Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/actions.py: 85%

376 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Reduced schemas for accepting API actions. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import json 1a

8from copy import deepcopy 1a

9from typing import Annotated, Any, ClassVar, Dict, List, Optional, Union 1a

10from uuid import UUID, uuid4 1a

11 

12from pydantic import ( 1a

13 AfterValidator, 

14 ConfigDict, 

15 Field, 

16 field_validator, 

17 model_validator, 

18) 

19 

20import prefect.server.schemas as schemas 1a

21from prefect._internal.schemas.validators import ( 1a

22 get_or_create_run_name, 

23 remove_old_deployment_fields, 

24 validate_cache_key_length, 

25 validate_max_metadata_length, 

26 validate_name_present_on_nonanonymous_blocks, 

27 validate_parameter_openapi_schema, 

28 validate_parameters_conform_to_schema, 

29 validate_parent_and_ref_diff, 

30 validate_schedule_max_scheduled_runs, 

31) 

32from prefect.server.utilities.schemas import get_class_fields_only 1a

33from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a

34from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS 1a

35from prefect.types import ( 1a

36 DateTime, 

37 KeyValueLabels, 

38 Name, 

39 NonEmptyishName, 

40 NonNegativeFloat, 

41 NonNegativeInteger, 

42 PositiveInteger, 

43 StrictVariableValue, 

44) 

45from prefect.types._datetime import now 1a

46from prefect.types._schema import ParameterSchema 1a

47from prefect.types.names import ( 1a

48 ArtifactKey, 

49 BlockDocumentName, 

50 BlockTypeSlug, 

51 VariableName, 

52) 

53from prefect.utilities.names import generate_slug 1a

54from prefect.utilities.templating import find_placeholders 1a

55 

56 

57class ActionBaseModel(PrefectBaseModel): 1a

58 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

59 

60 

61class FlowCreate(ActionBaseModel): 1a

62 """Data used by the Prefect REST API to create a flow.""" 

63 

64 name: Name = Field( 1a

65 default=..., description="The name of the flow", examples=["my-flow"] 

66 ) 

67 tags: List[str] = Field( 1a

68 default_factory=list, 

69 description="A list of flow tags", 

70 examples=[["tag-1", "tag-2"]], 

71 ) 

72 labels: Union[KeyValueLabels, None] = Field( 1a

73 default_factory=dict, 

74 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

75 examples=[{"key": "value1", "key2": 42}], 

76 ) 

77 

78 

79class FlowUpdate(ActionBaseModel): 1a

80 """Data used by the Prefect REST API to update a flow.""" 

81 

82 tags: List[str] = Field( 1a

83 default_factory=list, 

84 description="A list of flow tags", 

85 examples=[["tag-1", "tag-2"]], 

86 ) 

87 

88 

89class DeploymentScheduleCreate(ActionBaseModel): 1a

90 active: bool = Field( 1a

91 default=True, description="Whether or not the schedule is active." 

92 ) 

93 schedule: schemas.schedules.SCHEDULE_TYPES = Field( 1a

94 default=..., description="The schedule for the deployment." 

95 ) 

96 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

97 default=None, 

98 description="The maximum number of scheduled runs for the schedule.", 

99 ) 

100 parameters: dict[str, Any] = Field( 1a

101 default_factory=dict, description="A dictionary of parameter value overrides." 

102 ) 

103 slug: Optional[str] = Field( 1a

104 default=None, 

105 description="A unique identifier for the schedule.", 

106 ) 

107 

108 @field_validator("max_scheduled_runs") 1a

109 @classmethod 1a

110 def validate_max_scheduled_runs( 1a

111 cls, v: PositiveInteger | None 

112 ) -> PositiveInteger | None: 

113 return validate_schedule_max_scheduled_runs( 1cdfb

114 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value() 

115 ) 

116 

117 

118class DeploymentScheduleUpdate(ActionBaseModel): 1a

119 active: Optional[bool] = Field( 1a

120 default=None, description="Whether or not the schedule is active." 

121 ) 

122 schedule: Optional[schemas.schedules.SCHEDULE_TYPES] = Field( 1a

123 default=None, description="The schedule for the deployment." 

124 ) 

125 

126 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

127 default=None, 

128 description="The maximum number of scheduled runs for the schedule.", 

129 ) 

130 parameters: dict[str, Any] = Field( 1a

131 default_factory=dict, description="A dictionary of parameter value overrides." 

132 ) 

133 slug: Optional[str] = Field( 1a

134 default=None, 

135 description="A unique identifier for the schedule.", 

136 ) 

137 

138 @field_validator("max_scheduled_runs") 1a

139 @classmethod 1a

140 def validate_max_scheduled_runs( 1a

141 cls, v: PositiveInteger | None 

142 ) -> PositiveInteger | None: 

143 return validate_schedule_max_scheduled_runs( 1cb

144 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value() 

145 ) 

146 

147 

148class DeploymentCreate(ActionBaseModel): 1a

149 """Data used by the Prefect REST API to create a deployment.""" 

150 

151 name: str = Field( 1a

152 default=..., 

153 description="The name of the deployment.", 

154 examples=["my-deployment"], 

155 ) 

