Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/rules.py: 33%
276 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Prefect's flow and task-run orchestration machinery.
4This module contains all the core concepts necessary to implement Prefect's state
5orchestration engine. These states correspond to intuitive descriptions of all the
6points that a Prefect flow or task can observe executing user code and intervene, if
7necessary. A detailed description of states can be found in our concept
8[documentation](https://docs.prefect.io/v3/concepts/states).
10Prefect's orchestration engine operates under the assumption that no governed user code
11will execute without first requesting Prefect REST API validate a change in state and record
12metadata about the run. With all attempts to run user code being checked against a
13Prefect instance, the Prefect REST API database becomes the unambiguous source of truth for managing
14the execution of complex interacting workflows. Orchestration rules can be implemented
15as discrete units of logic that operate against each state transition and can be fully
16observable, extensible, and customizable -- all without needing to store or parse a
17single line of user code.
18"""
20from __future__ import annotations 1a
22import contextlib 1a
23from types import TracebackType 1a
24from typing import ( 1a
25 TYPE_CHECKING,
26 Any,
27 ClassVar,
28 Generic,
29 Iterable,
30 Optional,
31 TypeVar,
32 Union,
33)
35from pydantic import ConfigDict, Field 1a
36from sqlalchemy.ext.asyncio import AsyncSession 1a
37from typing_extensions import Self 1a
39from prefect.logging import get_logger 1a
40from prefect.server.database import PrefectDBInterface, inject_db, orm_models 1a
41from prefect.server.database.dependencies import db_injector 1a
42from prefect.server.exceptions import OrchestrationError 1a
43from prefect.server.models import artifacts, flow_runs 1a
44from prefect.server.schemas import core, states 1a
45from prefect.server.schemas.responses import ( 1a
46 SetStateStatus,
47 StateAbortDetails,
48 StateAcceptDetails,
49 StateRejectDetails,
50 StateResponseDetails,
51 StateWaitDetails,
52)
53from prefect.server.utilities.schemas import PrefectBaseModel 1a
55if TYPE_CHECKING: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a
56 from logging import Logger
59# all valid state types in the context of a task- or flow- run transition
60ALL_ORCHESTRATION_STATES: set[states.StateType | None] = {*states.StateType, None} 1a
62# all terminal states
63TERMINAL_STATES = set(states.TERMINAL_STATES) 1a
65logger: "Logger" = get_logger("server") 1a
67T = TypeVar("T", bound=orm_models.Run) 1a
68RP = TypeVar("RP", bound=Union[core.FlowRunPolicy, core.TaskRunPolicy]) 1a
71class OrchestrationContext(PrefectBaseModel, Generic[T, RP]): 1a
72 """
73 A container for a state transition, governed by orchestration rules.
75 Note:
76 An `OrchestrationContext` should not be instantiated directly, instead
77 use the flow- or task- specific subclasses, `FlowOrchestrationContext` and
78 `TaskOrchestrationContext`.
80 When a flow- or task- run attempts to change state, Prefect REST API has an opportunity
81 to decide whether this transition can proceed. All the relevant information
82 associated with the state transition is stored in an `OrchestrationContext`,
83 which is subsequently governed by nested orchestration rules implemented using
84 the `BaseOrchestrationRule` ABC.
86 `OrchestrationContext` introduces the concept of a state being `None` in the
87 context of an intended state transition. An initial state can be `None` if a run
88 is is attempting to set a state for the first time. The proposed state might be
89 `None` if a rule governing the transition determines that no state change
90 should occur at all and nothing is written to the database.
92 Attributes:
93 session: a SQLAlchemy database session
94 initial_state: the initial state of a run
95 proposed_state: the proposed state a run is transitioning into
96 validated_state: a proposed state that has committed to the database
97 rule_signature: a record of rules that have fired on entry into a
98 managed context, currently only used for debugging purposes
99 finalization_signature: a record of rules that have fired on exit from a
100 managed context, currently only used for debugging purposes
101 response_status: a SetStateStatus object used to build the API response
102 response_details:a StateResponseDetails object use to build the API response
104 Args:
105 session: a SQLAlchemy database session
106 initial_state: the initial state of a run
107 proposed_state: the proposed state a run is transitioning into
108 """
110 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a
112 session: AsyncSession 1a
113 initial_state: Optional[states.State] = None 1a
114 proposed_state: Optional[states.State] = None 1a
115 validated_state: Optional[states.State] = Field(default=None) 1a
116 rule_signature: list[str] = Field(default_factory=list) 1a
117 finalization_signature: list[str] = Field(default_factory=list) 1a
118 response_status: SetStateStatus = Field(default=SetStateStatus.ACCEPT) 1a
119 response_details: StateResponseDetails = Field(default_factory=StateAcceptDetails) 1a
120 orchestration_error: Optional[Exception] = Field(default=None) 1a
121 parameters: dict[Any, Any] = Field(default_factory=dict) 1a
122 client_version: Optional[str] = None 1a
123 run: T 1a
125 @property 1a
126 def initial_state_type(self) -> Optional[states.StateType]: 1a
127 """The state type of `self.initial_state` if it exists."""
129 return self.initial_state.type if self.initial_state else None
131 @property 1a
132 def proposed_state_type(self) -> Optional[states.StateType]: 1a
133 """The state type of `self.proposed_state` if it exists."""
135 return self.proposed_state.type if self.proposed_state else None
137 @property 1a
138 def validated_state_type(self) -> Optional[states.StateType]: 1a
139 """The state type of `self.validated_state` if it exists."""
140 return self.validated_state.type if self.validated_state else None
142 @property 1a
143 def run_settings(self) -> RP: 1a
144 """Run-level settings used to orchestrate the state transition."""
145 raise NotImplementedError(
146 "Run-level settings are not supported for this context"
147 )
149 def safe_copy(self) -> Self: 1a
150 """
151 Creates a mostly-mutation-safe copy for use in orchestration rules.
153 Orchestration rules govern state transitions using information stored in
154 an `OrchestrationContext`. However, mutating objects stored on the context
155 directly can have unintended side-effects. To guard against this,
156 `self.safe_copy` can be used to pass information to orchestration rules
157 without risking mutation.
159 Returns:
160 A mutation-safe copy of the `OrchestrationContext`
161 """
163 safe_copy = self.model_copy()
165 safe_copy.initial_state = (
166 self.initial_state.model_copy() if self.initial_state else None
167 )
168 safe_copy.proposed_state = (
169 self.proposed_state.model_copy() if self.proposed_state else None
170 )
171 safe_copy.validated_state = (
172 self.validated_state.model_copy() if self.validated_state else None
173 )
174 safe_copy.parameters = self.parameters.copy()
175 return safe_copy
177 def entry_context( 1a
178 self,
179 ) -> tuple[Optional[states.State], Optional[states.State], Self]:
180 """
181 A convenience method that generates input parameters for orchestration rules.
183 An `OrchestrationContext` defines a state transition that is managed by
184 orchestration rules which can fire hooks before a transition has been committed
185 to the database. These hooks have a consistent interface which can be generated
186 with this method.
187 """
189 safe_context = self.safe_copy()
190 return safe_context.initial_state, safe_context.proposed_state, safe_context
192 def exit_context( 1a
193 self,
194 ) -> tuple[Optional[states.State], Optional[states.State], Self]:
195 """
196 A convenience method that generates input parameters for orchestration rules.
198 An `OrchestrationContext` defines a state transition that is managed by
199 orchestration rules which can fire hooks after a transition has been committed
200 to the database. These hooks have a consistent interface which can be generated
201 with this method.
202 """
204 safe_context = self.safe_copy()
205 return safe_context.initial_state, safe_context.validated_state, safe_context
207 async def flow_run(self) -> orm_models.FlowRun | None: 1a
208 raise NotImplementedError("Flow run is not supported for this context")
211class FlowOrchestrationContext( 1a
212 OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
213):
214 """
215 A container for a flow run state transition, governed by orchestration rules.
217 When a flow- run attempts to change state, Prefect REST API has an opportunity
218 to decide whether this transition can proceed. All the relevant information
219 associated with the state transition is stored in an `OrchestrationContext`,
220 which is subsequently governed by nested orchestration rules implemented using
221 the `BaseOrchestrationRule` ABC.
223 `FlowOrchestrationContext` introduces the concept of a state being `None` in the
224 context of an intended state transition. An initial state can be `None` if a run
225 is is attempting to set a state for the first time. The proposed state might be
226 `None` if a rule governing the transition determines that no state change
227 should occur at all and nothing is written to the database.
229 Attributes:
230 session: a SQLAlchemy database session
231 run: the flow run attempting to change state
232 initial_state: the initial state of the run
233 proposed_state: the proposed state the run is transitioning into
234 validated_state: a proposed state that has committed to the database
235 rule_signature: a record of rules that have fired on entry into a
236 managed context, currently only used for debugging purposes
237 finalization_signature: a record of rules that have fired on exit from a
238 managed context, currently only used for debugging purposes
239 response_status: a SetStateStatus object used to build the API response
240 response_details:a StateResponseDetails object use to build the API response
242 Args:
243 session: a SQLAlchemy database session
244 run: the flow run attempting to change state
245 initial_state: the initial state of a run
246 proposed_state: the proposed state a run is transitioning into
247 """
249 run: orm_models.FlowRun 1a
251 @inject_db 1a
252 async def validate_proposed_state( 1a
253 self,
254 db: PrefectDBInterface,
255 ):
256 """
257 Validates a proposed state by committing it to the database.
259 After the `FlowOrchestrationContext` is governed by orchestration rules, the
260 proposed state can be validated: the proposed state is added to the current
261 SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
262 state. The state on the run is set to the validated state as well.
264 If the proposed state is `None` when this method is called, no state will be
265 written and `self.validated_state` will be set to the run's current state.
267 Returns:
268 None
269 """
270 # (circular import)
271 from prefect.server.api.server import is_client_retryable_exception
273 try:
274 await self._validate_proposed_state()
275 return
276 except Exception as exc:
277 logger.exception("Encountered error during state validation")
278 self.proposed_state: states.State[Any] | None = None
280 if is_client_retryable_exception(exc):
281 # Do not capture retryable database exceptions, this exception will be
282 # raised as a 503 in the API layer
283 raise
285 reason = f"Error validating state: {exc!r}"
286 self.response_status = SetStateStatus.ABORT
287 self.response_details: StateResponseDetails = StateAbortDetails(
288 reason=reason
289 )
291 @db_injector 1a
292 async def _validate_proposed_state( 1a
293 self,
294 db: PrefectDBInterface,
295 ):
296 if self.proposed_state is None:
297 validated_orm_state = self.run.state
298 state_data = None
299 if (
300 self.run.state is not None
301 and self.run.state.result_artifact_id is not None
302 ):
303 # We cannot access `self.run.state.data` directly for unknown reasons
304 artifact = await artifacts.read_artifact(
305 self.session, self.run.state.result_artifact_id
306 )
307 state_data = artifact.data if artifact else None
308 else:
309 state_payload = self.proposed_state.model_dump_for_orm()
310 state_data: dict[str, Any] | Any | None = state_payload.pop("data", None)
312 if state_data is not None and not (
313 isinstance(state_data, dict) and state_data.get("type") == "unpersisted"
314 ):
315 state_result_artifact = core.Artifact.from_result(state_data)
316 state_result_artifact.flow_run_id = self.run.id
317 await artifacts.create_artifact(self.session, state_result_artifact)
318 state_payload["result_artifact_id"] = state_result_artifact.id
320 validated_orm_state = db.FlowRunState(
321 flow_run_id=self.run.id,
322 **state_payload,
323 )
325 if validated_orm_state:
326 self.session.add(validated_orm_state)
327 self.run.set_state(validated_orm_state)
329 await self.session.flush()
330 self.validated_state = states.State.from_orm_without_result(
331 validated_orm_state, with_data=state_data
332 )
333 else:
334 self.validated_state = None
336 def safe_copy(self) -> Self: 1a
337 """
338 Creates a mostly-mutation-safe copy for use in orchestration rules.
340 Orchestration rules govern state transitions using information stored in
341 an `OrchestrationContext`. However, mutating objects stored on the context
342 directly can have unintended side-effects. To guard against this,
343 `self.safe_copy` can be used to pass information to orchestration rules
344 without risking mutation.
346 Note:
347 `self.run` is an ORM model, and even when copied is unsafe to mutate
349 Returns:
350 A mutation-safe copy of `FlowOrchestrationContext`
351 """
353 return super().safe_copy()
355 @property 1a
356 def run_settings(self) -> core.FlowRunPolicy: 1a
357 """Run-level settings used to orchestrate the state transition."""
359 return self.run.empirical_policy
361 async def task_run(self) -> None: 1a
362 return None
364 async def flow_run(self) -> orm_models.FlowRun: 1a
365 return self.run
368class TaskOrchestrationContext( 1a
369 OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy]
370):
371 """
372 A container for a task run state transition, governed by orchestration rules.
374 When a task- run attempts to change state, Prefect REST API has an opportunity
375 to decide whether this transition can proceed. All the relevant information
376 associated with the state transition is stored in an `OrchestrationContext`,
377 which is subsequently governed by nested orchestration rules implemented using
378 the `BaseOrchestrationRule` ABC.
380 `TaskOrchestrationContext` introduces the concept of a state being `None` in the
381 context of an intended state transition. An initial state can be `None` if a run
382 is is attempting to set a state for the first time. The proposed state might be
383 `None` if a rule governing the transition determines that no state change
384 should occur at all and nothing is written to the database.
386 Attributes:
387 session: a SQLAlchemy database session
388 run: the task run attempting to change state
389 initial_state: the initial state of the run
390 proposed_state: the proposed state the run is transitioning into
391 validated_state: a proposed state that has committed to the database
392 rule_signature: a record of rules that have fired on entry into a
393 managed context, currently only used for debugging purposes
394 finalization_signature: a record of rules that have fired on exit from a
395 managed context, currently only used for debugging purposes
396 response_status: a SetStateStatus object used to build the API response
397 response_details:a StateResponseDetails object use to build the API response
399 Args:
400 session: a SQLAlchemy database session
401 run: the task run attempting to change state
402 initial_state: the initial state of a run
403 proposed_state: the proposed state a run is transitioning into
404 """
406 run: orm_models.TaskRun 1a
408 @inject_db 1a
409 async def validate_proposed_state( 1a
410 self,
411 db: PrefectDBInterface,
412 ):
413 """
414 Validates a proposed state by committing it to the database.
416 After the `TaskOrchestrationContext` is governed by orchestration rules, the
417 proposed state can be validated: the proposed state is added to the current
418 SQLAlchemy session and is flushed. `self.validated_state` set to the flushed
419 state. The state on the run is set to the validated state as well.
421 If the proposed state is `None` when this method is called, no state will be
422 written and `self.validated_state` will be set to the run's current state.
424 Returns:
425 None
426 """
427 # (circular import)
428 from prefect.server.api.server import is_client_retryable_exception
430 try:
431 await self._validate_proposed_state()
432 return
433 except Exception as exc:
434 logger.exception("Encountered error during state validation")
435 self.proposed_state: states.State | None = None
437 if is_client_retryable_exception(exc):
438 # Do not capture retryable database exceptions, this exception will be
439 # raised as a 503 in the API layer
440 raise
442 reason = f"Error validating state: {exc!r}"
443 self.response_status = SetStateStatus.ABORT
444 self.response_details: StateResponseDetails = StateAbortDetails(
445 reason=reason
446 )
448 @db_injector 1a
449 async def _validate_proposed_state( 1a
450 self,
451 db: PrefectDBInterface,
452 ):
453 if self.proposed_state is None:
454 validated_orm_state = self.run.state
455 state_data = None
456 if (
457 self.run.state is not None
458 and self.run.state.result_artifact_id is not None
459 ):
460 # We cannot access `self.run.state.data` directly for unknown reasons
461 artifact = await artifacts.read_artifact(
462 self.session, self.run.state.result_artifact_id
463 )
464 state_data = artifact.data if artifact else None
465 else:
466 state_payload = self.proposed_state.model_dump_for_orm()
467 state_data: dict[str, Any] | Any | None = state_payload.pop("data", None)
469 if state_data is not None and not (
470 isinstance(state_data, dict) and state_data.get("type") == "unpersisted"
471 ):
472 state_result_artifact = core.Artifact.from_result(state_data)
473 state_result_artifact.task_run_id = self.run.id
475 if self.run.flow_run_id is not None:
476 state_result_artifact.flow_run_id = self.run.flow_run_id
478 await artifacts.create_artifact(self.session, state_result_artifact)
479 state_payload["result_artifact_id"] = state_result_artifact.id
481 validated_orm_state = db.TaskRunState(
482 task_run_id=self.run.id,
483 **state_payload,
484 )
486 if validated_orm_state:
487 self.session.add(validated_orm_state)
488 self.run.set_state(validated_orm_state)
490 await self.session.flush()
491 self.validated_state = states.State.from_orm_without_result(
492 validated_orm_state, with_data=state_data
493 )
494 else:
495 self.validated_state = None
497 def safe_copy(self) -> Self: 1a
498 """
499 Creates a mostly-mutation-safe copy for use in orchestration rules.
501 Orchestration rules govern state transitions using information stored in
502 an `OrchestrationContext`. However, mutating objects stored on the context
503 directly can have unintended side-effects. To guard against this,
504 `self.safe_copy` can be used to pass information to orchestration rules
505 without risking mutation.
507 Note:
508 `self.run` is an ORM model, and even when copied is unsafe to mutate
510 Returns:
511 A mutation-safe copy of `TaskOrchestrationContext`
512 """
514 return super().safe_copy()
516 @property 1a
517 def run_settings(self) -> core.TaskRunPolicy: 1a
518 """Run-level settings used to orchestrate the state transition."""
520 return self.run.empirical_policy
522 async def task_run(self) -> orm_models.TaskRun: 1a
523 return self.run
525 async def flow_run(self) -> orm_models.FlowRun | None: 1a
526 if self.run.flow_run_id is None:
527 return None
528 return await flow_runs.read_flow_run(
529 session=self.session,
530 flow_run_id=self.run.flow_run_id,
531 )
534class BaseOrchestrationRule( 1a
535 contextlib.AbstractAsyncContextManager[OrchestrationContext[T, RP]]
536):
537 """
538 An abstract base class used to implement a discrete piece of orchestration logic.
540 An `OrchestrationRule` is a stateful context manager that directly governs a state
541 transition. Complex orchestration is achieved by nesting multiple rules.
542 Each rule runs against an `OrchestrationContext` that contains the transition
543 details; this context is then passed to subsequent rules. The context can be
544 modified by hooks that fire before and after a new state is validated and committed
545 to the database. These hooks will fire as long as the state transition is
546 considered "valid" and govern a transition by either modifying the proposed state
547 before it is validated or by producing a side-effect.
549 A state transition occurs whenever a flow- or task- run changes state, prompting
550 Prefect REST API to decide whether or not this transition can proceed. The current state of
551 the run is referred to as the "initial state", and the state a run is
552 attempting to transition into is the "proposed state". Together, the initial state
553 transitioning into the proposed state is the intended transition that is governed
554 by these orchestration rules. After using rules to enter a runtime context, the
555 `OrchestrationContext` will contain a proposed state that has been governed by
556 each rule, and at that point can validate the proposed state and commit it to
557 the database. The validated state will be set on the context as
558 `context.validated_state`, and rules will call the `self.after_transition` hook
559 upon exiting the managed context.
561 Examples:
563 Create a rule:
565 ```python
566 class BasicRule(BaseOrchestrationRule):
567 # allowed initial state types
568 FROM_STATES = [StateType.RUNNING]
569 # allowed proposed state types
570 TO_STATES = [StateType.COMPLETED, StateType.FAILED]
572 async def before_transition(initial_state, proposed_state, ctx):
573 # side effects and proposed state mutation can happen here
574 ...
576 async def after_transition(initial_state, validated_state, ctx):
577 # operations on states that have been validated can happen here
578 ...
580 async def cleanup(intitial_state, validated_state, ctx):
581 # reverts side effects generated by `before_transition` if necessary
582 ...
583 ```
585 Use a rule:
587 ```python
588 intended_transition = (StateType.RUNNING, StateType.COMPLETED)
589 async with BasicRule(context, *intended_transition):
590 # context.proposed_state has been governed by BasicRule
591 ...
592 ```
594 Use multiple rules:
596 ```python
597 rules = [BasicRule, BasicRule]
598 intended_transition = (StateType.RUNNING, StateType.COMPLETED)
599 async with contextlib.AsyncExitStack() as stack:
600 for rule in rules:
601 stack.enter_async_context(rule(context, *intended_transition))
603 # context.proposed_state has been governed by all rules
604 ...
605 ```
607 Attributes:
608 FROM_STATES: list of valid initial state types this rule governs
609 TO_STATES: list of valid proposed state types this rule governs
610 context: the orchestration context
611 from_state_type: the state type a run is currently in
612 to_state_type: the intended proposed state type prior to any orchestration
614 Args:
615 context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is
616 passed between rules
617 from_state_type: The state type of the initial state of a run, if this
618 state type is not contained in `FROM_STATES`, no hooks will fire
619 to_state_type: The state type of the proposed state before orchestration, if
620 this state type is not contained in `TO_STATES`, no hooks will fire
621 """
623 FROM_STATES: set[states.StateType | None] = set() 1a
624 TO_STATES: set[states.StateType | None] = set() 1a
626 def __init__( 1a
627 self,
628 context: OrchestrationContext[T, RP],
629 from_state_type: Optional[states.StateType],
630 to_state_type: Optional[states.StateType],
631 ):
632 self.context = context
633 self.from_state_type = from_state_type
634 self.to_state_type = to_state_type
635 self._invalid_on_entry = None
637 async def __aenter__(self) -> OrchestrationContext[T, RP]: 1a
638 """
639 Enter an async runtime context governed by this rule.
641 The `with` statement will bind a governed `OrchestrationContext` to the target
642 specified by the `as` clause. If the transition proposed by the
643 `OrchestrationContext` is considered invalid on entry, entering this context
644 will do nothing. Otherwise, `self.before_transition` will fire.
645 """
646 if await self.invalid():
647 pass
648 else:
649 try:
650 entry_context = self.context.entry_context()
651 await self.before_transition(*entry_context)
652 self.context.rule_signature.append(str(self.__class__))
653 except Exception as before_transition_error:
654 reason = (
655 f"Aborting orchestration due to error in {self.__class__!r}:"
656 f" !{before_transition_error!r}"
657 )
658 logger.exception(
659 f"Error running before-transition hook in rule {self.__class__!r}:"
660 f" !{before_transition_error!r}"
661 )
663 self.context.proposed_state = None
664 self.context.response_status = SetStateStatus.ABORT
665 self.context.response_details = StateAbortDetails(reason=reason)
666 self.context.orchestration_error = before_transition_error
668 return self.context
670 async def __aexit__( 1a
671 self,
672 exc_type: type[BaseException] | None,
673 exc_val: BaseException | None,
674 exc_tb: TracebackType | None,
675 ) -> None:
676 """
677 Exit the async runtime context governed by this rule.
679 One of three outcomes can happen upon exiting this rule's context depending on
680 the state of the rule. If the rule was found to be invalid on entry, nothing
681 happens. If the rule was valid on entry and continues to be valid on exit,
682 `self.after_transition` will fire. If the rule was valid on entry but invalid
683 on exit, the rule will "fizzle" and `self.cleanup` will fire in order to revert
684 any side-effects produced by `self.before_transition`.
685 """
687 exit_context = self.context.exit_context()
688 if await self.invalid():
689 pass
690 elif await self.fizzled():
691 await self.cleanup(*exit_context)
692 else:
693 await self.after_transition(*exit_context)
694 self.context.finalization_signature.append(str(self.__class__))
696 async def before_transition( 1a
697 self,
698 initial_state: Optional[states.State],
699 proposed_state: Optional[states.State],
700 context: OrchestrationContext[T, RP],
701 ) -> None:
702 """
703 Implements a hook that can fire before a state is committed to the database.
705 This hook may produce side-effects or mutate the proposed state of a
706 transition using one of four methods: `self.reject_transition`,
707 `self.delay_transition`, `self.abort_transition`, and `self.rename_state`.
709 Note:
710 As currently implemented, the `before_transition` hook is not
711 perfectly isolated from mutating the transition. It is a standard instance
712 method that has access to `self`, and therefore `self.context`. This should
713 never be modified directly. Furthermore, `context.run` is an ORM model, and
714 mutating the run can also cause unintended writes to the database.
716 Args:
717 initial_state: The initial state of a transition
718 proposed_state: The proposed state of a transition
719 context: A safe copy of the `OrchestrationContext`, with the exception of
720 `context.run`, mutating this context will have no effect on the broader
721 orchestration environment.
723 Returns:
724 None
725 """
727 async def after_transition( 1a
728 self,
729 initial_state: Optional[states.State],
730 validated_state: Optional[states.State],
731 context: OrchestrationContext[T, RP],
732 ) -> None:
733 """
734 Implements a hook that can fire after a state is committed to the database.
736 Args:
737 initial_state: The initial state of a transition
738 validated_state: The governed state that has been committed to the database
739 context: A safe copy of the `OrchestrationContext`, with the exception of
740 `context.run`, mutating this context will have no effect on the broader
741 orchestration environment.
743 Returns:
744 None
745 """
747 async def cleanup( 1a
748 self,
749 initial_state: Optional[states.State],
750 validated_state: Optional[states.State],
751 context: OrchestrationContext[T, RP],
752 ) -> None:
753 """
754 Implements a hook that can fire after a state is committed to the database.
756 The intended use of this method is to revert side-effects produced by
757 `self.before_transition` when the transition is found to be invalid on exit.
758 This allows multiple rules to be gracefully run in sequence, without logic that
759 keeps track of all other rules that might govern a transition.
761 Args:
762 initial_state: The initial state of a transition
763 validated_state: The governed state that has been committed to the database
764 context: A safe copy of the `OrchestrationContext`, with the exception of
765 `context.run`, mutating this context will have no effect on the broader
766 orchestration environment.
768 Returns:
769 None
770 """
772 async def invalid(self) -> bool: 1a
773 """
774 Determines if a rule is invalid.
776 Invalid rules do nothing and no hooks fire upon entering or exiting a governed
777 context. Rules are invalid if the transition states types are not contained in
778 `self.FROM_STATES` and `self.TO_STATES`, or if the context is proposing
779 a transition that differs from the transition the rule was instantiated with.
781 Returns:
782 True if the rules in invalid, False otherwise.
783 """
784 # invalid and fizzled states are mutually exclusive,
785 # `_invalid_on_entry` holds this statefulness
786 if self.from_state_type not in self.FROM_STATES:
787 self._invalid_on_entry = True
788 if self.to_state_type not in self.TO_STATES:
789 self._invalid_on_entry = True
791 if self._invalid_on_entry is None:
792 self._invalid_on_entry = await self.invalid_transition()
793 return self._invalid_on_entry
795 async def fizzled(self) -> bool: 1a
796 """
797 Determines if a rule is fizzled and side-effects need to be reverted.
799 Rules are fizzled if the transitions were valid on entry (thus firing
800 `self.before_transition`) but are invalid upon exiting the governed context,
801 most likely caused by another rule mutating the transition.
803 Returns:
804 True if the rule is fizzled, False otherwise.
805 """
807 if self._invalid_on_entry:
808 return False
809 return await self.invalid_transition()
811 async def invalid_transition(self) -> bool: 1a
812 """
813 Determines if the transition proposed by the `OrchestrationContext` is invalid.
815 If the `OrchestrationContext` is attempting to manage a transition with this
816 rule that differs from the transition the rule was instantiated with, the
817 transition is considered to be invalid. Depending on the context, a rule with an
818 invalid transition is either "invalid" or "fizzled".
820 Returns:
821 True if the transition is invalid, False otherwise.
822 """
824 initial_state_type = self.context.initial_state_type
825 proposed_state_type = self.context.proposed_state_type
826 return (self.from_state_type != initial_state_type) or (
827 self.to_state_type != proposed_state_type
828 )
830 async def reject_transition( 1a
831 self, state: Optional[states.State], reason: str
832 ) -> None:
833 """
834 Rejects a proposed transition before the transition is validated.
836 This method will reject a proposed transition, mutating the proposed state to
837 the provided `state`. A reason for rejecting the transition is also passed on
838 to the `OrchestrationContext`. Rules that reject the transition will not fizzle,
839 despite the proposed state type changing.
841 Args:
842 state: The new proposed state. If `None`, the current run state will be
843 returned in the result instead.
844 reason: The reason for rejecting the transition
845 """
847 # don't run if the transition is already validated
848 if self.context.validated_state:
849 raise RuntimeError("The transition is already validated")
851 # the current state will be used if a new one is not provided
852 if state is None:
853 if self.from_state_type is None:
854 raise OrchestrationError(
855 "The current run has no state; this transition cannot be "
856 "rejected without providing a new state."
857 )
858 self.to_state_type = None
859 self.context.proposed_state = None
860 else:
861 # a rule that mutates state should not fizzle itself
862 self.to_state_type = state.type
863 self.context.proposed_state = state
865 self.context.response_status = SetStateStatus.REJECT
866 self.context.response_details = StateRejectDetails(reason=reason)
868 async def delay_transition( 1a
869 self,
870 delay_seconds: int,
871 reason: str,
872 ) -> None:
873 """
874 Delays a proposed transition before the transition is validated.
876 This method will delay a proposed transition, setting the proposed state to
877 `None`, signaling to the `OrchestrationContext` that no state should be
878 written to the database. The number of seconds a transition should be delayed is
879 passed to the `OrchestrationContext`. A reason for delaying the transition is
880 also provided. Rules that delay the transition will not fizzle, despite the
881 proposed state type changing.
883 Args:
884 delay_seconds: The number of seconds the transition should be delayed
885 reason: The reason for delaying the transition
886 """
888 # don't run if the transition is already validated
889 if self.context.validated_state:
890 raise RuntimeError("The transition is already validated")
892 # a rule that mutates state should not fizzle itself
893 self.to_state_type = None
894 self.context.proposed_state = None
895 self.context.response_status = SetStateStatus.WAIT
896 self.context.response_details = StateWaitDetails(
897 delay_seconds=delay_seconds, reason=reason
898 )
900 async def abort_transition(self, reason: str) -> None: 1a
901 """
902 Aborts a proposed transition before the transition is validated.
904 This method will abort a proposed transition, expecting no further action to
905 occur for this run. The proposed state is set to `None`, signaling to the
906 `OrchestrationContext` that no state should be written to the database. A
907 reason for aborting the transition is also provided. Rules that abort the
908 transition will not fizzle, despite the proposed state type changing.
910 Args:
911 reason: The reason for aborting the transition
912 """
914 # don't run if the transition is already validated
915 if self.context.validated_state:
916 raise RuntimeError("The transition is already validated")
918 # a rule that mutates state should not fizzle itself
919 self.to_state_type = None
920 self.context.proposed_state = None
921 self.context.response_status = SetStateStatus.ABORT
922 self.context.response_details = StateAbortDetails(reason=reason)
924 async def rename_state(self, state_name: str) -> None: 1a
925 """
926 Sets the "name" attribute on a proposed state.
928 The name of a state is an annotation intended to provide rich, human-readable
929 context for how a run is progressing. This method only updates the name and not
930 the canonical state TYPE, and will not fizzle or invalidate any other rules
931 that might govern this state transition.
932 """
933 if self.context.proposed_state is not None:
934 self.context.proposed_state.name = state_name
936 async def update_context_parameters(self, key: str, value: Any) -> None: 1a
937 """
938 Updates the "parameters" dictionary attribute with the specified key-value pair.
940 This mechanism streamlines the process of passing messages and information
941 between orchestration rules if necessary and is simpler and more ephemeral than
942 message-passing via the database or some other side-effect. This mechanism can
943 be used to break up large rules for ease of testing or comprehension, but note
944 that any rules coupled this way (or any other way) are no longer independent and
945 the order in which they appear in the orchestration policy priority will matter.
946 """
948 self.context.parameters.update({key: value})
951class FlowRunOrchestrationRule( 1a
952 BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]
953):
954 pass 1a
957class TaskRunOrchestrationRule( 1a
958 BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]
959):
960 pass 1a
963class GenericOrchestrationRule( 1a
964 BaseOrchestrationRule[orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]]
965):
966 pass 1a
969class BaseUniversalTransform( 1a
970 contextlib.AbstractAsyncContextManager[OrchestrationContext[T, RP]]
971):
972 """
973 An abstract base class used to implement privileged bookkeeping logic.
975 Warning:
976 In almost all cases, use the `BaseOrchestrationRule` base class instead.
978 Beyond the orchestration rules implemented with the `BaseOrchestrationRule` ABC,
979 Universal transforms are not stateful, and fire their before- and after- transition
980 hooks on every state transition unless the proposed state is `None`, indicating that
981 no state should be written to the database. Because there are no guardrails in place
982 to prevent directly mutating state or other parts of the orchestration context,
983 universal transforms should only be used with care.
985 Attributes:
986 FROM_STATES: for compatibility with `BaseOrchestrationPolicy`
987 TO_STATES: for compatibility with `BaseOrchestrationPolicy`
988 context: the orchestration context
989 from_state_type: the state type a run is currently in
990 to_state_type: the intended proposed state type prior to any orchestration
992 Args:
993 context: A `FlowOrchestrationContext` or `TaskOrchestrationContext` that is
994 passed between transforms
995 """
997 # `BaseUniversalTransform` will always fire on non-null transitions
998 FROM_STATES: Iterable[states.StateType | None] = ALL_ORCHESTRATION_STATES 1a
999 TO_STATES: Iterable[states.StateType | None] = ALL_ORCHESTRATION_STATES 1a
1001 def __init__( 1a
1002 self,
1003 context: OrchestrationContext[T, RP],
1004 from_state_type: Optional[states.StateType],
1005 to_state_type: Optional[states.StateType],
1006 ):
1007 self.context = context
1008 self.from_state_type = from_state_type
1009 self.to_state_type = to_state_type
1011 async def __aenter__(self) -> OrchestrationContext[T, RP]: 1a
1012 """
1013 Enter an async runtime context governed by this transform.
1015 The `with` statement will bind a governed `OrchestrationContext` to the target
1016 specified by the `as` clause. If the transition proposed by the
1017 `OrchestrationContext` has been nullified on entry and `context.proposed_state`
1018 is `None`, entering this context will do nothing. Otherwise
1019 `self.before_transition` will fire.
1020 """
1022 await self.before_transition(self.context)
1023 self.context.rule_signature.append(str(self.__class__))
1024 return self.context
1026 async def __aexit__( 1a
1027 self,
1028 exc_type: type[BaseException] | None,
1029 exc_val: BaseException | None,
1030 exc_tb: TracebackType | None,
1031 ) -> None:
1032 """
1033 Exit the async runtime context governed by this transform.
1035 If the transition has been nullified or errorred upon exiting this transforms's context,
1036 nothing happens. Otherwise, `self.after_transition` will fire on every non-null
1037 proposed state.
1038 """
1040 if not self.exception_in_transition():
1041 await self.after_transition(self.context)
1042 self.context.finalization_signature.append(str(self.__class__))
1044 async def before_transition(self, context: OrchestrationContext[T, RP]) -> None: 1a
1045 """
1046 Implements a hook that fires before a state is committed to the database.
1048 Args:
1049 context: the `OrchestrationContext` that contains transition details
1051 Returns:
1052 None
1053 """
1055 async def after_transition(self, context: OrchestrationContext[T, RP]) -> None: 1a
1056 """
1057 Implements a hook that can fire after a state is committed to the database.
1059 Args:
1060 context: the `OrchestrationContext` that contains transition details
1062 Returns:
1063 None
1064 """
1066 def nullified_transition(self) -> bool: 1a
1067 """
1068 Determines if the transition has been nullified.
1070 Transitions are nullified if the proposed state is `None`, indicating that
1071 nothing should be written to the database.
1073 Returns:
1074 True if the transition is nullified, False otherwise.
1075 """
1077 return self.context.proposed_state is None
1079 def exception_in_transition(self) -> bool: 1a
1080 """
1081 Determines if the transition has encountered an exception.
1083 Returns:
1084 True if the transition is encountered an exception, False otherwise.
1085 """
1087 return self.context.orchestration_error is not None
1090class TaskRunUniversalTransform( 1a
1091 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]
1092):
1093 pass 1a
1096class FlowRunUniversalTransform( 1a
1097 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
1098):
1099 pass 1a
1102class GenericUniversalTransform( 1a
1103 BaseUniversalTransform[
1104 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
1105 ]
1106):
1107 pass 1a
1110GenericOrchestrationContext = OrchestrationContext[ 1a
1111 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
1112]