Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/core.py: 80%

398 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Full schemas of Prefect REST API objects. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import datetime 1a

8from typing import ( 1a

9 TYPE_CHECKING, 

10 Annotated, 

11 Any, 

12 ClassVar, 

13 Dict, 

14 List, 

15 Optional, 

16 Type, 

17 Union, 

18) 

19from uuid import UUID 1a

20 

21from pydantic import ( 1a

22 AfterValidator, 

23 BaseModel, 

24 ConfigDict, 

25 Field, 

26 StrictBool, 

27 StrictFloat, 

28 StrictInt, 

29 field_validator, 

30 model_validator, 

31) 

32from sqlalchemy.ext.asyncio import AsyncSession 1a

33from typing_extensions import Literal, Self 1a

34 

35from prefect._internal.schemas.validators import ( 1a

36 get_or_create_run_name, 

37 list_length_50_or_less, 

38 set_run_policy_deprecated_fields, 

39 validate_cache_key_length, 

40 validate_default_queue_id_not_none, 

41 validate_max_metadata_length, 

42 validate_name_present_on_nonanonymous_blocks, 

43 validate_not_negative, 

44 validate_parent_and_ref_diff, 

45 validate_schedule_max_scheduled_runs, 

46) 

47from prefect.server.schemas import schedules, states 1a

48from prefect.server.schemas.statuses import WorkPoolStatus 1a

49from prefect.server.utilities.schemas.bases import ( 1a

50 ORMBaseModel, 

51 PrefectBaseModel, 

52 TimeSeriesBaseModel, 

53) 

54from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS 1a

55from prefect.types import ( 1a

56 MAX_VARIABLE_NAME_LENGTH, 

57 DateTime, 

58 LaxUrl, 

59 Name, 

60 NameOrEmpty, 

61 NonEmptyishName, 

62 NonNegativeInteger, 

63 PositiveInteger, 

64 StrictVariableValue, 

65) 

66from prefect.types._datetime import now 1a

67from prefect.types.names import raise_on_name_alphanumeric_dashes_only 1a

68from prefect.utilities.collections import ( 1a

69 AutoEnum, 

70 dict_to_flatdict, 

71 flatdict_to_dict, 

72) 

73from prefect.utilities.names import generate_slug, obfuscate 1a

74 

75if TYPE_CHECKING: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true1a

76 from prefect.server.database import orm_models 

77 

78DEFAULT_BLOCK_SCHEMA_VERSION = "non-versioned" 1a

79 

80KeyValueLabels = dict[str, Union[StrictBool, StrictInt, StrictFloat, str]] 1a

81 

82 

83class Flow(ORMBaseModel): 1a

84 """An ORM representation of flow data.""" 

85 

86 name: Name = Field( 1a

87 default=..., description="The name of the flow", examples=["my-flow"] 

88 ) 

89 tags: List[str] = Field( 1a

90 default_factory=list, 

91 description="A list of flow tags", 

92 examples=[["tag-1", "tag-2"]], 

93 ) 

94 labels: Union[KeyValueLabels, None] = Field( 1a

95 default_factory=dict, 

96 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

97 examples=[{"key": "value1", "key2": 42}], 

98 ) 

99 

100 

101class FlowRunPolicy(PrefectBaseModel): 1a

102 """Defines of how a flow run should retry.""" 

103 

104 max_retries: int = Field( 1a

105 default=0, 

106 description=( 

107 "The maximum number of retries. Field is not used. Please use `retries`" 

108 " instead." 

109 ), 

110 deprecated=True, 

111 ) 

112 retry_delay_seconds: float = Field( 1a

113 default=0, 

114 description=( 

115 "The delay between retries. Field is not used. Please use `retry_delay`" 

116 " instead." 

117 ), 

118 deprecated=True, 

119 ) 

120 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a

121 retry_delay: Optional[int] = Field( 1a

122 default=None, description="The delay time between retries, in seconds." 

123 ) 

124 pause_keys: Optional[set[str]] = Field( 1a

125 default_factory=set, description="Tracks pauses this run has observed." 

126 ) 

127 resuming: Optional[bool] = Field( 1a

128 default=False, description="Indicates if this run is resuming from a pause." 

129 ) 

130 retry_type: Optional[Literal["in_process", "reschedule"]] = Field( 1a

131 default=None, description="The type of retry this run is undergoing." 

132 ) 

133 

134 @model_validator(mode="before") 1a

135 def populate_deprecated_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

136 return set_run_policy_deprecated_fields(values) 

137 

138 

139class CreatedBy(BaseModel): 1a

140 id: Optional[UUID] = Field( 1a

141 default=None, description="The id of the creator of the object." 

142 ) 

143 type: Optional[str] = Field( 1a

144 default=None, description="The type of the creator of the object." 

145 ) 

146 display_value: Optional[str] = Field( 1a

147 default=None, description="The display value for the creator." 

148 ) 

149 

150 

151class UpdatedBy(BaseModel): 1a

152 id: Optional[UUID] = Field( 1a

153 default=None, description="The id of the updater of the object." 

154 ) 

155 type: Optional[str] = Field( 1a

156 default=None, description="The type of the updater of the object." 

157 ) 

158 display_value: Optional[str] = Field( 1a

159 default=None, description="The display value for the updater." 

160 ) 

161 

162 

163class ConcurrencyLimitStrategy(AutoEnum): 1a

164 """ 

165 Enumeration of concurrency collision strategies. 

166 """ 

167 

168 ENQUEUE = AutoEnum.auto() 1a

169 CANCEL_NEW = AutoEnum.auto() 1a

170 

171 

172class ConcurrencyOptions(BaseModel): 1a

173 """ 

174 Class for storing the concurrency config in database. 

175 """ 

176 

177 collision_strategy: ConcurrencyLimitStrategy 1a

178 

179 

180class FlowRun(TimeSeriesBaseModel, ORMBaseModel): 1a

181 """An ORM representation of flow run data.""" 

182 