156 flow_id: UUID = Field( 1a

157 default=..., description="The ID of the flow associated with the deployment." 

158 ) 

159 paused: bool = Field( 1a

160 default=False, description="Whether or not the deployment is paused." 

161 ) 

162 schedules: list[DeploymentScheduleCreate] = Field( 1a

163 default_factory=lambda: [], 

164 description="A list of schedules for the deployment.", 

165 ) 

166 concurrency_limit: Optional[PositiveInteger] = Field( 1a

167 default=None, description="The deployment's concurrency limit." 

168 ) 

169 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a

170 default=None, description="The deployment's concurrency options." 

171 ) 

172 global_concurrency_limit_id: Optional[UUID] = Field( 1a

173 default=None, 

174 description="The ID of the global concurrency limit to apply to the deployment.", 

175 ) 

176 enforce_parameter_schema: bool = Field( 1a

177 default=True, 

178 description=( 

179 "Whether or not the deployment should enforce the parameter schema." 

180 ), 

181 ) 

182 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a

183 default_factory=lambda: {"type": "object", "properties": {}}, 

184 description="The parameter schema of the flow, including defaults.", 

185 json_schema_extra={"additionalProperties": True}, 

186 ) 

187 parameters: Dict[str, Any] = Field( 1a

188 default_factory=dict, 

189 description="Parameters for flow runs scheduled by the deployment.", 

190 json_schema_extra={"additionalProperties": True}, 

191 ) 

192 tags: List[str] = Field( 1a

193 default_factory=list, 

194 description="A list of deployment tags.", 

195 examples=[["tag-1", "tag-2"]], 

196 ) 

197 labels: Union[KeyValueLabels, None] = Field( 1a

198 default_factory=dict, 

199 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

200 examples=[{"key": "value1", "key2": 42}], 

201 ) 

202 pull_steps: Optional[List[dict[str, Any]]] = Field(None) 1a

203 

204 work_queue_name: Optional[str] = Field(None) 1a

205 work_pool_name: Optional[str] = Field( 1a

206 default=None, 

207 description="The name of the deployment's work pool.", 

208 examples=["my-work-pool"], 

209 ) 

210 storage_document_id: Optional[UUID] = Field(None) 1a

211 infrastructure_document_id: Optional[UUID] = Field(None) 1a

212 description: Optional[str] = Field(None) 1a

213 path: Optional[str] = Field(None) 1a

214 version: Optional[str] = Field(None) 1a

215 entrypoint: Optional[str] = Field(None) 1a

216 job_variables: Dict[str, Any] = Field( 1a

217 default_factory=dict, 

218 description="Overrides for the flow's infrastructure configuration.", 

219 json_schema_extra={"additionalProperties": True}, 

220 ) 

221 

222 version_info: Optional[schemas.core.VersionInfo] = Field( 1a

223 default=None, description="A description of this version of the deployment." 

224 ) 

225 

226 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a

227 """ 

228 Check that the combination of base_job_template defaults and job_variables 

229 conforms to the specified schema. 

230 

231 NOTE: This method does not hydrate block references in default values within the 

232 base job template to validate them. Failing to do this can cause user-facing 

233 errors. Instead of this method, use `validate_job_variables_for_deployment` 

234 function from `prefect_cloud.orion.api.validation`. 

235 """ 

236 # This import is here to avoid a circular import 

237 from prefect.utilities.schema_tools import validate 

238 

239 variables_schema = deepcopy(base_job_template.get("variables")) 

240 

241 if variables_schema is not None: 

242 validate( 

243 self.job_variables, 

244 variables_schema, 

245 raise_on_error=True, 

246 preprocess=True, 

247 ignore_required=True, 

248 ) 

249 

250 @model_validator(mode="before") 1a

251 @classmethod 1a

252 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

253 return remove_old_deployment_fields(values) 1cdb

254 

255 @model_validator(mode="before") 1a

256 def _validate_parameters_conform_to_schema( 1a

257 cls, values: dict[str, Any] 

258 ) -> dict[str, Any]: 

259 values["parameters"] = validate_parameters_conform_to_schema( 1cdb

260 values.get("parameters", {}), values 

261 ) 

262 schema = validate_parameter_openapi_schema( 1cdb

263 values.get("parameter_openapi_schema"), values 

264 ) 

265 if schema is not None: 1cdb

266 values["parameter_openapi_schema"] = schema 1cdb

267 return values 1cdb

268 

269 @model_validator(mode="before") 1a

