Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/core.py: 84%
398 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Full schemas of Prefect REST API objects.
3"""
5from __future__ import annotations 1a
7import datetime 1a
8from typing import ( 1a
9 TYPE_CHECKING,
10 Annotated,
11 Any,
12 ClassVar,
13 Dict,
14 List,
15 Optional,
16 Type,
17 Union,
18)
19from uuid import UUID 1a
21from pydantic import ( 1a
22 AfterValidator,
23 BaseModel,
24 ConfigDict,
25 Field,
26 StrictBool,
27 StrictFloat,
28 StrictInt,
29 field_validator,
30 model_validator,
31)
32from sqlalchemy.ext.asyncio import AsyncSession 1a
33from typing_extensions import Literal, Self 1a
35from prefect._internal.schemas.validators import ( 1a
36 get_or_create_run_name,
37 list_length_50_or_less,
38 set_run_policy_deprecated_fields,
39 validate_cache_key_length,
40 validate_default_queue_id_not_none,
41 validate_max_metadata_length,
42 validate_name_present_on_nonanonymous_blocks,
43 validate_not_negative,
44 validate_parent_and_ref_diff,
45 validate_schedule_max_scheduled_runs,
46)
47from prefect.server.schemas import schedules, states 1a
48from prefect.server.schemas.statuses import WorkPoolStatus 1a
49from prefect.server.utilities.schemas.bases import ( 1a
50 ORMBaseModel,
51 PrefectBaseModel,
52 TimeSeriesBaseModel,
53)
54from prefect.settings import PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS 1a
55from prefect.types import ( 1a
56 MAX_VARIABLE_NAME_LENGTH,
57 DateTime,
58 LaxUrl,
59 Name,
60 NameOrEmpty,
61 NonEmptyishName,
62 NonNegativeInteger,
63 PositiveInteger,
64 StrictVariableValue,
65)
66from prefect.types._datetime import now 1a
67from prefect.types.names import raise_on_name_alphanumeric_dashes_only 1a
68from prefect.utilities.collections import ( 1a
69 AutoEnum,
70 dict_to_flatdict,
71 flatdict_to_dict,
72)
73from prefect.utilities.names import generate_slug, obfuscate 1a
75if TYPE_CHECKING: 75 ↛ 76line 75 didn't jump to line 76 because the condition on line 75 was never true1a
76 from prefect.server.database import orm_models
78DEFAULT_BLOCK_SCHEMA_VERSION = "non-versioned" 1a
80KeyValueLabels = dict[str, Union[StrictBool, StrictInt, StrictFloat, str]] 1a
83class Flow(ORMBaseModel): 1a
84 """An ORM representation of flow data."""
86 name: Name = Field( 1a
87 default=..., description="The name of the flow", examples=["my-flow"]
88 )
89 tags: List[str] = Field( 1a
90 default_factory=list,
91 description="A list of flow tags",
92 examples=[["tag-1", "tag-2"]],
93 )
94 labels: Union[KeyValueLabels, None] = Field( 1a
95 default_factory=dict,
96 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
97 examples=[{"key": "value1", "key2": 42}],
98 )
101class FlowRunPolicy(PrefectBaseModel): 1a
102 """Defines of how a flow run should retry."""
104 max_retries: int = Field( 1a
105 default=0,
106 description=(
107 "The maximum number of retries. Field is not used. Please use `retries`"
108 " instead."
109 ),
110 deprecated=True,
111 )
112 retry_delay_seconds: float = Field( 1a
113 default=0,
114 description=(
115 "The delay between retries. Field is not used. Please use `retry_delay`"
116 " instead."
117 ),
118 deprecated=True,
119 )
120 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a
121 retry_delay: Optional[int] = Field( 1a
122 default=None, description="The delay time between retries, in seconds."
123 )
124 pause_keys: Optional[set[str]] = Field( 1a
125 default_factory=set, description="Tracks pauses this run has observed."
126 )
127 resuming: Optional[bool] = Field( 1a
128 default=False, description="Indicates if this run is resuming from a pause."
129 )
130 retry_type: Optional[Literal["in_process", "reschedule"]] = Field( 1a
131 default=None, description="The type of retry this run is undergoing."
132 )
134 @model_validator(mode="before") 1a
135 def populate_deprecated_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
136 return set_run_policy_deprecated_fields(values) 1bdec
139class CreatedBy(BaseModel): 1a
140 id: Optional[UUID] = Field( 1a
141 default=None, description="The id of the creator of the object."
142 )
143 type: Optional[str] = Field( 1a
144 default=None, description="The type of the creator of the object."
145 )
146 display_value: Optional[str] = Field( 1a
147 default=None, description="The display value for the creator."
148 )
151class UpdatedBy(BaseModel): 1a
152 id: Optional[UUID] = Field( 1a
153 default=None, description="The id of the updater of the object."
154 )
155 type: Optional[str] = Field( 1a
156 default=None, description="The type of the updater of the object."
157 )
158 display_value: Optional[str] = Field( 1a
159 default=None, description="The display value for the updater."
160 )
163class ConcurrencyLimitStrategy(AutoEnum): 1a
164 """
165 Enumeration of concurrency collision strategies.
166 """
168 ENQUEUE = AutoEnum.auto() 1a
169 CANCEL_NEW = AutoEnum.auto() 1a
172class ConcurrencyOptions(BaseModel): 1a
173 """
174 Class for storing the concurrency config in database.
175 """
177 collision_strategy: ConcurrencyLimitStrategy 1a
180class FlowRun(TimeSeriesBaseModel, ORMBaseModel): 1a
181 """An ORM representation of flow run data."""
183 name: str = Field( 1a
184 default_factory=lambda: generate_slug(2),
185 description=(
186 "The name of the flow run. Defaults to a random slug if not specified."
187 ),
188 examples=["my-flow-run"],
189 )
190 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a
191 state_id: Optional[UUID] = Field( 1a
192 default=None, description="The id of the flow run's current state."
193 )
194 deployment_id: Optional[UUID] = Field( 1a
195 default=None,
196 description=(
197 "The id of the deployment associated with this flow run, if available."
198 ),
199 )
200 deployment_version: Optional[str] = Field( 1a
201 default=None,
202 description="The version of the deployment associated with this flow run.",
203 examples=["1.0"],
204 )
205 work_queue_name: Optional[str] = Field( 1a
206 default=None, description="The work queue that handled this flow run."
207 )
208 flow_version: Optional[str] = Field( 1a
209 default=None,
210 description="The version of the flow executed in this flow run.",
211 examples=["1.0"],
212 )
213 parameters: Dict[str, Any] = Field( 1a
214 default_factory=dict, description="Parameters for the flow run."
215 )
216 idempotency_key: Optional[str] = Field( 1a
217 default=None,
218 description=(
219 "An optional idempotency key for the flow run. Used to ensure the same flow"
220 " run is not created multiple times."
221 ),
222 )
223 context: Dict[str, Any] = Field( 1a
224 default_factory=dict,
225 description="Additional context for the flow run.",
226 examples=[{"my_var": "my_value"}],
227 )
228 empirical_policy: FlowRunPolicy = Field( 1a
229 default_factory=FlowRunPolicy,
230 )
231 tags: List[str] = Field( 1a
232 default_factory=list,
233 description="A list of tags on the flow run",
234 examples=[["tag-1", "tag-2"]],
235 )
236 labels: Union[KeyValueLabels, None] = Field( 1a
237 default_factory=dict,
238 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
239 examples=[{"key": "value1", "key2": 42}],
240 )
241 parent_task_run_id: Optional[UUID] = Field( 1a
242 default=None,
243 description=(
244 "If the flow run is a subflow, the id of the 'dummy' task in the parent"
245 " flow used to track subflow state."
246 ),
247 )
249 state_type: Optional[states.StateType] = Field( 1a
250 default=None, description="The type of the current flow run state."
251 )
252 state_name: Optional[str] = Field( 1a
253 default=None, description="The name of the current flow run state."
254 )
255 run_count: int = Field( 1a
256 default=0, description="The number of times the flow run was executed."
257 )
258 expected_start_time: Optional[DateTime] = Field( 1a
259 default=None,
260 description="The flow run's expected start time.",
261 )
262 next_scheduled_start_time: Optional[DateTime] = Field( 1a
263 default=None,
264 description="The next time the flow run is scheduled to start.",
265 )
266 start_time: Optional[DateTime] = Field( 1a
267 default=None, description="The actual start time."
268 )
269 end_time: Optional[DateTime] = Field( 1a
270 default=None, description="The actual end time."
271 )
272 total_run_time: datetime.timedelta = Field( 1a
273 default=datetime.timedelta(0),
274 description=(
275 "Total run time. If the flow run was executed multiple times, the time of"
276 " each run will be summed."
277 ),
278 )
279 estimated_run_time: datetime.timedelta = Field( 1a
280 default=datetime.timedelta(0),
281 description="A real-time estimate of the total run time.",
282 )
283 estimated_start_time_delta: datetime.timedelta = Field( 1a
284 default=datetime.timedelta(0),
285 description="The difference between actual and expected start time.",
286 )
287 auto_scheduled: bool = Field( 1a
288 default=False,
289 description="Whether or not the flow run was automatically scheduled.",
290 )
291 infrastructure_document_id: Optional[UUID] = Field( 1a
292 default=None,
293 description="The block document defining infrastructure to use this flow run.",
294 )
295 infrastructure_pid: Optional[str] = Field( 1a
296 default=None,
297 description="The id of the flow run as returned by an infrastructure block.",
298 )
299 created_by: Optional[CreatedBy] = Field( 1a
300 default=None,
301 description="Optional information about the creator of this flow run.",
302 )
303 work_queue_id: Optional[UUID] = Field( 1a
304 default=None, description="The id of the run's work pool queue."
305 )
307 # relationships
308 # flow: Flow = None
309 # task_runs: List["TaskRun"] = Field(default_factory=list)
310 state: Optional[states.State] = Field( 1a
311 default=None, description="The current state of the flow run."
312 )
313 # parent_task_run: "TaskRun" = None
315 job_variables: Optional[Dict[str, Any]] = Field( 1a
316 default=None,
317 description="Variables used as overrides in the base job template",
318 )
320 @field_validator("name", mode="before") 1a
321 @classmethod 1a
322 def set_name(cls, name: str) -> str: 1a
323 return get_or_create_run_name(name) 1bdc
325 def __eq__(self, other: Any) -> bool: 1a
326 """
327 Check for "equality" to another flow run schema
329 Estimates times are rolling and will always change with repeated queries for
330 a flow run so we ignore them during equality checks.
331 """
332 if isinstance(other, FlowRun):
333 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
334 return self.model_dump(exclude=exclude_fields) == other.model_dump(
335 exclude=exclude_fields
336 )
337 return super().__eq__(other)
340class TaskRunPolicy(PrefectBaseModel): 1a
341 """Defines of how a task run should retry."""
343 max_retries: int = Field( 1a
344 default=0,
345 description=(
346 "The maximum number of retries. Field is not used. Please use `retries`"
347 " instead."
348 ),
349 deprecated=True,
350 )
351 retry_delay_seconds: float = Field( 1a
352 default=0,
353 description=(
354 "The delay between retries. Field is not used. Please use `retry_delay`"
355 " instead."
356 ),
357 deprecated=True,
358 )
359 retries: Optional[int] = Field(default=None, description="The number of retries.") 1a
360 retry_delay: Union[None, int, float, List[int], List[float]] = Field( 1a
361 default=None,
362 description="A delay time or list of delay times between retries, in seconds.",
363 )
364 retry_jitter_factor: Optional[float] = Field( 1a
365 default=None, description="Determines the amount a retry should jitter"
366 )
368 @model_validator(mode="before") 1a
369 def populate_deprecated_fields(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
370 return set_run_policy_deprecated_fields(values) 1bdc
372 @field_validator("retry_delay") 1a
373 @classmethod 1a
374 def validate_configured_retry_delays( 1a
375 cls, v: int | float | list[int] | list[float] | None
376 ) -> int | float | list[int] | list[float] | None:
377 return list_length_50_or_less(v) 1bdc
379 @field_validator("retry_jitter_factor") 1a
380 @classmethod 1a
381 def validate_jitter_factor(cls, v: float | None) -> float | None: 1a
382 return validate_not_negative(v) 1bdc
385class RunInput(PrefectBaseModel): 1a
386 """
387 Base class for classes that represent inputs to runs, which
388 could include, constants, parameters, task runs or flow runs.
389 """
391 model_config: ClassVar[ConfigDict] = ConfigDict(frozen=True) 1a
393 input_type: str 1a
396class TaskRunResult(RunInput): 1a
397 """Represents a task run result input to another task run."""
399 input_type: Literal["task_run"] = "task_run" 1a
400 id: UUID 1a
403class FlowRunResult(RunInput): 1a
404 input_type: Literal["flow_run"] = "flow_run" 1a
405 id: UUID 1a
408class Parameter(RunInput): 1a
409 """Represents a parameter input to a task run."""
411 input_type: Literal["parameter"] = "parameter" 1a
412 name: str 1a
415class Constant(RunInput): 1a
416 """Represents constant input value to a task run."""
418 input_type: Literal["constant"] = "constant" 1a
419 type: str 1a
422class TaskRun(TimeSeriesBaseModel, ORMBaseModel): 1a
423 """An ORM representation of task run data."""
425 name: str = Field( 1a
426 default_factory=lambda: generate_slug(2), examples=["my-task-run"]
427 )
428 flow_run_id: Optional[UUID] = Field( 1a
429 default=None, description="The flow run id of the task run."
430 )
431 task_key: str = Field( 1a
432 default=..., description="A unique identifier for the task being run."
433 )
434 dynamic_key: str = Field( 1a
435 default=...,
436 description=(
437 "A dynamic key used to differentiate between multiple runs of the same task"
438 " within the same flow run."
439 ),
440 )
441 cache_key: Optional[str] = Field( 1a
442 default=None,
443 description=(
444 "An optional cache key. If a COMPLETED state associated with this cache key"
445 " is found, the cached COMPLETED state will be used instead of executing"
446 " the task run."
447 ),
448 )
449 cache_expiration: Optional[DateTime] = Field( 1a
450 default=None, description="Specifies when the cached state should expire."
451 )
452 task_version: Optional[str] = Field( 1a
453 default=None, description="The version of the task being run."
454 )
455 empirical_policy: TaskRunPolicy = Field( 1a
456 default_factory=TaskRunPolicy,
457 )
458 tags: List[str] = Field( 1a
459 default_factory=list,
460 description="A list of tags for the task run.",
461 examples=[["tag-1", "tag-2"]],
462 )
463 labels: Union[KeyValueLabels, None] = Field( 1a
464 default_factory=dict,
465 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
466 examples=[{"key": "value1", "key2": 42}],
467 )
468 state_id: Optional[UUID] = Field( 1a
469 default=None, description="The id of the current task run state."
470 )
471 task_inputs: Dict[ 1a
472 str, List[Union[TaskRunResult, FlowRunResult, Parameter, Constant]]
473 ] = Field(
474 default_factory=dict,
475 description=(
476 "Tracks the source of inputs to a task run. Used for internal bookkeeping."
477 ),
478 )
479 state_type: Optional[states.StateType] = Field( 1a
480 default=None, description="The type of the current task run state."
481 )
482 state_name: Optional[str] = Field( 1a
483 default=None, description="The name of the current task run state."
484 )
485 run_count: int = Field( 1a
486 default=0, description="The number of times the task run has been executed."
487 )
488 flow_run_run_count: int = Field( 1a
489 default=0,
490 description=(
491 "If the parent flow has retried, this indicates the flow retry this run is"
492 " associated with."
493 ),
494 )
495 expected_start_time: Optional[DateTime] = Field( 1a
496 default=None,
497 description="The task run's expected start time.",
498 )
500 # the next scheduled start time will be populated
501 # whenever the run is in a scheduled state
502 next_scheduled_start_time: Optional[DateTime] = Field( 1a
503 default=None,
504 description="The next time the task run is scheduled to start.",
505 )
506 start_time: Optional[DateTime] = Field( 1a
507 default=None, description="The actual start time."
508 )
509 end_time: Optional[DateTime] = Field( 1a
510 default=None, description="The actual end time."
511 )
512 total_run_time: datetime.timedelta = Field( 1a
513 default=datetime.timedelta(0),
514 description=(
515 "Total run time. If the task run was executed multiple times, the time of"
516 " each run will be summed."
517 ),
518 )
519 estimated_run_time: datetime.timedelta = Field( 1a
520 default=datetime.timedelta(0),
521 description="A real-time estimate of total run time.",
522 )
523 estimated_start_time_delta: datetime.timedelta = Field( 1a
524 default=datetime.timedelta(0),
525 description="The difference between actual and expected start time.",
526 )
528 # relationships
529 # flow_run: FlowRun = None
530 # subflow_runs: List[FlowRun] = Field(default_factory=list)
531 state: Optional[states.State] = Field( 1a
532 default=None, description="The current task run state."
533 )
535 @field_validator("name", mode="before") 1a
536 @classmethod 1a
537 def set_name(cls, name: str) -> str: 1a
538 return get_or_create_run_name(name) 1bdc
540 @field_validator("cache_key") 1a
541 @classmethod 1a
542 def validate_cache_key(cls, cache_key: str) -> str: 1a
543 return validate_cache_key_length(cache_key) 1bdc
546class DeploymentSchedule(ORMBaseModel): 1a
547 deployment_id: Optional[UUID] = Field( 1a
548 default=None,
549 description="The deployment id associated with this schedule.",
550 )
551 schedule: schedules.SCHEDULE_TYPES = Field( 1a
552 default=..., description="The schedule for the deployment."
553 )
554 active: bool = Field( 1a
555 default=True, description="Whether or not the schedule is active."
556 )
557 max_scheduled_runs: Optional[PositiveInteger] = Field( 1a
558 default=None,
559 description="The maximum number of scheduled runs for the schedule.",
560 )
561 parameters: dict[str, Any] = Field( 1a
562 default_factory=dict, description="A dictionary of parameter value overrides."
563 )
564 slug: Optional[str] = Field( 1a
565 default=None,
566 description="A unique slug for the schedule.",
567 )
569 @field_validator("max_scheduled_runs") 1a
570 @classmethod 1a
571 def validate_max_scheduled_runs(cls, v: int) -> int: 1a
572 return validate_schedule_max_scheduled_runs(
573 v, PREFECT_DEPLOYMENT_SCHEDULE_MAX_SCHEDULED_RUNS.value()
574 )
577class VersionInfo(PrefectBaseModel, extra="allow"): 1a
578 type: str = Field(default=..., description="The type of version info.") 1a
579 version: str = Field(default=..., description="The version of the deployment.") 1a
582class Deployment(ORMBaseModel): 1a
583 """An ORM representation of deployment data."""
585 model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) 1a
587 name: NameOrEmpty = Field(default=..., description="The name of the deployment.") 1a
588 version: Optional[str] = Field( 1a
589 default=None, description="An optional version for the deployment."
590 )
591 description: Optional[str] = Field( 1a
592 default=None, description="A description for the deployment."
593 )
594 flow_id: UUID = Field( 1a
595 default=..., description="The flow id associated with the deployment."
596 )
597 paused: bool = Field( 1a
598 default=False, description="Whether or not the deployment is paused."
599 )
600 schedules: list[DeploymentSchedule] = Field( 1a
601 default_factory=lambda: [],
602 description="A list of schedules for the deployment.",
603 )
604 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a
605 default=None, description="The concurrency limit for the deployment."
606 )
607 concurrency_limit_id: Optional[UUID] = Field( 1a
608 default=None,
609 description="The concurrency limit id associated with the deployment.",
610 )
611 concurrency_options: Optional[ConcurrencyOptions] = Field( 1a
612 default=None, description="The concurrency options for the deployment."
613 )
614 job_variables: Dict[str, Any] = Field( 1a
615 default_factory=dict,
616 description="Overrides to apply to flow run infrastructure at runtime.",
617 )
618 parameters: Dict[str, Any] = Field( 1a
619 default_factory=dict,
620 description="Parameters for flow runs scheduled by the deployment.",
621 )
622 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a
623 default=None,
624 description="Pull steps for cloning and running this deployment.",
625 )
626 tags: List[str] = Field( 1a
627 default_factory=list,
628 description="A list of tags for the deployment",
629 examples=[["tag-1", "tag-2"]],
630 )
631 labels: Union[KeyValueLabels, None] = Field( 1a
632 default_factory=dict,
633 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
634 examples=[{"key": "value1", "key2": 42}],
635 )
636 work_queue_name: Optional[str] = Field( 1a
637 default=None,
638 description=(
639 "The work queue for the deployment. If no work queue is set, work will not"
640 " be scheduled."
641 ),
642 )
643 last_polled: Optional[DateTime] = Field( 1a
644 default=None,
645 description="The last time the deployment was polled for status updates.",
646 )
647 parameter_openapi_schema: Optional[Dict[str, Any]] = Field( 1a
648 default_factory=dict,
649 description="The parameter schema of the flow, including defaults.",
650 )
651 path: Optional[str] = Field( 1a
652 default=None,
653 description=(
654 "The path to the working directory for the workflow, relative to remote"
655 " storage or an absolute path."
656 ),
657 )
658 entrypoint: Optional[str] = Field( 1a
659 default=None,
660 description=(
661 "The path to the entrypoint for the workflow, relative to the `path`."
662 ),
663 )
664 storage_document_id: Optional[UUID] = Field( 1a
665 default=None,
666 description="The block document defining storage used for this flow.",
667 )
668 infrastructure_document_id: Optional[UUID] = Field( 1a
669 default=None,
670 description="The block document defining infrastructure to use for flow runs.",
671 )
672 created_by: Optional[CreatedBy] = Field( 1a
673 default=None,
674 description="Optional information about the creator of this deployment.",
675 )
676 updated_by: Optional[UpdatedBy] = Field( 1a
677 default=None,
678 description="Optional information about the updater of this deployment.",
679 )
680 work_queue_id: Optional[UUID] = Field( 1a
681 default=None,
682 description=(
683 "The id of the work pool queue to which this deployment is assigned."
684 ),
685 )
686 enforce_parameter_schema: bool = Field( 1a
687 default=True,
688 description=(
689 "Whether or not the deployment should enforce the parameter schema."
690 ),
691 )
694class ConcurrencyLimit(ORMBaseModel): 1a
695 """An ORM representation of a concurrency limit."""
697 tag: str = Field( 1a
698 default=..., description="A tag the concurrency limit is applied to."
699 )
700 concurrency_limit: int = Field(default=..., description="The concurrency limit.") 1a
701 active_slots: list[UUID] = Field( 1a
702 default_factory=lambda: [],
703 description="A list of active run ids using a concurrency slot",
704 )
707class ConcurrencyLimitV2(ORMBaseModel): 1a
708 """An ORM representation of a v2 concurrency limit."""
710 active: bool = Field( 1a
711 default=True, description="Whether the concurrency limit is active."
712 )
713 name: Name = Field(default=..., description="The name of the concurrency limit.") 1a
714 limit: int = Field(default=..., description="The concurrency limit.") 1a
715 active_slots: int = Field(default=0, description="The number of active slots.") 1a
716 denied_slots: int = Field(default=0, description="The number of denied slots.") 1a
717 slot_decay_per_second: float = Field( 1a
718 default=0,
719 description="The decay rate for active slots when used as a rate limit.",
720 )
721 avg_slot_occupancy_seconds: float = Field( 1a
722 default=2.0, description="The average amount of time a slot is occupied."
723 )
726class BlockType(ORMBaseModel): 1a
727 """An ORM representation of a block type"""
729 name: Name = Field(default=..., description="A block type's name") 1a
730 slug: str = Field(default=..., description="A block type's slug") 1a
731 logo_url: Optional[LaxUrl] = Field( # TODO: make it HttpUrl 1a
732 default=None, description="Web URL for the block type's logo"
733 )
734 documentation_url: Optional[LaxUrl] = Field( # TODO: make it HttpUrl 1a
735 default=None, description="Web URL for the block type's documentation"
736 )
737 description: Optional[str] = Field( 1a
738 default=None,
739 description="A short blurb about the corresponding block's intended use",
740 )
741 code_example: Optional[str] = Field( 1a
742 default=None,
743 description="A code snippet demonstrating use of the corresponding block",
744 )
745 is_protected: bool = Field( 1a
746 default=False, description="Protected block types cannot be modified via API."
747 )
750class BlockSchema(ORMBaseModel): 1a
751 """An ORM representation of a block schema."""
753 checksum: str = Field(default=..., description="The block schema's unique checksum") 1a
754 fields: Dict[str, Any] = Field( 1a
755 default_factory=dict,
756 description="The block schema's field schema",
757 json_schema_extra={"additionalProperties": True},
758 )
759 block_type_id: Optional[UUID] = Field(default=..., description="A block type ID") 1a
760 block_type: Optional[BlockType] = Field( 1a
761 default=None, description="The associated block type"
762 )
763 capabilities: List[str] = Field( 1a
764 default_factory=list,
765 description="A list of Block capabilities",
766 )
767 version: str = Field( 1a
768 default=DEFAULT_BLOCK_SCHEMA_VERSION,
769 description="Human readable identifier for the block schema",
770 )
773class BlockSchemaReference(ORMBaseModel): 1a
774 """An ORM representation of a block schema reference."""
776 parent_block_schema_id: UUID = Field( 1a
777 default=..., description="ID of block schema the reference is nested within"
778 )
779 parent_block_schema: Optional[BlockSchema] = Field( 1a
780 default=None, description="The block schema the reference is nested within"
781 )
782 reference_block_schema_id: UUID = Field( 1a
783 default=..., description="ID of the nested block schema"
784 )
785 reference_block_schema: Optional[BlockSchema] = Field( 1a
786 default=None, description="The nested block schema"
787 )
788 name: str = Field( 1a
789 default=..., description="The name that the reference is nested under"
790 )
793class BlockDocument(ORMBaseModel): 1a
794 """An ORM representation of a block document."""
796 name: Optional[Name] = Field( 1a
797 default=None,
798 description=(
799 "The block document's name. Not required for anonymous block documents."
800 ),
801 )
802 data: dict[str, Any] = Field( 1a
803 default_factory=dict, description="The block document's data"
804 )
805 block_schema_id: UUID = Field(default=..., description="A block schema ID") 1a
806 block_schema: Optional[BlockSchema] = Field( 1a
807 default=None, description="The associated block schema"
808 )
809 block_type_id: UUID = Field(default=..., description="A block type ID") 1a
810 block_type_name: Optional[str] = Field( 1a
811 default=None, description="The associated block type's name"
812 )
813 block_type: Optional[BlockType] = Field( 1a
814 default=None, description="The associated block type"
815 )
816 block_document_references: dict[str, dict[str, Any]] = Field( 1a
817 default_factory=dict, description="Record of the block document's references"
818 )
819 is_anonymous: bool = Field( 1a
820 default=False,
821 description=(
822 "Whether the block is anonymous (anonymous blocks are usually created by"
823 " Prefect automatically)"
824 ),
825 )
827 @model_validator(mode="before") 1a
828 def validate_name_is_present_if_not_anonymous( 1a
829 cls, values: dict[str, Any]
830 ) -> dict[str, Any]:
831 return validate_name_present_on_nonanonymous_blocks(values)
833 @classmethod 1a
834 async def from_orm_model( 1a
835 cls: type[Self],
836 session: AsyncSession,
837 orm_block_document: "orm_models.ORMBlockDocument",
838 include_secrets: bool = False,
839 ) -> Self:
840 data = await orm_block_document.decrypt_data(session=session)
841 # if secrets are not included, obfuscate them based on the schema's
842 # `secret_fields`. Note this walks any nested blocks as well. If the
843 # nested blocks were recovered from named blocks, they will already
844 # be obfuscated, but if nested fields were hardcoded into the parent
845 # blocks data, this is the only opportunity to obfuscate them.
846 if not include_secrets:
847 flat_data = dict_to_flatdict(data)
848 # iterate over the (possibly nested) secret fields
849 # and obfuscate their data
850 for secret_field in orm_block_document.block_schema.fields.get(
851 "secret_fields", []
852 ):
853 secret_key = tuple(secret_field.split("."))
854 if flat_data.get(secret_key) is not None:
855 flat_data[secret_key] = obfuscate(flat_data[secret_key])
856 # If a wildcard (*) is in the current secret key path, we take the portion
857 # of the path before the wildcard and compare it to the same level of each
858 # key. A match means that the field is nested under the secret key and should
859 # be obfuscated.
860 elif "*" in secret_key:
861 wildcard_index = secret_key.index("*")
862 for data_key in flat_data.keys():
863 if secret_key[0:wildcard_index] == data_key[0:wildcard_index]:
864 flat_data[data_key] = obfuscate(flat_data[data_key])
865 data = flatdict_to_dict(flat_data)
866 return cls(
867 id=orm_block_document.id,
868 created=orm_block_document.created,
869 updated=orm_block_document.updated,
870 name=orm_block_document.name,
871 data=data,
872 block_schema_id=orm_block_document.block_schema_id,
873 block_schema=orm_block_document.block_schema,
874 block_type_id=orm_block_document.block_type_id,
875 block_type_name=orm_block_document.block_type_name,
876 block_type=orm_block_document.block_type,
877 is_anonymous=orm_block_document.is_anonymous,
878 )
881class BlockDocumentReference(ORMBaseModel): 1a
882 """An ORM representation of a block document reference."""
884 parent_block_document_id: UUID = Field( 1a
885 default=..., description="ID of block document the reference is nested within"
886 )
887 parent_block_document: Optional[BlockDocument] = Field( 1a
888 default=None, description="The block document the reference is nested within"
889 )
890 reference_block_document_id: UUID = Field( 1a
891 default=..., description="ID of the nested block document"
892 )
893 reference_block_document: Optional[BlockDocument] = Field( 1a
894 default=None, description="The nested block document"
895 )
896 name: str = Field( 1a
897 default=..., description="The name that the reference is nested under"
898 )
900 @model_validator(mode="before") 1a
901 def validate_parent_and_ref_are_different( 1a
902 cls, values: dict[str, Any]
903 ) -> dict[str, Any]:
904 return validate_parent_and_ref_diff(values)
907class Configuration(ORMBaseModel): 1a
908 """An ORM representation of account info."""
910 key: str = Field(default=..., description="Account info key") 1a
911 value: Dict[str, Any] = Field(default=..., description="Account info") 1a
914class SavedSearchFilter(PrefectBaseModel): 1a
915 """A filter for a saved search model. Intended for use by the Prefect UI."""
917 object: str = Field(default=..., description="The object over which to filter.") 1a
918 property: str = Field( 1a
919 default=..., description="The property of the object on which to filter."
920 )
921 type: str = Field(default=..., description="The type of the property.") 1a
922 operation: str = Field( 1a
923 default=...,
924 description="The operator to apply to the object. For example, `equals`.",
925 )
926 value: Any = Field( 1a
927 default=..., description="A JSON-compatible value for the filter."
928 )
931class SavedSearch(ORMBaseModel): 1a
932 """An ORM representation of saved search data. Represents a set of filter criteria."""
934 name: str = Field(default=..., description="The name of the saved search.") 1a
935 filters: list[SavedSearchFilter] = Field( 1a
936 default_factory=lambda: [],
937 description="The filter set for the saved search.",
938 )
941class Log(TimeSeriesBaseModel, ORMBaseModel): 1a
942 """An ORM representation of log data."""
944 name: str = Field(default=..., description="The logger name.") 1a
945 level: int = Field(default=..., description="The log level.") 1a
946 message: str = Field(default=..., description="The log message.") 1a
947 timestamp: DateTime = Field(default=..., description="The log timestamp.") 1a
948 flow_run_id: Optional[UUID] = Field( 1a
949 default=None, description="The flow run ID associated with the log."
950 )
951 task_run_id: Optional[UUID] = Field( 1a
952 default=None, description="The task run ID associated with the log."
953 )
956class QueueFilter(PrefectBaseModel): 1a
957 """Filter criteria definition for a work queue."""
959 tags: Optional[list[str]] = Field( 1a
960 default=None,
961 description="Only include flow runs with these tags in the work queue.",
962 )
963 deployment_ids: Optional[list[UUID]] = Field( 1a
964 default=None,
965 description="Only include flow runs from these deployments in the work queue.",
966 )
969class WorkQueue(ORMBaseModel): 1a
970 """An ORM representation of a work queue"""
972 name: Name = Field(default=..., description="The name of the work queue.") 1a
973 description: Optional[str] = Field( 1a
974 default="", description="An optional description for the work queue."
975 )
976 is_paused: bool = Field( 1a
977 default=False, description="Whether or not the work queue is paused."
978 )
979 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a
980 default=None, description="An optional concurrency limit for the work queue."
981 )
982 priority: PositiveInteger = Field( 1a
983 default=1,
984 description=(
985 "The queue's priority. Lower values are higher priority (1 is the highest)."
986 ),
987 )
988 # Will be required after a future migration
989 work_pool_id: Optional[UUID] = Field( 1a
990 default=None, description="The work pool with which the queue is associated."
991 )
992 filter: Optional[QueueFilter] = Field( 1a
993 default=None,
994 description="DEPRECATED: Filter criteria for the work queue.",
995 deprecated=True,
996 )
997 last_polled: Optional[DateTime] = Field( 1a
998 default=None, description="The last time an agent polled this queue for work."
999 )
1002class WorkQueueHealthPolicy(PrefectBaseModel): 1a
1003 maximum_late_runs: Optional[int] = Field( 1a
1004 default=0,
1005 description=(
1006 "The maximum number of late runs in the work queue before it is deemed"
1007 " unhealthy. Defaults to `0`."
1008 ),
1009 )
1010 maximum_seconds_since_last_polled: Optional[int] = Field( 1a
1011 default=60,
1012 description=(
1013 "The maximum number of time in seconds elapsed since work queue has been"
1014 " polled before it is deemed unhealthy. Defaults to `60`."
1015 ),
1016 )
1018 def evaluate_health_status( 1a
1019 self, late_runs_count: int, last_polled: Optional[DateTime] = None
1020 ) -> bool:
1021 """
1022 Given empirical information about the state of the work queue, evaluate its health status.
1024 Args:
1025 late_runs: the count of late runs for the work queue.
1026 last_polled: the last time the work queue was polled, if available.
1028 Returns:
1029 bool: whether or not the work queue is healthy.
1030 """
1031 healthy = True
1032 if (
1033 self.maximum_late_runs is not None
1034 and late_runs_count > self.maximum_late_runs
1035 ):
1036 healthy = False
1038 if self.maximum_seconds_since_last_polled is not None:
1039 if (
1040 last_polled is None
1041 or (now("UTC") - last_polled).total_seconds()
1042 > self.maximum_seconds_since_last_polled
1043 ):
1044 healthy = False
1046 return healthy
1049class WorkQueueStatusDetail(PrefectBaseModel): 1a
1050 healthy: bool = Field(..., description="Whether or not the work queue is healthy.") 1a
1051 late_runs_count: int = Field( 1a
1052 default=0, description="The number of late flow runs in the work queue."
1053 )
1054 last_polled: Optional[DateTime] = Field( 1a
1055 default=None, description="The last time an agent polled this queue for work."
1056 )
1057 health_check_policy: WorkQueueHealthPolicy = Field( 1a
1058 ...,
1059 description=(
1060 "The policy used to determine whether or not the work queue is healthy."
1061 ),
1062 )
1065class Agent(ORMBaseModel): 1a
1066 """An ORM representation of an agent"""
1068 name: str = Field( 1a
1069 default_factory=lambda: generate_slug(2),
1070 description=(
1071 "The name of the agent. If a name is not provided, it will be"
1072 " auto-generated."
1073 ),
1074 )
1075 work_queue_id: UUID = Field( 1a
1076 default=..., description="The work queue with which the agent is associated."
1077 )
1078 last_activity_time: Optional[DateTime] = Field( 1a
1079 default=None, description="The last time this agent polled for work."
1080 )
1083class WorkPoolStorageConfiguration(PrefectBaseModel): 1a
1084 """A representation of a work pool's storage configuration"""
1086 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
1088 bundle_upload_step: Optional[dict[str, Any]] = Field( 1a
1089 default=None,
1090 description="The step to use for uploading bundles to storage.",
1091 )
1092 bundle_execution_step: Optional[dict[str, Any]] = Field( 1a
1093 default=None,
1094 description="The step to use for executing bundles.",
1095 )
1096 default_result_storage_block_id: Optional[UUID] = Field( 1a
1097 default=None,
1098 description="The block document ID of the default result storage block.",
1099 )
1102class WorkPool(ORMBaseModel): 1a
1103 """An ORM representation of a work pool"""
1105 name: NonEmptyishName = Field( 1a
1106 description="The name of the work pool.",
1107 )
1108 description: Optional[str] = Field( 1a
1109 default=None, description="A description of the work pool."
1110 )
1111 type: str = Field(description="The work pool type.") 1a
1112 base_job_template: Dict[str, Any] = Field( 1a
1113 default_factory=dict, description="The work pool's base job template."
1114 )
1115 is_paused: bool = Field( 1a
1116 default=False,
1117 description="Pausing the work pool stops the delivery of all work.",
1118 )
1119 concurrency_limit: Optional[NonNegativeInteger] = Field( 1a
1120 default=None, description="A concurrency limit for the work pool."
1121 )
1122 status: Optional[WorkPoolStatus] = Field( 1a
1123 default=None, description="The current status of the work pool."
1124 )
1126 # this required field has a default of None so that the custom validator
1127 # below will be called and produce a more helpful error message
1128 default_queue_id: Optional[UUID] = Field( 1a
1129 default=None, description="The id of the pool's default queue."
1130 )
1132 storage_configuration: WorkPoolStorageConfiguration = Field( 1a
1133 default_factory=WorkPoolStorageConfiguration,
1134 description="The storage configuration for the work pool.",
1135 )
1137 @field_validator("default_queue_id") 1a
1138 def helpful_error_for_missing_default_queue_id(cls, v: UUID | None) -> UUID: 1a
1139 return validate_default_queue_id_not_none(v) 1b
1141 @classmethod 1a
1142 def model_validate( 1a
1143 cls: Type[Self],
1144 obj: Any,
1145 *,
1146 strict: Optional[bool] = None,
1147 from_attributes: Optional[bool] = None,
1148 context: Optional[dict[str, Any]] = None,
1149 ) -> Self:
1150 parsed: WorkPool = super().model_validate( 1b
1151 obj, strict=strict, from_attributes=from_attributes, context=context
1152 )
1153 if from_attributes: 1153 ↛ 1156line 1153 didn't jump to line 1156 because the condition on line 1153 was always true1b
1154 if obj.type == "prefect-agent": 1154 ↛ 1156line 1154 didn't jump to line 1156 because the condition on line 1154 was always true1b
1155 parsed.status = None 1b
1156 return parsed 1b
1159class Worker(ORMBaseModel): 1a
1160 """An ORM representation of a worker"""
1162 name: str = Field(description="The name of the worker.") 1a
1163 work_pool_id: UUID = Field( 1a
1164 description="The work pool with which the queue is associated."
1165 )
1166 last_heartbeat_time: Optional[datetime.datetime] = Field( 1a
1167 None, description="The last time the worker process sent a heartbeat."
1168 )
1169 heartbeat_interval_seconds: Optional[int] = Field( 1a
1170 default=None,
1171 description=(
1172 "The number of seconds to expect between heartbeats sent by the worker."
1173 ),
1174 )
1177Flow.model_rebuild() 1a
1178FlowRun.model_rebuild() 1a
1181class Artifact(ORMBaseModel): 1a
1182 key: Optional[str] = Field( 1a
1183 default=None, description="An optional unique reference key for this artifact."
1184 )
1185 type: Optional[str] = Field( 1a
1186 default=None,
1187 description=(
1188 "An identifier that describes the shape of the data field. e.g. 'result',"
1189 " 'table', 'markdown'"
1190 ),
1191 )
1192 description: Optional[str] = Field( 1a
1193 default=None, description="A markdown-enabled description of the artifact."
1194 )
1195 # data will eventually be typed as `Optional[Union[Result, Any]]`
1196 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a
1197 default=None,
1198 description=(
1199 "Data associated with the artifact, e.g. a result.; structure depends on"
1200 " the artifact type."
1201 ),
1202 )
1203 metadata_: Optional[dict[str, str]] = Field( 1a
1204 default=None,
1205 description=(
1206 "User-defined artifact metadata. Content must be string key and value"
1207 " pairs."
1208 ),
1209 )
1210 flow_run_id: Optional[UUID] = Field( 1a
1211 default=None, description="The flow run associated with the artifact."
1212 )
1213 task_run_id: Optional[UUID] = Field( 1a
1214 default=None, description="The task run associated with the artifact."
1215 )
1217 @classmethod 1a
1218 def from_result(cls, data: Any | dict[str, Any]) -> "Artifact": 1a
1219 artifact_info: dict[str, Any] = dict() 1bc
1220 if isinstance(data, dict): 1220 ↛ 1221line 1220 didn't jump to line 1221 because the condition on line 1220 was never true1bc
1221 artifact_key = data.pop("artifact_key", None)
1222 if artifact_key:
1223 artifact_info["key"] = artifact_key
1225 artifact_type = data.pop("artifact_type", None)
1226 if artifact_type:
1227 artifact_info["type"] = artifact_type
1229 description = data.pop("artifact_description", None)
1230 if description:
1231 artifact_info["description"] = description
1233 return cls(data=data, **artifact_info) 1bc
1235 @field_validator("metadata_") 1a
1236 @classmethod 1a
1237 def validate_metadata_length(cls, v: dict[str, str]) -> dict[str, str]: 1a
1238 return validate_max_metadata_length(v) 1bdc
1241class ArtifactCollection(ORMBaseModel): 1a
1242 key: str = Field(description="An optional unique reference key for this artifact.") 1a
1243 latest_id: UUID = Field( 1a
1244 description="The latest artifact ID associated with the key."
1245 )
1246 type: Optional[str] = Field( 1a
1247 default=None,
1248 description=(
1249 "An identifier that describes the shape of the data field. e.g. 'result',"
1250 " 'table', 'markdown'"
1251 ),
1252 )
1253 description: Optional[str] = Field( 1a
1254 default=None, description="A markdown-enabled description of the artifact."
1255 )
1256 data: Optional[Union[Dict[str, Any], Any]] = Field( 1a
1257 default=None,
1258 description=(
1259 "Data associated with the artifact, e.g. a result.; structure depends on"
1260 " the artifact type."
1261 ),
1262 )
1263 metadata_: Optional[Dict[str, str]] = Field( 1a
1264 default=None,
1265 description=(
1266 "User-defined artifact metadata. Content must be string key and value"
1267 " pairs."
1268 ),
1269 )
1270 flow_run_id: Optional[UUID] = Field( 1a
1271 default=None, description="The flow run associated with the artifact."
1272 )
1273 task_run_id: Optional[UUID] = Field( 1a
1274 default=None, description="The task run associated with the artifact."
1275 )
1278class Variable(ORMBaseModel): 1a
1279 name: str = Field( 1a
1280 default=...,
1281 description="The name of the variable",
1282 examples=["my-variable"],
1283 max_length=MAX_VARIABLE_NAME_LENGTH,
1284 )
1285 value: StrictVariableValue = Field( 1a
1286 default=...,
1287 description="The value of the variable",
1288 examples=["my-value"],
1289 )
1290 tags: List[str] = Field( 1a
1291 default_factory=list,
1292 description="A list of variable tags",
1293 examples=[["tag-1", "tag-2"]],
1294 )
1297class FlowRunInput(ORMBaseModel): 1a
1298 flow_run_id: UUID = Field(description="The flow run ID associated with the input.") 1a
1299 key: Annotated[str, AfterValidator(raise_on_name_alphanumeric_dashes_only)] = Field( 1a
1300 description="The key of the input."
1301 )
1302 value: str = Field(description="The value of the input.") 1a
1303 sender: Optional[str] = Field(default=None, description="The sender of the input.") 1a
1306class CsrfToken(ORMBaseModel): 1a
1307 token: str = Field( 1a
1308 default=...,
1309 description="The CSRF token",
1310 )
1311 client: str = Field( 1a
1312 default=..., description="The client id associated with the CSRF token"
1313 )
1314 expiration: DateTime = Field( 1a
1315 default=..., description="The expiration time of the CSRF token"
1316 )