Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/responses.py: 82%
153 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import datetime 1a
2from typing import TYPE_CHECKING, Any, ClassVar, Generic, Optional, TypeVar, Union 1a
3from uuid import UUID 1a
5from pydantic import ConfigDict, Field 1a
6from typing_extensions import Literal 1a
8import prefect.client.schemas.objects as objects 1a
9from prefect._internal.schemas.bases import ObjectBaseModel, PrefectBaseModel 1a
10from prefect._internal.schemas.fields import CreatedBy, UpdatedBy 1a
11from prefect.types import DateTime, KeyValueLabelsField 1a
12from prefect.utilities.collections import AutoEnum 1a
13from prefect.utilities.names import generate_slug 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 from prefect.events.schemas.events import RelatedResource
18T = TypeVar("T") 1a
21class SetStateStatus(AutoEnum): 1a
22 """Enumerates return statuses for setting run states."""
24 ACCEPT = AutoEnum.auto() 1a
25 REJECT = AutoEnum.auto() 1a
26 ABORT = AutoEnum.auto() 1a
27 WAIT = AutoEnum.auto() 1a
30class StateAcceptDetails(PrefectBaseModel): 1a
31 """Details associated with an ACCEPT state transition."""
33 type: Literal["accept_details"] = Field( 1a
34 default="accept_details",
35 description=(
36 "The type of state transition detail. Used to ensure pydantic does not"
37 " coerce into a different type."
38 ),
39 )
42class StateRejectDetails(PrefectBaseModel): 1a
43 """Details associated with a REJECT state transition."""
45 type: Literal["reject_details"] = Field( 1a
46 default="reject_details",
47 description=(
48 "The type of state transition detail. Used to ensure pydantic does not"
49 " coerce into a different type."
50 ),
51 )
52 reason: Optional[str] = Field( 1a
53 default=None, description="The reason why the state transition was rejected."
54 )
57class StateAbortDetails(PrefectBaseModel): 1a
58 """Details associated with an ABORT state transition."""
60 type: Literal["abort_details"] = Field( 1a
61 default="abort_details",
62 description=(
63 "The type of state transition detail. Used to ensure pydantic does not"
64 " coerce into a different type."
65 ),
66 )
67 reason: Optional[str] = Field( 1a
68 default=None, description="The reason why the state transition was aborted."
69 )
72class StateWaitDetails(PrefectBaseModel): 1a
73 """Details associated with a WAIT state transition."""
75 type: Literal["wait_details"] = Field( 1a
76 default="wait_details",
77 description=(
78 "The type of state transition detail. Used to ensure pydantic does not"
79 " coerce into a different type."
80 ),
81 )
82 delay_seconds: int = Field( 1a
83 default=...,
84 description=(
85 "The length of time in seconds the client should wait before transitioning"
86 " states."
87 ),
88 )
89 reason: Optional[str] = Field( 1a
90 default=None, description="The reason why the state transition should wait."
91 )
94class HistoryResponseState(PrefectBaseModel): 1a
95 """Represents a single state's history over an interval."""
97 state_type: objects.StateType = Field(default=..., description="The state type.") 1a
98 state_name: str = Field(default=..., description="The state name.") 1a
99 count_runs: int = Field( 1a
100 default=...,
101 description="The number of runs in the specified state during the interval.",
102 )
103 sum_estimated_run_time: datetime.timedelta = Field( 1a
104 default=...,
105 description="The total estimated run time of all runs during the interval.",
106 )
107 sum_estimated_lateness: datetime.timedelta = Field( 1a
108 default=...,
109 description=(
110 "The sum of differences between actual and expected start time during the"
111 " interval."
112 ),
113 )
116class HistoryResponse(PrefectBaseModel): 1a
117 """Represents a history of aggregation states over an interval"""
119 interval_start: DateTime = Field( 1a
120 default=..., description="The start date of the interval."
121 )
122 interval_end: DateTime = Field( 1a
123 default=..., description="The end date of the interval."
124 )
125 states: list[HistoryResponseState] = Field( 1a
126 default=..., description="A list of state histories during the interval."
127 )
130StateResponseDetails = Union[ 1a
131 StateAcceptDetails, StateWaitDetails, StateRejectDetails, StateAbortDetails
132]
135class OrchestrationResult(PrefectBaseModel, Generic[T]): 1a
136 """
137 A container for the output of state orchestration.
138 """
140 state: Optional[objects.State[T]] 1a
141 status: SetStateStatus 1a
142 details: StateResponseDetails 1a
145class WorkerFlowRunResponse(PrefectBaseModel): 1a
146 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a
148 work_pool_id: UUID 1a
149 work_queue_id: UUID 1a
150 flow_run: objects.FlowRun 1a
153class FlowRunResponse(ObjectBaseModel): 1a
154 name: str = Field( 1a
155 default_factory=lambda: generate_slug(2),
156 description=(
157 "The name of the flow run. Defaults to a random slug if not specified."
158 ),
159 examples=["my-flow-run"],
160 )
161 flow_id: UUID = Field(default=..., description="The id of the flow being run.") 1a
162 state_id: Optional[UUID] = Field( 1a
163 default=None, description="The id of the flow run's current state."
164 )
165 deployment_id: Optional[UUID] = Field( 1a
166 default=None,
167 description=(
168 "The id of the deployment associated with this flow run, if available."
169 ),
170 )
171 deployment_version: Optional[str] = Field( 1a
172 default=None,
173 description="The version of the deployment associated with this flow run.",
174 examples=["1.0"],
175 )
176 work_queue_name: Optional[str] = Field( 1a
177 default=None, description="The work queue that handled this flow run."
178 )
179 flow_version: Optional[str] = Field( 1a
180 default=None,
181 description="The version of the flow executed in this flow run.",
182 examples=["1.0"],
183 )
184 parameters: dict[str, Any] = Field( 1a
185 default_factory=dict, description="Parameters for the flow run."
186 )
187 idempotency_key: Optional[str] = Field( 1a
188 default=None,
189 description=(
190 "An optional idempotency key for the flow run. Used to ensure the same flow"
191 " run is not created multiple times."
192 ),
193 )
194 context: dict[str, Any] = Field( 1a
195 default_factory=dict,
196 description="Additional context for the flow run.",
197 examples=[{"my_var": "my_val"}],
198 )
199 empirical_policy: objects.FlowRunPolicy = Field( 1a
200 default_factory=objects.FlowRunPolicy,
201 )
202 tags: list[str] = Field( 1a
203 default_factory=list,
204 description="A list of tags on the flow run",
205 examples=[["tag-1", "tag-2"]],
206 )
207 labels: KeyValueLabelsField 1a
208 parent_task_run_id: Optional[UUID] = Field( 1a
209 default=None,
210 description=(
211 "If the flow run is a subflow, the id of the 'dummy' task in the parent"
212 " flow used to track subflow state."
213 ),
214 )
215 run_count: int = Field( 1a
216 default=0, description="The number of times the flow run was executed."
217 )
218 expected_start_time: Optional[DateTime] = Field( 1a
219 default=None,
220 description="The flow run's expected start time.",
221 )
222 next_scheduled_start_time: Optional[DateTime] = Field( 1a
223 default=None,
224 description="The next time the flow run is scheduled to start.",
225 )
226 start_time: Optional[DateTime] = Field( 1a
227 default=None, description="The actual start time."
228 )
229 end_time: Optional[DateTime] = Field( 1a
230 default=None, description="The actual end time."
231 )
232 total_run_time: datetime.timedelta = Field( 1a
233 default=datetime.timedelta(0),
234 description=(
235 "Total run time. If the flow run was executed multiple times, the time of"
236 " each run will be summed."
237 ),
238 )
239 estimated_run_time: datetime.timedelta = Field( 1a
240 default=datetime.timedelta(0),
241 description="A real-time estimate of the total run time.",
242 )
243 estimated_start_time_delta: datetime.timedelta = Field( 1a
244 default=datetime.timedelta(0),
245 description="The difference between actual and expected start time.",
246 )
247 auto_scheduled: bool = Field( 1a
248 default=False,
249 description="Whether or not the flow run was automatically scheduled.",
250 )
251 infrastructure_document_id: Optional[UUID] = Field( 1a
252 default=None,
253 description="The block document defining infrastructure to use this flow run.",
254 )
255 infrastructure_pid: Optional[str] = Field( 1a
256 default=None,
257 description="The id of the flow run as returned by an infrastructure block.",
258 )
259 created_by: Optional[CreatedBy] = Field( 1a
260 default=None,
261 description="Optional information about the creator of this flow run.",
262 )
263 work_queue_id: Optional[UUID] = Field( 1a
264 default=None, description="The id of the run's work pool queue."
265 )
267 work_pool_id: Optional[UUID] = Field( 1a
268 description="The work pool with which the queue is associated."
269 )
270 work_pool_name: Optional[str] = Field( 1a
271 default=None,
272 description="The name of the flow run's work pool.",
273 examples=["my-work-pool"],
274 )
275 state: Optional[objects.State] = Field( 1a
276 default=None,
277 description="The state of the flow run.",
278 examples=["objects.State(type=objects.StateType.COMPLETED)"],
279 )
280 job_variables: Optional[dict[str, Any]] = Field( 1a
281 default=None, description="Job variables for the flow run."
282 )
284 # These are server-side optimizations and should not be present on client models
285 # TODO: Deprecate these fields
287 state_type: Optional[objects.StateType] = Field( 1a
288 default=None, description="The type of the current flow run state."
289 )
290 state_name: Optional[str] = Field( 1a
291 default=None, description="The name of the current flow run state."
292 )
294 def __eq__(self, other: Any) -> bool: 1a
295 """
296 Check for "equality" to another flow run schema
298 Estimates times are rolling and will always change with repeated queries for
299 a flow run so we ignore them during equality checks.
300 """
301 if isinstance(other, objects.FlowRun):
302 exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
303 return self.model_dump(exclude=exclude_fields) == other.model_dump(
304 exclude=exclude_fields
305 )
306 return super().__eq__(other)
309class DeploymentResponse(ObjectBaseModel): 1a
310 name: str = Field(default=..., description="The name of the deployment.") 1a
312 # Versionining
313 version: Optional[str] = Field( 1a
314 default=None, description="An optional version for the deployment."
315 )
316 version_id: Optional[UUID] = Field( 1a
317 default=None, description="The ID of the current version of the deployment."
318 )
319 version_info: Optional[objects.VersionInfo] = Field( 1a
320 default=None, description="A description of this version of the deployment."
321 )
323 # Branching
324 branch: Optional[str] = Field( 1a
325 default=None, description="The branch of the deployment."
326 )
327 base: Optional[UUID] = Field( 1a
328 default=None, description="The base deployment of the deployment."
329 )
330 root: Optional[UUID] = Field( 1a
331 default=None, description="The root deployment of the deployment."
332 )
334 description: Optional[str] = Field( 1a
335 default=None, description="A description for the deployment."
336 )
337 flow_id: UUID = Field( 1a
338 default=..., description="The flow id associated with the deployment."
339 )
340 concurrency_limit: Optional[int] = Field( 1a
341 default=None,
342 description="DEPRECATED: Prefer `global_concurrency_limit`. Will always be None for backwards compatibility. Will be removed after December 2024.",
343 deprecated=True,
344 )
345 global_concurrency_limit: Optional["GlobalConcurrencyLimitResponse"] = Field( 1a
346 default=None,
347 description="The global concurrency limit object for enforcing the maximum number of flow runs that can be active at once.",
348 )
349 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a
350 default=None,
351 description="The concurrency options for the deployment.",
352 )
353 paused: bool = Field( 1a
354 default=False, description="Whether or not the deployment is paused."
355 )
356 concurrency_options: Optional[objects.ConcurrencyOptions] = Field( 1a
357 default=None,
358 description="The concurrency options for the deployment.",
359 )
360 schedules: list[objects.DeploymentSchedule] = Field( 1a
361 default_factory=list, description="A list of schedules for the deployment."
362 )
363 job_variables: dict[str, Any] = Field( 1a
364 default_factory=dict,
365 description="Overrides to apply to flow run infrastructure at runtime.",
366 )
367 parameters: dict[str, Any] = Field( 1a
368 default_factory=dict,
369 description="Parameters for flow runs scheduled by the deployment.",
370 )
371 pull_steps: Optional[list[dict[str, Any]]] = Field( 1a
372 default=None,
373 description="Pull steps for cloning and running this deployment.",
374 )
375 tags: list[str] = Field( 1a
376 default_factory=list,
377 description="A list of tags for the deployment",
378 examples=[["tag-1", "tag-2"]],
379 )
380 labels: KeyValueLabelsField 1a
381 work_queue_name: Optional[str] = Field( 1a
382 default=None,
383 description=(
384 "The work queue for the deployment. If no work queue is set, work will not"
385 " be scheduled."
386 ),
387 )
388 last_polled: Optional[DateTime] = Field( 1a
389 default=None,
390 description="The last time the deployment was polled for status updates.",
391 )
392 parameter_openapi_schema: Optional[dict[str, Any]] = Field( 1a
393 default=None,
394 description="The parameter schema of the flow, including defaults.",
395 )
396 path: Optional[str] = Field( 1a
397 default=None,
398 description=(
399 "The path to the working directory for the workflow, relative to remote"
400 " storage or an absolute path."
401 ),
402 )
403 entrypoint: Optional[str] = Field( 1a
404 default=None,
405 description=(
406 "The path to the entrypoint for the workflow, relative to the `path`."
407 ),
408 )
409 storage_document_id: Optional[UUID] = Field( 1a
410 default=None,
411 description="The block document defining storage used for this flow.",
412 )
413 infrastructure_document_id: Optional[UUID] = Field( 1a
414 default=None,
415 description="The block document defining infrastructure to use for flow runs.",
416 )
417 created_by: Optional[CreatedBy] = Field( 1a
418 default=None,
419 description="Optional information about the creator of this deployment.",
420 )
421 updated_by: Optional[UpdatedBy] = Field( 1a
422 default=None,
423 description="Optional information about the updater of this deployment.",
424 )
425 work_queue_id: Optional[UUID] = Field( 1a
426 default=None,
427 description=(
428 "The id of the work pool queue to which this deployment is assigned."
429 ),
430 )
431 enforce_parameter_schema: bool = Field( 1a
432 default=True,
433 description=(
434 "Whether or not the deployment should enforce the parameter schema."
435 ),
436 )
437 work_pool_name: Optional[str] = Field( 1a
438 default=None,
439 description="The name of the deployment's work pool.",
440 )
441 status: Optional[objects.DeploymentStatus] = Field( 1a
442 default=None,
443 description="Current status of the deployment.",
444 )
446 def as_related_resource(self, role: str = "deployment") -> "RelatedResource": 1a
447 from prefect.events.schemas.events import RelatedResource
449 labels = {
450 "prefect.resource.id": f"prefect.deployment.{self.id}",
451 "prefect.resource.role": role,
452 "prefect.resource.name": self.name,
453 }
455 if self.branch:
456 labels["prefect.deployment.branch"] = self.branch
458 if self.base:
459 labels["prefect.deployment.base"] = f"prefect.deployment.{self.base}"
461 if self.root:
462 labels["prefect.deployment.root"] = f"prefect.deployment.{self.root}"
464 if self.version_id and self.version_info:
465 labels["prefect.deployment.version-id"] = str(self.version_id)
466 labels["prefect.deployment.version-type"] = self.version_info.type
467 labels["prefect.deployment.version"] = self.version_info.version
469 return RelatedResource(labels)
472class MinimalConcurrencyLimitResponse(PrefectBaseModel): 1a
473 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a
475 id: UUID 1a
476 name: str 1a
477 limit: int 1a
480class ConcurrencyLimitWithLeaseResponse(PrefectBaseModel): 1a
481 model_config: ClassVar[ConfigDict] = ConfigDict(extra="ignore") 1a
483 lease_id: UUID 1a
484 limits: list[MinimalConcurrencyLimitResponse] 1a
487class GlobalConcurrencyLimitResponse(ObjectBaseModel): 1a
488 """
489 A response object for global concurrency limits.
490 """
492 active: bool = Field( 1a
493 default=True, description="Whether the global concurrency limit is active."
494 )
495 name: str = Field( 1a
496 default=..., description="The name of the global concurrency limit."
497 )
498 limit: int = Field(default=..., description="The concurrency limit.") 1a
499 active_slots: int = Field(default=..., description="The number of active slots.") 1a
500 slot_decay_per_second: float = Field( 1a
501 default=2.0,
502 description="The decay rate for active slots when used as a rate limit.",
503 )