270 def _validate_concurrency_limits(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

271 """Validate that a deployment does not have both a concurrency limit and global concurrency limit.""" 

272 if values.get("concurrency_limit") and values.get( 1cdb

273 "global_concurrency_limit_id" 

274 ): 

275 raise ValueError( 1cb

276 "A deployment cannot have both a concurrency limit and a global concurrency limit." 

277 ) 

278 return values 1cdb

279 

280 

281class DeploymentUpdate(ActionBaseModel): 1a

282 """Data used by the Prefect REST API to update a deployment.""" 

283 

284 @model_validator(mode="before") 1a

285 @classmethod 1a

286 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

287 return remove_old_deployment_fields(values) 1cdb

288 

289 version: Optional[str] = Field(default=None) 1a

290 description: Optional[str] = Field(default=None) 1a

291 paused: bool = Field( 1a

292 default=False, description="Whether or not the deployment is paused." 

293 ) 

294 schedules: list[DeploymentScheduleUpdate] = Field( 1a

295 default_factory=lambda: [], 

296 description="A list of schedules for the deployment.", 

297 ) 

298 concurrency_limit: Optional[PositiveInteger] = Field( 1a

299 default=None, description="The deployment's concurrency limit." 

300 ) 

301 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a

302 default=None, description="The deployment's concurrency options." 

303 ) 

304 global_concurrency_limit_id: Optional[UUID] = Field( 1a

305 default=None, 

306 description="The ID of the global concurrency limit to apply to the deployment.", 

307 ) 

308 parameters: Optional[Dict[str, Any]] = Field( 1a

309 default=None, 

310 description="Parameters for flow runs scheduled by the deployment.", 

311 ) 

312 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a

313 default=None, 

314 description="The parameter schema of the flow, including defaults.", 

315 ) 

316 tags: List[str] = Field( 1a

317 default_factory=list, 

318 description="A list of deployment tags.", 

319 examples=[["tag-1", "tag-2"]], 

320 ) 

321 work_queue_name: Optional[str] = Field(default=None) 1a

322 work_pool_name: Optional[str] = Field( 1a

323 default=None, 

324 description="The name of the deployment's work pool.", 

325 examples=["my-work-pool"], 

326 ) 

327 path: Optional[str] = Field(default=None) 1a

328 job_variables: Optional[Dict[str, Any]] = Field( 1a

329 default=None, 

330 description="Overrides for the flow's infrastructure configuration.", 

331 ) 

332 pull_steps: Optional[List[dict[str, Any]]] = Field(default=None) 1a

333 entrypoint: Optional[str] = Field(default=None) 1a

334 storage_document_id: Optional[UUID] = Field(default=None) 1a

335 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a

336 enforce_parameter_schema: Optional[bool] = Field( 1a

337 default=None, 

338 description=( 

339 "Whether or not the deployment should enforce the parameter schema." 

340 ), 

341 ) 

342 model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) 1a

343 

344 version_info: Optional[schemas.core.VersionInfo] = Field( 1a

345 default=None, description="A description of this version of the deployment." 

346 ) 

347 

348 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a

349 """ 

350 Check that the combination of base_job_template defaults and job_variables 

351 conforms to the schema specified in the base_job_template. 

352 

353 NOTE: This method does not hydrate block references in default values within the 

354 base job template to validate them. Failing to do this can cause user-facing 

355 errors. Instead of this method, use `validate_job_variables_for_deployment` 

356 function from `prefect_cloud.orion.api.validation`. 

357 """ 

358 # This import is here to avoid a circular import 

359 from prefect.utilities.schema_tools import validate 

360 

361 variables_schema = deepcopy(base_job_template.get("variables")) 

362 

363 if variables_schema is not None and self.job_variables is not None: 

364 errors = validate( 

365 self.job_variables, 

366 variables_schema, 

367 raise_on_error=False, 

368 preprocess=True, 

369 ignore_required=True, 

370 ) 

371 if errors: 

372 for error in errors: 

373 raise error 

374 

375 @model_validator(mode="before") 1a

376 def _validate_concurrency_limits(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

377 """Validate that a deployment does not have both a concurrency limit and global concurrency limit.""" 

378 if values.get("concurrency_limit") and values.get( 378 ↛ 381line 378 didn't jump to line 381 because the condition on line 378 was never true1cdb

379 "global_concurrency_limit_id" 

380 ): 

381 raise ValueError( 

382 "A deployment cannot have both a concurrency limit and a global concurrency limit." 

383 ) 

384 return values 1cdb

385 

386 

387class FlowRunUpdate(ActionBaseModel): 1a

388 """Data used by the Prefect REST API to update a flow run.""" 

389 

390 name: Optional[str] = Field(None) 1a

391 flow_version: Optional[str] = Field(None) 1a

392 parameters: Dict[str, Any] = Field(default_factory=dict) 1a

393 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a

394 default_factory=schemas.core.FlowRunPolicy 

395 ) 

396 tags: List[str] = Field(default_factory=list) 1a

397 infrastructure_pid: Optional[str] = Field(None) 1a

398 job_variables: Optional[Dict[str, Any]] = Field(None) 1a

399 

400 @field_validator("name", mode="before") 1a

401 @classmethod 1a

402 def set_name(cls, name: str) -> str: 1a

403 return get_or_create_run_name(name) 

404 

405 

406class StateCreate(ActionBaseModel): 1a

407 """Data used by the Prefect REST API to create a new state.""" 

408 

409 type: schemas.states.StateType = Field( 1a

410 default=..., description="The type of the state to create" 

411 ) 

412 name: Optional[str] = Field( 1a

413 default=None, description="The name of the state to create" 

414 ) 

415 message: Optional[str] = Field( 1a

416 default=None, description="The message of the state to create" 

417 ) 

