Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/actions.py: 76%

365 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4from copy import deepcopy 1a

5from typing import Any, Callable, Optional, TypeVar, Union 1a

6from uuid import UUID, uuid4 1a

7 

8import jsonschema 1a

9from pydantic import ( 1a

10 BaseModel, 

11 Field, 

12 field_serializer, 

13 field_validator, 

14 model_validator, 

15) 

16 

17import prefect.client.schemas.objects as objects 1a

18from prefect._internal.schemas.bases import ActionBaseModel 1a

19from prefect._internal.schemas.validators import ( 1a

20 convert_to_strings, 

21 remove_old_deployment_fields, 

22 validate_name_present_on_nonanonymous_blocks, 

23 validate_schedule_max_scheduled_runs, 

24) 

25from prefect._result_records import ResultRecordMetadata 1a

26from prefect.client.schemas.objects import ( 1a

27 StateDetails, 

28 StateType, 

29 WorkPoolStorageConfiguration, 

30) 

31from prefect.client.schemas.schedules import ( 1a

32 SCHEDULE_TYPES, 

33 CronSchedule, 

34 IntervalSchedule, 

35 NoSchedule, 

36 RRuleSchedule, 

37) 

38from prefect.schedules import Schedule 1a

39from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS 1a

40from prefect.types import ( 1a

41 DateTime, 

42 KeyValueLabelsField, 

43 Name, 

44 NameOrEmpty, 

45 NonEmptyishName, 

46 NonNegativeFloat, 

47 NonNegativeInteger, 

48 PositiveInteger, 

49 StrictVariableValue, 

50) 

51from prefect.types._schema import ParameterSchema 1a

52from prefect.types.names import ( 1a

53 ArtifactKey, 

54 BlockDocumentName, 

55 BlockTypeSlug, 

56 VariableName, 

57) 

58from prefect.utilities.collections import visit_collection 1a

59from prefect.utilities.pydantic import get_class_fields_only 1a

60 

61R = TypeVar("R") 1a

62 

63 

64class StateCreate(ActionBaseModel): 1a

65 """Data used by the Prefect REST API to create a new state.""" 

66 

67 type: StateType 1a

68 name: Optional[str] = Field(default=None) 1a

69 message: Optional[str] = Field(default=None, examples=["Run started"]) 1a

70 state_details: StateDetails = Field(default_factory=StateDetails) 1a

71 data: Union[ResultRecordMetadata, Any] = Field( 1a

72 default=None, 

73 ) 

74 

75 

76class FlowCreate(ActionBaseModel): 1a

77 """Data used by the Prefect REST API to create a flow.""" 

78 

79 name: str = Field( 1a

80 default=..., description="The name of the flow", examples=["my-flow"] 

81 ) 

82 tags: list[str] = Field( 1a

83 default_factory=list, 

84 description="A list of flow tags", 

85 examples=[["tag-1", "tag-2"]], 

86 ) 

87 

88 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

89 

90 

91class FlowUpdate(ActionBaseModel): 1a

92 """Data used by the Prefect REST API to update a flow.""" 

93 

94 tags: list[str] = Field( 1a

95 default_factory=list, 

96 description="A list of flow tags", 

97 examples=[["tag-1", "tag-2"]], 

98 ) 

99 

100 

101class DeploymentScheduleCreate(ActionBaseModel): 1a

102 schedule: SCHEDULE_TYPES = Field( 1a

103 default=..., description="The schedule for the deployment." 

104 ) 

105 active: bool = Field( 1a

106 default=True, description="Whether or not the schedule is active." 

107 ) 

108 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

109 default=None, 

110 description="The maximum number of scheduled runs for the schedule.", 

111 ) 

112 parameters: dict[str, Any] = Field( 1a

113 default_factory=dict, 

114 description="Parameter overrides for the schedule.", 

115 ) 

116 slug: Optional[str] = Field( 1a

117 default=None, 

118 description="A unique identifier for the schedule.", 

119 ) 

120 

121 @field_validator("active", mode="wrap") 1a

122 @classmethod 1a

123 def validate_active(cls, v: Any, handler: Callable[[Any], Any]) -> bool: 1a

124 try: 

125 return handler(v) 

126 except Exception: 

127 raise ValueError( 

128 f"active must be able to be parsed as a boolean, got {v!r} of type {type(v)}" 

129 ) 

130 

131 @field_validator("max_scheduled_runs") 1a

132 @classmethod 1a