183 name: str = Field( 1a

184 default_factory=lambda: generate_slug(2), 

185 description=( 

186 "The name of the flow run. Defaults to a random slug if not specified." 

187 ), 

188 examples=["my-flow-run"], 

189 ) 

190 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a

191 state_id: Optional[UUID] = Field( 1a

192 default=None, description="The id of the flow run's current state." 

193 ) 

194 deployment_id: Optional[UUID] = Field( 1a

195 default=None, 

196 description=( 

197 "The id of the deployment associated with this flow run, if available." 

198 ), 

199 ) 

200 deployment_version: Optional[str] = Field( 1a

201 default=None, 

202 description="The version of the deployment associated with this flow run.", 

203 examples=["1.0"], 

204 ) 

205 work_queue_name: Optional[str] = Field( 1a

206 default=None, description="The work queue that handled this flow run." 

207 ) 

208 flow_version: Optional[str] = Field( 1a

209 default=None, 

210 description="The version of the flow executed in this flow run.", 

211 examples=["1.0"], 

212 ) 

213 parameters: Dict[str, Any] = Field( 1a

214 default_factory=dict, description="Parameters for the flow run." 

215 ) 

216 idempotency_key: Optional[str] = Field( 1a

217 default=None, 

218 description=( 

219 "An optional idempotency key for the flow run. Used to ensure the same flow" 

220 " run is not created multiple times." 

221 ), 

222 ) 

223 context: Dict[str, Any] = Field( 1a

224 default_factory=dict, 

225 description="Additional context for the flow run.", 

226 examples=[{"my_var": "my_value"}], 

227 ) 

228 empirical_policy: FlowRunPolicy = Field( 1a

229 default_factory=FlowRunPolicy, 

230 ) 

231 tags: List[str] = Field( 1a

232 default_factory=list, 

233 description="A list of tags on the flow run", 

234 examples=[["tag-1", "tag-2"]], 

235 ) 

236 labels: Union[KeyValueLabels, None] = Field( 1a

237 default_factory=dict, 

238 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

239 examples=[{"key": "value1", "key2": 42}], 

240 ) 

241 parent_task_run_id: Optional[UUID] = Field( 1a

242 default=None, 

243 description=( 

244 "If the flow run is a subflow, the id of the 'dummy' task in the parent" 

245 " flow used to track subflow state." 

246 ), 

247 ) 

248 

249 state_type: Optional[states.StateType] = Field( 1a

250 default=None, description="The type of the current flow run state." 

251 ) 

252 state_name: Optional[str] = Field( 1a

253 default=None, description="The name of the current flow run state." 

254 ) 

255 run_count: int = Field( 1a

256 default=0, description="The number of times the flow run was executed." 

257 ) 

258 expected_start_time: Optional[DateTime] = Field( 1a

259 default=None, 

260 description="The flow run's expected start time.", 

261 ) 

262 next_scheduled_start_time: Optional[DateTime] = Field( 1a

263 default=None, 

264 description="The next time the flow run is scheduled to start.", 

265 ) 

266 start_time: Optional[DateTime] = Field( 1a

267 default=None, description="The actual start time." 

268 ) 

269 end_time: Optional[DateTime] = Field( 1a

270 default=None, description="The actual end time." 

271 ) 

272 total_run_time: datetime.timedelta = Field( 1a

273 default=datetime.timedelta(0), 

274 description=( 

275 "Total run time. If the flow run was executed multiple times, the time of" 

276 " each run will be summed." 

277 ), 

278 ) 

279 estimated_run_time: datetime.timedelta = Field( 1a

280 default=datetime.timedelta(0), 

281 description="A real-time estimate of the total run time.", 

282 ) 

283 estimated_start_time_delta: datetime.timedelta = Field( 1a

284 default=datetime.timedelta(0), 

285 description="The difference between actual and expected start time.", 

286 ) 

287 auto_scheduled: bool = Field( 1a

288 default=False, 

289 description="Whether or not the flow run was automatically scheduled.", 

290 ) 

291 infrastructure_document_id: Optional[UUID] = Field( 1a

292 default=None, 

293 description="The block document defining infrastructure to use this flow run.", 

294 ) 

295 infrastructure_pid: Optional[str] = Field( 1a

296 default=None, 

297 description="The id of the flow run as returned by an infrastructure block.", 

298 ) 

299 created_by: Optional[CreatedBy] = Field( 1a

300 default=None, 

301 description="Optional information about the creator of this flow run.", 

302 ) 

303 work_queue_id: Optional[UUID] = Field( 1a

304 default=None, description="The id of the run's work pool queue." 

305 ) 

306 

307 # relationships 

308 # flow: Flow = None 

309 # task_runs: List["TaskRun"] = Field(default_factory=list) 

310 state: Optional[states.State] = Field( 1a

311 default=None, description="The current state of the flow run." 

312 ) 

313 # parent_task_run: "TaskRun" = None 

314 

315 job_variables: Optional[Dict[str, Any]] = Field( 1a

316 default=None, 

317 description="Variables used as overrides in the base job template", 

318 ) 

319 

320 @field_validator("name", mode="before") 1a

321 @classmethod 1a

322 def set_name(cls, name: str) -> str: 1a

323 return get_or_create_run_name(name) 

324 

325 def __eq__(self, other: Any) -> bool: 1a

326 """ 

327 Check for "equality" to another flow run schema 

328 

329 Estimates times are rolling and will always change with repeated queries for 

330 a flow run so we ignore them during equality checks. 

331 """ 

332 if isinstance(other, FlowRun): 

333 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"} 

334 return self.model_dump(exclude=exclude_fields) == other.model_dump( 

335 exclude=exclude_fields 

336 ) 

337 return super().__eq__(other) 

338 

339 

340class TaskRunPolicy(PrefectBaseModel): 1a

341 """Defines of how a task run should retry.""" 

342 

343 max_retries: int = Field( 1a

344 default=0, 

345 description=( 

346 "The maximum number of retries. Field is not used. Please use `retries`" 

347 " instead." 

348 ), 

349 deprecated=True, 

350 ) 

