Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/responses.py: 84%
223 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Schemas for special responses from the Prefect REST API.
3"""
5import datetime 1a
6from typing import Any, ClassVar, Dict, List, Optional, Type, Union 1a
7from uuid import UUID 1a
9from pydantic import BaseModel, ConfigDict, Field, model_validator 1a
10from typing_extensions import Literal, Self 1a
12import prefect.server.schemas as schemas 1a
13from prefect.server.schemas.core import ( 1a
14 CreatedBy,
15 FlowRunPolicy,
16 UpdatedBy,
17 WorkQueueStatusDetail,
18)
19from prefect.server.utilities.schemas.bases import ORMBaseModel, PrefectBaseModel 1a
20from prefect.types import DateTime, KeyValueLabelsField 1a
21from prefect.types._datetime import create_datetime_instance 1a
22from prefect.utilities.collections import AutoEnum 1a
23from prefect.utilities.names import generate_slug 1a
26class SetStateStatus(AutoEnum): 1a
27 """Enumerates return statuses for setting run states."""
29 ACCEPT = AutoEnum.auto() 1a
30 REJECT = AutoEnum.auto() 1a
31 ABORT = AutoEnum.auto() 1a
32 WAIT = AutoEnum.auto() 1a
35class StateAcceptDetails(PrefectBaseModel): 1a
36 """Details associated with an ACCEPT state transition."""
38 type: Literal["accept_details"] = Field( 1a
39 default="accept_details",
40 description=(
41 "The type of state transition detail. Used to ensure pydantic does not"
42 " coerce into a different type."
43 ),
44 )
47class StateRejectDetails(PrefectBaseModel): 1a
48 """Details associated with a REJECT state transition."""
50 type: Literal["reject_details"] = Field( 1a
51 default="reject_details",
52 description=(
53 "The type of state transition detail. Used to ensure pydantic does not"
54 " coerce into a different type."
55 ),
56 )
57 reason: Optional[str] = Field( 1a
58 default=None, description="The reason why the state transition was rejected."
59 )
62class StateAbortDetails(PrefectBaseModel): 1a
63 """Details associated with an ABORT state transition."""
65 type: Literal["abort_details"] = Field( 1a
66 default="abort_details",
67 description=(
68 "The type of state transition detail. Used to ensure pydantic does not"
69 " coerce into a different type."
70 ),
71 )
72 reason: Optional[str] = Field( 1a
73 default=None, description="The reason why the state transition was aborted."
74 )
77class StateWaitDetails(PrefectBaseModel): 1a
78 """Details associated with a WAIT state transition."""
80 type: Literal["wait_details"] = Field( 1a
81 default="wait_details",
82 description=(
83 "The type of state transition detail. Used to ensure pydantic does not"
84 " coerce into a different type."
85 ),
86 )
87 delay_seconds: int = Field( 1a
88 default=...,
89 description=(
90 "The length of time in seconds the client should wait before transitioning"
91 " states."
92 ),
93 )
94 reason: Optional[str] = Field( 1a
95 default=None, description="The reason why the state transition should wait."
96 )
99class HistoryResponseState(PrefectBaseModel): 1a
100 """Represents a single state's history over an interval."""
102 state_type: schemas.states.StateType = Field( 1a
103 default=..., description="The state type."
104 )
105 state_name: str = Field(default=..., description="The state name.") 1a
106 count_runs: int = Field( 1a
107 default=...,
108 description="The number of runs in the specified state during the interval.",
109 )
110 sum_estimated_run_time: datetime.timedelta = Field( 1a
111 default=...,
112 description="The total estimated run time of all runs during the interval.",
113 )
114 sum_estimated_lateness: datetime.timedelta = Field( 1a
115 default=...,
116 description=(
117 "The sum of differences between actual and expected start time during the"
118 " interval."
119 ),
120 )
123class HistoryResponse(PrefectBaseModel): 1a
124 """Represents a history of aggregation states over an interval"""
126 interval_start: DateTime = Field( 1a
127 default=..., description="The start date of the interval."
128 )
129 interval_end: DateTime = Field( 1a
130 default=..., description="The end date of the interval."
131 )
132 states: List[HistoryResponseState] = Field( 1a
133 default=..., description="A list of state histories during the interval."
134 )
136 @model_validator(mode="before") 1a
137 @classmethod 1a
138 def validate_timestamps( 1a
139 cls, values: dict
140 ) -> dict: # TODO: remove this, handle with ORM
141 d = {"interval_start": None, "interval_end": None}
142 for field in d.keys():
143 val = values.get(field)
144 if isinstance(val, datetime.datetime):
145 d[field] = create_datetime_instance(values[field])
146 else:
147 d[field] = val
149 return {**values, **d}
152StateResponseDetails = Union[ 1a
153 StateAcceptDetails, StateWaitDetails, StateRejectDetails, StateAbortDetails
154]
157class OrchestrationResult(PrefectBaseModel): 1a
158 """
159 A container for the output of state orchestration.
160 """
162 state: Optional[schemas.states.State] 1a
163 status: SetStateStatus 1a
164 details: StateResponseDetails 1a
167class WorkerFlowRunResponse(PrefectBaseModel): 1a
168 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a
170 work_pool_id: UUID 1a
171 work_queue_id: UUID 1a
172 flow_run: schemas.core.FlowRun 1a
175class FlowRunResponse(ORMBaseModel): 1a
176 name: str = Field( 1a
177 default_factory=lambda: generate_slug(2),
178 description=(
179 "The name of the flow run. Defaults to a random slug if not specified."
180 ),
181 examples=["my-flow-run"],
182 )
183 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a
184 state_id: Optional[UUID] = Field( 1a
185 default=None, description="The id of the flow run's current state."
186 )
187 deployment_id: Optional[UUID] = Field( 1a
188 default=None,
189 description=(
190 "The id of the deployment associated with this flow run, if available."
191 ),
192 )
193 deployment_version: Optional[str] = Field( 1a
194 default=None,
195 description="The version of the deployment associated with this flow run.",
196 examples=["1.0"],
197 )
198 work_queue_id: Optional[UUID] = Field( 1a
199 default=None, description="The id of the run's work pool queue."
200 )
201 work_queue_name: Optional[str] = Field( 1a
202 default=None, description="The work queue that handled this flow run."
203 )
204 flow_version: Optional[str] = Field( 1a
205 default=None,
206 description="The version of the flow executed in this flow run.",
207 examples=["1.0"],
208 )
209 parameters: Dict[str, Any] = Field( 1a
210 default_factory=dict, description="Parameters for the flow run."
211 )
212 idempotency_key: Optional[str] = Field( 1a
213 default=None,
214 description=(
215 "An optional idempotency key for the flow run. Used to ensure the same flow"
216 " run is not created multiple times."
217 ),
218 )
219 context: Dict[str, Any] = Field( 1a
220 default_factory=dict,
221 description="Additional context for the flow run.",
222 examples=[{"my_var": "my_val"}],
223 )
224 empirical_policy: FlowRunPolicy = Field( 1a
225 default_factory=FlowRunPolicy,
226 )
227 tags: List[str] = Field( 1a
228 default_factory=list,
229 description="A list of tags on the flow run",
230 examples=[["tag-1", "tag-2"]],
231 )
232 labels: KeyValueLabelsField 1a
233 parent_task_run_id: Optional[UUID] = Field( 1a
234 default=None,
235 description=(
236 "If the flow run is a subflow, the id of the 'dummy' task in the parent"
237 " flow used to track subflow state."
238 ),
239 )
240 state_type: Optional[schemas.states.StateType] = Field( 1a
241 default=None, description="The type of the current flow run state."
242 )
243 state_name: Optional[str] = Field( 1a
244 default=None, description="The name of the current flow run state."
245 )
246 run_count: int = Field( 1a
247 default=0, description="The number of times the flow run was executed."
248 )
249 expected_start_time: Optional[DateTime] = Field( 1a
250 default=None,
251 description="The flow run's expected start time.",
252 )
253 next_scheduled_start_time: Optional[DateTime] = Field( 1a
254 default=None,
255 description="The next time the flow run is scheduled to start.",
256 )
257 start_time: Optional[DateTime] = Field( 1a
258 default=None, description="The actual start time."
259 )
260 end_time: Optional[DateTime] = Field( 1a
261 default=None, description="The actual end time."
262 )
263 total_run_time: datetime.timedelta = Field( 1a
264 default=datetime.timedelta(0),
265 description=(
266 "Total run time. If the flow run was executed multiple times, the time of"
267 " each run will be summed."
268 ),
269 )
270 estimated_run_time: datetime.timedelta = Field( 1a
271 default=datetime.timedelta(0),
272 description="A real-time estimate of the total run time.",
273 )
274 estimated_start_time_delta: datetime.timedelta = Field( 1a
275 default=datetime.timedelta(0),
276 description="The difference between actual and expected start time.",
277 )
278 auto_scheduled: bool = Field( 1a
279 default=False,
280 description="Whether or not the flow run was automatically scheduled.",
281 )
282 infrastructure_document_id: Optional[UUID] = Field( 1a
283 default=None,
284 description="The block document defining infrastructure to use this flow run.",
285 )
286 infrastructure_pid: Optional[str] = Field( 1a
287 default=None,
288 description="The id of the flow run as returned by an infrastructure block.",
289 )
290 created_by: Optional[CreatedBy] = Field( 1a
291 default=None,
292 description="Optional information about the creator of this flow run.",
293 )
294 work_pool_id: Optional[UUID] = Field( 1a
295 default=None,
296 description="The id of the flow run's work pool.",
297 )
298 work_pool_name: Optional[str] = Field( 1a
299 default=None,
300 description="The name of the flow run's work pool.",
301 examples=["my-work-pool"],
302 )
303 state: Optional[schemas.states.State] = Field( 1a
304 default=None, description="The current state of the flow run."
305 )
306 job_variables: Optional[Dict[str, Any]] = Field( 1a
307 default=None,
308 description="Variables used as overrides in the base job template",
309 )
311 @classmethod 1a
312 def model_validate( 1a
313 cls: Type[Self],
314 obj: Any,
315 *,
316 strict: Optional[bool] = None,
317 from_attributes: Optional[bool] = None,
318 context: Optional[dict[str, Any]] = None,
319 ) -> Self:
320 response = super().model_validate(obj) 1bc
322 if from_attributes: 322 ↛ 330line 322 didn't jump to line 330 because the condition on line 322 was always true1bc
323 if obj.work_queue: 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true1bc
324 response.work_queue_id = obj.work_queue.id
325 response.work_queue_name = obj.work_queue.name
326 if obj.work_queue.work_pool:
327 response.work_pool_id = obj.work_queue.work_pool.id
328 response.work_pool_name = obj.work_queue.work_pool.name
330 return response 1bc
332 def __eq__(self, other: Any) -> bool: 1a
333 """
334 Check for "equality" to another flow run schema
336 Estimates times are rolling and will always change with repeated queries for
337 a flow run so we ignore them during equality checks.
338 """
339 if isinstance(other, FlowRunResponse):
340 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
341 return self.model_dump(exclude=exclude_fields) == other.model_dump(
342 exclude=exclude_fields
343 )
344 return super().__eq__(other)
347class TaskRunResponse(ORMBaseModel): 1a
348 name: str = Field( 1a
349 default_factory=lambda: generate_slug(2),
350 description=(
351 "The name of the task run. Defaults to a random slug if not specified."
352 ),
353 examples=["my-task-run"],
354 )
355 flow_run_id: Optional[UUID] = Field( 1a
356 default=None, description="The id of the flow run this task run belongs to."
357 )
358 task_key: str = Field( 1a
359 default=..., description="The key of the task this run represents."
360 )
361 state_id: Optional[UUID] = Field( 1a
362 default=None, description="The id of the task run's current state."
363 )
364 state: Optional[schemas.states.State] = Field( 1a
365 default=None, description="The current state of the task run."
366 )
367 task_version: Optional[str] = Field( 1a
368 default=None,
369 description="The version of the task executed in this task run.",
370 examples=["1.0"],
371 )
372 parameters: dict[str, Any] = Field( 1a
373 default_factory=dict, description="Parameters for the task run."
374 )
375 task_inputs: dict[ 1a
376 str,
377 list[
378 Union[
379 schemas.core.TaskRunResult,
380 schemas.core.FlowRunResult,
381 schemas.core.Parameter,
382 schemas.core.Constant,
383 ]
384 ],
385 ] = Field(default_factory=dict, description="Inputs provided to the task run.")
386 context: dict[str, Any] = Field( 1a
387 default_factory=dict,
388 description="Additional context for the task run.",
389 examples=[{"my_var": "my_val"}],
390 )
391 empirical_policy: schemas.core.TaskRunPolicy = Field( 1a
392 default_factory=schemas.core.TaskRunPolicy,
393 description="The task run's empirical retry policy.",
394 )
395 tags: list[str] = Field( 1a
396 default_factory=list,
397 description="A list of tags for the task run.",
398 examples=[["tag-1", "tag-2"]],
399 )
402class DeploymentResponse(ORMBaseModel): 1a
403 name: str = Field(default=..., description="The name of the deployment.") 1a
404 version: Optional[str] = Field( 1a
405 default=None, description="An optional version for the deployment."
406 )
407 description: Optional[str] = Field( 1a
408 default=None, description="A description for the deployment."
409 )
410 flow_id: UUID = Field( 1a
411 default=..., description="The flow id associated with the deployment."
412 )
413 paused: bool = Field( 1a
414 default=False, description="Whether or not the deployment is paused."
415 )
416 schedules: List[schemas.core.DeploymentSchedule] = Field( 1a
417 default_factory=list, description="A list of schedules for the deployment."
418 )
419 concurrency_limit: Optional[int] = Field( 1a
420 default=None,
421 description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.",
422 deprecated=True,
423 )
424 global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( 1a
425 default=None,
426 description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.",
427 )
428 concurrency_options: Optional[schemas.core.ConcurrencyOptions] = Field( 1a
429 default=None,
430 description="The concurrency options for the deployment.",
431 )
432 job_variables: Dict[str, Any] = Field( 1a
433 default_factory=dict,
434 description="Overrides to apply to the base infrastructure block at runtime.",
435 )
436 parameters: Dict[str, Any] = Field( 1a
437 default_factory=dict,
438 description="Parameters for flow runs scheduled by the deployment.",
439 )
440 tags: List[str] = Field( 1a
441 default_factory=list,
442 description="A list of tags for the deployment",
443 examples=[["tag-1", "tag-2"]],
444 )
445 labels: KeyValueLabelsField 1a
446 work_queue_name: Optional[str] = Field( 1a
447 default=None,
448 description=(
449 "The work queue for the deployment. If no work queue is set, work will not"
450 " be scheduled."
451 ),
452 )
453 work_queue_id: Optional[UUID] = Field( 1a
454 default=None,
455 description="The id of the work pool queue to which this deployment is assigned.",
456 )
457 last_polled: Optional[DateTime] = Field( 1a
458 default=None,
459 description="The last time the deployment was polled for status updates.",
460 )
461 parameter_openapi_schema: Optional[Dict[str, Any]] = Field( 1a
462 default=None,
463 description="The parameter schema of the flow, including defaults.",
464 json_schema_extra={"additionalProperties": True},
465 )
466 path: Optional[str] = Field( 1a
467 default=None,
468 description=(
469 "The path to the working directory for the workflow, relative to remote"
470 " storage or an absolute path."
471 ),
472 )
473 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a
474 default=None, description="Pull steps for cloning and running this deployment."
475 )
476 entrypoint: Optional[str] = Field( 1a
477 default=None,
478 description=(
479 "The path to the entrypoint for the workflow, relative to the `path`."
480 ),
481 )
482 storage_document_id: Optional[UUID] = Field( 1a
483 default=None,
484 description="The block document defining storage used for this flow.",
485 )
486 infrastructure_document_id: Optional[UUID] = Field( 1a
487 default=None,
488 description="The block document defining infrastructure to use for flow runs.",
489 )
490 created_by: Optional[CreatedBy] = Field( 1a
491 default=None,
492 description="Optional information about the creator of this deployment.",
493 )
494 updated_by: Optional[UpdatedBy] = Field( 1a
495 default=None,
496 description="Optional information about the updater of this deployment.",
497 )
498 work_pool_name: Optional[str] = Field( 1a
499 default=None, description="The name of the deployment's work pool."
500 )
501 status: Optional[schemas.statuses.DeploymentStatus] = Field( 1a
502 default=schemas.statuses.DeploymentStatus.NOT_READY,
503 description="Whether the deployment is ready to run flows.",
504 )
505 enforce_parameter_schema: bool = Field( 1a
506 default=True,
507 description=(
508 "Whether or not the deployment should enforce the parameter schema."
509 ),
510 )
512 @classmethod 1a
513 def model_validate( 1a
514 cls: Type[Self],
515 obj: Any,
516 *,
517 strict: Optional[bool] = None,
518 from_attributes: Optional[bool] = None,
519 context: Optional[dict[str, Any]] = None,
520 ) -> Self:
521 response = super().model_validate( 1bdc
522 obj, strict=strict, from_attributes=from_attributes, context=context
523 )
525 if from_attributes: 525 ↛ 532line 525 didn't jump to line 532 because the condition on line 525 was always true1bdc
526 if obj.work_queue: 1bdc
527 response.work_queue_id = obj.work_queue.id
528 response.work_queue_name = obj.work_queue.name
529 if obj.work_queue.work_pool: 529 ↛ 532line 529 didn't jump to line 532 because the condition on line 529 was always true
530 response.work_pool_name = obj.work_queue.work_pool.name
532 return response 1bdc
535class WorkQueueResponse(schemas.core.WorkQueue): 1a
536 work_pool_name: Optional[str] = Field( 1a
537 default=None,
538 description="The name of the work pool the work pool resides within.",
539 )
540 status: Optional[schemas.statuses.WorkQueueStatus] = Field( 1a
541 default=None, description="The queue status."
542 )
544 @classmethod 1a
545 def model_validate( 1a
546 cls: Type[Self],
547 obj: Any,
548 *,
549 strict: Optional[bool] = None,
550 from_attributes: Optional[bool] = None,
551 context: Optional[dict[str, Any]] = None,
552 ) -> Self:
553 response = super().model_validate( 1b
554 obj, strict=strict, from_attributes=from_attributes, context=context
555 )
557 if from_attributes: 557 ↛ 561line 557 didn't jump to line 561 because the condition on line 557 was always true1b
558 if obj.work_pool: 558 ↛ 561line 558 didn't jump to line 561 because the condition on line 558 was always true1b
559 response.work_pool_name = obj.work_pool.name 1b
561 return response 1b
564class WorkQueueWithStatus(WorkQueueResponse, WorkQueueStatusDetail): 1a
565 """Combines a work queue and its status details into a single object"""
568DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 30 1a
569INACTIVITY_HEARTBEAT_MULTIPLE = 3 1a
572class WorkerResponse(schemas.core.Worker): 1a
573 status: schemas.statuses.WorkerStatus = Field( 1a
574 schemas.statuses.WorkerStatus.OFFLINE,
575 description="Current status of the worker.",
576 )
578 @classmethod 1a
579 def model_validate( 1a
580 cls: Type[Self],
581 obj: Any,
582 *,
583 strict: Optional[bool] = None,
584 from_attributes: Optional[bool] = None,
585 context: Optional[dict[str, Any]] = None,
586 ) -> Self:
587 worker = super().model_validate(
588 obj, strict=strict, from_attributes=from_attributes, context=context
589 )
591 if from_attributes:
592 offline_horizon = datetime.datetime.now(
593 tz=datetime.timezone.utc
594 ) - datetime.timedelta(
595 seconds=(
596 worker.heartbeat_interval_seconds
597 or DEFAULT_HEARTBEAT_INTERVAL_SECONDS
598 )
599 * INACTIVITY_HEARTBEAT_MULTIPLE
600 )
601 if worker.last_heartbeat_time > offline_horizon:
602 worker.status = schemas.statuses.WorkerStatus.ONLINE
603 else:
604 worker.status = schemas.statuses.WorkerStatus.OFFLINE
606 return worker
609class GlobalConcurrencyLimitResponse(ORMBaseModel): 1a
610 """
611 A response object for global concurrency limits.
612 """
614 active: bool = Field( 1a
615 default=True, description="Whether the global concurrency limit is active."
616 )
617 name: str = Field( 1a
618 default=..., description="The name of the global concurrency limit."
619 )
620 limit: int = Field(default=..., description="The concurrency limit.") 1a
621 active_slots: int = Field(default=..., description="The number of active slots.") 1a
622 slot_decay_per_second: float = Field( 1a
623 default=2.0,
624 description="The decay rate for active slots when used as a rate limit.",
625 )
628class FlowPaginationResponse(BaseModel): 1a
629 results: list[schemas.core.Flow] 1a
630 count: int 1a
631 limit: int 1a
632 pages: int 1a
633 page: int 1a
636class FlowRunPaginationResponse(BaseModel): 1a
637 results: list[FlowRunResponse] 1a
638 count: int 1a
639 limit: int 1a
640 pages: int 1a
641 page: int 1a
644class TaskRunPaginationResponse(BaseModel): 1a
645 results: list[TaskRunResponse] 1a
646 count: int 1a
647 limit: int 1a
648 pages: int 1a
649 page: int 1a
652class DeploymentPaginationResponse(BaseModel): 1a
653 results: list[DeploymentResponse] 1a
654 count: int 1a
655 limit: int 1a
656 pages: int 1a
657 page: int 1a
660class SchemaValuePropertyError(BaseModel): 1a
661 property: str 1a
662 errors: List["SchemaValueError"] 1a
665class SchemaValueIndexError(BaseModel): 1a
666 index: int 1a
667 errors: List["SchemaValueError"] 1a
670SchemaValueError = Union[str, SchemaValuePropertyError, SchemaValueIndexError] 1a
673class SchemaValuesValidationResponse(BaseModel): 1a
674 errors: List[SchemaValueError] 1a
675 valid: bool 1a