418 data: Optional[Any] = Field( 1a

419 default=None, description="The data of the state to create" 

420 ) 

421 state_details: schemas.states.StateDetails = Field( 1a

422 default_factory=schemas.states.StateDetails, 

423 description="The details of the state to create", 

424 ) 

425 

426 @model_validator(mode="after") 1a

427 def default_name_from_type(self): 1a

428 """If a name is not provided, use the type""" 

429 # if `type` is not in `values` it means the `type` didn't pass its own 

430 # validation check and an error will be raised after this function is called 

431 name = self.name 1cdeb

432 if name is None and self.type: 1cdeb

433 self.name = " ".join([v.capitalize() for v in self.type.value.split("_")]) 1cb

434 return self 1cdeb

435 

436 @model_validator(mode="after") 1a

437 def default_scheduled_start_time(self): 1a

438 from prefect.server.schemas.states import StateType 1cdeb

439 

440 if self.type == StateType.SCHEDULED: 1cdeb

441 if not self.state_details.scheduled_time: 1cdeb

442 self.state_details.scheduled_time = now("UTC") 1cb

443 

444 return self 1cdeb

445 

446 

447class TaskRunCreate(ActionBaseModel): 1a

448 """Data used by the Prefect REST API to create a task run""" 

449 

450 id: Optional[UUID] = Field( 1a

451 default=None, 

452 description="The ID to assign to the task run. If not provided, a random UUID will be generated.", 

453 ) 

454 # TaskRunCreate states must be provided as StateCreate objects 

455 state: Optional[StateCreate] = Field( 1a

456 default=None, description="The state of the task run to create" 

457 ) 

458 

459 name: str = Field( 1a

460 default_factory=lambda: generate_slug(2), examples=["my-task-run"] 

461 ) 

462 flow_run_id: Optional[UUID] = Field( 1a

463 default=None, description="The flow run id of the task run." 

464 ) 

465 task_key: str = Field( 1a

466 default=..., description="A unique identifier for the task being run." 

467 ) 

468 dynamic_key: str = Field( 1a

469 default=..., 

470 description=( 

471 "A dynamic key used to differentiate between multiple runs of the same task" 

472 " within the same flow run." 

473 ), 

474 ) 

475 cache_key: Optional[str] = Field( 1a

476 default=None, 

477 description=( 

478 "An optional cache key. If a COMPLETED state associated with this cache key" 

479 " is found, the cached COMPLETED state will be used instead of executing" 

480 " the task run." 

481 ), 

482 ) 

483 cache_expiration: Optional[DateTime] = Field( 1a

484 default=None, description="Specifies when the cached state should expire." 

485 ) 

486 task_version: Optional[str] = Field( 1a

487 default=None, description="The version of the task being run." 

488 ) 

489 empirical_policy: schemas.core.TaskRunPolicy = Field( 1a

490 default_factory=schemas.core.TaskRunPolicy, 

491 ) 

492 tags: List[str] = Field( 1a

493 default_factory=list, 

494 description="A list of tags for the task run.", 

495 examples=[["tag-1", "tag-2"]], 

496 ) 

497 labels: Union[KeyValueLabels, None] = Field( 1a

498 default_factory=dict, 

499 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

500 examples=[{"key": "value1", "key2": 42}], 

501 ) 

502 task_inputs: Dict[ 1a

503 str, 

504 List[ 

505 Union[ 

506 schemas.core.TaskRunResult, 

507 schemas.core.FlowRunResult, 

508 schemas.core.Parameter, 

509 schemas.core.Constant, 

510 ] 

511 ], 

512 ] = Field( 

513 default_factory=dict, 

514 description="The inputs to the task run.", 

515 ) 

516 

517 @field_validator("name", mode="before") 1a

518 @classmethod 1a

519 def set_name(cls, name: str) -> str: 1a

520 return get_or_create_run_name(name) 1cdb

521 

522 @field_validator("cache_key") 1a

523 @classmethod 1a

524 def validate_cache_key(cls, cache_key: str | None) -> str | None: 1a

525 return validate_cache_key_length(cache_key) 1cdb

526 

527 

528class TaskRunUpdate(ActionBaseModel): 1a

529 """Data used by the Prefect REST API to update a task run""" 

530 

531 name: str = Field( 1a

532 default_factory=lambda: generate_slug(2), examples=["my-task-run"] 

533 ) 

534 

535 @field_validator("name", mode="before") 1a

536 @classmethod 1a

537 def set_name(cls, name: str) -> str: 1a

538 return get_or_create_run_name(name) 1cb

539 

540 

541class FlowRunCreate(ActionBaseModel): 1a

542 """Data used by the Prefect REST API to create a flow run.""" 

543 

544 # FlowRunCreate states must be provided as StateCreate objects 

545 state: Optional[StateCreate] = Field( 1a

546 default=None, description="The state of the flow run to create" 

547 ) 

548 

549 name: str = Field( 1a

550 default_factory=lambda: generate_slug(2), 

551 description=( 

552 "The name of the flow run. Defaults to a random slug if not specified." 

553 ), 

554 examples=["my-flow-run"], 

555 ) 