133 def validate_max_scheduled_runs(cls, v: Optional[int]) -> Optional[int]: 1a

134 return validate_schedule_max_scheduled_runs( 

135 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value() 

136 ) 

137 

138 @classmethod 1a

139 def from_schedule(cls, schedule: Schedule) -> "DeploymentScheduleCreate": 1a

140 if schedule.interval is not None: 

141 return cls( 

142 schedule=IntervalSchedule( 

143 interval=schedule.interval, 

144 timezone=schedule.timezone, 

145 anchor_date=schedule.anchor_date, 

146 ), 

147 parameters=schedule.parameters, 

148 active=schedule.active, 

149 slug=schedule.slug, 

150 ) 

151 elif schedule.cron is not None: 

152 return cls( 

153 schedule=CronSchedule( 

154 cron=schedule.cron, 

155 timezone=schedule.timezone, 

156 day_or=schedule.day_or, 

157 ), 

158 parameters=schedule.parameters, 

159 active=schedule.active, 

160 slug=schedule.slug, 

161 ) 

162 elif schedule.rrule is not None: 

163 return cls( 

164 schedule=RRuleSchedule( 

165 rrule=schedule.rrule, 

166 timezone=schedule.timezone, 

167 ), 

168 parameters=schedule.parameters, 

169 active=schedule.active, 

170 slug=schedule.slug, 

171 ) 

172 else: 

173 return cls( 

174 schedule=NoSchedule(), 

175 ) 

176 

177 

178class DeploymentScheduleUpdate(ActionBaseModel): 1a

179 schedule: Optional[SCHEDULE_TYPES] = Field( 1a

180 default=None, description="The schedule for the deployment." 

181 ) 

182 active: Optional[bool] = Field( 1a

183 default=None, description="Whether or not the schedule is active." 

184 ) 

185 

186 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

187 default=None, 

188 description="The maximum number of scheduled runs for the schedule.", 

189 ) 

190 parameters: Optional[dict[str, Any]] = Field( 1a

191 default=None, 

192 description="Parameter overrides for the schedule.", 

193 ) 

194 slug: Optional[str] = Field( 1a

195 default=None, 

196 description="A unique identifier for the schedule.", 

197 ) 

198 

199 @field_validator("max_scheduled_runs") 1a

200 @classmethod 1a

201 def validate_max_scheduled_runs(cls, v: Optional[int]) -> Optional[int]: 1a

202 return validate_schedule_max_scheduled_runs( 

203 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value() 

204 ) 

205 

206 

207class DeploymentCreate(ActionBaseModel): 1a

208 """Data used by the Prefect REST API to create a deployment.""" 

209 

210 @model_validator(mode="before") 1a

211 @classmethod 1a

212 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

213 return remove_old_deployment_fields(values) 

214 

215 @field_validator("description", "tags", mode="before") 1a

216 @classmethod 1a

217 def convert_to_strings( 1a

218 cls, values: Optional[Union[str, list[str]]] 

219 ) -> Union[str, list[str]]: 

220 return convert_to_strings(values) 

221 

222 name: NameOrEmpty = Field(..., description="The name of the deployment.") 1a

223 flow_id: UUID = Field(..., description="The ID of the flow to deploy.") 1a

224 paused: Optional[bool] = Field(default=None) 1a

225 schedules: list[DeploymentScheduleCreate] = Field( 1a

226 default_factory=lambda: [], 

227 description="A list of schedules for the deployment.", 

228 ) 

229 concurrency_limit: Optional[int] = Field( 1a

230 default=None, 

231 description="The concurrency limit for the deployment.", 

232 ) 

233 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a

234 default=None, 

235 description="The concurrency options for the deployment.", 

236 ) 

237 enforce_parameter_schema: Optional[bool] = Field( 1a

238 default=None, 

239 description=( 

240 "Whether or not the deployment should enforce the parameter schema." 

241 ), 

242 ) 

243 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a

244 default_factory=lambda: {"type": "object", "properties": {}} 

245 ) 

246 parameters: dict[str, Any] = Field( 1a

247 default_factory=dict, 

248 description="Parameters for flow runs scheduled by the deployment.", 

249 ) 

250 tags: list[str] = Field(default_factory=list) 1a

251 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

252 pull_steps: Optional[list[dict[str, Any]]] = Field(default=None) 1a

253 

254 work_queue_name: Optional[str] = Field(default=None) 1a

255 work_pool_name: Optional[str] = Field( 1a

256 default=None, 

257 description="The name of the deployment's work pool.", 

258 examples=["my-work-pool"], 

259 ) 

