Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/global_policy.py: 21%
132 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Bookkeeping logic that fires on every state transition.
4For clarity, `GlobalFlowpolicy` and `GlobalTaskPolicy` contain all transition logic
5implemented using `BaseUniversalTransform`. None of these operations modify state, and regardless of what orchestration Prefect REST API might
6enforce on a transition, the global policies contain Prefect's necessary bookkeeping.
7Because these transforms record information about the validated state committed to the
8state database, they should be the most deeply nested contexts in orchestration loop.
9"""
11from __future__ import annotations 1a
13from typing import Any, Union, cast 1a
15from packaging.version import Version 1a
17import prefect.server.models as models 1a
18from prefect.server.database import orm_models 1a
19from prefect.server.orchestration.policies import BaseOrchestrationPolicy 1a
20from prefect.server.orchestration.rules import ( 1a
21 BaseOrchestrationRule,
22 BaseUniversalTransform,
23 FlowOrchestrationContext,
24 FlowRunUniversalTransform,
25 GenericOrchestrationContext,
26 OrchestrationContext,
27 TaskOrchestrationContext,
28 TaskRunUniversalTransform,
29)
30from prefect.server.schemas import core 1a
31from prefect.server.schemas.core import FlowRunPolicy 1a
34def COMMON_GLOBAL_TRANSFORMS() -> list[ 1a
35 type[
36 BaseUniversalTransform[
37 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
38 ]
39 ]
40]:
41 return [
42 SetRunStateType,
43 SetRunStateName,
44 SetRunStateTimestamp,
45 SetStartTime,
46 SetEndTime,
47 IncrementRunTime,
48 SetExpectedStartTime,
49 SetNextScheduledStartTime,
50 UpdateStateDetails,
51 ]
54class GlobalFlowPolicy(BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy]): 1a
55 """
56 Global transforms that run against flow-run-state transitions in priority order.
58 These transforms are intended to run immediately before and after a state transition
59 is validated.
60 """
62 @staticmethod 1a
63 def priority() -> list[ 1a
64 Union[
65 type[BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]],
66 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]],
67 ]
68 ]:
69 return cast(
70 list[
71 Union[
72 type[
73 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
74 ],
75 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]],
76 ]
77 ],
78 COMMON_GLOBAL_TRANSFORMS(),
79 ) + [
80 UpdateSubflowParentTask,
81 UpdateSubflowStateDetails,
82 IncrementFlowRunCount,
83 RemoveResumingIndicator,
84 ]
87class GlobalTaskPolicy(BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy]): 1a
88 """
89 Global transforms that run against task-run-state transitions in priority order.
91 These transforms are intended to run immediately before and after a state transition
92 is validated.
93 """
95 @staticmethod 1a
96 def priority() -> list[ 1a
97 Union[
98 type[BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]],
99 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]],
100 ]
101 ]:
102 return cast(
103 list[
104 Union[
105 type[
106 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]
107 ],
108 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]],
109 ]
110 ],
111 COMMON_GLOBAL_TRANSFORMS(),
112 ) + [IncrementTaskRunCount]
115class SetRunStateType( 1a
116 BaseUniversalTransform[
117 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
118 ]
119):
120 """
121 Updates the state type of a run on a state transition.
122 """
124 async def before_transition( 1a
125 self, context: GenericOrchestrationContext[orm_models.Run, Any]
126 ) -> None:
127 if self.nullified_transition():
128 return
130 # record the new state's type
131 if context.proposed_state is not None:
132 context.run.state_type = context.proposed_state.type
135class SetRunStateName( 1a
136 BaseUniversalTransform[
137 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
138 ]
139):
140 """
141 Updates the state name of a run on a state transition.
142 """
144 async def before_transition( 1a
145 self, context: GenericOrchestrationContext[orm_models.Run, Any]
146 ) -> None:
147 if self.nullified_transition():
148 return
150 if context.proposed_state is not None:
151 # record the new state's name
152 context.run.state_name = context.proposed_state.name
155class SetStartTime( 1a
156 BaseUniversalTransform[
157 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
158 ]
159):
160 """
161 Records the time a run enters a running state for the first time.
162 """
164 async def before_transition( 1a
165 self, context: GenericOrchestrationContext[orm_models.Run, Any]
166 ) -> None:
167 if self.nullified_transition():
168 return
170 if context.proposed_state is not None:
171 # if entering a running state and no start time is set...
172 if context.proposed_state.is_running() and context.run.start_time is None:
173 # set the start time
174 context.run.start_time = context.proposed_state.timestamp
177class SetRunStateTimestamp( 1a
178 BaseUniversalTransform[
179 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
180 ]
181):
182 """
183 Records the time a run changes states.
184 """
186 async def before_transition( 1a
187 self, context: GenericOrchestrationContext[orm_models.Run, Any]
188 ) -> None:
189 if self.nullified_transition():
190 return
192 if context.proposed_state is not None:
193 # record the new state's timestamp
194 context.run.state_timestamp = context.proposed_state.timestamp
197class SetEndTime( 1a
198 BaseUniversalTransform[
199 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
200 ]
201):
202 """
203 Records the time a run enters a terminal state.
205 With normal client usage, a run will not transition out of a terminal state.
206 However, it's possible to force these transitions manually via the API. While
207 leaving a terminal state, the end time will be unset.
208 """
210 async def before_transition( 1a
211 self, context: GenericOrchestrationContext[orm_models.Run, Any]
212 ) -> None:
213 if self.nullified_transition():
214 return
216 if context.proposed_state is not None:
217 # if exiting a final state for a non-final state...
218 if (
219 context.initial_state
220 and context.initial_state.is_final()
221 and not context.proposed_state.is_final()
222 ):
223 # clear the end time
224 context.run.end_time = None
226 # if entering a final state...
227 if context.proposed_state.is_final():
228 # if the run has a start time and no end time, give it one
229 if context.run.start_time and not context.run.end_time:
230 context.run.end_time = context.proposed_state.timestamp
233class IncrementRunTime( 1a
234 BaseUniversalTransform[
235 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
236 ]
237):
238 """
239 Records the amount of time a run spends in the running state.
240 """
242 async def before_transition( 1a
243 self, context: GenericOrchestrationContext[orm_models.Run, Any]
244 ) -> None:
245 if self.nullified_transition():
246 return
248 if context.proposed_state is not None:
249 # if exiting a running state...
250 if context.initial_state and context.initial_state.is_running():
251 # increment the run time by the time spent in the previous state
252 context.run.total_run_time += (
253 context.proposed_state.timestamp - context.initial_state.timestamp
254 )
257class IncrementFlowRunCount( 1a
258 FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
259):
260 """
261 Records the number of times a run enters a running state. For use with retries.
262 """
264 async def before_transition( 1a
265 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
266 ) -> None:
267 if self.nullified_transition():
268 return
270 if context.proposed_state is not None:
271 # if entering a running state...
272 if context.proposed_state.is_running():
273 # do not increment the run count if resuming a paused flow
274 api_version = context.parameters.get("api-version", None)
275 if api_version is None or api_version >= Version("0.8.4"):
276 if context.run.empirical_policy.resuming:
277 return
279 # increment the run count
280 context.run.run_count += 1
283class RemoveResumingIndicator( 1a
284 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
285):
286 """
287 Removes the indicator on a flow run that marks it as resuming.
288 """
290 async def before_transition( 1a
291 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
292 ) -> None:
293 if self.nullified_transition():
294 return
296 proposed_state = context.proposed_state
298 api_version = context.parameters.get("api-version", None)
299 if api_version is None or api_version >= Version("0.8.4"):
300 if proposed_state is not None and (
301 proposed_state.is_running() or proposed_state.is_final()
302 ):
303 if context.run.empirical_policy.resuming:
304 updated_policy = context.run.empirical_policy.model_dump()
305 updated_policy["resuming"] = False
306 context.run.empirical_policy = FlowRunPolicy(**updated_policy)
309class IncrementTaskRunCount(TaskRunUniversalTransform): 1a
310 """
311 Records the number of times a run enters a running state. For use with retries.
312 """
314 async def before_transition( 1a
315 self, context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy]
316 ) -> None:
317 if self.nullified_transition():
318 return
320 proposed_state = context.proposed_state
322 # if entering a running state...
323 if proposed_state is not None and proposed_state.is_running():
324 # increment the run count
325 context.run.run_count += 1
328class SetExpectedStartTime( 1a
329 BaseUniversalTransform[
330 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
331 ]
332):
333 """
334 Estimates the time a state is expected to start running if not set.
336 For scheduled states, this estimate is simply the scheduled time. For other states,
337 this is set to the time the proposed state was created by Prefect.
338 """
340 async def before_transition( 1a
341 self, context: GenericOrchestrationContext[orm_models.Run, Any]
342 ) -> None:
343 if self.nullified_transition():
344 return
346 # set expected start time if this is the first state
347 if not context.run.expected_start_time and context.proposed_state is not None:
348 if context.proposed_state.is_scheduled():
349 context.run.expected_start_time = (
350 context.proposed_state.state_details.scheduled_time
351 )
352 else:
353 context.run.expected_start_time = context.proposed_state.timestamp
356class SetNextScheduledStartTime( 1a
357 BaseUniversalTransform[
358 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
359 ]
360):
361 """
362 Records the scheduled time on a run.
364 When a run enters a scheduled state, `run.next_scheduled_start_time` is set to
365 the state's scheduled time. When leaving a scheduled state,
366 `run.next_scheduled_start_time` is unset.
367 """
369 async def before_transition( 1a
370 self, context: GenericOrchestrationContext[orm_models.Run, Any]
371 ) -> None:
372 if self.nullified_transition():
373 return
375 # remove the next scheduled start time if exiting a scheduled state
376 if context.initial_state and context.initial_state.is_scheduled():
377 context.run.next_scheduled_start_time = None
379 # set next scheduled start time if entering a scheduled state
380 if context.proposed_state is not None and context.proposed_state.is_scheduled():
381 context.run.next_scheduled_start_time = (
382 context.proposed_state.state_details.scheduled_time
383 )
386class UpdateSubflowParentTask( 1a
387 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
388):
389 """
390 Whenever a subflow changes state, it must update its parent task run's state.
391 """
393 async def after_transition( 1a
394 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
395 ) -> None:
396 if self.nullified_transition():
397 return
399 # only applies to flow runs with a parent task run id
400 if (
401 context.run.parent_task_run_id is not None
402 and context.validated_state is not None
403 ):
404 # avoid mutation of the flow run state
405 subflow_parent_task_state = context.validated_state.fresh_copy()
407 # set the task's "child flow run id" to be the subflow run id
408 subflow_parent_task_state.state_details.child_flow_run_id = context.run.id
410 await models.task_runs.set_task_run_state(
411 session=context.session,
412 task_run_id=context.run.parent_task_run_id,
413 state=subflow_parent_task_state,
414 force=True,
415 )
418class UpdateSubflowStateDetails( 1a
419 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
420):
421 """
422 Update a child subflow state's references to a corresponding tracking task run id
423 in the parent flow run
424 """
426 async def before_transition( 1a
427 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
428 ) -> None:
429 if self.nullified_transition():
430 return
432 # only applies to flow runs with a parent task run id
433 if (
434 context.run.parent_task_run_id is not None
435 and context.proposed_state is not None
436 ):
437 context.proposed_state.state_details.task_run_id = (
438 context.run.parent_task_run_id
439 )
442class UpdateStateDetails( 1a
443 BaseUniversalTransform[
444 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
445 ]
446):
447 """
448 Update a state's references to a corresponding flow- or task- run.
449 """
451 async def before_transition( 1a
452 self,
453 context: GenericOrchestrationContext,
454 ) -> None:
455 if self.nullified_transition():
456 return
458 if isinstance(context, FlowOrchestrationContext):
459 flow_run = await context.flow_run()
460 context.proposed_state.state_details.flow_run_id = flow_run.id
462 elif isinstance(context, TaskOrchestrationContext):
463 task_run = await context.task_run()
464 context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
465 context.proposed_state.state_details.task_run_id = task_run.id