351 retry_delay_seconds: float = Field( 1a

352 default=0, 

353 description=( 

354 "The delay between retries. Field is not used. Please use `retry_delay`" 

355 " instead." 

356 ), 

357 deprecated=True, 

358 ) 

359 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a

360 retry_delay: Union[None, int, float, List[int], List[float]] = Field( 1a

361 default=None, 

362 description="A delay time or list of delay times between retries, in seconds.", 

363 ) 

364 retry_jitter_factor: Optional[float] = Field( 1a

365 default=None, description="Determines the amount a retry should jitter" 

366 ) 

367 

368 @model_validator(mode="before") 1a

369 def populate_deprecated_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

370 return set_run_policy_deprecated_fields(values) 

371 

372 @field_validator("retry_delay") 1a

373 @classmethod 1a

374 def validate_configured_retry_delays( 1a

375 cls, v: int | float | list[int] | list[float] | None 

376 ) -> int | float | list[int] | list[float] | None: 

377 return list_length_50_or_less(v) 

378 

379 @field_validator("retry_jitter_factor") 1a

380 @classmethod 1a

381 def validate_jitter_factor(cls, v: float | None) -> float | None: 1a

382 return validate_not_negative(v) 

383 

384 

385class RunInput(PrefectBaseModel): 1a

386 """ 

387 Base class for classes that represent inputs to runs, which 

388 could include, constants, parameters, task runs or flow runs. 

389 """ 

390 

391 model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True) 1a

392 

393 input_type: str 1a

394 

395 

396class TaskRunResult(RunInput): 1a

397 """Represents a task run result input to another task run.""" 

398 

399 input_type: Literal["task_run"] = "task_run" 1a

400 id: UUID 1a

401 

402 

403class FlowRunResult(RunInput): 1a

404 input_type: Literal["flow_run"] = "flow_run" 1a

405 id: UUID 1a

406 

407 

408class Parameter(RunInput): 1a

409 """Represents a parameter input to a task run.""" 

410 

411 input_type: Literal["parameter"] = "parameter" 1a

412 name: str 1a

413 

414 

415class Constant(RunInput): 1a

416 """Represents constant input value to a task run.""" 

417 

418 input_type: Literal["constant"] = "constant" 1a

419 type: str 1a

420 

421 

422class TaskRun(TimeSeriesBaseModel, ORMBaseModel): 1a

423 """An ORM representation of task run data.""" 

424 

425 name: str = Field( 1a

426 default_factory=lambda: generate_slug(2), examples=["my-task-run"] 

427 ) 

428 flow_run_id: Optional[UUID] = Field( 1a

429 default=None, description="The flow run id of the task run." 

430 ) 

431 task_key: str = Field( 1a

432 default=..., description="A unique identifier for the task being run." 

433 ) 

434 dynamic_key: str = Field( 1a

435 default=..., 

436 description=( 

437 "A dynamic key used to differentiate between multiple runs of the same task" 

438 " within the same flow run." 

439 ), 

440 ) 

441 cache_key: Optional[str] = Field( 1a

442 default=None, 

443 description=( 

444 "An optional cache key. If a COMPLETED state associated with this cache key" 

445 " is found, the cached COMPLETED state will be used instead of executing" 

446 " the task run." 

447 ), 

448 ) 

449 cache_expiration: Optional[DateTime] = Field( 1a

450 default=None, description="Specifies when the cached state should expire." 

451 ) 

452 task_version: Optional[str] = Field( 1a

453 default=None, description="The version of the task being run." 

454 ) 

455 empirical_policy: TaskRunPolicy = Field( 1a

456 default_factory=TaskRunPolicy, 

457 ) 

458 tags: List[str] = Field( 1a

459 default_factory=list, 

460 description="A list of tags for the task run.", 

461 examples=[["tag-1", "tag-2"]], 

462 ) 

463 labels: Union[KeyValueLabels, None] = Field( 1a

464 default_factory=dict, 

465 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

466 examples=[{"key": "value1", "key2": 42}], 

467 ) 

468 state_id: Optional[UUID] = Field( 1a

469 default=None, description="The id of the current task run state." 

470 ) 

471 task_inputs: Dict[ 1a

472 str, List[Union[TaskRunResult, FlowRunResult, Parameter, Constant]] 

473 ] = Field( 

474 default_factory=dict, 

475 description=( 

476 "Tracks the source of inputs to a task run. Used for internal bookkeeping." 

477 ), 

478 ) 

479 state_type: Optional[states.StateType] = Field( 1a

480 default=None, description="The type of the current task run state." 

481 ) 

482 state_name: Optional[str] = Field( 1a

483 default=None, description="The name of the current task run state." 

484 ) 

485 run_count: int = Field( 1a

486 default=0, description="The number of times the task run has been executed." 

487 ) 

488 flow_run_run_count: int = Field( 1a

489 default=0, 

490 description=( 

491 "If the parent flow has retried, this indicates the flow retry this run is" 

492 " associated with." 

493 ), 

494 ) 

495 expected_start_time: Optional[DateTime] = Field( 1a

496 default=None, 

497 description="The task run's expected start time.", 

498 ) 

499 

500 # the next scheduled start time will be populated 

501 # whenever the run is in a scheduled state 

502 next_scheduled_start_time: Optional[DateTime] = Field( 1a

503 default=None, 

504 description="The next time the task run is scheduled to start.", 

505 ) 

506 start_time: Optional[DateTime] = Field( 1a

507 default=None, description="The actual start time." 

508 ) 

509 end_time: Optional[DateTime] = Field( 1a

510 default=None, description="The actual end time." 

511 ) 

512 total_run_time: datetime.timedelta = Field( 1a

513 default=datetime.timedelta(0), 

514 description=( 

515 "Total run time. If the task run was executed multiple times, the time of" 

516 " each run will be summed." 

517 ), 

518 ) 

519 estimated_run_time: datetime.timedelta = Field( 1a

520 default=datetime.timedelta(0), 

521 description="A real-time estimate of total run time.", 

522 ) 

523 estimated_start_time_delta: datetime.timedelta = Field( 1a

524 default=datetime.timedelta(0), 

525 description="The difference between actual and expected start time.", 

526 ) 

