Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/global_policy.py: 66%

132 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Bookkeeping logic that fires on every state transition. 

3 

4For clarity, `GlobalFlowpolicy` and `GlobalTaskPolicy` contain all transition logic 

5implemented using `BaseUniversalTransform`. None of these operations modify state, and regardless of what orchestration Prefect REST API might 

6enforce on a transition, the global policies contain Prefect's necessary bookkeeping. 

7Because these transforms record information about the validated state committed to the 

8state database, they should be the most deeply nested contexts in orchestration loop. 

9""" 

10 

11from __future__ import annotations 1c

12 

13from typing import Any, Union, cast 1c

14 

15from packaging.version import Version 1c

16 

17import prefect.server.models as models 1c

18from prefect.server.database import orm_models 1c

19from prefect.server.orchestration.policies import BaseOrchestrationPolicy 1c

20from prefect.server.orchestration.rules import ( 1c

21 BaseOrchestrationRule, 

22 BaseUniversalTransform, 

23 FlowOrchestrationContext, 

24 FlowRunUniversalTransform, 

25 GenericOrchestrationContext, 

26 OrchestrationContext, 

27 TaskOrchestrationContext, 

28 TaskRunUniversalTransform, 

29) 

30from prefect.server.schemas import core 1c

31from prefect.server.schemas.core import FlowRunPolicy 1c

32 

33 

34def COMMON_GLOBAL_TRANSFORMS() -> list[ 1c

35 type[ 

36 BaseUniversalTransform[ 

37 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

38 ] 

39 ] 

40]: 

41 return [ 1bda

42 SetRunStateType, 

43 SetRunStateName, 

44 SetRunStateTimestamp, 

45 SetStartTime, 

46 SetEndTime, 

47 IncrementRunTime, 

48 SetExpectedStartTime, 

49 SetNextScheduledStartTime, 

50 UpdateStateDetails, 

51 ] 

52 

53 

54class GlobalFlowPolicy(BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy]): 1c

55 """ 

56 Global transforms that run against flow-run-state transitions in priority order. 

57 

58 These transforms are intended to run immediately before and after a state transition 

59 is validated. 

60 """ 

61 

62 @staticmethod 1c

63 def priority() -> list[ 1c

64 Union[ 

65 type[BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]], 

66 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]], 

67 ] 

68 ]: 

69 return cast( 1ba

70 list[ 

71 Union[ 

72 type[ 

73 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

74 ], 

75 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]], 

76 ] 

77 ], 

78 COMMON_GLOBAL_TRANSFORMS(), 

79 ) + [ 

80 UpdateSubflowParentTask, 

81 UpdateSubflowStateDetails, 

82 IncrementFlowRunCount, 

83 RemoveResumingIndicator, 

84 ] 

85 

86 

87class GlobalTaskPolicy(BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy]): 1c

88 """ 

89 Global transforms that run against task-run-state transitions in priority order. 

90 

91 These transforms are intended to run immediately before and after a state transition 

92 is validated. 

93 """ 

94 

95 @staticmethod 1c

96 def priority() -> list[ 1c

97 Union[ 

98 type[BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]], 

99 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]], 

100 ] 

101 ]: 

102 return cast( 1bda

103 list[ 

104 Union[ 

105 type[ 

106 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy] 

107 ], 

108 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]], 

109 ] 

110 ], 

111 COMMON_GLOBAL_TRANSFORMS(), 

112 ) + [IncrementTaskRunCount] 

113 

114 

115class SetRunStateType( 1c

116 BaseUniversalTransform[ 

117 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

118 ] 

119): 

120 """ 

121 Updates the state type of a run on a state transition. 

122 """ 

123 

124 async def before_transition( 1c

125 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

126 ) -> None: 

127 if self.nullified_transition(): 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true1bda

128 return 

129 

130 # record the new state's type 

131 if context.proposed_state is not None: 131 ↛ exitline 131 didn't return from function 'before_transition' because the condition on line 131 was always true1bda

132 context.run.state_type = context.proposed_state.type 1bda

133 

134 

135class SetRunStateName( 1c

136 BaseUniversalTransform[ 

137 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

138 ] 

139): 

140 """ 