556 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

557 flow_version: Optional[str] = Field( 1a

558 default=None, description="The version of the flow being run." 

559 ) 

560 parameters: Dict[str, Any] = Field( 1a

561 default_factory=dict, 

562 ) 

563 context: Dict[str, Any] = Field( 1a

564 default_factory=dict, 

565 description="The context of the flow run.", 

566 ) 

567 parent_task_run_id: Optional[UUID] = Field(None) 1a

568 infrastructure_document_id: Optional[UUID] = Field(None) 1a

569 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a

570 default_factory=schemas.core.FlowRunPolicy, 

571 description="The empirical policy for the flow run.", 

572 ) 

573 tags: List[str] = Field( 1a

574 default_factory=list, 

575 description="A list of tags for the flow run.", 

576 examples=[["tag-1", "tag-2"]], 

577 ) 

578 labels: Union[KeyValueLabels, None] = Field( 1a

579 default_factory=dict, 

580 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

581 examples=[{"key": "value1", "key2": 42}], 

582 ) 

583 idempotency_key: Optional[str] = Field( 1a

584 None, 

585 description=( 

586 "An optional idempotency key. If a flow run with the same idempotency key" 

587 " has already been created, the existing flow run will be returned." 

588 ), 

589 ) 

590 work_pool_name: Optional[str] = Field( 1a

591 default=None, 

592 description="The name of the work pool to run the flow run in.", 

593 ) 

594 work_queue_name: Optional[str] = Field( 1a

595 default=None, 

596 description="The name of the work queue to place the flow run in.", 

597 ) 

598 job_variables: Optional[Dict[str, Any]] = Field( 1a

599 default=None, 

600 description="The job variables to use when setting up flow run infrastructure.", 

601 ) 

602 

603 # DEPRECATED 

604 

605 deployment_id: Optional[UUID] = Field( 1a

606 None, 

607 description=( 

608 "DEPRECATED: The id of the deployment associated with this flow run, if" 

609 " available." 

610 ), 

611 deprecated=True, 

612 ) 

613 

614 @field_validator("name", mode="before") 1a

615 @classmethod 1a

616 def set_name(cls, name: str) -> str: 1a

617 return get_or_create_run_name(name) 1cdb

618 

619 

620class DeploymentFlowRunCreate(ActionBaseModel): 1a

621 """Data used by the Prefect REST API to create a flow run from a deployment.""" 

622 

623 # FlowRunCreate states must be provided as StateCreate objects 

624 state: Optional[StateCreate] = Field( 1a

625 default=None, description="The state of the flow run to create" 

626 ) 

627 

628 name: str = Field( 1a

629 default_factory=lambda: generate_slug(2), 

630 description=( 

631 "The name of the flow run. Defaults to a random slug if not specified." 

632 ), 

633 examples=["my-flow-run"], 

634 ) 

635 parameters: Dict[str, Any] = Field( 1a

636 default_factory=dict, 

637 json_schema_extra={"additionalProperties": True}, 

638 ) 

639 enforce_parameter_schema: Optional[bool] = Field( 1a

640 default=None, 

641 description="Whether or not to enforce the parameter schema on this run.", 

642 ) 

643 context: Dict[str, Any] = Field(default_factory=dict) 1a

644 infrastructure_document_id: Optional[UUID] = Field(None) 1a

645 empirical_policy: schemas.core.FlowRunPolicy = Field( 1a

646 default_factory=schemas.core.FlowRunPolicy, 

647 description="The empirical policy for the flow run.", 

648 ) 

649 tags: List[str] = Field( 1a

650 default_factory=list, 

651 description="A list of tags for the flow run.", 

652 examples=[["tag-1", "tag-2"]], 

653 ) 

654 idempotency_key: Optional[str] = Field( 1a

655 None, 

656 description=( 

657 "An optional idempotency key. If a flow run with the same idempotency key" 

658 " has already been created, the existing flow run will be returned." 

659 ), 

660 ) 

661 labels: Union[KeyValueLabels, None] = Field( 1a

662 None, 

663 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

664 examples=[{"key": "value1", "key2": 42}], 

665 ) 

666 parent_task_run_id: Optional[UUID] = Field(None) 1a

667 work_queue_name: Optional[str] = Field(None) 1a

668 job_variables: Optional[Dict[str, Any]] = Field( 1a

669 default_factory=dict, 

670 json_schema_extra={"additionalProperties": True}, 

671 ) 

672 

673 @field_validator("name", mode="before") 1a

674 @classmethod 1a

675 def set_name(cls, name: str) -> str: 1a

676 return get_or_create_run_name(name) 1cdeb

677 

678 

679class SavedSearchCreate(ActionBaseModel): 1a

680 """Data used by the Prefect REST API to create a saved search.""" 

681 

682 name: str = Field(default=..., description="The name of the saved search.") 1a

683 filters: list[schemas.core.SavedSearchFilter] = Field( 1a

684 default_factory=lambda: [], description="The filter set for the saved search." 

685 ) 

686 

687 

688class ConcurrencyLimitCreate(ActionBaseModel): 1a

