Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/dependencies.py: 34%

74 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Injected orchestration dependencies 

3""" 

4 

5from __future__ import annotations 1a

6 

7from contextlib import contextmanager 1a

8from typing import Any, Awaitable, Callable, TypedDict, cast 1a

9 

10from packaging.version import Version 1a

11 

12from prefect.server.orchestration.policies import ( 1a

13 FlowRunOrchestrationPolicy, 

14 TaskRunOrchestrationPolicy, 

15) 

16 

17TaskRunPolicyProvider = Callable[[], Awaitable[type[TaskRunOrchestrationPolicy]]] 1a

18FlowRunPolicyProvider = Callable[[], Awaitable[type[FlowRunOrchestrationPolicy]]] 1a

19ParameterProvider = Callable[[], Awaitable[dict[str, Any]]] 1a

20 

21 

22class OrchestrationDependencies(TypedDict): 1a

23 task_policy_provider: TaskRunPolicyProvider | None 1a

24 flow_policy_provider: FlowRunPolicyProvider | None 1a

25 task_orchestration_parameters_provider: ParameterProvider | None 1a

26 flow_orchestration_parameters_provider: ParameterProvider | None 1a

27 

28 

29ORCHESTRATION_DEPENDENCIES: OrchestrationDependencies = { 1a

30 "task_policy_provider": None, 

31 "flow_policy_provider": None, 

32 "task_orchestration_parameters_provider": None, 

33 "flow_orchestration_parameters_provider": None, 

34} 

35 

36WORKER_VERSIONS_THAT_MANAGE_DEPLOYMENT_CONCURRENCY = { 1a

37 "3.0.0rc20", 

38 "3.0.0", 

39 "3.0.1", 

40 "3.0.2", 

41 "3.0.3", 

42} 

43 

44MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING = Version("3.4.11") 1a

45 

46 

47async def provide_task_policy() -> type[TaskRunOrchestrationPolicy]: 1a

48 policy_provider = ORCHESTRATION_DEPENDENCIES.get("task_policy_provider") 

49 

50 if policy_provider is None: 

51 from prefect.server.orchestration.core_policy import CoreTaskPolicy 

52 

53 return CoreTaskPolicy 

54 

55 return await policy_provider() 

56 

57 

58async def provide_flow_policy() -> type[FlowRunOrchestrationPolicy]: 1a

59 policy_provider = ORCHESTRATION_DEPENDENCIES.get("flow_policy_provider") 

60 

61 if policy_provider is None: 

62 from prefect.server.orchestration.core_policy import ( 

63 CoreFlowPolicy, 

64 ) 

65 

66 return CoreFlowPolicy 

67 

68 return await policy_provider() 

69 

70 

71async def provide_task_orchestration_parameters() -> dict[str, Any]: 1a

72 parameter_provider = ORCHESTRATION_DEPENDENCIES.get( 

73 "task_orchestration_parameters_provider" 

74 ) 

75 

76 if parameter_provider is None: 

77 return cast(dict[str, Any], dict()) 

78 

79 return await parameter_provider() 

80 

81 

82async def provide_flow_orchestration_parameters() -> dict[str, Any]: 1a

83 parameter_provider = ORCHESTRATION_DEPENDENCIES.get( 

84 "flow_orchestration_parameters_provider" 

85 ) 

86 

87 if parameter_provider is None: 

88 return cast(dict[str, Any], dict()) 

89 

90 return await parameter_provider() 

91 

92 

93@contextmanager 1a

94def temporary_task_policy(tmp_task_policy: type[TaskRunOrchestrationPolicy]): 1a

95 starting_task_policy = ORCHESTRATION_DEPENDENCIES["task_policy_provider"] 

96 

97 async def policy_lambda(): 

98 return tmp_task_policy 

99 

100 try: 

101 ORCHESTRATION_DEPENDENCIES["task_policy_provider"] = policy_lambda 

102 yield 

103 finally: 

104 ORCHESTRATION_DEPENDENCIES["task_policy_provider"] = starting_task_policy 

105 

106 

107@contextmanager 1a

108def temporary_flow_policy(tmp_flow_policy: type[FlowRunOrchestrationPolicy]): 1a

109 starting_flow_policy = ORCHESTRATION_DEPENDENCIES["flow_policy_provider"] 

110 

111 async def policy_lambda(): 

112 return tmp_flow_policy 

113 

114 try: 

115 ORCHESTRATION_DEPENDENCIES["flow_policy_provider"] = policy_lambda 

116 yield 

117 finally: 

118 ORCHESTRATION_DEPENDENCIES["flow_policy_provider"] = starting_flow_policy 

119 

120 

121@contextmanager 1a

122def temporary_task_orchestration_parameters( 1a

123 tmp_orchestration_parameters: dict[str, Any], 

124): 

125 starting_task_orchestration_parameters = ORCHESTRATION_DEPENDENCIES[ 

126 "task_orchestration_parameters_provider" 

127 ] 

128 

129 async def parameter_lambda(): 

130 return tmp_orchestration_parameters 

131 

132 try: 

133 ORCHESTRATION_DEPENDENCIES["task_orchestration_parameters_provider"] = ( 

134 parameter_lambda 

135 ) 

136 yield 

137 finally: 

138 ORCHESTRATION_DEPENDENCIES["task_orchestration_parameters_provider"] = ( 

139 starting_task_orchestration_parameters 

140 ) 

141 

142 

143@contextmanager 1a

144def temporary_flow_orchestration_parameters( 1a

145 tmp_orchestration_parameters: dict[str, Any], 

146): 

147 starting_flow_orchestration_parameters = ORCHESTRATION_DEPENDENCIES[ 

148 "flow_orchestration_parameters_provider" 

149 ] 

150 

151 async def parameter_lambda(): 

152 return tmp_orchestration_parameters 

153 

154 try: 

155 ORCHESTRATION_DEPENDENCIES["flow_orchestration_parameters_provider"] = ( 

156 parameter_lambda 

157 ) 

158 yield 

159 finally: 

160 ORCHESTRATION_DEPENDENCIES["flow_orchestration_parameters_provider"] = ( 

161 starting_flow_orchestration_parameters 

162 )