141 Updates the state name of a run on a state transition. 

142 """ 

143 

144 async def before_transition( 1c

145 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

146 ) -> None: 

147 if self.nullified_transition(): 147 ↛ 148line 147 didn't jump to line 148 because the condition on line 147 was never true1bda

148 return 

149 

150 if context.proposed_state is not None: 150 ↛ exitline 150 didn't return from function 'before_transition' because the condition on line 150 was always true1bda

151 # record the new state's name 

152 context.run.state_name = context.proposed_state.name 1bda

153 

154 

155class SetStartTime( 1c

156 BaseUniversalTransform[ 

157 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

158 ] 

159): 

160 """ 

161 Records the time a run enters a running state for the first time. 

162 """ 

163 

164 async def before_transition( 1c

165 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

166 ) -> None: 

167 if self.nullified_transition(): 167 ↛ 168line 167 didn't jump to line 168 because the condition on line 167 was never true1bda

168 return 

169 

170 if context.proposed_state is not None: 170 ↛ exitline 170 didn't return from function 'before_transition' because the condition on line 170 was always true1bda

171 # if entering a running state and no start time is set... 

172 if context.proposed_state.is_running() and context.run.start_time is None: 172 ↛ 174line 172 didn't jump to line 174 because the condition on line 172 was never true1bda

173 # set the start time 

174 context.run.start_time = context.proposed_state.timestamp 

175 

176 

177class SetRunStateTimestamp( 1c

178 BaseUniversalTransform[ 

179 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

180 ] 

181): 

182 """ 

183 Records the time a run changes states. 

184 """ 

185 

186 async def before_transition( 1c

187 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

188 ) -> None: 

189 if self.nullified_transition(): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1bda

190 return 

191 

192 if context.proposed_state is not None: 192 ↛ exitline 192 didn't return from function 'before_transition' because the condition on line 192 was always true1bda

193 # record the new state's timestamp 

194 context.run.state_timestamp = context.proposed_state.timestamp 1bda

195 

196 

197class SetEndTime( 1c

198 BaseUniversalTransform[ 

199 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

200 ] 

201): 

202 """ 

203 Records the time a run enters a terminal state. 

204 

205 With normal client usage, a run will not transition out of a terminal state. 

206 However, it's possible to force these transitions manually via the API. While 

207 leaving a terminal state, the end time will be unset. 

208 """ 

209 

210 async def before_transition( 1c

211 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

212 ) -> None: 

213 if self.nullified_transition(): 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true1bda

214 return 

215 

216 if context.proposed_state is not None: 216 ↛ exitline 216 didn't return from function 'before_transition' because the condition on line 216 was always true1bda

217 # if exiting a final state for a non-final state... 

218 if ( 218 ↛ 224line 218 didn't jump to line 224 because the condition on line 218 was never true

219 context.initial_state 

220 and context.initial_state.is_final() 

221 and not context.proposed_state.is_final() 

222 ): 

223 # clear the end time 

224 context.run.end_time = None 

225 

226 # if entering a final state... 

227 if context.proposed_state.is_final(): 1bda

228 # if the run has a start time and no end time, give it one 

229 if context.run.start_time and not context.run.end_time: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true1ba

230 context.run.end_time = context.proposed_state.timestamp 

231 

232 

233class IncrementRunTime( 1c

234 BaseUniversalTransform[ 

235 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

236 ] 

237): 

238 """ 

239 Records the amount of time a run spends in the running state. 

240 """ 

241 

242 async def before_transition( 1c

243 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

244 ) -> None: 

245 if self.nullified_transition(): 245 ↛ 246line 245 didn't jump to line 246 because the condition on line 245 was never true1bda

246 return 

247 

248 if context.proposed_state is not None: 248 ↛ exitline 248 didn't return from function 'before_transition' because the condition on line 248 was always true1bda

249 # if exiting a running state... 

250 if context.initial_state and context.initial_state.is_running(): 250 ↛ 252line 250 didn't jump to line 252 because the condition on line 250 was never true1bda

251 # increment the run time by the time spent in the previous state 

252 context.run.total_run_time += ( 

253 context.proposed_state.timestamp - context.initial_state.timestamp 

254 ) 

255 

256 

257class IncrementFlowRunCount( 1c

258 FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

259): 

260 """ 