689 """Data used by the Prefect REST API to create a concurrency limit.""" 

690 

691 tag: str = Field( 1a

692 default=..., description="A tag the concurrency limit is applied to." 

693 ) 

694 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a

695 

696 

697class ConcurrencyLimitV2Create(ActionBaseModel): 1a

698 """Data used by the Prefect REST API to create a v2 concurrency limit.""" 

699 

700 active: bool = Field( 1a

701 default=True, description="Whether the concurrency limit is active." 

702 ) 

703 name: Name = Field(default=..., description="The name of the concurrency limit.") 1a

704 limit: NonNegativeInteger = Field(default=..., description="The concurrency limit.") 1a

705 active_slots: NonNegativeInteger = Field( 1a

706 default=0, description="The number of active slots." 

707 ) 

708 denied_slots: NonNegativeInteger = Field( 1a

709 default=0, description="The number of denied slots." 

710 ) 

711 slot_decay_per_second: NonNegativeFloat = Field( 1a

712 default=0, 

713 description="The decay rate for active slots when used as a rate limit.", 

714 ) 

715 

716 

717class ConcurrencyLimitV2Update(ActionBaseModel): 1a

718 """Data used by the Prefect REST API to update a v2 concurrency limit.""" 

719 

720 active: Optional[bool] = Field(None) 1a

721 name: Optional[Name] = Field(None) 1a

722 limit: Optional[NonNegativeInteger] = Field(None) 1a

723 active_slots: Optional[NonNegativeInteger] = Field(None) 1a

724 denied_slots: Optional[NonNegativeInteger] = Field(None) 1a

725 slot_decay_per_second: Optional[NonNegativeFloat] = Field(None) 1a

726 

727 

728class BlockTypeCreate(ActionBaseModel): 1a

729 """Data used by the Prefect REST API to create a block type.""" 

730 

731 name: Name = Field(default=..., description="A block type's name") 1a

732 slug: BlockTypeSlug = Field(default=..., description="A block type's slug") 1a

733 logo_url: Optional[str] = Field( # TODO: HttpUrl 1a

734 default=None, description="Web URL for the block type's logo" 

735 ) 

736 documentation_url: Optional[str] = Field( # TODO: HttpUrl 1a

737 default=None, description="Web URL for the block type's documentation" 

738 ) 

739 description: Optional[str] = Field( 1a

740 default=None, 

741 description="A short blurb about the corresponding block's intended use", 

742 ) 

743 code_example: Optional[str] = Field( 1a

744 default=None, 

745 description="A code snippet demonstrating use of the corresponding block", 

746 ) 

747 

748 

749class BlockTypeUpdate(ActionBaseModel): 1a

750 """Data used by the Prefect REST API to update a block type.""" 

751 

752 logo_url: Optional[str] = Field(None) # TODO: HttpUrl 1a

753 documentation_url: Optional[str] = Field(None) # TODO: HttpUrl 1a

754 description: Optional[str] = Field(None) 1a

755 code_example: Optional[str] = Field(None) 1a

756 

757 @classmethod 1a

758 def updatable_fields(cls) -> set[str]: 1a

759 return get_class_fields_only(cls) 

760 

761 

762class BlockSchemaCreate(ActionBaseModel): 1a

763 """Data used by the Prefect REST API to create a block schema.""" 

764 

765 fields: dict[str, Any] = Field( 1a

766 default_factory=dict, description="The block schema's field schema" 

767 ) 

768 block_type_id: UUID = Field(default=..., description="A block type ID") 1a

769 

770 capabilities: List[str] = Field( 1a

771 default_factory=list, 

772 description="A list of Block capabilities", 

773 ) 

774 version: str = Field( 1a

775 default=schemas.core.DEFAULT_BLOCK_SCHEMA_VERSION, 

776 description="Human readable identifier for the block schema", 

777 ) 

778 

779 

780class BlockDocumentCreate(ActionBaseModel): 1a

781 """Data used by the Prefect REST API to create a block document.""" 

782 

783 name: Optional[BlockDocumentName] = Field( 1a

784 default=None, 

785 description=( 

786 "The block document's name. Not required for anonymous block documents." 

787 ), 

788 ) 

789 data: Dict[str, Any] = Field( 1a

790 default_factory=dict, description="The block document's data" 

791 ) 

792 block_schema_id: UUID = Field(default=..., description="A block schema ID") 1a

793 

794 block_type_id: UUID = Field(default=..., description="A block type ID") 1a

795 

796 is_anonymous: bool = Field( 1a

797 default=False, 

798 description=( 

799 "Whether the block is anonymous (anonymous blocks are usually created by" 

800 " Prefect automatically)" 

801 ), 

802 ) 

803 

804 @model_validator(mode="before") 1a

805 def validate_name_is_present_if_not_anonymous( 1a

806 cls, values: dict[str, Any] 

807 ) -> dict[str, Any]: 

808 return validate_name_present_on_nonanonymous_blocks(values) 1cdfb

809 

810 

811class BlockDocumentUpdate(ActionBaseModel): 1a

812 """Data used by the Prefect REST API to update a block document.""" 

813 

