Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/schemas/states.py: 51%
188 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2State schemas.
3"""
5import warnings 1a
6from datetime import timedelta 1a
7from typing import ( 1a
8 TYPE_CHECKING,
9 Any,
10 ClassVar,
11 Literal,
12 Optional,
13 TypeVar,
14 Union,
15 overload,
16)
17from uuid import UUID 1a
19from pydantic import ConfigDict, Field, ValidationInfo, field_validator, model_validator 1a
20from typing_extensions import Self 1a
22from prefect._internal.uuid7 import uuid7 1a
23from prefect.client.schemas import objects 1a
24from prefect.server.utilities.schemas.bases import ( 1a
25 PrefectBaseModel,
26 TimeSeriesBaseModel,
27)
28from prefect.types._datetime import DateTime, now 1a
29from prefect.utilities.collections import AutoEnum 1a
31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1a
32 from prefect.server.database.orm_models import ORMFlowRunState, ORMTaskRunState
33 from prefect.server.schemas.actions import StateCreate
36R = TypeVar("R") 1a
37_State = TypeVar("_State", bound="State") 1a
40class StateType(AutoEnum): 1a
41 """Enumeration of state types."""
43 SCHEDULED = AutoEnum.auto() 1a
44 PENDING = AutoEnum.auto() 1a
45 RUNNING = AutoEnum.auto() 1a
46 COMPLETED = AutoEnum.auto() 1a
47 FAILED = AutoEnum.auto() 1a
48 CANCELLED = AutoEnum.auto() 1a
49 CRASHED = AutoEnum.auto() 1a
50 PAUSED = AutoEnum.auto() 1a
51 CANCELLING = AutoEnum.auto() 1a
54class CountByState(PrefectBaseModel): 1a
55 COMPLETED: int = Field(default=0) 1a
56 PENDING: int = Field(default=0) 1a
57 RUNNING: int = Field(default=0) 1a
58 FAILED: int = Field(default=0) 1a
59 CANCELLED: int = Field(default=0) 1a
60 CRASHED: int = Field(default=0) 1a
61 PAUSED: int = Field(default=0) 1a
62 CANCELLING: int = Field(default=0) 1a
63 SCHEDULED: int = Field(default=0) 1a
65 @field_validator("*") 1a
66 @classmethod 1a
67 def check_key(cls, value: Optional[Any], info: ValidationInfo) -> Optional[Any]: 1a
68 if info.field_name not in StateType.__members__:
69 raise ValueError(f"{info.field_name} is not a valid StateType")
70 return value
73TERMINAL_STATES = { 1a
74 StateType.COMPLETED,
75 StateType.CANCELLED,
76 StateType.FAILED,
77 StateType.CRASHED,
78}
81class StateDetails(PrefectBaseModel): 1a
82 flow_run_id: Optional[UUID] = None 1a
83 task_run_id: Optional[UUID] = None 1a
84 # for task runs that represent subflows, the subflow's run ID
85 child_flow_run_id: Optional[UUID] = None 1a
86 scheduled_time: Optional[DateTime] = None 1a
87 cache_key: Optional[str] = None 1a
88 cache_expiration: Optional[DateTime] = None 1a
89 deferred: Optional[bool] = False 1a
90 untrackable_result: bool = False 1a
91 pause_timeout: Optional[DateTime] = None 1a
92 pause_reschedule: bool = False 1a
93 pause_key: Optional[str] = None 1a
94 run_input_keyset: Optional[dict[str, str]] = None 1a
95 refresh_cache: Optional[bool] = None 1a
96 retriable: Optional[bool] = None 1a
97 transition_id: Optional[UUID] = None 1a
98 task_parameters_id: Optional[UUID] = None 1a
99 # Captures the trace_id and span_id of the span where this state was created
100 traceparent: Optional[str] = None 1a
101 # The ID of the lease that is currently holding the deployment concurrency slot
102 # for this run.
103 deployment_concurrency_lease_id: Optional[UUID] = None 1a
106class StateBaseModel(TimeSeriesBaseModel): 1a
107 def orm_dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: 1a
108 """
109 This method is used as a convenience method for constructing fixtues by first
110 building a `State` schema object and converting it into an ORM-compatible
111 format. Because the `data` field is not writable on ORM states, this method
112 omits the `data` field entirely for the purposes of constructing an ORM model.
113 If state data is required, an artifact must be created separately.
114 """
116 schema_dict = self.model_dump(*args, **kwargs)
117 # remove the data field in order to construct a state ORM model
118 schema_dict.pop("data", None)
119 return schema_dict
122class State(StateBaseModel): 1a
123 """Represents the state of a run."""
125 model_config: ClassVar[ConfigDict] = ConfigDict(from_attributes=True) 1a
127 type: StateType 1a
128 name: Optional[str] = Field(default=None) 1a
129 timestamp: DateTime = Field(default_factory=lambda: now("UTC")) 1a
130 message: Optional[str] = Field(default=None, examples=["Run started"]) 1a
131 data: Optional[Any] = Field( 1a
132 default=None,
133 description=(
134 "Data associated with the state, e.g. a result. "
135 "Content must be storable as JSON."
136 ),
137 )
138 state_details: StateDetails = Field(default_factory=StateDetails) 1a
140 @classmethod 1a
141 def from_orm_without_result( 1a
142 cls,
143 orm_state: Union["ORMFlowRunState", "ORMTaskRunState"],
144 with_data: Optional[Any] = None,
145 ) -> Self:
146 """
147 During orchestration, ORM states can be instantiated prior to inserting results
148 into the artifact table and the `data` field will not be eagerly loaded. In
149 these cases, sqlalchemy will attempt to lazily load the the relationship, which
150 will fail when called within a synchronous pydantic method.
152 This method will construct a `State` object from an ORM model without a loaded
153 artifact and attach data passed using the `with_data` argument to the `data`
154 field.
155 """
157 field_keys = cls.model_fields.keys()
158 state_data: dict[str, Any] = {
159 field: getattr(orm_state, field, None)
160 for field in field_keys
161 if field != "data"
162 }
163 state_data["data"] = with_data
164 return cls(**state_data)
166 @model_validator(mode="after") 1a
167 def default_name_from_type(self) -> Self: 1a
168 """If a name is not provided, use the type"""
169 # if `type` is not in `values` it means the `type` didn't pass its own
170 # validation check and an error will be raised after this function is called
171 name = self.name
172 if name is None and self.type:
173 self.name = " ".join([v.capitalize() for v in self.type.value.split("_")])
174 return self
176 @model_validator(mode="after") 1a
177 def default_scheduled_start_time(self) -> Self: 1a
178 from prefect.server.schemas.states import StateType
180 if self.type == StateType.SCHEDULED:
181 if not self.state_details.scheduled_time:
182 self.state_details.scheduled_time = now("UTC")
184 return self
186 def is_scheduled(self) -> bool: 1a
187 return self.type == StateType.SCHEDULED
189 def is_pending(self) -> bool: 1a
190 return self.type == StateType.PENDING
192 def is_running(self) -> bool: 1a
193 return self.type == StateType.RUNNING
195 def is_completed(self) -> bool: 1a
196 return self.type == StateType.COMPLETED
198 def is_failed(self) -> bool: 1a
199 return self.type == StateType.FAILED
201 def is_crashed(self) -> bool: 1a
202 return self.type == StateType.CRASHED
204 def is_cancelled(self) -> bool: 1a
205 return self.type == StateType.CANCELLED
207 def is_cancelling(self) -> bool: 1a
208 return self.type == StateType.CANCELLING
210 def is_final(self) -> bool: 1a
211 return self.type in TERMINAL_STATES
213 def is_paused(self) -> bool: 1a
214 return self.type == StateType.PAUSED
216 def fresh_copy(self, **kwargs: Any) -> Self: 1a
217 """
218 Return a fresh copy of the state with a new ID.
219 """
220 return self.model_copy(
221 update={
222 "id": uuid7(),
223 "created": now("UTC"),
224 "updated": now("UTC"),
225 "timestamp": now("UTC"),
226 },
227 deep=True,
228 **kwargs,
229 )
231 @overload 1a
232 def result(self, raise_on_failure: Literal[True] = ...) -> Any: ... 232 ↛ exitline 232 didn't return from function 'result' because 1a
234 @overload 1a
235 def result( 235 ↛ exitline 235 didn't return from function 'result' because 1a
236 self, raise_on_failure: Literal[False] = False
237 ) -> Union[Any, Exception]: ...
239 @overload 1a
240 def result(self, raise_on_failure: bool = ...) -> Union[Any, Exception]: ... 240 ↛ exitline 240 didn't return from function 'result' because 1a
242 def result(self, raise_on_failure: bool = True) -> Union[Any, Exception]: 1a
243 # Backwards compatible `result` handling on the server-side schema
244 from prefect.states import State
246 warnings.warn(
247 (
248 "`result` is no longer supported by"
249 " `prefect.server.schemas.states.State` and will be removed in a future"
250 " release. When result retrieval is needed, use `prefect.states.State`."
251 ),
252 DeprecationWarning,
253 stacklevel=2,
254 )
256 state: State[Any] = objects.State.model_validate(self)
257 return state.result(raise_on_failure=raise_on_failure)
259 def to_state_create(self) -> "StateCreate": 1a
260 from prefect.server.schemas.actions import StateCreate
262 return StateCreate(
263 type=self.type,
264 name=self.name,
265 message=self.message,
266 data=self.data,
267 state_details=self.state_details,
268 )
270 def __repr__(self) -> str: 1a
271 """
272 Generates a complete state representation appropriate for introspection
273 and debugging, including the result:
275 `MyCompletedState(message="my message", type=COMPLETED, result=...)`
276 """
277 result = self.data
279 display = dict(
280 message=repr(self.message),
281 type=str(self.type.value),
282 result=repr(result),
283 )
285 return f"{self.name}({', '.join(f'{k}={v}' for k, v in display.items())})"
287 def __str__(self) -> str: 1a
288 """
289 Generates a simple state representation appropriate for logging:
291 `MyCompletedState("my message", type=COMPLETED)`
292 """
294 display: list[str] = []
296 if self.message:
297 display.append(repr(self.message))
299 if self.type.value.lower() != (self.name or "").lower():
300 display.append(f"type={self.type.value}")
302 return f"{self.name}({', '.join(display)})"
304 def __hash__(self) -> int: 1a
305 return hash(
306 (
307 getattr(self.state_details, "flow_run_id", None),
308 getattr(self.state_details, "task_run_id", None),
309 self.timestamp,
310 self.type,
311 )
312 )
315def Scheduled( 1a
316 scheduled_time: Optional[DateTime] = None,
317 cls: type[_State] = State,
318 **kwargs: Any,
319) -> _State:
320 """Convenience function for creating `Scheduled` states.
322 Returns:
323 State: a Scheduled state
324 """
325 # NOTE: `scheduled_time` must come first for backwards compatibility
327 state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
328 if scheduled_time is None:
329 scheduled_time = now("UTC")
330 elif state_details.scheduled_time:
331 raise ValueError("An extra scheduled_time was provided in state_details")
332 state_details.scheduled_time = scheduled_time
334 return cls(type=StateType.SCHEDULED, state_details=state_details, **kwargs)
337def Completed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
338 """Convenience function for creating `Completed` states.
340 Returns:
341 State: a Completed state
342 """
343 return cls(type=StateType.COMPLETED, **kwargs)
346def Running(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
347 """Convenience function for creating `Running` states.
349 Returns:
350 State: a Running state
351 """
352 return cls(type=StateType.RUNNING, **kwargs)
355def Failed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
356 """Convenience function for creating `Failed` states.
358 Returns:
359 State: a Failed state
360 """
361 return cls(type=StateType.FAILED, **kwargs)
364def Crashed(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
365 """Convenience function for creating `Crashed` states.
367 Returns:
368 State: a Crashed state
369 """
370 return cls(type=StateType.CRASHED, **kwargs)
373def Cancelling(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
374 """Convenience function for creating `Cancelling` states.
376 Returns:
377 State: a Cancelling state
378 """
379 return cls(type=StateType.CANCELLING, **kwargs)
382def Cancelled(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
383 """Convenience function for creating `Cancelled` states.
385 Returns:
386 State: a Cancelled state
387 """
388 return cls(type=StateType.CANCELLED, **kwargs)
391def Pending(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
392 """Convenience function for creating `Pending` states.
394 Returns:
395 State: a Pending state
396 """
397 return cls(type=StateType.PENDING, **kwargs)
400def Paused( 1a
401 cls: type[_State] = State,
402 timeout_seconds: Optional[int] = None,
403 pause_expiration_time: Optional[DateTime] = None,
404 reschedule: bool = False,
405 pause_key: Optional[str] = None,
406 **kwargs: Any,
407) -> _State:
408 """Convenience function for creating `Paused` states.
410 Returns:
411 State: a Paused state
412 """
413 state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
415 if state_details.pause_timeout:
416 raise ValueError("An extra pause timeout was provided in state_details")
418 if pause_expiration_time is not None and timeout_seconds is not None:
419 raise ValueError(
420 "Cannot supply both a pause_expiration_time and timeout_seconds"
421 )
423 if pause_expiration_time:
424 state_details.pause_timeout = pause_expiration_time
425 elif timeout_seconds is not None:
426 state_details.pause_timeout = now("UTC") + timedelta(seconds=timeout_seconds)
428 state_details.pause_reschedule = reschedule
429 state_details.pause_key = pause_key
431 return cls(type=StateType.PAUSED, state_details=state_details, **kwargs)
434def Suspended( 1a
435 cls: type[_State] = State,
436 timeout_seconds: Optional[int] = None,
437 pause_expiration_time: Optional[DateTime] = None,
438 pause_key: Optional[str] = None,
439 **kwargs: Any,
440) -> _State:
441 """Convenience function for creating `Suspended` states.
443 Returns:
444 State: a Suspended state
445 """
446 return Paused(
447 cls=cls,
448 name="Suspended",
449 reschedule=True,
450 timeout_seconds=timeout_seconds,
451 pause_expiration_time=pause_expiration_time,
452 pause_key=pause_key,
453 **kwargs,
454 )
457def AwaitingRetry( 1a
458 cls: type[_State] = State,
459 scheduled_time: Optional[DateTime] = None,
460 **kwargs: Any,
461) -> _State:
462 """Convenience function for creating `AwaitingRetry` states.
464 Returns:
465 State: an AwaitingRetry state
466 """
467 return Scheduled(
468 cls=cls, scheduled_time=scheduled_time, name="AwaitingRetry", **kwargs
469 )
472def AwaitingConcurrencySlot( 1a
473 cls: type[_State] = State,
474 scheduled_time: Optional[DateTime] = None,
475 **kwargs: Any,
476) -> _State:
477 """Convenience function for creating `AwaitingConcurrencySlot` states.
479 Returns:
480 State: an AwaitingConcurrencySlot state
481 """
482 return Scheduled(
483 cls=cls,
484 scheduled_time=scheduled_time,
485 name="AwaitingConcurrencySlot",
486 **kwargs,
487 )
490def Retrying(cls: type[_State] = State, **kwargs: Any) -> _State: 1a
491 """Convenience function for creating `Retrying` states.
493 Returns:
494 State: a Retrying state
495 """
496 return cls(type=StateType.RUNNING, name="Retrying", **kwargs)
499def Late( 1a
500 cls: type[_State] = State,
501 scheduled_time: Optional[DateTime] = None,
502 **kwargs: Any,
503) -> _State:
504 """Convenience function for creating `Late` states.
506 Returns:
507 State: a Late state
508 """
509 return Scheduled(cls=cls, scheduled_time=scheduled_time, name="Late", **kwargs)