261 Records the number of times a run enters a running state. For use with retries. 

262 """ 

263 

264 async def before_transition( 1c

265 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

266 ) -> None: 

267 if self.nullified_transition(): 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true1ba

268 return 

269 

270 if context.proposed_state is not None: 270 ↛ exitline 270 didn't return from function 'before_transition' because the condition on line 270 was always true1ba

271 # if entering a running state... 

272 if context.proposed_state.is_running(): 272 ↛ 274line 272 didn't jump to line 274 because the condition on line 272 was never true1ba

273 # do not increment the run count if resuming a paused flow 

274 api_version = context.parameters.get("api-version", None) 

275 if api_version is None or api_version >= Version("0.8.4"): 

276 if context.run.empirical_policy.resuming: 

277 return 

278 

279 # increment the run count 

280 context.run.run_count += 1 

281 

282 

283class RemoveResumingIndicator( 1c

284 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

285): 

286 """ 

287 Removes the indicator on a flow run that marks it as resuming. 

288 """ 

289 

290 async def before_transition( 1c

291 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

292 ) -> None: 

293 if self.nullified_transition(): 293 ↛ 294line 293 didn't jump to line 294 because the condition on line 293 was never true1ba

294 return 

295 

296 proposed_state = context.proposed_state 1ba

297 

298 api_version = context.parameters.get("api-version", None) 1ba

299 if api_version is None or api_version >= Version("0.8.4"): 299 ↛ exitline 299 didn't return from function 'before_transition' because the condition on line 299 was always true1ba

300 if proposed_state is not None and ( 300 ↛ 303line 300 didn't jump to line 303 because the condition on line 300 was never true1ba

301 proposed_state.is_running() or proposed_state.is_final() 

302 ): 

303 if context.run.empirical_policy.resuming: 

304 updated_policy = context.run.empirical_policy.model_dump() 

305 updated_policy["resuming"] = False 

306 context.run.empirical_policy = FlowRunPolicy(**updated_policy) 

307 

308 

309class IncrementTaskRunCount(TaskRunUniversalTransform): 1c

310 """ 

311 Records the number of times a run enters a running state. For use with retries. 

312 """ 

313 

314 async def before_transition( 1c

315 self, context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy] 

316 ) -> None: 

317 if self.nullified_transition(): 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true1bda

318 return 

319 

320 proposed_state = context.proposed_state 1bda

321 

322 # if entering a running state... 

323 if proposed_state is not None and proposed_state.is_running(): 323 ↛ 325line 323 didn't jump to line 325 because the condition on line 323 was never true1bda

324 # increment the run count 

325 context.run.run_count += 1 

326 

327 

328class SetExpectedStartTime( 1c

329 BaseUniversalTransform[ 

330 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

331 ] 

332): 

333 """ 

334 Estimates the time a state is expected to start running if not set. 

335 

336 For scheduled states, this estimate is simply the scheduled time. For other states, 

337 this is set to the time the proposed state was created by Prefect. 

338 """ 

339 

340 async def before_transition( 1c

341 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

342 ) -> None: 

343 if self.nullified_transition(): 343 ↛ 344line 343 didn't jump to line 344 because the condition on line 343 was never true1bda

344 return 

345 

346 # set expected start time if this is the first state 

347 if not context.run.expected_start_time and context.proposed_state is not None: 347 ↛ exitline 347 didn't return from function 'before_transition' because the condition on line 347 was always true1bda

348 if context.proposed_state.is_scheduled(): 1bda

349 context.run.expected_start_time = ( 

350 context.proposed_state.state_details.scheduled_time 

351 ) 

352 else: 

353 context.run.expected_start_time = context.proposed_state.timestamp 1bda

354 

355 

356class SetNextScheduledStartTime( 1c

357 BaseUniversalTransform[ 

358 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

359 ] 

360): 

361 """ 