527 

528 # relationships 

529 # flow_run: FlowRun = None 

530 # subflow_runs: List[FlowRun] = Field(default_factory=list) 

531 state: Optional[states.State] = Field( 1a

532 default=None, description="The current task run state." 

533 ) 

534 

535 @field_validator("name", mode="before") 1a

536 @classmethod 1a

537 def set_name(cls, name: str) -> str: 1a

538 return get_or_create_run_name(name) 

539 

540 @field_validator("cache_key") 1a

541 @classmethod 1a

542 def validate_cache_key(cls, cache_key: str) -> str: 1a

543 return validate_cache_key_length(cache_key) 

544 

545 

546class DeploymentSchedule(ORMBaseModel): 1a

547 deployment_id: Optional[UUID] = Field( 1a

548 default=None, 

549 description="The deployment id associated with this schedule.", 

550 ) 

551 schedule: schedules.SCHEDULE_TYPES = Field( 1a

552 default=..., description="The schedule for the deployment." 

553 ) 

554 active: bool = Field( 1a

555 default=True, description="Whether or not the schedule is active." 

556 ) 

557 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a

558 default=None, 

559 description="The maximum number of scheduled runs for the schedule.", 

560 ) 

561 parameters: dict[str, Any] = Field( 1a

562 default_factory=dict, description="A dictionary of parameter value overrides." 

563 ) 

564 slug: Optional[str] = Field( 1a

565 default=None, 

566 description="A unique slug for the schedule.", 

567 ) 

568 

569 @field_validator("max_scheduled_runs") 1a

570 @classmethod 1a

571 def validate_max_scheduled_runs(cls, v: int) -> int: 1a

572 return validate_schedule_max_scheduled_runs( 

573 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value() 

574 ) 

575 

576 

577class VersionInfo(PrefectBaseModel, extra="allow"): 1a

578 type: str = Field(default=..., description="The type of version info.") 1a

579 version: str = Field(default=..., description="The version of the deployment.") 1a

580 

581 

582class Deployment(ORMBaseModel): 1a

583 """An ORM representation of deployment data.""" 

584 

585 model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) 1a

586 

587 name: NameOrEmpty = Field(default=..., description="The name of the deployment.") 1a

588 version: Optional[str] = Field( 1a

589 default=None, description="An optional version for the deployment." 

590 ) 

591 description: Optional[str] = Field( 1a

592 default=None, description="A description for the deployment." 

593 ) 

594 flow_id: UUID = Field( 1a

595 default=..., description="The flow id associated with the deployment." 

596 ) 

597 paused: bool = Field( 1a

598 default=False, description="Whether or not the deployment is paused." 

599 ) 

600 schedules: list[DeploymentSchedule] = Field( 1a

601 default_factory=lambda: [], 

602 description="A list of schedules for the deployment.", 

603 ) 

604 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

605 default=None, description="The concurrency limit for the deployment." 

606 ) 

607 concurrency_limit_id: Optional[UUID] = Field( 1a

608 default=None, 

609 description="The concurrency limit id associated with the deployment.", 

610 ) 

611 concurrency_options: Optional[ConcurrencyOptions] = Field( 1a

612 default=None, description="The concurrency options for the deployment." 

613 ) 

614 job_variables: Dict[str, Any] = Field( 1a

615 default_factory=dict, 

616 description="Overrides to apply to flow run infrastructure at runtime.", 

617 ) 

618 parameters: Dict[str, Any] = Field( 1a

619 default_factory=dict, 

620 description="Parameters for flow runs scheduled by the deployment.", 

621 ) 

622 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a

623 default=None, 

624 description="Pull steps for cloning and running this deployment.", 

625 ) 

626 tags: List[str] = Field( 1a

627 default_factory=list, 

628 description="A list of tags for the deployment", 

629 examples=[["tag-1", "tag-2"]], 

630 ) 

631 labels: Union[KeyValueLabels, None] = Field( 1a

632 default_factory=dict, 

633 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

634 examples=[{"key": "value1", "key2": 42}], 

635 ) 

636 work_queue_name: Optional[str] = Field( 1a

637 default=None, 

638 description=( 

639 "The work queue for the deployment. If no work queue is set, work will not" 

640 " be scheduled." 

641 ), 

642 ) 

643 last_polled: Optional[DateTime] = Field( 1a

644 default=None, 

645 description="The last time the deployment was polled for status updates.", 

646 ) 

647 parameter_openapi_schema: Optional[Dict[str, Any]] = Field( 1a

648 default_factory=dict, 

649 description="The parameter schema of the flow, including defaults.", 

650 ) 

651 path: Optional[str] = Field( 1a

652 default=None, 

653 description=( 

654 "The path to the working directory for the workflow, relative to remote" 

655 " storage or an absolute path." 

656 ), 

657 ) 

658 entrypoint: Optional[str] = Field( 1a

659 default=None, 

660 description=( 

661 "The path to the entrypoint for the workflow, relative to the `path`." 

662 ), 

663 ) 

664 storage_document_id: Optional[UUID] = Field( 1a

665 default=None, 

666 description="The block document defining storage used for this flow.", 

667 ) 

668 infrastructure_document_id: Optional[UUID] = Field( 1a

669 default=None, 

670 description="The block document defining infrastructure to use for flow runs.", 

671 ) 

672 created_by: Optional[CreatedBy] = Field( 1a

673 default=None, 

674 description="Optional information about the creator of this deployment.", 

675 ) 

676 updated_by: Optional[UpdatedBy] = Field( 1a

677 default=None, 

678 description="Optional information about the updater of this deployment.", 

679 ) 

680 work_queue_id: Optional[UUID] = Field( 1a

681 default=None, 

682 description=( 

683 "The id of the work pool queue to which this deployment is assigned." 

684 ), 

685 ) 

686 enforce_parameter_schema: bool = Field( 1a

687 default=True, 

688 description=( 

689 "Whether or not the deployment should enforce the parameter schema." 

690 ), 

691 ) 