260 storage_document_id: Optional[UUID] = Field(default=None) 1a

261 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a

262 description: Optional[str] = Field(default=None) 1a

263 path: Optional[str] = Field(default=None) 1a

264 entrypoint: Optional[str] = Field(default=None) 1a

265 job_variables: dict[str, Any] = Field( 1a

266 default_factory=dict, 

267 description="Overrides to apply to flow run infrastructure at runtime.", 

268 ) 

269 

270 # Versionining 

271 version: Optional[str] = Field(default=None) 1a

272 version_info: Optional[objects.VersionInfo] = Field( 1a

273 default=None, description="Version information for the deployment." 

274 ) 

275 

276 # Branching 

277 branch: Optional[str] = Field( 1a

278 default=None, description="The branch of the deployment." 

279 ) 

280 base: Optional[UUID] = Field( 1a

281 default=None, description="The base deployment of the deployment." 

282 ) 

283 root: Optional[UUID] = Field( 1a

284 default=None, description="The root deployment of the deployment." 

285 ) 

286 

287 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a

288 """Check that the combination of base_job_template defaults 

289 and job_variables conforms to the specified schema. 

290 """ 

291 variables_schema = deepcopy(base_job_template.get("variables")) 

292 

293 if variables_schema is not None: 

294 # jsonschema considers required fields, even if that field has a default, 

295 # to still be required. To get around this we remove the fields from 

296 # required if there is a default present. 

297 required = variables_schema.get("required") 

298 properties = variables_schema.get("properties") 

299 if required is not None and properties is not None: 

300 for k, v in properties.items(): 

301 if "default" in v and k in required: 

302 required.remove(k) 

303 

304 jsonschema.validate(self.job_variables, variables_schema) 

305 

306 

307class DeploymentUpdate(ActionBaseModel): 1a

308 """Data used by the Prefect REST API to update a deployment.""" 

309 

310 @model_validator(mode="before") 1a

311 @classmethod 1a

312 def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

313 return remove_old_deployment_fields(values) 

314 

315 version: Optional[str] = Field(default=None) 1a

316 version_info: Optional[objects.VersionInfo] = Field( 1a

317 default=None, description="Version information for the deployment." 

318 ) 

319 description: Optional[str] = Field(default=None) 1a

320 parameters: Optional[dict[str, Any]] = Field( 1a

321 default=None, 

322 description="Parameters for flow runs scheduled by the deployment.", 

323 ) 

324 paused: Optional[bool] = Field( 1a

325 default=None, description="Whether or not the deployment is paused." 

326 ) 

327 schedules: Optional[list[DeploymentScheduleUpdate]] = Field( 1a

328 default=None, 

329 description="A list of schedules for the deployment.", 

330 ) 

331 concurrency_limit: Optional[int] = Field( 1a

332 default=None, 

333 description="The concurrency limit for the deployment.", 

334 ) 

335 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a

336 default=None, 

337 description="The concurrency options for the deployment.", 

338 ) 

339 tags: list[str] = Field(default_factory=list) 1a

340 work_queue_name: Optional[str] = Field(default=None) 1a

341 work_pool_name: Optional[str] = Field( 1a

342 default=None, 

343 description="The name of the deployment's work pool.", 

344 examples=["my-work-pool"], 

345 ) 

346 path: Optional[str] = Field(default=None) 1a

347 job_variables: Optional[dict[str, Any]] = Field( 1a

348 default_factory=dict, 

349 description="Overrides to apply to flow run infrastructure at runtime.", 

350 ) 

351 entrypoint: Optional[str] = Field(default=None) 1a

352 storage_document_id: Optional[UUID] = Field(default=None) 1a

353 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a

354 enforce_parameter_schema: Optional[bool] = Field( 1a

355 default=None, 

356 description=( 

357 "Whether or not the deployment should enforce the parameter schema." 

358 ), 

359 ) 

360 parameter_openapi_schema: Optional[ParameterSchema] = Field( 1a

361 default_factory=lambda: {"type": "object", "properties": {}} 

362 ) 

363 pull_steps: Optional[list[dict[str, Any]]] = Field(default=None) 1a

364 

365 def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: 1a

366 """Check that the combination of base_job_template defaults 

367 and job_variables conforms to the specified schema. 

368 """ 

369 variables_schema = deepcopy(base_job_template.get("variables")) 

370 

371 if variables_schema is not None: 

