Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/policies.py: 97%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Policies are collections of orchestration rules and transforms. 

3 

4Prefect implements (most) orchestration with logic that governs a Prefect flow or task 

5changing state. Policies organize of orchestration logic both to provide an ordering 

6mechanism as well as provide observability into the orchestration process. 

7 

8While Prefect's orchestration rules can gracefully run independently of one another, ordering can still have an impact on 

9the observed behavior of the system. For example, it makes no sense to secure a 

10concurrency slot for a run if a cached state exists. Furthermore, policies, provide a 

11mechanism to configure and observe exactly what logic will fire against a transition. 

12""" 

13 

14from __future__ import annotations 1a

15 

16from abc import ABC, abstractmethod 1a

17from typing import Generic, TypeVar, Union 1a

18 

19from prefect.server.database import orm_models 1a

20from prefect.server.orchestration.rules import ( 1a

21 BaseOrchestrationRule, 

22 BaseUniversalTransform, 

23) 

24from prefect.server.schemas import core, states 1a

25 

26T = TypeVar("T", bound=orm_models.Run) 1a

27RP = TypeVar("RP", bound=Union[core.FlowRunPolicy, core.TaskRunPolicy]) 1a

28 

29 

30class BaseOrchestrationPolicy(ABC, Generic[T, RP]): 1a

31 """ 

32 An abstract base class used to organize orchestration rules in priority order. 

33 

34 Different collections of orchestration rules might be used to govern various kinds 

35 of transitions. For example, flow-run states and task-run states might require 

36 different orchestration logic. 

37 """ 

38 

39 @staticmethod 1a

40 @abstractmethod 1a

41 def priority() -> list[ 1a

42 type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]] 

43 ]: 

44 """ 

45 A list of orchestration rules in priority order. 

46 """ 

47 

48 return [] 

49 

50 @classmethod 1a

51 def compile_transition_rules( 1a

52 cls, 

53 from_state: states.StateType | None = None, 

54 to_state: states.StateType | None = None, 

55 ) -> list[type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]]]: 

56 """ 

57 Returns rules in policy that are valid for the specified state transition. 

58 """ 

59 

60 transition_rules: list[ 1bcd

61 type[BaseUniversalTransform[T, RP] | BaseOrchestrationRule[T, RP]] 

62 ] = [] 

63 for rule in cls.priority(): 1bcd

64 if from_state in rule.FROM_STATES and to_state in rule.TO_STATES: 1bcd

65 transition_rules.append(rule) 1bcd

66 return transition_rules 1bcd

67 

68 

69class TaskRunOrchestrationPolicy( 1a

70 BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy] 

71): 

72 pass 1a

73 

74 

75class FlowRunOrchestrationPolicy( 1a

76 BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy] 

77): 

78 pass 1a

79 

80 

81class GenericOrchestrationPolicy( 1a

82 BaseOrchestrationPolicy[ 

83 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

84 ] 

85): 

86 pass 1a