Coverage for /usr/local/lib/python3.12/site-packages/prefect/states.py: 13%
361 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4import datetime 1a
5import sys 1a
6import traceback 1a
7import uuid 1a
8from collections import Counter 1a
9from types import GeneratorType, TracebackType 1a
10from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Type 1a
12import anyio 1a
13import httpx 1a
14from opentelemetry import propagate 1a
15from typing_extensions import TypeGuard 1a
17from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
18from prefect.client.schemas.objects import State, StateDetails, StateType 1a
19from prefect.exceptions import ( 1a
20 CancelledRun,
21 CrashedRun,
22 FailedRun,
23 MissingContextError,
24 MissingResult,
25 PausedRun,
26 TerminationSignal,
27 UnfinishedRun,
28)
29from prefect.logging.loggers import get_logger, get_run_logger 1a
30from prefect.types._datetime import now 1a
31from prefect.utilities.annotations import BaseAnnotation 1a
32from prefect.utilities.collections import ensure_iterable 1a
34if TYPE_CHECKING: 34 ↛ 35line 34 didn't jump to line 35 because the condition on line 34 was never true1a
35 import logging
37 from prefect.client.schemas.actions import StateCreate
38 from prefect.results import (
39 R,
40 ResultStore,
41 )
43logger: "logging.Logger" = get_logger("states") 1a
46def to_state_create(state: State) -> "StateCreate": 1a
47 """
48 Convert the state to a `StateCreate` type which can be used to set the state of
49 a run in the API.
51 This method will drop this state's `data` if it is not a result type. Only
52 results should be sent to the API. Other data is only available locally.
53 """
54 from prefect.client.schemas.actions import StateCreate
55 from prefect.results import (
56 ResultRecord,
57 should_persist_result,
58 )
60 if isinstance(state.data, ResultRecord) and should_persist_result():
61 data = state.data.metadata # pyright: ignore[reportUnknownMemberType] unable to narrow ResultRecord type
62 else:
63 data = None
65 return StateCreate(
66 type=state.type,
67 name=state.name,
68 message=state.message,
69 data=data,
70 state_details=state.state_details,
71 )
74async def get_state_result( 1a
75 state: "State[R]",
76 raise_on_failure: bool = True,
77 retry_result_failure: bool = True,
78) -> "R":
79 """
80 Get the result from a state.
82 See `State.result()`
83 """
85 return await _get_state_result(
86 state,
87 raise_on_failure=raise_on_failure,
88 retry_result_failure=retry_result_failure,
89 )
92RESULT_READ_MAXIMUM_ATTEMPTS = 10 1a
93RESULT_READ_RETRY_DELAY = 0.25 1a
96async def _get_state_result_data_with_retries( 1a
97 state: "State[R]", retry_result_failure: bool = True
98) -> "R":
99 # Results may be written asynchronously, possibly after their corresponding
100 # state has been written and events have been emitted, so we should give some
101 # grace here about missing results. The exception below could come in the form
102 # of a missing file, a short read, or other types of errors depending on the
103 # result storage backend.
104 from prefect._result_records import (
105 ResultRecordMetadata,
106 )
107 from prefect.results import ResultStore
109 if retry_result_failure is False:
110 max_attempts = 1
111 else:
112 max_attempts = RESULT_READ_MAXIMUM_ATTEMPTS
114 for i in range(1, max_attempts + 1):
115 try:
116 if isinstance(state.data, ResultRecordMetadata):
117 record = await ResultStore._from_metadata(state.data)
118 return record.result
119 else:
120 return await state.data.get()
121 except Exception as e:
122 if i == max_attempts:
123 raise
124 logger.debug(
125 "Exception %r while reading result, retry %s/%s in %ss...",
126 e,
127 i,
128 max_attempts,
129 RESULT_READ_RETRY_DELAY,
130 )
131 await asyncio.sleep(RESULT_READ_RETRY_DELAY)
134async def _get_state_result( 1a
135 state: "State[R]", raise_on_failure: bool, retry_result_failure: bool = True
136) -> "R":
137 """
138 Internal implementation for `get_state_result` without async backwards compatibility
139 """
140 from prefect.results import (
141 ResultRecord,
142 ResultRecordMetadata,
143 )
145 if state.is_paused():
146 # Paused states are not truly terminal and do not have results associated with them
147 raise PausedRun("Run is paused, its result is not available.", state=state)
149 if not state.is_final():
150 raise UnfinishedRun(
151 f"Run is in {state.type.name} state, its result is not available."
152 )
154 if raise_on_failure and (
155 state.is_crashed() or state.is_failed() or state.is_cancelled()
156 ):
157 raise await aget_state_exception(state)
159 if isinstance(state.data, ResultRecordMetadata):
160 result = await _get_state_result_data_with_retries(
161 state, retry_result_failure=retry_result_failure
162 )
163 elif isinstance(state.data, ResultRecord):
164 result = state.data.result
166 elif state.data is None:
167 if state.is_failed() or state.is_crashed() or state.is_cancelled():
168 return await aget_state_exception(state)
169 else:
170 raise MissingResult(
171 "State data is missing. "
172 "Typically, this occurs when result persistence is disabled and the "
173 "state has been retrieved from the API."
174 )
176 else:
177 # The result is attached directly
178 result = state.data
180 return result
183def format_exception(exc: BaseException, tb: TracebackType = None) -> str: 1a
184 exc_type = type(exc)
185 if tb is not None:
186 formatted = "".join(list(traceback.format_exception(exc_type, exc, tb=tb)))
187 else:
188 formatted = f"{exc_type.__name__}: {exc}"
190 # Trim `prefect` module paths from our exception types
191 if exc_type.__module__.startswith("prefect."):
192 formatted = formatted.replace(
193 f"{exc_type.__module__}.{exc_type.__name__}", exc_type.__name__
194 )
196 return formatted
199async def exception_to_crashed_state( 1a
200 exc: BaseException,
201 result_store: Optional["ResultStore"] = None,
202) -> State:
203 """
204 Takes an exception that occurs _outside_ of user code and converts it to a
205 'Crash' exception with a 'Crashed' state.
206 """
207 state_message = None
209 if isinstance(exc, anyio.get_cancelled_exc_class()):
210 state_message = "Execution was cancelled by the runtime environment."
212 elif isinstance(exc, KeyboardInterrupt):
213 state_message = "Execution was aborted by an interrupt signal."
215 elif isinstance(exc, TerminationSignal):
216 state_message = "Execution was aborted by a termination signal."
218 elif isinstance(exc, SystemExit):
219 state_message = "Execution was aborted by Python system exit call."
221 elif isinstance(exc, (httpx.TimeoutException, httpx.ConnectError)):
222 try:
223 request: httpx.Request = exc.request
224 except RuntimeError:
225 # The request property is not set
226 state_message = (
227 "Request failed while attempting to contact the server:"
228 f" {format_exception(exc)}"
229 )
230 else:
231 # TODO: We can check if this is actually our API url
232 state_message = f"Request to {request.url} failed: {format_exception(exc)}."
234 else:
235 state_message = (
236 "Execution was interrupted by an unexpected exception:"
237 f" {format_exception(exc)}"
238 )
240 if result_store:
241 key = uuid.uuid4().hex
242 data = result_store.create_result_record(exc, key=key)
243 else:
244 # Attach the exception for local usage, will not be available when retrieved
245 # from the API
246 data = exc
248 return Crashed(message=state_message, data=data)
251async def exception_to_failed_state( 1a
252 exc: Optional[BaseException] = None,
253 result_store: Optional["ResultStore"] = None,
254 write_result: bool = False,
255 **kwargs: Any,
256) -> State[BaseException]:
257 """
258 Convenience function for creating `Failed` states from exceptions
259 """
260 try:
261 local_logger = get_run_logger()
262 except MissingContextError:
263 local_logger = logger
265 if not exc:
266 _, exc, _ = sys.exc_info()
267 if exc is None:
268 raise ValueError(
269 "Exception was not passed and no active exception could be found."
270 )
271 else:
272 pass
274 if result_store:
275 key = uuid.uuid4().hex
276 data = result_store.create_result_record(exc, key=key)
277 if write_result:
278 try:
279 await result_store.apersist_result_record(data)
280 except Exception as nested_exc:
281 local_logger.warning(
282 "Failed to write result: %s Execution will continue, but the result has not been written",
283 nested_exc,
284 )
285 else:
286 # Attach the exception for local usage, will not be available when retrieved
287 # from the API
288 data = exc
290 existing_message = kwargs.pop("message", "")
291 if existing_message and not existing_message.endswith(" "):
292 existing_message += " "
294 # TODO: Consider if we want to include traceback information, it is intentionally
295 # excluded from messages for now
296 message = existing_message + format_exception(exc)
298 state = Failed(data=data, message=message, **kwargs)
299 state.state_details.retriable = False
301 return state
304async def return_value_to_state( 1a
305 retval: "R",
306 result_store: "ResultStore",
307 key: Optional[str] = None,
308 expiration: Optional[datetime.datetime] = None,
309 write_result: bool = False,
310) -> "State[R]":
311 """
312 Given a return value from a user's function, create a `State` the run should
313 be placed in.
315 - If data is returned, we create a 'COMPLETED' state with the data
316 - If a single, manually created state is returned, we use that state as given
317 (manual creation is determined by the lack of ids)
318 - If an upstream state or iterable of upstream states is returned, we apply the
319 aggregate rule
321 The aggregate rule says that given multiple states we will determine the final state
322 such that:
324 - If any states are not COMPLETED the final state is FAILED
325 - If all of the states are COMPLETED the final state is COMPLETED
326 - The states will be placed in the final state `data` attribute
328 Callers should resolve all futures into states before passing return values to this
329 function.
330 """
331 from prefect.results import (
332 ResultRecord,
333 ResultRecordMetadata,
334 )
336 try:
337 local_logger = get_run_logger()
338 except MissingContextError:
339 local_logger = logger
341 if (
342 isinstance(retval, State)
343 # Check for manual creation
344 and not retval.state_details.flow_run_id
345 and not retval.state_details.task_run_id
346 ):
347 state = retval
348 # Unless the user has already constructed a result explicitly, use the store
349 # to update the data to the correct type
350 if not isinstance(state.data, (ResultRecord, ResultRecordMetadata)):
351 result_record = result_store.create_result_record(
352 state.data,
353 key=key,
354 expiration=expiration,
355 )
356 if write_result:
357 try:
358 await result_store.apersist_result_record(result_record)
359 except Exception as exc:
360 local_logger.warning(
361 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted",
362 exc,
363 )
364 state.data = result_record
365 return state
367 # Determine a new state from the aggregate of contained states
368 if isinstance(retval, State) or is_state_iterable(retval):
369 states = StateGroup(ensure_iterable(retval))
371 # Determine the new state type
372 if states.all_completed():
373 new_state_type = StateType.COMPLETED
374 elif states.any_cancelled():
375 new_state_type = StateType.CANCELLED
376 elif states.any_paused():
377 new_state_type = StateType.PAUSED
378 else:
379 new_state_type = StateType.FAILED
381 # Generate a nice message for the aggregate
382 if states.all_completed():
383 message = "All states completed."
384 elif states.any_cancelled():
385 message = f"{states.cancelled_count}/{states.total_count} states cancelled."
386 elif states.any_paused():
387 message = f"{states.paused_count}/{states.total_count} states paused."
388 elif states.any_failed():
389 message = f"{states.fail_count}/{states.total_count} states failed."
390 elif not states.all_final():
391 message = (
392 f"{states.not_final_count}/{states.total_count} states are not final."
393 )
394 else:
395 message = "Given states: " + states.counts_message()
397 # TODO: We may actually want to set the data to a `StateGroup` object and just
398 # allow it to be unpacked into a tuple and such so users can interact with
399 # it
400 result_record = result_store.create_result_record(
401 retval,
402 key=key,
403 expiration=expiration,
404 )
405 if write_result:
406 try:
407 await result_store.apersist_result_record(result_record)
408 except Exception as exc:
409 local_logger.warning(
410 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted",
411 exc,
412 )
413 return State(
414 type=new_state_type,
415 message=message,
416 data=result_record,
417 )
419 # Generators aren't portable, implicitly convert them to a list.
420 if isinstance(retval, GeneratorType):
421 data = list(retval)
422 else:
423 data = retval
425 # Otherwise, they just gave data and this is a completed retval
426 if isinstance(data, ResultRecord):
427 return Completed(data=data)
428 else:
429 result_record = result_store.create_result_record(
430 data,
431 key=key,
432 expiration=expiration,
433 )
434 if write_result:
435 try:
436 await result_store.apersist_result_record(result_record)
437 except Exception as exc:
438 local_logger.warning(
439 "Encountered an error while persisting result: %s Execution will continue, but the result has not been persisted",
440 exc,
441 )
442 return Completed(data=result_record)
445async def aget_state_exception(state: State) -> BaseException: 1a
446 """
447 Get the exception from a state asynchronously.
449 If not given a FAILED or CRASHED state, this raise a value error.
451 If the state result is a state, its exception will be returned.
453 If the state result is an iterable of states, the exception of the first failure
454 will be returned.
456 If the state result is a string, a wrapper exception will be returned with the
457 string as the message.
459 If the state result is null, a wrapper exception will be returned with the state
460 message attached.
462 If the state result is not of a known type, a `TypeError` will be returned.
464 When a wrapper exception is returned, the type will be:
465 - `FailedRun` if the state type is FAILED.
466 - `CrashedRun` if the state type is CRASHED.
467 - `CancelledRun` if the state type is CANCELLED.
468 """
469 from prefect._result_records import (
470 ResultRecord,
471 ResultRecordMetadata,
472 )
473 from prefect.results import ResultStore
475 if state.is_failed():
476 wrapper = FailedRun
477 default_message = "Run failed."
478 elif state.is_crashed():
479 wrapper = CrashedRun
480 default_message = "Run crashed."
481 elif state.is_cancelled():
482 wrapper = CancelledRun
483 default_message = "Run cancelled."
484 else:
485 raise ValueError(f"Expected failed or crashed state got {state!r}.")
487 if isinstance(state.data, ResultRecord):
488 result = state.data.result
489 elif isinstance(state.data, ResultRecordMetadata):
490 record = await ResultStore._from_metadata(state.data)
491 result = record.result
492 elif state.data is None:
493 result = None
494 else:
495 result = state.data
497 if result is None:
498 return wrapper(state.message or default_message)
500 if isinstance(result, Exception):
501 return result
503 elif isinstance(result, BaseException):
504 return result
506 elif isinstance(result, str):
507 return wrapper(result)
509 elif isinstance(result, State):
510 # Return the exception from the inner state
511 return await aget_state_exception(result)
513 elif is_state_iterable(result):
514 # Return the first failure
515 for state in result:
516 if state.is_failed() or state.is_crashed() or state.is_cancelled():
517 return await aget_state_exception(state)
519 raise ValueError(
520 "Failed state result was an iterable of states but none were failed."
521 )
523 else:
524 raise TypeError(
525 f"Unexpected result for failed state: {result!r} —— "
526 f"{type(result).__name__} cannot be resolved into an exception"
527 )
530@async_dispatch(aget_state_exception) 1a
531def get_state_exception(state: State) -> BaseException: 1a
532 """
533 Get the exception from a state.
535 If not given a FAILED or CRASHED state, this raise a value error.
537 If the state result is a state, its exception will be returned.
539 If the state result is an iterable of states, the exception of the first failure
540 will be returned.
542 If the state result is a string, a wrapper exception will be returned with the
543 string as the message.
545 If the state result is null, a wrapper exception will be returned with the state
546 message attached.
548 If the state result is not of a known type, a `TypeError` will be returned.
550 When a wrapper exception is returned, the type will be:
551 - `FailedRun` if the state type is FAILED.
552 - `CrashedRun` if the state type is CRASHED.
553 - `CancelledRun` if the state type is CANCELLED.
554 """
555 from prefect._result_records import (
556 ResultRecord,
557 ResultRecordMetadata,
558 )
559 from prefect.results import ResultStore, resolve_result_storage
561 if state.is_failed():
562 wrapper = FailedRun
563 default_message = "Run failed."
564 elif state.is_crashed():
565 wrapper = CrashedRun
566 default_message = "Run crashed."
567 elif state.is_cancelled():
568 wrapper = CancelledRun
569 default_message = "Run cancelled."
570 else:
571 raise ValueError(f"Expected failed or crashed state got {state!r}.")
573 if isinstance(state.data, ResultRecord):
574 result = state.data.result
575 elif isinstance(state.data, ResultRecordMetadata):
576 # Inline sync version of _from_metadata
577 metadata = state.data
578 if metadata.storage_block_id is None:
579 storage_block = None
580 else:
581 storage_block = resolve_result_storage(
582 metadata.storage_block_id, _sync=True
583 )
584 store = ResultStore(
585 result_storage=storage_block, serializer=metadata.serializer
586 )
587 if metadata.storage_key is None:
588 raise ValueError(
589 "storage_key is required to hydrate a result record from metadata"
590 )
591 record = store.read(metadata.storage_key)
592 result = record.result
593 elif state.data is None:
594 result = None
595 else:
596 result = state.data
598 if result is None:
599 return wrapper(state.message or default_message)
601 if isinstance(result, Exception):
602 return result
604 elif isinstance(result, BaseException):
605 return result
607 elif isinstance(result, str):
608 return wrapper(result)
610 elif isinstance(result, State):
611 # Return the exception from the inner state
612 return get_state_exception(result)
614 elif is_state_iterable(result):
615 # Return the first failure
616 for state in result:
617 if state.is_failed() or state.is_crashed() or state.is_cancelled():
618 return get_state_exception(state)
620 raise ValueError(
621 "Failed state result was an iterable of states but none were failed."
622 )
624 else:
625 raise TypeError(
626 f"Unexpected result for failed state: {result!r} —— "
627 f"{type(result).__name__} cannot be resolved into an exception"
628 )
631async def araise_state_exception(state: State) -> None: 1a
632 """
633 Given a FAILED or CRASHED state, raise the contained exception asynchronously.
634 """
635 if not (state.is_failed() or state.is_crashed() or state.is_cancelled()):
636 return None
638 raise await aget_state_exception(state)
641@async_dispatch(araise_state_exception) 1a
642def raise_state_exception(state: State) -> None: 1a
643 """
644 Given a FAILED or CRASHED state, raise the contained exception.
645 """
646 if not (state.is_failed() or state.is_crashed() or state.is_cancelled()):
647 return None
649 raise get_state_exception(state)
652def is_state_iterable(obj: Any) -> TypeGuard[Iterable[State]]: 1a
653 """
654 Check if a the given object is an iterable of states types
656 Supported iterables are:
657 - set
658 - list
659 - tuple
661 Other iterables will return `False` even if they contain states.
662 """
663 # We do not check for arbitrary iterables because this is not intended to be used
664 # for things like dictionaries, dataframes, or pydantic models
665 if (
666 not isinstance(obj, BaseAnnotation)
667 and isinstance(obj, (list, set, tuple))
668 and obj
669 ):
670 return all([isinstance(o, State) for o in obj])
671 else:
672 return False
675class StateGroup: 1a
676 def __init__(self, states: list[State]) -> None: 1a
677 self.states: list[State] = states
678 self.type_counts: dict[StateType, int] = self._get_type_counts(states)
679 self.total_count: int = len(states)
680 self.cancelled_count: int = self.type_counts[StateType.CANCELLED]
681 self.final_count: int = sum(state.is_final() for state in states)
682 self.not_final_count: int = self.total_count - self.final_count
683 self.paused_count: int = self.type_counts[StateType.PAUSED]
685 @property 1a
686 def fail_count(self) -> int: 1a
687 return self.type_counts[StateType.FAILED] + self.type_counts[StateType.CRASHED]
689 def all_completed(self) -> bool: 1a
690 return self.type_counts[StateType.COMPLETED] == self.total_count
692 def any_cancelled(self) -> bool: 1a
693 return self.cancelled_count > 0
695 def any_failed(self) -> bool: 1a
696 return (
697 self.type_counts[StateType.FAILED] > 0
698 or self.type_counts[StateType.CRASHED] > 0
699 )
701 def any_paused(self) -> bool: 1a
702 return self.paused_count > 0
704 def all_final(self) -> bool: 1a
705 return self.final_count == self.total_count
707 def counts_message(self) -> str: 1a
708 count_messages = [f"total={self.total_count}"]
709 if self.not_final_count:
710 count_messages.append(f"not_final={self.not_final_count}")
712 count_messages += [
713 f"{state_type.value!r}={count}"
714 for state_type, count in self.type_counts.items()
715 if count
716 ]
718 return ", ".join(count_messages)
720 @staticmethod 1a
721 def _get_type_counts(states: Iterable[State]) -> Dict[StateType, int]: 1a
722 return Counter(state.type for state in states)
724 def __repr__(self) -> str: 1a
725 return f"StateGroup<{self.counts_message()}>"
728def _traced(cls: Type["State[R]"], **kwargs: Any) -> "State[R]": 1a
729 state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
731 carrier = {}
732 propagate.inject(carrier)
733 state_details.traceparent = carrier.get("traceparent")
735 return cls(**kwargs, state_details=state_details)
738def Scheduled( 1a
739 cls: Type["State[R]"] = State,
740 scheduled_time: Optional[datetime.datetime] = None,
741 **kwargs: Any,
742) -> "State[R]":
743 """Convenience function for creating `Scheduled` states.
745 Returns:
746 State: a Scheduled state
747 """
748 state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
749 if scheduled_time is None:
750 scheduled_time = now()
751 elif state_details.scheduled_time:
752 raise ValueError("An extra scheduled_time was provided in state_details")
753 state_details.scheduled_time = scheduled_time
755 return _traced(cls, type=StateType.SCHEDULED, state_details=state_details, **kwargs)
758def Completed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
759 """Convenience function for creating `Completed` states.
761 Returns:
762 State: a Completed state
763 """
765 return _traced(cls, type=StateType.COMPLETED, **kwargs)
768def Running(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
769 """Convenience function for creating `Running` states.
771 Returns:
772 State: a Running state
773 """
774 return _traced(cls, type=StateType.RUNNING, **kwargs)
777def Failed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
778 """Convenience function for creating `Failed` states.
780 Returns:
781 State: a Failed state
782 """
783 return _traced(cls, type=StateType.FAILED, **kwargs)
786def Crashed(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
787 """Convenience function for creating `Crashed` states.
789 Returns:
790 State: a Crashed state
791 """
792 return _traced(cls, type=StateType.CRASHED, **kwargs)
795def Cancelling(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
796 """Convenience function for creating `Cancelling` states.
798 Returns:
799 State: a Cancelling state
800 """
801 return _traced(cls, type=StateType.CANCELLING, **kwargs)
804def Cancelled(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
805 """Convenience function for creating `Cancelled` states.
807 Returns:
808 State: a Cancelled state
809 """
810 return _traced(cls, type=StateType.CANCELLED, **kwargs)
813def Pending(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
814 """Convenience function for creating `Pending` states.
816 Returns:
817 State: a Pending state
818 """
819 return _traced(cls, type=StateType.PENDING, **kwargs)
822def Paused( 1a
823 cls: Type["State[R]"] = State,
824 timeout_seconds: Optional[int] = None,
825 pause_expiration_time: Optional[datetime.datetime] = None,
826 reschedule: bool = False,
827 pause_key: Optional[str] = None,
828 **kwargs: Any,
829) -> "State[R]":
830 """Convenience function for creating `Paused` states.
832 Returns:
833 State: a Paused state
834 """
835 state_details = StateDetails.model_validate(kwargs.pop("state_details", {}))
837 if state_details.pause_timeout:
838 raise ValueError("An extra pause timeout was provided in state_details")
840 if pause_expiration_time is not None and timeout_seconds is not None:
841 raise ValueError(
842 "Cannot supply both a pause_expiration_time and timeout_seconds"
843 )
845 if pause_expiration_time is None and timeout_seconds is None:
846 pass
847 else:
848 state_details.pause_timeout = (
849 pause_expiration_time
850 if pause_expiration_time
851 else now() + datetime.timedelta(seconds=timeout_seconds or 0)
852 )
854 state_details.pause_reschedule = reschedule
855 state_details.pause_key = pause_key
857 return _traced(cls, type=StateType.PAUSED, state_details=state_details, **kwargs)
860def Suspended( 1a
861 cls: Type["State[R]"] = State,
862 timeout_seconds: Optional[int] = None,
863 pause_expiration_time: Optional[datetime.datetime] = None,
864 pause_key: Optional[str] = None,
865 **kwargs: Any,
866) -> "State[R]":
867 """Convenience function for creating `Suspended` states.
869 Returns:
870 State: a Suspended state
871 """
872 return Paused(
873 cls=cls,
874 name="Suspended",
875 reschedule=True,
876 timeout_seconds=timeout_seconds,
877 pause_expiration_time=pause_expiration_time,
878 pause_key=pause_key,
879 **kwargs,
880 )
883def AwaitingRetry( 1a
884 cls: Type["State[R]"] = State,
885 scheduled_time: Optional[datetime.datetime] = None,
886 **kwargs: Any,
887) -> "State[R]":
888 """Convenience function for creating `AwaitingRetry` states.
890 Returns:
891 State: an AwaitingRetry state
892 """
893 return Scheduled(
894 cls=cls, scheduled_time=scheduled_time, name="AwaitingRetry", **kwargs
895 )
898def AwaitingConcurrencySlot( 1a
899 cls: Type["State[R]"] = State,
900 scheduled_time: Optional[datetime.datetime] = None,
901 **kwargs: Any,
902) -> "State[R]":
903 """Convenience function for creating `AwaitingConcurrencySlot` states.
905 Returns:
906 State: an AwaitingConcurrencySlot state
907 """
908 return Scheduled(
909 cls=cls, scheduled_time=scheduled_time, name="AwaitingConcurrencySlot", **kwargs
910 )
913def Retrying(cls: Type["State[R]"] = State, **kwargs: Any) -> "State[R]": 1a
914 """Convenience function for creating `Retrying` states.
916 Returns:
917 State: a Retrying state
918 """
919 return _traced(cls, type=StateType.RUNNING, name="Retrying", **kwargs)
922def Late( 1a
923 cls: Type["State[R]"] = State,
924 scheduled_time: Optional[datetime.datetime] = None,
925 **kwargs: Any,
926) -> "State[R]":
927 """Convenience function for creating `Late` states.
929 Returns:
930 State: a Late state
931 """
932 return Scheduled(cls=cls, scheduled_time=scheduled_time, name="Late", **kwargs)