372 # jsonschema considers required fields, even if that field has a default, 

373 # to still be required. To get around this we remove the fields from 

374 # required if there is a default present. 

375 required = variables_schema.get("required") 

376 properties = variables_schema.get("properties") 

377 if required is not None and properties is not None: 

378 for k, v in properties.items(): 

379 if "default" in v and k in required: 

380 required.remove(k) 

381 

382 if variables_schema is not None: 

383 jsonschema.validate(self.job_variables, variables_schema) 

384 

385 

386class DeploymentBranch(ActionBaseModel): 1a

387 branch: str = Field(..., description="Name of the branch to create") 1a

388 options: objects.DeploymentBranchingOptions = Field( 1a

389 default_factory=objects.DeploymentBranchingOptions, 

390 description="Configuration options for how the deployment should be branched", 

391 ) 

392 overrides: Optional[DeploymentUpdate] = Field( 1a

393 default=None, 

394 description="Optional values to override in the branched deployment", 

395 ) 

396 

397 @field_validator("branch") 1a

398 @classmethod 1a

399 def validate_branch_length(cls, v: str) -> str: 1a

400 if len(v.strip()) < 1: 

401 raise ValueError("Branch name cannot be empty or contain only whitespace") 

402 return v 

403 

404 

405class FlowRunUpdate(ActionBaseModel): 1a

406 """Data used by the Prefect REST API to update a flow run.""" 

407 

408 name: Optional[str] = Field(default=None) 1a

409 flow_version: Optional[str] = Field(default=None) 1a

410 parameters: Optional[dict[str, Any]] = Field(default_factory=dict) 1a

411 empirical_policy: objects.FlowRunPolicy = Field( 1a

412 default_factory=objects.FlowRunPolicy 

413 ) 

414 tags: list[str] = Field(default_factory=list) 1a

415 infrastructure_pid: Optional[str] = Field(default=None) 1a

416 job_variables: Optional[dict[str, Any]] = Field(default=None) 1a

417 

418 

419class TaskRunCreate(ActionBaseModel): 1a

420 """Data used by the Prefect REST API to create a task run""" 

421 

422 id: Optional[UUID] = Field(None, description="The ID to assign to the task run") 1a

423 # TaskRunCreate states must be provided as StateCreate objects 

424 state: Optional[StateCreate] = Field( 1a

425 default=None, description="The state of the task run to create" 

426 ) 

427 

428 name: Optional[str] = Field( 1a

429 default=None, 

430 description="The name of the task run", 

431 ) 

432 flow_run_id: Optional[UUID] = Field(default=None) 1a

433 task_key: str = Field( 1a

434 default=..., description="A unique identifier for the task being run." 

435 ) 

436 dynamic_key: str = Field( 1a

437 default=..., 

438 description=( 

439 "A dynamic key used to differentiate between multiple runs of the same task" 

440 " within the same flow run." 

441 ), 

442 ) 

443 cache_key: Optional[str] = Field(default=None) 1a

444 cache_expiration: Optional[objects.DateTime] = Field(default=None) 1a

445 task_version: Optional[str] = Field(default=None) 1a

446 empirical_policy: objects.TaskRunPolicy = Field( 1a

447 default_factory=objects.TaskRunPolicy, 

448 ) 

449 tags: list[str] = Field(default_factory=list) 1a

450 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

451 task_inputs: dict[ 1a

452 str, 

453 list[ 

454 Union[ 

455 objects.TaskRunResult, 

456 objects.FlowRunResult, 

457 objects.Parameter, 

458 objects.Constant, 

459 ] 

460 ], 

461 ] = Field(default_factory=dict) 

462 

463 

464class TaskRunUpdate(ActionBaseModel): 1a

465 """Data used by the Prefect REST API to update a task run""" 

466 

467 name: Optional[str] = Field(default=None) 1a

468 

469 

470class FlowRunCreate(ActionBaseModel): 1a

471 """Data used by the Prefect REST API to create a flow run.""" 

472 

473 # FlowRunCreate states must be provided as StateCreate objects 

474 state: Optional[StateCreate] = Field( 1a

475 default=None, description="The state of the flow run to create" 

476 ) 

477 

478 name: Optional[str] = Field(default=None, description="The name of the flow run.") 1a

479 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

480 deployment_id: Optional[UUID] = Field(default=None) 1a

481 flow_version: Optional[str] = Field(default=None) 1a

482 parameters: dict[str, Any] = Field( 1a

483 default_factory=dict, description="The parameters for the flow run." 

484 ) 

