Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/global_policy.py: 66%
132 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Bookkeeping logic that fires on every state transition.
4For clarity, `GlobalFlowpolicy` and `GlobalTaskPolicy` contain all transition logic
5implemented using `BaseUniversalTransform`. None of these operations modify state, and regardless of what orchestration Prefect REST API might
6enforce on a transition, the global policies contain Prefect's necessary bookkeeping.
7Because these transforms record information about the validated state committed to the
8state database, they should be the most deeply nested contexts in orchestration loop.
9"""
11from __future__ import annotations 1c
13from typing import Any, Union, cast 1c
15from packaging.version import Version 1c
17import prefect.server.models as models 1c
18from prefect.server.database import orm_models 1c
19from prefect.server.orchestration.policies import BaseOrchestrationPolicy 1c
20from prefect.server.orchestration.rules import ( 1c
21 BaseOrchestrationRule,
22 BaseUniversalTransform,
23 FlowOrchestrationContext,
24 FlowRunUniversalTransform,
25 GenericOrchestrationContext,
26 OrchestrationContext,
27 TaskOrchestrationContext,
28 TaskRunUniversalTransform,
29)
30from prefect.server.schemas import core 1c
31from prefect.server.schemas.core import FlowRunPolicy 1c
34def COMMON_GLOBAL_TRANSFORMS() -> list[ 1c
35 type[
36 BaseUniversalTransform[
37 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
38 ]
39 ]
40]:
41 return [ 1bda
42 SetRunStateType,
43 SetRunStateName,
44 SetRunStateTimestamp,
45 SetStartTime,
46 SetEndTime,
47 IncrementRunTime,
48 SetExpectedStartTime,
49 SetNextScheduledStartTime,
50 UpdateStateDetails,
51 ]
54class GlobalFlowPolicy(BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy]): 1c
55 """
56 Global transforms that run against flow-run-state transitions in priority order.
58 These transforms are intended to run immediately before and after a state transition
59 is validated.
60 """
62 @staticmethod 1c
63 def priority() -> list[ 1c
64 Union[
65 type[BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]],
66 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]],
67 ]
68 ]:
69 return cast( 1ba
70 list[
71 Union[
72 type[
73 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
74 ],
75 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]],
76 ]
77 ],
78 COMMON_GLOBAL_TRANSFORMS(),
79 ) + [
80 UpdateSubflowParentTask,
81 UpdateSubflowStateDetails,
82 IncrementFlowRunCount,
83 RemoveResumingIndicator,
84 ]
87class GlobalTaskPolicy(BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy]): 1c
88 """
89 Global transforms that run against task-run-state transitions in priority order.
91 These transforms are intended to run immediately before and after a state transition
92 is validated.
93 """
95 @staticmethod 1c
96 def priority() -> list[ 1c
97 Union[
98 type[BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]],
99 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]],
100 ]
101 ]:
102 return cast( 1bda
103 list[
104 Union[
105 type[
106 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]
107 ],
108 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]],
109 ]
110 ],
111 COMMON_GLOBAL_TRANSFORMS(),
112 ) + [IncrementTaskRunCount]
115class SetRunStateType( 1c
116 BaseUniversalTransform[
117 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
118 ]
119):
120 """
121 Updates the state type of a run on a state transition.
122 """
124 async def before_transition( 1c
125 self, context: GenericOrchestrationContext[orm_models.Run, Any]
126 ) -> None:
127 if self.nullified_transition(): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true1bda
128 return
130 # record the new state's type
131 if context.proposed_state is not None: 131 ↛ exitline 131 didn't return from function 'before_transition' because the condition on line 131 was always true1bda
132 context.run.state_type = context.proposed_state.type 1bda
135class SetRunStateName( 1c
136 BaseUniversalTransform[
137 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
138 ]
139):
140 """
141 Updates the state name of a run on a state transition.
142 """
144 async def before_transition( 1c
145 self, context: GenericOrchestrationContext[orm_models.Run, Any]
146 ) -> None:
147 if self.nullified_transition(): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true1bda
148 return
150 if context.proposed_state is not None: 150 ↛ exitline 150 didn't return from function 'before_transition' because the condition on line 150 was always true1bda
151 # record the new state's name
152 context.run.state_name = context.proposed_state.name 1bda
155class SetStartTime( 1c
156 BaseUniversalTransform[
157 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
158 ]
159):
160 """
161 Records the time a run enters a running state for the first time.
162 """
164 async def before_transition( 1c
165 self, context: GenericOrchestrationContext[orm_models.Run, Any]
166 ) -> None:
167 if self.nullified_transition(): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true1bda
168 return
170 if context.proposed_state is not None: 170 ↛ exitline 170 didn't return from function 'before_transition' because the condition on line 170 was always true1bda
171 # if entering a running state and no start time is set...
172 if context.proposed_state.is_running() and context.run.start_time is None: 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was never true1bda
173 # set the start time
174 context.run.start_time = context.proposed_state.timestamp
177class SetRunStateTimestamp( 1c
178 BaseUniversalTransform[
179 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
180 ]
181):
182 """
183 Records the time a run changes states.
184 """
186 async def before_transition( 1c
187 self, context: GenericOrchestrationContext[orm_models.Run, Any]
188 ) -> None:
189 if self.nullified_transition(): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1bda
190 return
192 if context.proposed_state is not None: 192 ↛ exitline 192 didn't return from function 'before_transition' because the condition on line 192 was always true1bda
193 # record the new state's timestamp
194 context.run.state_timestamp = context.proposed_state.timestamp 1bda
197class SetEndTime( 1c
198 BaseUniversalTransform[
199 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
200 ]
201):
202 """
203 Records the time a run enters a terminal state.
205 With normal client usage, a run will not transition out of a terminal state.
206 However, it's possible to force these transitions manually via the API. While
207 leaving a terminal state, the end time will be unset.
208 """
210 async def before_transition( 1c
211 self, context: GenericOrchestrationContext[orm_models.Run, Any]
212 ) -> None:
213 if self.nullified_transition(): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true1bda
214 return
216 if context.proposed_state is not None: 216 ↛ exitline 216 didn't return from function 'before_transition' because the condition on line 216 was always true1bda
217 # if exiting a final state for a non-final state...
218 if ( 218 ↛ 224line 218 didn't jump to line 224 because the condition on line 218 was never true
219 context.initial_state
220 and context.initial_state.is_final()
221 and not context.proposed_state.is_final()
222 ):
223 # clear the end time
224 context.run.end_time = None
226 # if entering a final state...
227 if context.proposed_state.is_final(): 1bda
228 # if the run has a start time and no end time, give it one
229 if context.run.start_time and not context.run.end_time: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true1ba
230 context.run.end_time = context.proposed_state.timestamp
233class IncrementRunTime( 1c
234 BaseUniversalTransform[
235 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
236 ]
237):
238 """
239 Records the amount of time a run spends in the running state.
240 """
242 async def before_transition( 1c
243 self, context: GenericOrchestrationContext[orm_models.Run, Any]
244 ) -> None:
245 if self.nullified_transition(): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true1bda
246 return
248 if context.proposed_state is not None: 248 ↛ exitline 248 didn't return from function 'before_transition' because the condition on line 248 was always true1bda
249 # if exiting a running state...
250 if context.initial_state and context.initial_state.is_running(): 250 ↛ 252line 250 didn't jump to line 252 because the condition on line 250 was never true1bda
251 # increment the run time by the time spent in the previous state
252 context.run.total_run_time += (
253 context.proposed_state.timestamp - context.initial_state.timestamp
254 )
257class IncrementFlowRunCount( 1c
258 FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
259):
260 """
261 Records the number of times a run enters a running state. For use with retries.
262 """
264 async def before_transition( 1c
265 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
266 ) -> None:
267 if self.nullified_transition(): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true1ba
268 return
270 if context.proposed_state is not None: 270 ↛ exitline 270 didn't return from function 'before_transition' because the condition on line 270 was always true1ba
271 # if entering a running state...
272 if context.proposed_state.is_running(): 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was never true1ba
273 # do not increment the run count if resuming a paused flow
274 api_version = context.parameters.get("api-version", None)
275 if api_version is None or api_version >= Version("0.8.4"):
276 if context.run.empirical_policy.resuming:
277 return
279 # increment the run count
280 context.run.run_count += 1
283class RemoveResumingIndicator( 1c
284 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
285):
286 """
287 Removes the indicator on a flow run that marks it as resuming.
288 """
290 async def before_transition( 1c
291 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
292 ) -> None:
293 if self.nullified_transition(): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true1ba
294 return
296 proposed_state = context.proposed_state 1ba
298 api_version = context.parameters.get("api-version", None) 1ba
299 if api_version is None or api_version >= Version("0.8.4"): 299 ↛ exitline 299 didn't return from function 'before_transition' because the condition on line 299 was always true1ba
300 if proposed_state is not None and ( 300 ↛ 303line 300 didn't jump to line 303 because the condition on line 300 was never true1ba
301 proposed_state.is_running() or proposed_state.is_final()
302 ):
303 if context.run.empirical_policy.resuming:
304 updated_policy = context.run.empirical_policy.model_dump()
305 updated_policy["resuming"] = False
306 context.run.empirical_policy = FlowRunPolicy(**updated_policy)
309class IncrementTaskRunCount(TaskRunUniversalTransform): 1c
310 """
311 Records the number of times a run enters a running state. For use with retries.
312 """
314 async def before_transition( 1c
315 self, context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy]
316 ) -> None:
317 if self.nullified_transition(): 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true1bda
318 return
320 proposed_state = context.proposed_state 1bda
322 # if entering a running state...
323 if proposed_state is not None and proposed_state.is_running(): 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was never true1bda
324 # increment the run count
325 context.run.run_count += 1
328class SetExpectedStartTime( 1c
329 BaseUniversalTransform[
330 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
331 ]
332):
333 """
334 Estimates the time a state is expected to start running if not set.
336 For scheduled states, this estimate is simply the scheduled time. For other states,
337 this is set to the time the proposed state was created by Prefect.
338 """
340 async def before_transition( 1c
341 self, context: GenericOrchestrationContext[orm_models.Run, Any]
342 ) -> None:
343 if self.nullified_transition(): 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true1bda
344 return
346 # set expected start time if this is the first state
347 if not context.run.expected_start_time and context.proposed_state is not None: 347 ↛ exitline 347 didn't return from function 'before_transition' because the condition on line 347 was always true1bda
348 if context.proposed_state.is_scheduled(): 1bda
349 context.run.expected_start_time = (
350 context.proposed_state.state_details.scheduled_time
351 )
352 else:
353 context.run.expected_start_time = context.proposed_state.timestamp 1bda
356class SetNextScheduledStartTime( 1c
357 BaseUniversalTransform[
358 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
359 ]
360):
361 """
362 Records the scheduled time on a run.
364 When a run enters a scheduled state, `run.next_scheduled_start_time` is set to
365 the state's scheduled time. When leaving a scheduled state,
366 `run.next_scheduled_start_time` is unset.
367 """
369 async def before_transition( 1c
370 self, context: GenericOrchestrationContext[orm_models.Run, Any]
371 ) -> None:
372 if self.nullified_transition(): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true1bda
373 return
375 # remove the next scheduled start time if exiting a scheduled state
376 if context.initial_state and context.initial_state.is_scheduled(): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1bda
377 context.run.next_scheduled_start_time = None
379 # set next scheduled start time if entering a scheduled state
380 if context.proposed_state is not None and context.proposed_state.is_scheduled(): 1bda
381 context.run.next_scheduled_start_time = (
382 context.proposed_state.state_details.scheduled_time
383 )
386class UpdateSubflowParentTask( 1c
387 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
388):
389 """
390 Whenever a subflow changes state, it must update its parent task run's state.
391 """
393 async def after_transition( 1c
394 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
395 ) -> None:
396 if self.nullified_transition(): 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true1ba
397 return
399 # only applies to flow runs with a parent task run id
400 if ( 400 ↛ 405line 400 didn't jump to line 405 because the condition on line 400 was never true
401 context.run.parent_task_run_id is not None
402 and context.validated_state is not None
403 ):
404 # avoid mutation of the flow run state
405 subflow_parent_task_state = context.validated_state.fresh_copy()
407 # set the task's "child flow run id" to be the subflow run id
408 subflow_parent_task_state.state_details.child_flow_run_id = context.run.id
410 await models.task_runs.set_task_run_state(
411 session=context.session,
412 task_run_id=context.run.parent_task_run_id,
413 state=subflow_parent_task_state,
414 force=True,
415 )
418class UpdateSubflowStateDetails( 1c
419 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]
420):
421 """
422 Update a child subflow state's references to a corresponding tracking task run id
423 in the parent flow run
424 """
426 async def before_transition( 1c
427 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy]
428 ) -> None:
429 if self.nullified_transition(): 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true1ba
430 return
432 # only applies to flow runs with a parent task run id
433 if ( 433 ↛ 437line 433 didn't jump to line 437 because the condition on line 433 was never true
434 context.run.parent_task_run_id is not None
435 and context.proposed_state is not None
436 ):
437 context.proposed_state.state_details.task_run_id = (
438 context.run.parent_task_run_id
439 )
442class UpdateStateDetails( 1c
443 BaseUniversalTransform[
444 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy]
445 ]
446):
447 """
448 Update a state's references to a corresponding flow- or task- run.
449 """
451 async def before_transition( 1c
452 self,
453 context: GenericOrchestrationContext,
454 ) -> None:
455 if self.nullified_transition(): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true1bda
456 return
458 if isinstance(context, FlowOrchestrationContext): 1bda
459 flow_run = await context.flow_run() 1ba
460 context.proposed_state.state_details.flow_run_id = flow_run.id 1ba
462 elif isinstance(context, TaskOrchestrationContext): 462 ↛ exitline 462 didn't return from function 'before_transition' because the condition on line 462 was always true1bda
463 task_run = await context.task_run() 1bda
464 context.proposed_state.state_details.flow_run_id = task_run.flow_run_id 1bda
465 context.proposed_state.state_details.task_run_id = task_run.id 1bda