692 

693 

694class ConcurrencyLimit(ORMBaseModel): 1a

695 """An ORM representation of a concurrency limit.""" 

696 

697 tag: str = Field( 1a

698 default=..., description="A tag the concurrency limit is applied to." 

699 ) 

700 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a

701 active_slots: list[UUID] = Field( 1a

702 default_factory=lambda: [], 

703 description="A list of active run ids using a concurrency slot", 

704 ) 

705 

706 

707class ConcurrencyLimitV2(ORMBaseModel): 1a

708 """An ORM representation of a v2 concurrency limit.""" 

709 

710 active: bool = Field( 1a

711 default=True, description="Whether the concurrency limit is active." 

712 ) 

713 name: Name = Field(default=..., description="The name of the concurrency limit.") 1a

714 limit: int = Field(default=..., description="The concurrency limit.") 1a

715 active_slots: int = Field(default=0, description="The number of active slots.") 1a

716 denied_slots: int = Field(default=0, description="The number of denied slots.") 1a

717 slot_decay_per_second: float = Field( 1a

718 default=0, 

719 description="The decay rate for active slots when used as a rate limit.", 

720 ) 

721 avg_slot_occupancy_seconds: float = Field( 1a

722 default=2.0, description="The average amount of time a slot is occupied." 

723 ) 

724 

725 

726class BlockType(ORMBaseModel): 1a

727 """An ORM representation of a block type""" 

728 

729 name: Name = Field(default=..., description="A block type's name") 1a

730 slug: str = Field(default=..., description="A block type's slug") 1a

731 logo_url: Optional[LaxUrl] = Field( # TODO: make it HttpUrl 1a

732 default=None, description="Web URL for the block type's logo" 

733 ) 

734 documentation_url: Optional[LaxUrl] = Field( # TODO: make it HttpUrl 1a

735 default=None, description="Web URL for the block type's documentation" 

736 ) 

737 description: Optional[str] = Field( 1a

738 default=None, 

739 description="A short blurb about the corresponding block's intended use", 

740 ) 

741 code_example: Optional[str] = Field( 1a

742 default=None, 

743 description="A code snippet demonstrating use of the corresponding block", 

744 ) 

745 is_protected: bool = Field( 1a

746 default=False, description="Protected block types cannot be modified via API." 

747 ) 

748 

749 

750class BlockSchema(ORMBaseModel): 1a

751 """An ORM representation of a block schema.""" 

752 

753 checksum: str = Field(default=..., description="The block schema's unique checksum") 1a

754 fields: Dict[str, Any] = Field( 1a

755 default_factory=dict, 

756 description="The block schema's field schema", 

757 json_schema_extra={"additionalProperties": True}, 

758 ) 

759 block_type_id: Optional[UUID] = Field(default=..., description="A block type ID") 1a

760 block_type: Optional[BlockType] = Field( 1a

761 default=None, description="The associated block type" 

762 ) 

763 capabilities: List[str] = Field( 1a

764 default_factory=list, 

765 description="A list of Block capabilities", 

766 ) 

767 version: str = Field( 1a

768 default=DEFAULT_BLOCK_SCHEMA_VERSION, 

769 description="Human readable identifier for the block schema", 

770 ) 

771 

772 

773class BlockSchemaReference(ORMBaseModel): 1a

774 """An ORM representation of a block schema reference.""" 

775 

776 parent_block_schema_id: UUID = Field( 1a

777 default=..., description="ID of block schema the reference is nested within" 

778 ) 

779 parent_block_schema: Optional[BlockSchema] = Field( 1a

780 default=None, description="The block schema the reference is nested within" 

781 ) 

782 reference_block_schema_id: UUID = Field( 1a

783 default=..., description="ID of the nested block schema" 

784 ) 

785 reference_block_schema: Optional[BlockSchema] = Field( 1a

786 default=None, description="The nested block schema" 

787 ) 

788 name: str = Field( 1a

789 default=..., description="The name that the reference is nested under" 

790 ) 

791 

792 

793class BlockDocument(ORMBaseModel): 1a

794 """An ORM representation of a block document.""" 

795 

796 name: Optional[Name] = Field( 1a

797 default=None, 

798 description=( 

799 "The block document's name. Not required for anonymous block documents." 

800 ), 

801 ) 

802 data: dict[str, Any] = Field( 1a

803 default_factory=dict, description="The block document's data" 

804 ) 

805 block_schema_id: UUID = Field(default=..., description="A block schema ID") 1a

806 block_schema: Optional[BlockSchema] = Field( 1a

807 default=None, description="The associated block schema" 

808 ) 

809 block_type_id: UUID = Field(default=..., description="A block type ID") 1a

810 block_type_name: Optional[str] = Field( 1a

811 default=None, description="The associated block type's name" 

812 ) 

813 block_type: Optional[BlockType] = Field( 1a

814 default=None, description="The associated block type" 

815 ) 

816 block_document_references: dict[str, dict[str, Any]] = Field( 1a

817 default_factory=dict, description="Record of the block document's references" 

818 ) 

819 is_anonymous: bool = Field( 1a

820 default=False, 

821 description=( 

822 "Whether the block is anonymous (anonymous blocks are usually created by" 

823 " Prefect automatically)" 

824 ), 

825 ) 

826 

827 @model_validator(mode="before") 1a

828 def validate_name_is_present_if_not_anonymous( 1a

829 cls, values: dict[str, Any] 

830 ) -> dict[str, Any]: 

831 return validate_name_present_on_nonanonymous_blocks(values) 

832 

833 @classmethod 1a

834 async def from_orm_model( 1a

835 cls: type[Self], 

836 session: AsyncSession, 

837 orm_block_document: "orm_models.ORMBlockDocument", 

838 include_secrets: bool = False, 

839 ) -> Self: 

840 data = await orm_block_document.decrypt_data(session=session) 

841 # if secrets are not included, obfuscate them based on the schema's 

842 # `secret_fields`. Note this walks any nested blocks as well. If the 

843 # nested blocks were recovered from named blocks, they will already 

