Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/policies.py: 97%
26 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Policies are collections of orchestration rules and transforms.
4Prefect implements (most) orchestration with logic that governs a Prefect flow or task
5changing state. Policies organize of orchestration logic both to provide an ordering
6mechanism as well as provide observability into the orchestration process.
8While Prefect's orchestration rules can gracefully run independently of one another, ordering can still have an impact on
9the observed behavior of the system. For example, it makes no sense to secure a
10concurrency slot for a run if a cached state exists. Furthermore, policies, provide a
11mechanism to configure and observe exactly what logic will fire against a transition.
12"""
14from __future__ import annotations 1a
16from abc import ABC, abstractmethod 1a
17from typing import Generic, TypeVar, Union 1a
19from prefect.server.database import orm_models 1a
20from prefect.server.orchestration.rules import ( 1a
21 BaseOrchestrationRule,
22 BaseUniversalTransform,
23)
24from prefect.server.schemas import core, states 1a
26T = TypeVar("T", bound=orm_models.Run) 1a
27RP = TypeVar("RP", bound=Union[core.FlowRunPolicy, core.TaskRunPolicy]) 1a
30class BaseOrchestrationPolicy(ABC, Generic[T, RP]): 1a
31 """
32 An abstract base class used to organize orchestration rules in priority order.
34 Different collections of orchestration rules might be used to govern various kinds
35 of transitions. For example, flow-run states and task-run states might require
36 different orchestration logic.
37 """
39 @staticmethod 1a
40 @abstractmethod 1a
41 def priority() -> list[ 1a
42 type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]]
43 ]:
44 """
45 A list of orchestration rules in priority order.
46 """
48 return []
50 @classmethod 1a
51 def compile_transition_rules( 1a
52 cls,
53 from_state: states.StateType | None = None,
54 to_state: states.StateType | None = None,
55 ) -> list[type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]]]:
56 """
57 Returns rules in policy that are valid for the specified state transition.
58 """
60 transition_rules: list[ 1bcd
61 type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]]
62 ] = []
63 for rule in cls.priority(): 1bcd
64 if from_state in rule.FROM_STATES and to_state in rule.TO_STATES: 1bcd
65 transition_rules.append(rule) 1bcd
66 return transition_rules 1bcd
69class TaskRunOrchestrationPolicy( 1a
70 BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy]
71):
72 pass 1a
75class FlowRunOrchestrationPolicy( 1a
76 BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy]
77):
78 pass 1a
81class GenericOrchestrationPolicy( 1a
82 BaseOrchestrationPolicy[
83 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
84 ]
85):
86 pass 1a