485 context: dict[str, Any] = Field( 1a

486 default_factory=dict, description="The context for the flow run." 

487 ) 

488 parent_task_run_id: Optional[UUID] = Field(default=None) 1a

489 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a

490 empirical_policy: objects.FlowRunPolicy = Field( 1a

491 default_factory=objects.FlowRunPolicy 

492 ) 

493 tags: list[str] = Field(default_factory=list) 1a

494 idempotency_key: Optional[str] = Field(default=None) 1a

495 

496 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

497 work_pool_name: Optional[str] = Field(default=None) 1a

498 work_queue_name: Optional[str] = Field(default=None) 1a

499 job_variables: Optional[dict[str, Any]] = Field(default=None) 1a

500 

501 

502class DeploymentFlowRunCreate(ActionBaseModel): 1a

503 """Data used by the Prefect REST API to create a flow run from a deployment.""" 

504 

505 # FlowRunCreate states must be provided as StateCreate objects 

506 state: Optional[StateCreate] = Field( 1a

507 default=None, description="The state of the flow run to create" 

508 ) 

509 

510 name: Optional[str] = Field(default=None, description="The name of the flow run.") 1a

511 parameters: dict[str, Any] = Field( 1a

512 default_factory=dict, description="The parameters for the flow run." 

513 ) 

514 enforce_parameter_schema: Optional[bool] = Field( 1a

515 default=None, 

516 description="Whether or not to enforce the parameter schema on this run.", 

517 ) 

518 context: dict[str, Any] = Field( 1a

519 default_factory=dict, description="The context for the flow run." 

520 ) 

521 infrastructure_document_id: Optional[UUID] = Field(default=None) 1a

522 empirical_policy: objects.FlowRunPolicy = Field( 1a

523 default_factory=objects.FlowRunPolicy 

524 ) 

525 tags: list[str] = Field(default_factory=list) 1a

526 idempotency_key: Optional[str] = Field(default=None) 1a

527 parent_task_run_id: Optional[UUID] = Field(default=None) 1a

528 work_queue_name: Optional[str] = Field(default=None) 1a

529 job_variables: Optional[dict[str, Any]] = Field(default=None) 1a

530 labels: KeyValueLabelsField = Field(default_factory=dict) 1a

531 

532 @model_validator(mode="before") 1a