814 block_schema_id: Optional[UUID] = Field( 1a

815 default=None, description="A block schema ID" 

816 ) 

817 data: Dict[str, Any] = Field( 1a

818 default_factory=dict, description="The block document's data" 

819 ) 

820 merge_existing_data: bool = True 1a

821 

822 

823class BlockDocumentReferenceCreate(ActionBaseModel): 1a

824 """Data used to create block document reference.""" 

825 

826 id: UUID = Field( 1a

827 default_factory=uuid4, description="The block document reference ID" 

828 ) 

829 parent_block_document_id: UUID = Field( 1a

830 default=..., description="ID of the parent block document" 

831 ) 

832 reference_block_document_id: UUID = Field( 1a

833 default=..., description="ID of the nested block document" 

834 ) 

835 name: str = Field( 1a

836 default=..., description="The name that the reference is nested under" 

837 ) 

838 

839 @model_validator(mode="before") 1a

840 def validate_parent_and_ref_are_different(cls, values): 1a

841 return validate_parent_and_ref_diff(values) 

842 

843 

844class LogCreate(ActionBaseModel): 1a

845 """Data used by the Prefect REST API to create a log.""" 

846 

847 name: str = Field(default=..., description="The logger name.") 1a

848 level: int = Field(default=..., description="The log level.") 1a

849 message: str = Field(default=..., description="The log message.") 1a

850 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a

851 flow_run_id: Optional[UUID] = Field(None) 1a

852 task_run_id: Optional[UUID] = Field(None) 1a

853 

854 

855def validate_base_job_template(v: dict[str, Any]) -> dict[str, Any]: 1a

856 if v == dict(): 1cb

857 return v 1c

858 

859 job_config = v.get("job_configuration") 

860 variables_schema = v.get("variables") 

861 if not (job_config and variables_schema): 

862 raise ValueError( 

863 "The `base_job_template` must contain both a `job_configuration` key" 

864 " and a `variables` key." 

865 ) 

866 template_variables: set[str] = set() 

867 for template in job_config.values(): 

868 # find any variables inside of double curly braces, minus any whitespace 

869 # e.g. "{{ var1 }}.{{var2}}" -> ["var1", "var2"] 

870 # convert to json string to handle nested objects and lists 

871 found_variables = find_placeholders(json.dumps(template)) 

872 template_variables.update({placeholder.name for placeholder in found_variables}) 

873 

874 provided_variables = set(variables_schema.get("properties", {}).keys()) 

875 if not template_variables.issubset(provided_variables): 

876 missing_variables = template_variables - provided_variables 

877 raise ValueError( 

878 "The variables specified in the job configuration template must be " 

879 "present as properties in the variables schema. " 

880 "Your job configuration uses the following undeclared " 

881 f"variable(s): {' ,'.join(missing_variables)}." 

882 ) 

883 return v 

884 

885 

886class WorkPoolCreate(ActionBaseModel): 1a

887 """Data used by the Prefect REST API to create a work pool.""" 

888 

889 name: NonEmptyishName = Field(..., description="The name of the work pool.") 1a

890 description: Optional[str] = Field( 1a

891 default=None, description="The work pool description." 

892 ) 

893 type: str = Field(description="The work pool type.", default="prefect-agent") 1a

894 base_job_template: Dict[str, Any] = Field( 1a

895 default_factory=dict, description="The work pool's base job template." 

896 ) 

897 is_paused: bool = Field( 1a

898 default=False, 

899 description="Pausing the work pool stops the delivery of all work.", 

900 ) 

901 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

902 default=None, description="A concurrency limit for the work pool." 

903 ) 

904 

905 storage_configuration: schemas.core.WorkPoolStorageConfiguration = Field( 1a

906 default_factory=schemas.core.WorkPoolStorageConfiguration, 

907 description="The storage configuration for the work pool.", 

908 ) 

909 

910 _validate_base_job_template = field_validator("base_job_template")( 1a

911 validate_base_job_template 

912 ) 

913 

914 

915class WorkPoolUpdate(ActionBaseModel): 1a

916 """Data used by the Prefect REST API to update a work pool.""" 

917 

918 description: Optional[str] = Field(default=None) 1a

919 is_paused: Optional[bool] = Field(default=None) 1a

920 base_job_template: Optional[Dict[str, Any]] = Field(default=None) 1a

921 concurrency_limit: Optional[NonNegativeInteger] = Field(default=None) 1a

922 storage_configuration: Optional[schemas.core.WorkPoolStorageConfiguration] = Field( 1a

923 default=None, 

924 description="The storage configuration for the work pool.", 

925 ) 

926 _validate_base_job_template = field_validator("base_job_template")( 1a

927 validate_base_job_template 

928 ) 

929 

930 

931class WorkQueueCreate(ActionBaseModel): 1a

932 """Data used by the Prefect REST API to create a work queue.""" 

933 

934 name: Name = Field(default=..., description="The name of the work queue.") 1a

935 description: Optional[str] = Field( 1a

936 default="", description="An optional description for the work queue." 

937 ) 

938 is_paused: bool = Field( 1a

939 default=False, description="Whether or not the work queue is paused." 

940 ) 