362 Records the scheduled time on a run. 

363 

364 When a run enters a scheduled state, `run.next_scheduled_start_time` is set to 

365 the state's scheduled time. When leaving a scheduled state, 

366 `run.next_scheduled_start_time` is unset. 

367 """ 

368 

369 async def before_transition( 1c

370 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

371 ) -> None: 

372 if self.nullified_transition(): 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true1bda

373 return 

374 

375 # remove the next scheduled start time if exiting a scheduled state 

376 if context.initial_state and context.initial_state.is_scheduled(): 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1bda

377 context.run.next_scheduled_start_time = None 

378 

379 # set next scheduled start time if entering a scheduled state 

380 if context.proposed_state is not None and context.proposed_state.is_scheduled(): 1bda

381 context.run.next_scheduled_start_time = ( 

382 context.proposed_state.state_details.scheduled_time 

383 ) 

384 

385 

386class UpdateSubflowParentTask( 1c

387 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

388): 

389 """ 

390 Whenever a subflow changes state, it must update its parent task run's state. 

391 """ 

392 

393 async def after_transition( 1c

394 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

395 ) -> None: 

396 if self.nullified_transition(): 396 ↛ 397line 396 didn't jump to line 397 because the condition on line 396 was never true1ba

397 return 

398 

399 # only applies to flow runs with a parent task run id 

400 if ( 400 ↛ 405line 400 didn't jump to line 405 because the condition on line 400 was never true

401 context.run.parent_task_run_id is not None 

402 and context.validated_state is not None 

403 ): 

404 # avoid mutation of the flow run state 

405 subflow_parent_task_state = context.validated_state.fresh_copy() 

406 

407 # set the task's "child flow run id" to be the subflow run id 

408 subflow_parent_task_state.state_details.child_flow_run_id = context.run.id 

409 

410 await models.task_runs.set_task_run_state( 

411 session=context.session, 

412 task_run_id=context.run.parent_task_run_id, 

413 state=subflow_parent_task_state, 

414 force=True, 

415 ) 

416 

417 

418class UpdateSubflowStateDetails( 1c

419 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

420): 

421 """ 

422 Update a child subflow state's references to a corresponding tracking task run id 

423 in the parent flow run 

424 """ 

425 

426 async def before_transition( 1c

427 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

428 ) -> None: 

429 if self.nullified_transition(): 429 ↛ 430line 429 didn't jump to line 430 because the condition on line 429 was never true1ba

430 return 

431 

432 # only applies to flow runs with a parent task run id 

433 if ( 433 ↛ 437line 433 didn't jump to line 437 because the condition on line 433 was never true

434 context.run.parent_task_run_id is not None 

435 and context.proposed_state is not None 

436 ): 

437 context.proposed_state.state_details.task_run_id = ( 

438 context.run.parent_task_run_id 

439 ) 

440 

441 

442class UpdateStateDetails( 1c

443 BaseUniversalTransform[ 

444 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

445 ] 

446): 

447 """ 

448 Update a state's references to a corresponding flow- or task- run. 

449 """ 

450 

451 async def before_transition( 1c

452 self, 

453 context: GenericOrchestrationContext, 

454 ) -> None: 

455 if self.nullified_transition(): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true1bda

456 return 

457 

458 if isinstance(context, FlowOrchestrationContext): 1bda

459 flow_run = await context.flow_run() 1ba

460 context.proposed_state.state_details.flow_run_id = flow_run.id 1ba

461 

462 elif isinstance(context, TaskOrchestrationContext): 462 ↛ exitline 462 didn't return from function 'before_transition' because the condition on line 462 was always true1bda

463 task_run = await context.task_run() 1bda

464 context.proposed_state.state_details.flow_run_id = task_run.flow_run_id 1bda

465 context.proposed_state.state_details.task_run_id = task_run.id 1bda