844 # be obfuscated, but if nested fields were hardcoded into the parent 

845 # blocks data, this is the only opportunity to obfuscate them. 

846 if not include_secrets: 

847 flat_data = dict_to_flatdict(data) 

848 # iterate over the (possibly nested) secret fields 

849 # and obfuscate their data 

850 for secret_field in orm_block_document.block_schema.fields.get( 

851 "secret_fields", [] 

852 ): 

853 secret_key = tuple(secret_field.split(".")) 

854 if flat_data.get(secret_key) is not None: 

855 flat_data[secret_key] = obfuscate(flat_data[secret_key]) 

856 # If a wildcard (*) is in the current secret key path, we take the portion 

857 # of the path before the wildcard and compare it to the same level of each 

858 # key. A match means that the field is nested under the secret key and should 

859 # be obfuscated. 

860 elif "*" in secret_key: 

861 wildcard_index = secret_key.index("*") 

862 for data_key in flat_data.keys(): 

863 if secret_key[0:wildcard_index] == data_key[0:wildcard_index]: 

864 flat_data[data_key] = obfuscate(flat_data[data_key]) 

865 data = flatdict_to_dict(flat_data) 

866 return cls( 

867 id=orm_block_document.id, 

868 created=orm_block_document.created, 

869 updated=orm_block_document.updated, 

870 name=orm_block_document.name, 

871 data=data, 

872 block_schema_id=orm_block_document.block_schema_id, 

873 block_schema=orm_block_document.block_schema, 

874 block_type_id=orm_block_document.block_type_id, 

875 block_type_name=orm_block_document.block_type_name, 

876 block_type=orm_block_document.block_type, 

877 is_anonymous=orm_block_document.is_anonymous, 

878 ) 

879 

880 

881class BlockDocumentReference(ORMBaseModel): 1a

882 """An ORM representation of a block document reference.""" 

883 

884 parent_block_document_id: UUID = Field( 1a

885 default=..., description="ID of block document the reference is nested within" 

886 ) 

887 parent_block_document: Optional[BlockDocument] = Field( 1a

888 default=None, description="The block document the reference is nested within" 

889 ) 

890 reference_block_document_id: UUID = Field( 1a

891 default=..., description="ID of the nested block document" 

892 ) 

893 reference_block_document: Optional[BlockDocument] = Field( 1a

894 default=None, description="The nested block document" 

895 ) 

896 name: str = Field( 1a

897 default=..., description="The name that the reference is nested under" 

898 ) 

899 

900 @model_validator(mode="before") 1a

901 def validate_parent_and_ref_are_different( 1a

902 cls, values: dict[str, Any] 

903 ) -> dict[str, Any]: 

904 return validate_parent_and_ref_diff(values) 

905 

906 

907class Configuration(ORMBaseModel): 1a

908 """An ORM representation of account info.""" 

909 

910 key: str = Field(default=..., description="Account info key") 1a

911 value: Dict[str, Any] = Field(default=..., description="Account info") 1a

912 

913 

914class SavedSearchFilter(PrefectBaseModel): 1a

915 """A filter for a saved search model. Intended for use by the Prefect UI.""" 

916 

917 object: str = Field(default=..., description="The object over which to filter.") 1a

918 property: str = Field( 1a

919 default=..., description="The property of the object on which to filter." 

920 ) 

921 type: str = Field(default=..., description="The type of the property.") 1a

922 operation: str = Field( 1a

923 default=..., 

924 description="The operator to apply to the object. For example, `equals`.", 

925 ) 

926 value: Any = Field( 1a

927 default=..., description="A JSON-compatible value for the filter." 

928 ) 

929 

930 

931class SavedSearch(ORMBaseModel): 1a

932 """An ORM representation of saved search data. Represents a set of filter criteria.""" 

933 

934 name: str = Field(default=..., description="The name of the saved search.") 1a

935 filters: list[SavedSearchFilter] = Field( 1a

936 default_factory=lambda: [], 

937 description="The filter set for the saved search.", 

938 ) 

939 

940 

941class Log(TimeSeriesBaseModel, ORMBaseModel): 1a

942 """An ORM representation of log data.""" 

943 

944 name: str = Field(default=..., description="The logger name.") 1a

945 level: int = Field(default=..., description="The log level.") 1a

946 message: str = Field(default=..., description="The log message.") 1a

947 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a

948 flow_run_id: Optional[UUID] = Field( 1a

949 default=None, description="The flow run ID associated with the log." 

950 ) 

951 task_run_id: Optional[UUID] = Field( 1a

952 default=None, description="The task run ID associated with the log." 

953 ) 

954 

955 

956class QueueFilter(PrefectBaseModel): 1a

957 """Filter criteria definition for a work queue.""" 

958 

959 tags: Optional[list[str]] = Field( 1a

960 default=None, 

961 description="Only include flow runs with these tags in the work queue.", 

962 ) 

963 deployment_ids: Optional[list[UUID]] = Field( 1a

964 default=None, 

965 description="Only include flow runs from these deployments in the work queue.", 

966 ) 

967 

968 

969class WorkQueue(ORMBaseModel): 1a

970 """An ORM representation of a work queue""" 

971 

972 name: Name = Field(default=..., description="The name of the work queue.") 1a

973 description: Optional[str] = Field( 1a

974 default="", description="An optional description for the work queue." 

975 ) 

976 is_paused: bool = Field( 1a

977 default=False, description="Whether or not the work queue is paused." 

978 ) 

979 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

980 default=None, description="An optional concurrency limit for the work queue." 

981 ) 

982 priority: PositiveInteger = Field( 1a

983 default=1, 

984 description=( 

985 "The queue's priority. Lower values are higher priority (1 is the highest)." 

986 ), 

987 ) 

988 # Will be required after a future migration 

989 work_pool_id: Optional[UUID] = Field( 1a

990 default=None, description="The work pool with which the queue is associated." 

991 ) 

992 filter: Optional[QueueFilter] = Field( 1a

993 default=None, 

994 description="DEPRECATED: Filter criteria for the work queue.", 

995 deprecated=True, 

996 ) 