941 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

942 None, description="The work queue's concurrency limit." 

943 ) 

944 priority: Optional[PositiveInteger] = Field( 1a

945 None, 

946 description=( 

947 "The queue's priority. Lower values are higher priority (1 is the highest)." 

948 ), 

949 ) 

950 

951 # DEPRECATED 

952 

953 filter: Optional[schemas.core.QueueFilter] = Field( 1a

954 None, 

955 description="DEPRECATED: Filter criteria for the work queue.", 

956 deprecated=True, 

957 ) 

958 

959 

960class WorkQueueUpdate(ActionBaseModel): 1a

961 """Data used by the Prefect REST API to update a work queue.""" 

962 

963 name: Optional[str] = Field(None) 1a

964 description: Optional[str] = Field(None) 1a

965 is_paused: bool = Field( 1a

966 default=False, description="Whether or not the work queue is paused." 

967 ) 

968 concurrency_limit: Optional[NonNegativeInteger] = Field(None) 1a

969 priority: Optional[PositiveInteger] = Field(None) 1a

970 last_polled: Optional[DateTime] = Field(None) 1a

971 

972 # DEPRECATED 

973 

974 filter: Optional[schemas.core.QueueFilter] = Field( 1a

975 None, 

976 description="DEPRECATED: Filter criteria for the work queue.", 

977 deprecated=True, 

978 ) 

979 

980 

981class ArtifactCreate(ActionBaseModel): 1a

982 """Data used by the Prefect REST API to create an artifact.""" 

983 

984 key: Optional[ArtifactKey] = Field( 1a

985 default=None, description="An optional unique reference key for this artifact." 

986 ) 

987 type: Optional[str] = Field( 1a

988 default=None, 

989 description=( 

990 "An identifier that describes the shape of the data field. e.g. 'result'," 

991 " 'table', 'markdown'" 

992 ), 

993 ) 

994 description: Optional[str] = Field( 1a

995 default=None, description="A markdown-enabled description of the artifact." 

996 ) 

997 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a

998 default=None, 

999 description=( 

1000 "Data associated with the artifact, e.g. a result.; structure depends on" 

1001 " the artifact type." 

1002 ), 

1003 ) 

1004 metadata_: Optional[ 1a

1005 Annotated[dict[str, str], AfterValidator(validate_max_metadata_length)] 

1006 ] = Field( 

1007 default=None, 

1008 description=( 

1009 "User-defined artifact metadata. Content must be string key and value" 

1010 " pairs." 

1011 ), 

1012 ) 

1013 flow_run_id: Optional[UUID] = Field( 1a

1014 default=None, description="The flow run associated with the artifact." 

1015 ) 

1016 task_run_id: Optional[UUID] = Field( 1a

1017 default=None, description="The task run associated with the artifact." 

1018 ) 

1019 

1020 @classmethod 1a

1021 def from_result(cls, data: Any | dict[str, Any]) -> "ArtifactCreate": 1a

1022 artifact_info: dict[str, Any] = dict() 

1023 if isinstance(data, dict): 

1024 artifact_key = data.pop("artifact_key", None) 

1025 if artifact_key: 

1026 artifact_info["key"] = artifact_key 

1027 

1028 artifact_type = data.pop("artifact_type", None) 

1029 if artifact_type: 

1030 artifact_info["type"] = artifact_type 

1031 

1032 description = data.pop("artifact_description", None) 

1033 if description: 

1034 artifact_info["description"] = description 

1035 

1036 return cls(data=data, **artifact_info) 

1037 

1038 

1039class ArtifactUpdate(ActionBaseModel): 1a

1040 """Data used by the Prefect REST API to update an artifact.""" 

1041 

1042 data: Optional[Union[Dict[str, Any], Any]] = Field(None) 1a

1043 description: Optional[str] = Field(None) 1a

1044 metadata_: Optional[ 1a

1045 Annotated[dict[str, str], AfterValidator(validate_max_metadata_length)] 

1046 ] = Field(None) 

1047 

1048 

1049class VariableCreate(ActionBaseModel): 1a

1050 """Data used by the Prefect REST API to create a Variable.""" 

1051 

1052 name: VariableName = Field(default=...) 1a

1053 value: StrictVariableValue = Field( 1a

1054 default=..., 

1055 description="The value of the variable", 

1056 examples=["my-value"], 

1057 ) 

1058 tags: list[str] = Field( 1a

1059 default_factory=list, 

1060 description="A list of variable tags", 

1061 examples=[["tag-1", "tag-2"]], 

1062 ) 

1063 

1064 

1065class VariableUpdate(ActionBaseModel): 1a

1066 """Data used by the Prefect REST API to update a Variable.""" 

1067 

1068 name: Optional[VariableName] = Field(default=None) 1a

1069 value: StrictVariableValue = Field( 1a

1070 default=None, 

1071 description="The value of the variable", 

1072 examples=["my-value"], 

1073 ) 

1074 tags: Optional[list[str]] = Field( 1a

1075 default=None, 

1076 description="A list of variable tags", 

1077 examples=[["tag-1", "tag-2"]], 

1078 )