Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/dependencies.py: 34%
74 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Injected orchestration dependencies
3"""
5from __future__ import annotations 1a
7from contextlib import contextmanager 1a
8from typing import Any, Awaitable, Callable, TypedDict, cast 1a
10from packaging.version import Version 1a
12from prefect.server.orchestration.policies import ( 1a
13 FlowRunOrchestrationPolicy,
14 TaskRunOrchestrationPolicy,
15)
17TaskRunPolicyProvider = Callable[[], Awaitable[type[TaskRunOrchestrationPolicy]]] 1a
18FlowRunPolicyProvider = Callable[[], Awaitable[type[FlowRunOrchestrationPolicy]]] 1a
19ParameterProvider = Callable[[], Awaitable[dict[str, Any]]] 1a
22class OrchestrationDependencies(TypedDict): 1a
23 task_policy_provider: TaskRunPolicyProvider | None 1a
24 flow_policy_provider: FlowRunPolicyProvider | None 1a
25 task_orchestration_parameters_provider: ParameterProvider | None 1a
26 flow_orchestration_parameters_provider: ParameterProvider | None 1a
29ORCHESTRATION_DEPENDENCIES: OrchestrationDependencies = { 1a
30 "task_policy_provider": None,
31 "flow_policy_provider": None,
32 "task_orchestration_parameters_provider": None,
33 "flow_orchestration_parameters_provider": None,
34}
36WORKER_VERSIONS_THAT_MANAGE_DEPLOYMENT_CONCURRENCY = { 1a
37 "3.0.0rc20",
38 "3.0.0",
39 "3.0.1",
40 "3.0.2",
41 "3.0.3",
42}
44MIN_CLIENT_VERSION_FOR_CONCURRENCY_LIMIT_LEASING = Version("3.4.11") 1a
47async def provide_task_policy() -> type[TaskRunOrchestrationPolicy]: 1a
48 policy_provider = ORCHESTRATION_DEPENDENCIES.get("task_policy_provider")
50 if policy_provider is None:
51 from prefect.server.orchestration.core_policy import CoreTaskPolicy
53 return CoreTaskPolicy
55 return await policy_provider()
58async def provide_flow_policy() -> type[FlowRunOrchestrationPolicy]: 1a
59 policy_provider = ORCHESTRATION_DEPENDENCIES.get("flow_policy_provider")
61 if policy_provider is None:
62 from prefect.server.orchestration.core_policy import (
63 CoreFlowPolicy,
64 )
66 return CoreFlowPolicy
68 return await policy_provider()
71async def provide_task_orchestration_parameters() -> dict[str, Any]: 1a
72 parameter_provider = ORCHESTRATION_DEPENDENCIES.get(
73 "task_orchestration_parameters_provider"
74 )
76 if parameter_provider is None:
77 return cast(dict[str, Any], dict())
79 return await parameter_provider()
82async def provide_flow_orchestration_parameters() -> dict[str, Any]: 1a
83 parameter_provider = ORCHESTRATION_DEPENDENCIES.get(
84 "flow_orchestration_parameters_provider"
85 )
87 if parameter_provider is None:
88 return cast(dict[str, Any], dict())
90 return await parameter_provider()
93@contextmanager 1a
94def temporary_task_policy(tmp_task_policy: type[TaskRunOrchestrationPolicy]): 1a
95 starting_task_policy = ORCHESTRATION_DEPENDENCIES["task_policy_provider"]
97 async def policy_lambda():
98 return tmp_task_policy
100 try:
101 ORCHESTRATION_DEPENDENCIES["task_policy_provider"] = policy_lambda
102 yield
103 finally:
104 ORCHESTRATION_DEPENDENCIES["task_policy_provider"] = starting_task_policy
107@contextmanager 1a
108def temporary_flow_policy(tmp_flow_policy: type[FlowRunOrchestrationPolicy]): 1a
109 starting_flow_policy = ORCHESTRATION_DEPENDENCIES["flow_policy_provider"]
111 async def policy_lambda():
112 return tmp_flow_policy
114 try:
115 ORCHESTRATION_DEPENDENCIES["flow_policy_provider"] = policy_lambda
116 yield
117 finally:
118 ORCHESTRATION_DEPENDENCIES["flow_policy_provider"] = starting_flow_policy
121@contextmanager 1a
122def temporary_task_orchestration_parameters( 1a
123 tmp_orchestration_parameters: dict[str, Any],
124):
125 starting_task_orchestration_parameters = ORCHESTRATION_DEPENDENCIES[
126 "task_orchestration_parameters_provider"
127 ]
129 async def parameter_lambda():
130 return tmp_orchestration_parameters
132 try:
133 ORCHESTRATION_DEPENDENCIES["task_orchestration_parameters_provider"] = (
134 parameter_lambda
135 )
136 yield
137 finally:
138 ORCHESTRATION_DEPENDENCIES["task_orchestration_parameters_provider"] = (
139 starting_task_orchestration_parameters
140 )
143@contextmanager 1a
144def temporary_flow_orchestration_parameters( 1a
145 tmp_orchestration_parameters: dict[str, Any],
146):
147 starting_flow_orchestration_parameters = ORCHESTRATION_DEPENDENCIES[
148 "flow_orchestration_parameters_provider"
149 ]
151 async def parameter_lambda():
152 return tmp_orchestration_parameters
154 try:
155 ORCHESTRATION_DEPENDENCIES["flow_orchestration_parameters_provider"] = (
156 parameter_lambda
157 )
158 yield
159 finally:
160 ORCHESTRATION_DEPENDENCIES["flow_orchestration_parameters_provider"] = (
161 starting_flow_orchestration_parameters
162 )