997 last_polled: Optional[DateTime] = Field( 1a

998 default=None, description="The last time an agent polled this queue for work." 

999 ) 

1000 

1001 

1002class WorkQueueHealthPolicy(PrefectBaseModel): 1a

1003 maximum_late_runs: Optional[int] = Field( 1a

1004 default=0, 

1005 description=( 

1006 "The maximum number of late runs in the work queue before it is deemed" 

1007 " unhealthy. Defaults to `0`." 

1008 ), 

1009 ) 

1010 maximum_seconds_since_last_polled: Optional[int] = Field( 1a

1011 default=60, 

1012 description=( 

1013 "The maximum number of time in seconds elapsed since work queue has been" 

1014 " polled before it is deemed unhealthy. Defaults to `60`." 

1015 ), 

1016 ) 

1017 

1018 def evaluate_health_status( 1a

1019 self, late_runs_count: int, last_polled: Optional[DateTime] = None 

1020 ) -> bool: 

1021 """ 

1022 Given empirical information about the state of the work queue, evaluate its health status. 

1023 

1024 Args: 

1025 late_runs: the count of late runs for the work queue. 

1026 last_polled: the last time the work queue was polled, if available. 

1027 

1028 Returns: 

1029 bool: whether or not the work queue is healthy. 

1030 """ 

1031 healthy = True 

1032 if ( 

1033 self.maximum_late_runs is not None 

1034 and late_runs_count > self.maximum_late_runs 

1035 ): 

1036 healthy = False 

1037 

1038 if self.maximum_seconds_since_last_polled is not None: 

1039 if ( 

1040 last_polled is None 

1041 or (now("UTC") - last_polled).total_seconds() 

1042 > self.maximum_seconds_since_last_polled 

1043 ): 

1044 healthy = False 

1045 

1046 return healthy 

1047 

1048 

1049class WorkQueueStatusDetail(PrefectBaseModel): 1a

1050 healthy: bool = Field(..., description="Whether or not the work queue is healthy.") 1a

1051 late_runs_count: int = Field( 1a

1052 default=0, description="The number of late flow runs in the work queue." 

1053 ) 

1054 last_polled: Optional[DateTime] = Field( 1a

1055 default=None, description="The last time an agent polled this queue for work." 

1056 ) 

1057 health_check_policy: WorkQueueHealthPolicy = Field( 1a

1058 ..., 

1059 description=( 

1060 "The policy used to determine whether or not the work queue is healthy." 

1061 ), 

1062 ) 

1063 

1064 

1065class Agent(ORMBaseModel): 1a

1066 """An ORM representation of an agent""" 

1067 

1068 name: str = Field( 1a

1069 default_factory=lambda: generate_slug(2), 

1070 description=( 

1071 "The name of the agent. If a name is not provided, it will be" 

1072 " auto-generated." 

1073 ), 

1074 ) 

1075 work_queue_id: UUID = Field( 1a

1076 default=..., description="The work queue with which the agent is associated." 

1077 ) 

1078 last_activity_time: Optional[DateTime] = Field( 1a

1079 default=None, description="The last time this agent polled for work." 

1080 ) 

1081 

1082 

1083class WorkPoolStorageConfiguration(PrefectBaseModel): 1a

1084 """A representation of a work pool's storage configuration""" 

1085 

1086 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

1087 

1088 bundle_upload_step: Optional[dict[str, Any]] = Field( 1a

1089 default=None, 

1090 description="The step to use for uploading bundles to storage.", 

1091 ) 

1092 bundle_execution_step: Optional[dict[str, Any]] = Field( 1a

1093 default=None, 

1094 description="The step to use for executing bundles.", 

1095 ) 

1096 default_result_storage_block_id: Optional[UUID] = Field( 1a

1097 default=None, 

1098 description="The block document ID of the default result storage block.", 

1099 ) 

1100 

1101 

1102class WorkPool(ORMBaseModel): 1a

1103 """An ORM representation of a work pool""" 

1104 

1105 name: NonEmptyishName = Field( 1a

1106 description="The name of the work pool.", 

1107 ) 

1108 description: Optional[str] = Field( 1a

1109 default=None, description="A description of the work pool." 

1110 ) 

1111 type: str = Field(description="The work pool type.") 1a

1112 base_job_template: Dict[str, Any] = Field( 1a

1113 default_factory=dict, description="The work pool's base job template." 

1114 ) 

1115 is_paused: bool = Field( 1a

1116 default=False, 

1117 description="Pausing the work pool stops the delivery of all work.", 

1118 ) 

1119 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a

1120 default=None, description="A concurrency limit for the work pool." 

1121 ) 

1122 status: Optional[WorkPoolStatus] = Field( 1a

1123 default=None, description="The current status of the work pool." 

1124 ) 

1125 

1126 # this required field has a default of None so that the custom validator 

1127 # below will be called and produce a more helpful error message 

1128 default_queue_id: Optional[UUID] = Field( 1a

1129 default=None, description="The id of the pool's default queue." 

1130 ) 

1131 

1132 storage_configuration: WorkPoolStorageConfiguration = Field( 1a

1133 default_factory=WorkPoolStorageConfiguration, 

1134 description="The storage configuration for the work pool.", 

1135 ) 

1136 

1137 @field_validator("default_queue_id") 1a

1138 def helpful_error_for_missing_default_queue_id(cls, v: UUID | None) -> UUID: 1a

1139 return validate_default_queue_id_not_none(v) 

1140 

1141 @classmethod 1a

1142 def model_validate( 1a

1143 cls: Type[Self], 

1144 obj: Any, 

1145 *, 

1146 strict: Optional[bool] = None, 

1147 from_attributes: Optional[bool] = None, 

1148 context: Optional[dict[str, Any]] = None, 

1149 ) -> Self: 

1150 parsed: WorkPool = super().model_validate( 

1151 obj, strict=strict, from_attributes=from_attributes, context=context 

1152 ) 

1153 if from_attributes: 

1154 if obj.type == "prefect-agent": 