533 def convert_parameters_to_plain_data(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

534 if "parameters" in values: 

535 

536 def convert_value(value: Any) -> Any: 

537 if isinstance(value, BaseModel): 

538 return value.model_dump(mode="json") 

539 return value 

540 

541 values["parameters"] = visit_collection( 

542 values["parameters"], convert_value, return_data=True 

543 ) 

544 return values 

545 

546 @field_serializer("parameters", when_used="json") 1a

547 def serialize_parameters(self, value: dict[str, Any]) -> dict[str, Any]: 1a

548 """Serialize datetime types as ISO strings instead of timestamps. 

549 

550 PrefectBaseModel has ser_json_timedelta='float' to serialize timedeltas as floats, 

551 but this also causes datetime/date/time to serialize as timestamps. This serializer 

552 overrides that behavior for datetime types while preserving float serialization for 

553 timedeltas. 

554 """ 

555 

556 def convert_temporal(v: Any) -> Any: 

557 if isinstance(v, (datetime.datetime, datetime.date, datetime.time)): 

558 return v.isoformat() 

559 elif isinstance(v, dict): 

560 return {k: convert_temporal(val) for k, val in v.items()} 

561 elif isinstance(v, list): 

562 return [convert_temporal(item) for item in v] 

563 return v 

564 

565 return {k: convert_temporal(v) for k, v in value.items()} 

566 

567 

568class SavedSearchCreate(ActionBaseModel): 1a

569 """Data used by the Prefect REST API to create a saved search.""" 

570 

571 name: str = Field(default=..., description="The name of the saved search.") 1a

572 filters: list[objects.SavedSearchFilter] = Field( 1a

573 default_factory=lambda: [], description="The filter set for the saved search." 

574 ) 

575 

576 

577class ConcurrencyLimitCreate(ActionBaseModel): 1a

578 """Data used by the Prefect REST API to create a concurrency limit.""" 

579 

580 tag: str = Field( 1a

581 default=..., description="A tag the concurrency limit is applied to." 

582 ) 

583 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a

584 

585 

586class ConcurrencyLimitV2Create(ActionBaseModel): 1a

587 """Data used by the Prefect REST API to create a v2 concurrency limit.""" 

588 

589 active: bool = Field( 1a

590 default=True, description="Whether the concurrency limit is active." 

591 ) 

592 name: Name = Field(default=..., description="The name of the concurrency limit.") 1a

593 limit: NonNegativeInteger = Field(default=..., description="The concurrency limit.") 1a

594 active_slots: NonNegativeInteger = Field( 1a

595 default=0, description="The number of active slots." 

596 ) 

597 denied_slots: NonNegativeInteger = Field( 1a

598 default=0, description="The number of denied slots." 

599 ) 

600 slot_decay_per_second: NonNegativeFloat = Field( 1a

601 default=0, 

602 description="The decay rate for active slots when used as a rate limit.", 

603 ) 

604 

605 

606class ConcurrencyLimitV2Update(ActionBaseModel): 1a

607 """Data used by the Prefect REST API to update a v2 concurrency limit.""" 

608 

609 active: Optional[bool] = Field(default=None) 1a

610 name: Optional[Name] = Field(default=None) 1a

611 limit: Optional[NonNegativeInteger] = Field(default=None) 1a

612 active_slots: Optional[NonNegativeInteger] = Field(default=None) 1a

613 denied_slots: Optional[NonNegativeInteger] = Field(default=None) 1a

614 slot_decay_per_second: Optional[NonNegativeFloat] = Field(default=None) 1a

615 

616 

617class BlockTypeCreate(ActionBaseModel): 1a

618 """Data used by the Prefect REST API to create a block type.""" 

619 

620 name: str = Field(default=..., description="A block type's name") 1a

621 slug: BlockTypeSlug = Field(default=..., description="A block type's slug") 1a

622 logo_url: Optional[objects.HttpUrl] = Field( 1a

623 default=None, description="Web URL for the block type's logo" 

624 ) 

625 documentation_url: Optional[objects.HttpUrl] = Field( 1a

626 default=None, description="Web URL for the block type's documentation" 

627 ) 

628 description: Optional[str] = Field( 1a

629 default=None, 

630 description="A short blurb about the corresponding block's intended use", 

631 ) 

632 code_example: Optional[str] = Field( 1a

633 default=None, 

634 description="A code snippet demonstrating use of the corresponding block", 

635 ) 

636 

637 

638class BlockTypeUpdate(ActionBaseModel): 1a

639 """Data used by the Prefect REST API to update a block type.""" 

640 

641 logo_url: Optional[objects.HttpUrl] = Field(default=None) 1a

642 documentation_url: Optional[objects.HttpUrl] = Field(default=None) 1a

643 description: Optional[str] = Field(default=None) 1a

644 code_example: Optional[str] = Field(default=None) 1a

645 

646 @classmethod 1a

647 def updatable_fields(cls) -> set[str]: 1a

648 return get_class_fields_only(cls) 

649 

650 

651class BlockSchemaCreate(ActionBaseModel): 1a

652 """Data used by the Prefect REST API to create a block schema.""" 

653 

654 fields: dict[str, Any] = Field( 1a

655 default_factory=dict, description="The block schema's field schema" 

656 ) 

657 block_type_id: Optional[UUID] = Field(default=None) 1a

658 capabilities: list[str] = Field( 1a

659 default_factory=list, 

660 description="A list of Block capabilities", 

661 ) 

662 version: str = Field( 1a

663 default=objects.DEFAULT_BLOCK_SCHEMA_VERSION, 

664 description="Human readable identifier for the block schema", 

665 ) 

666 

667 

668class BlockDocumentCreate(ActionBaseModel): 1a

669 """Data used by the Prefect REST API to create a block document.""" 

670 

671 name: Optional[BlockDocumentName] = Field( 1a

672 default=None, description="The name of the block document" 

673 ) 

674 data: dict[str, Any] = Field( 1a

675 default_factory=dict, description="The block document's data" 

676 ) 

677 block_schema_id: UUID = Field( 1a

678 default=..., description="The block schema ID for the block document" 

679 ) 

680 block_type_id: UUID = Field( 1a

681 default=..., description="The block type ID for the block document" 

682 ) 

683 is_anonymous: bool = Field( 1a

684 default=False, 

685 description=( 

686 "Whether the block is anonymous (anonymous blocks are usually created by" 

687 " Prefect automatically)" 

688 ), 

689 ) 

690 

691 @model_validator(mode="before") 1a

692 def validate_name_is_present_if_not_anonymous( 1a

693 cls, values: dict[str, Any] 

694 ) -> dict[str, Any]: 

695 return validate_name_present_on_nonanonymous_blocks(values) 

696 

697 

698class BlockDocumentUpdate(ActionBaseModel): 1a

699 """Data used by the Prefect REST API to update a block document.""" 

700 

701 block_schema_id: Optional[UUID] = Field( 1a

702 default=None, description="A block schema ID" 

703 ) 

704 data: dict[str, Any] = Field( 1a

705 default_factory=dict, description="The block document's data" 

706 ) 

707 merge_existing_data: bool = Field( 1a

708 default=True, 

709 description="Whether to merge the existing data with the new data or replace it", 

710 ) 

711 

712 

713class BlockDocumentReferenceCreate(ActionBaseModel): 1a

714 """Data used to create block document reference.""" 

715 

716 id: UUID = Field(default_factory=uuid4) 1a

717 parent_block_document_id: UUID = Field( 1a

718 default=..., description="ID of block document the reference is nested within" 

719 ) 

720 reference_block_document_id: UUID = Field( 1a

721 default=..., description="ID of the nested block document" 

722 ) 

723 name: str = Field( 1a

724 default=..., description="The name that the reference is nested under" 

725 ) 

726 

727 

728class LogCreate(ActionBaseModel): 1a

729 """Data used by the Prefect REST API to create a log.""" 

730 

731 name: str = Field(default=..., description="The logger name.") 1a

732 level: int = Field(default=..., description="The log level.") 1a

733 message: str = Field(default=..., description="The log message.") 1a

734 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a

735 flow_run_id: Optional[UUID] = Field(default=None) 1a

736 task_run_id: Optional[UUID] = Field(default=None) 1a

737 worker_id: Optional[UUID] = Field(default=None) 1a

738 

739 def model_dump(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 1a

740 """ 

741 The worker_id field is only included in logs sent to Prefect Cloud. 

742 If it's unset, we should not include it in the log payload. 

743 """ 

744 data = super().model_dump(*args, **kwargs) 

745 if self.worker_id is None: 

746 data.pop("worker_id") 

747 return data 

748 

749 

750class WorkPoolCreate(ActionBaseModel): 1a

751 """Data used by the Prefect REST API to create a work pool.""" 

752 

753 name: NonEmptyishName = Field( 1a

754 description="The name of the work pool.", 

755 ) 

756 description: Optional[str] = Field(default=None) 1a

757 type: str = Field( 1a

758 description="The work pool type.", default="prefect-agent" 

759 ) # TODO: change default 

760 base_job_template: dict[str, Any] = Field( 1a

761 default_factory=dict, 

762 description="The base job template for the work pool.", 

763 ) 

764 is_paused: bool = Field( 1a

765 default=False, 

766 description="Whether the work pool is paused.", 

767 ) 

768 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

769 default=None, description="A concurrency limit for the work pool." 

770 ) 

771 storage_configuration: WorkPoolStorageConfiguration = Field( 1a

772 default_factory=WorkPoolStorageConfiguration, 

773 description="A storage configuration for the work pool.", 

774 ) 

775 

776 

777class WorkPoolUpdate(ActionBaseModel): 1a

778 """Data used by the Prefect REST API to update a work pool.""" 

779 

780 description: Optional[str] = Field(default=None) 1a

781 is_paused: Optional[bool] = Field(default=None) 1a

782 base_job_template: Optional[dict[str, Any]] = Field(default=None) 1a

783 concurrency_limit: Optional[int] = Field(default=None) 1a

784 storage_configuration: Optional[WorkPoolStorageConfiguration] = Field( 1a

785 default=None, 

786 description="A storage configuration for the work pool.", 

787 ) 

788 

789 

790class WorkQueueCreate(ActionBaseModel): 1a

791 """Data used by the Prefect REST API to create a work queue.""" 

792 

793 name: str = Field(default=..., description="The name of the work queue.") 1a

794 description: Optional[str] = Field(default=None) 1a

795 is_paused: bool = Field( 1a

796 default=False, 

797 description="Whether the work queue is paused.", 

798 ) 

799 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

800 default=None, 

801 description="A concurrency limit for the work queue.", 

802 ) 

803 priority: Optional[PositiveInteger] = Field( 1a

804 default=None, 

805 description=( 

806 "The queue's priority. Lower values are higher priority (1 is the highest)." 

807 ), 

808 ) 

809 

810 # DEPRECATED 

811 

812 filter: Optional[objects.QueueFilter] = Field( 1a

813 None, 

814 description="DEPRECATED: Filter criteria for the work queue.", 

815 deprecated=True, 

816 ) 

817 

818 

819class WorkQueueUpdate(ActionBaseModel): 1a

820 """Data used by the Prefect REST API to update a work queue.""" 

821 

822 name: Optional[str] = Field(default=None) 1a

823 description: Optional[str] = Field(default=None) 1a

824 is_paused: bool = Field( 1a

825 default=False, description="Whether or not the work queue is paused." 

826 ) 

827 concurrency_limit: Optional[NonNegativeInteger] = Field(default=None) 1a

828 priority: Optional[PositiveInteger] = Field( 1a

829 None, description="The queue's priority." 

830 ) 

831 last_polled: Optional[DateTime] = Field(default=None) 1a

832 

833 # DEPRECATED 

834 

835 filter: Optional[objects.QueueFilter] = Field( 1a

836 None, 

837 description="DEPRECATED: Filter criteria for the work queue.", 

838 deprecated=True, 

839 ) 