1155 parsed.status = None 

1156 return parsed 

1157 

1158 

1159class Worker(ORMBaseModel): 1a

1160 """An ORM representation of a worker""" 

1161 

1162 name: str = Field(description="The name of the worker.") 1a

1163 work_pool_id: UUID = Field( 1a

1164 description="The work pool with which the queue is associated." 

1165 ) 

1166 last_heartbeat_time: Optional[datetime.datetime] = Field( 1a

1167 None, description="The last time the worker process sent a heartbeat." 

1168 ) 

1169 heartbeat_interval_seconds: Optional[int] = Field( 1a

1170 default=None, 

1171 description=( 

1172 "The number of seconds to expect between heartbeats sent by the worker." 

1173 ), 

1174 ) 

1175 

1176 

1177Flow.model_rebuild() 1a

1178FlowRun.model_rebuild() 1a

1179 

1180 

1181class Artifact(ORMBaseModel): 1a

1182 key: Optional[str] = Field( 1a

1183 default=None, description="An optional unique reference key for this artifact." 

1184 ) 

1185 type: Optional[str] = Field( 1a

1186 default=None, 

1187 description=( 

1188 "An identifier that describes the shape of the data field. e.g. 'result'," 

1189 " 'table', 'markdown'" 

1190 ), 

1191 ) 

1192 description: Optional[str] = Field( 1a

1193 default=None, description="A markdown-enabled description of the artifact." 

1194 ) 

1195 # data will eventually be typed as `Optional[Union[Result, Any]]` 

1196 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a

1197 default=None, 

1198 description=( 

1199 "Data associated with the artifact, e.g. a result.; structure depends on" 

1200 " the artifact type." 

1201 ), 

1202 ) 

1203 metadata_: Optional[dict[str, str]] = Field( 1a

1204 default=None, 

1205 description=( 

1206 "User-defined artifact metadata. Content must be string key and value" 

1207 " pairs." 

1208 ), 

1209 ) 

1210 flow_run_id: Optional[UUID] = Field( 1a

1211 default=None, description="The flow run associated with the artifact." 

1212 ) 

1213 task_run_id: Optional[UUID] = Field( 1a

1214 default=None, description="The task run associated with the artifact." 

1215 ) 

1216 

1217 @classmethod 1a

1218 def from_result(cls, data: Any | dict[str, Any]) -> "Artifact": 1a

1219 artifact_info: dict[str, Any] = dict() 

1220 if isinstance(data, dict): 

1221 artifact_key = data.pop("artifact_key", None) 

1222 if artifact_key: 

1223 artifact_info["key"] = artifact_key 

1224 

1225 artifact_type = data.pop("artifact_type", None) 

1226 if artifact_type: 

1227 artifact_info["type"] = artifact_type 

1228 

1229 description = data.pop("artifact_description", None) 

1230 if description: 

1231 artifact_info["description"] = description 

1232 

1233 return cls(data=data, **artifact_info) 

1234 

1235 @field_validator("metadata_") 1a

1236 @classmethod 1a

1237 def validate_metadata_length(cls, v: dict[str, str]) -> dict[str, str]: 1a

1238 return validate_max_metadata_length(v) 

1239 

1240 

1241class ArtifactCollection(ORMBaseModel): 1a

1242 key: str = Field(description="An optional unique reference key for this artifact.") 1a

1243 latest_id: UUID = Field( 1a

1244 description="The latest artifact ID associated with the key." 

1245 ) 

1246 type: Optional[str] = Field( 1a

1247 default=None, 

1248 description=( 

1249 "An identifier that describes the shape of the data field. e.g. 'result'," 

1250 " 'table', 'markdown'" 

1251 ), 

1252 ) 

1253 description: Optional[str] = Field( 1a

1254 default=None, description="A markdown-enabled description of the artifact." 

1255 ) 

1256 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a

1257 default=None, 

1258 description=( 

1259 "Data associated with the artifact, e.g. a result.; structure depends on" 

1260 " the artifact type." 

1261 ), 

1262 ) 

1263 metadata_: Optional[Dict[str, str]] = Field( 1a

1264 default=None, 

1265 description=( 

1266 "User-defined artifact metadata. Content must be string key and value" 

1267 " pairs." 

1268 ), 

1269 ) 

1270 flow_run_id: Optional[UUID] = Field( 1a

1271 default=None, description="The flow run associated with the artifact." 

1272 ) 

1273 task_run_id: Optional[UUID] = Field( 1a

1274 default=None, description="The task run associated with the artifact." 

1275 ) 

1276 

1277 

1278class Variable(ORMBaseModel): 1a

1279 name: str = Field( 1a

1280 default=..., 

1281 description="The name of the variable", 

1282 examples=["my-variable"], 

1283 max_length=MAX_VARIABLE_NAME_LENGTH, 

1284 ) 

1285 value: StrictVariableValue = Field( 1a

1286 default=..., 

1287 description="The value of the variable", 

1288 examples=["my-value"], 

1289 ) 

1290 tags: List[str] = Field( 1a

1291 default_factory=list, 

1292 description="A list of variable tags", 

1293 examples=[["tag-1", "tag-2"]], 

1294 ) 

1295 

1296 

1297class FlowRunInput(ORMBaseModel): 1a

1298 flow_run_id: UUID = Field(description="The flow run ID associated with the input.") 1a

1299 key: Annotated[str, AfterValidator(raise_on_name_alphanumeric_dashes_only)] = Field( 1a

1300 description="The key of the input." 

1301 ) 

1302 value: str = Field(description="The value of the input.") 1a

1303 sender: Optional[str] = Field(default=None, description="The sender of the input.") 1a

1304 

1305 

1306class CsrfToken(ORMBaseModel): 1a

1307 token: str = Field( 1a

1308 default=..., 

1309 description="The CSRF token", 

1310 ) 

1311 client: str = Field( 1a

1312 default=..., description="The client id associated with the CSRF token" 

1313 ) 

1314 expiration: DateTime = Field( 1a

1315 default=..., description="The expiration time of the CSRF token" 

1316 )