840 

841 

842class ArtifactCreate(ActionBaseModel): 1a

843 """Data used by the Prefect REST API to create an artifact.""" 

844 

845 key: Optional[ArtifactKey] = Field(default=None) 1a

846 type: Optional[str] = Field(default=None) 1a

847 description: Optional[str] = Field(default=None) 1a

848 data: Optional[Union[dict[str, Any], Any]] = Field(default=None) 1a

849 metadata_: Optional[dict[str, str]] = Field(default=None) 1a

850 flow_run_id: Optional[UUID] = Field(default=None) 1a

851 task_run_id: Optional[UUID] = Field(default=None) 1a

852 

853 

854class ArtifactUpdate(ActionBaseModel): 1a

855 """Data used by the Prefect REST API to update an artifact.""" 

856 

857 data: Optional[Union[dict[str, Any], Any]] = Field(default=None) 1a

858 description: Optional[str] = Field(default=None) 1a

859 metadata_: Optional[dict[str, str]] = Field(default=None) 1a

860 

861 

862class VariableCreate(ActionBaseModel): 1a

863 """Data used by the Prefect REST API to create a Variable.""" 

864 

865 name: VariableName = Field(default=...) 1a

866 value: StrictVariableValue = Field( 1a

867 default=..., 

868 description="The value of the variable", 

869 examples=["my-value"], 

870 ) 

871 tags: Optional[list[str]] = Field(default=None) 1a

872 

873 

874class VariableUpdate(ActionBaseModel): 1a

875 """Data used by the Prefect REST API to update a Variable.""" 

876 

877 name: Optional[VariableName] = Field(default=None) 1a

878 value: StrictVariableValue = Field( 1a

879 default=None, 

880 description="The value of the variable", 

881 examples=["my-value"], 

882 ) 

883 tags: Optional[list[str]] = Field(default=None) 1a

884 

885 

886class GlobalConcurrencyLimitCreate(ActionBaseModel): 1a

887 """Data used by the Prefect REST API to create a global concurrency limit.""" 

888 

889 name: Name = Field(description="The name of the global concurrency limit.") 1a

890 limit: NonNegativeInteger = Field( 1a

891 description=( 

892 "The maximum number of slots that can be occupied on this concurrency" 

893 " limit." 

894 ) 

895 ) 

896 active: Optional[bool] = Field( 1a

897 default=True, 

898 description="Whether or not the concurrency limit is in an active state.", 

899 ) 

900 active_slots: Optional[NonNegativeInteger] = Field( 1a

901 default=0, 

902 description="Number of tasks currently using a concurrency slot.", 

903 ) 

904 slot_decay_per_second: Optional[NonNegativeFloat] = Field( 1a

905 default=0.0, 

906 description=( 

907 "Controls the rate at which slots are released when the concurrency limit" 

908 " is used as a rate limit." 

909 ), 

910 ) 

911 

912 

913class GlobalConcurrencyLimitUpdate(ActionBaseModel): 1a

914 """Data used by the Prefect REST API to update a global concurrency limit.""" 

915 

916 name: Optional[Name] = Field(default=None) 1a

917 limit: Optional[NonNegativeInteger] = Field(default=None) 1a

918 active: Optional[bool] = Field(default=None) 1a

919 active_slots: Optional[NonNegativeInteger] = Field(default=None) 1a

920 slot_decay_per_second: Optional[NonNegativeFloat] = Field(default=None) 1a