Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/orchestration/global_policy.py: 21%

132 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Bookkeeping logic that fires on every state transition. 

3 

4For clarity, `GlobalFlowpolicy` and `GlobalTaskPolicy` contain all transition logic 

5implemented using `BaseUniversalTransform`. None of these operations modify state, and regardless of what orchestration Prefect REST API might 

6enforce on a transition, the global policies contain Prefect's necessary bookkeeping. 

7Because these transforms record information about the validated state committed to the 

8state database, they should be the most deeply nested contexts in orchestration loop. 

9""" 

10 

11from __future__ import annotations 1a

12 

13from typing import Any, Union, cast 1a

14 

15from packaging.version import Version 1a

16 

17import prefect.server.models as models 1a

18from prefect.server.database import orm_models 1a

19from prefect.server.orchestration.policies import BaseOrchestrationPolicy 1a

20from prefect.server.orchestration.rules import ( 1a

21 BaseOrchestrationRule, 

22 BaseUniversalTransform, 

23 FlowOrchestrationContext, 

24 FlowRunUniversalTransform, 

25 GenericOrchestrationContext, 

26 OrchestrationContext, 

27 TaskOrchestrationContext, 

28 TaskRunUniversalTransform, 

29) 

30from prefect.server.schemas import core 1a

31from prefect.server.schemas.core import FlowRunPolicy 1a

32 

33 

34def COMMON_GLOBAL_TRANSFORMS() -> list[ 1a

35 type[ 

36 BaseUniversalTransform[ 

37 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

38 ] 

39 ] 

40]: 

41 return [ 

42 SetRunStateType, 

43 SetRunStateName, 

44 SetRunStateTimestamp, 

45 SetStartTime, 

46 SetEndTime, 

47 IncrementRunTime, 

48 SetExpectedStartTime, 

49 SetNextScheduledStartTime, 

50 UpdateStateDetails, 

51 ] 

52 

53 

54class GlobalFlowPolicy(BaseOrchestrationPolicy[orm_models.FlowRun, core.FlowRunPolicy]): 1a

55 """ 

56 Global transforms that run against flow-run-state transitions in priority order. 

57 

58 These transforms are intended to run immediately before and after a state transition 

59 is validated. 

60 """ 

61 

62 @staticmethod 1a

63 def priority() -> list[ 1a

64 Union[ 

65 type[BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy]], 

66 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]], 

67 ] 

68 ]: 

69 return cast( 

70 list[ 

71 Union[ 

72 type[ 

73 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

74 ], 

75 type[BaseOrchestrationRule[orm_models.FlowRun, core.FlowRunPolicy]], 

76 ] 

77 ], 

78 COMMON_GLOBAL_TRANSFORMS(), 

79 ) + [ 

80 UpdateSubflowParentTask, 

81 UpdateSubflowStateDetails, 

82 IncrementFlowRunCount, 

83 RemoveResumingIndicator, 

84 ] 

85 

86 

87class GlobalTaskPolicy(BaseOrchestrationPolicy[orm_models.TaskRun, core.TaskRunPolicy]): 1a

88 """ 

89 Global transforms that run against task-run-state transitions in priority order. 

90 

91 These transforms are intended to run immediately before and after a state transition 

92 is validated. 

93 """ 

94 

95 @staticmethod 1a

96 def priority() -> list[ 1a

97 Union[ 

98 type[BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy]], 

99 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]], 

100 ] 

101 ]: 

102 return cast( 

103 list[ 

104 Union[ 

105 type[ 

106 BaseUniversalTransform[orm_models.TaskRun, core.TaskRunPolicy] 

107 ], 

108 type[BaseOrchestrationRule[orm_models.TaskRun, core.TaskRunPolicy]], 

109 ] 

110 ], 

111 COMMON_GLOBAL_TRANSFORMS(), 

112 ) + [IncrementTaskRunCount] 

113 

114 

115class SetRunStateType( 1a

116 BaseUniversalTransform[ 

117 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

118 ] 

119): 

120 """ 

121 Updates the state type of a run on a state transition. 

122 """ 

123 

124 async def before_transition( 1a

125 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

126 ) -> None: 

127 if self.nullified_transition(): 

128 return 

129 

130 # record the new state's type 

131 if context.proposed_state is not None: 

132 context.run.state_type = context.proposed_state.type 

133 

134 

135class SetRunStateName( 1a

136 BaseUniversalTransform[ 

137 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

138 ] 

139): 

140 """ 

141 Updates the state name of a run on a state transition. 

142 """ 

143 

144 async def before_transition( 1a

145 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

146 ) -> None: 

147 if self.nullified_transition(): 

148 return 

149 

150 if context.proposed_state is not None: 

151 # record the new state's name 

152 context.run.state_name = context.proposed_state.name 

153 

154 

155class SetStartTime( 1a

156 BaseUniversalTransform[ 

157 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

158 ] 

159): 

160 """ 

161 Records the time a run enters a running state for the first time. 

162 """ 

163 

164 async def before_transition( 1a

165 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

166 ) -> None: 

167 if self.nullified_transition(): 

168 return 

169 

170 if context.proposed_state is not None: 

171 # if entering a running state and no start time is set... 

172 if context.proposed_state.is_running() and context.run.start_time is None: 

173 # set the start time 

174 context.run.start_time = context.proposed_state.timestamp 

175 

176 

177class SetRunStateTimestamp( 1a

178 BaseUniversalTransform[ 

179 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

180 ] 

181): 

182 """ 

183 Records the time a run changes states. 

184 """ 

185 

186 async def before_transition( 1a

187 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

188 ) -> None: 

189 if self.nullified_transition(): 

190 return 

191 

192 if context.proposed_state is not None: 

193 # record the new state's timestamp 

194 context.run.state_timestamp = context.proposed_state.timestamp 

195 

196 

197class SetEndTime( 1a

198 BaseUniversalTransform[ 

199 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

200 ] 

201): 

202 """ 

203 Records the time a run enters a terminal state. 

204 

205 With normal client usage, a run will not transition out of a terminal state. 

206 However, it's possible to force these transitions manually via the API. While 

207 leaving a terminal state, the end time will be unset. 

208 """ 

209 

210 async def before_transition( 1a

211 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

212 ) -> None: 

213 if self.nullified_transition(): 

214 return 

215 

216 if context.proposed_state is not None: 

217 # if exiting a final state for a non-final state... 

218 if ( 

219 context.initial_state 

220 and context.initial_state.is_final() 

221 and not context.proposed_state.is_final() 

222 ): 

223 # clear the end time 

224 context.run.end_time = None 

225 

226 # if entering a final state... 

227 if context.proposed_state.is_final(): 

228 # if the run has a start time and no end time, give it one 

229 if context.run.start_time and not context.run.end_time: 

230 context.run.end_time = context.proposed_state.timestamp 

231 

232 

233class IncrementRunTime( 1a

234 BaseUniversalTransform[ 

235 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

236 ] 

237): 

238 """ 

239 Records the amount of time a run spends in the running state. 

240 """ 

241 

242 async def before_transition( 1a

243 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

244 ) -> None: 

245 if self.nullified_transition(): 

246 return 

247 

248 if context.proposed_state is not None: 

249 # if exiting a running state... 

250 if context.initial_state and context.initial_state.is_running(): 

251 # increment the run time by the time spent in the previous state 

252 context.run.total_run_time += ( 

253 context.proposed_state.timestamp - context.initial_state.timestamp 

254 ) 

255 

256 

257class IncrementFlowRunCount( 1a

258 FlowRunUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

259): 

260 """ 

261 Records the number of times a run enters a running state. For use with retries. 

262 """ 

263 

264 async def before_transition( 1a

265 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

266 ) -> None: 

267 if self.nullified_transition(): 

268 return 

269 

270 if context.proposed_state is not None: 

271 # if entering a running state... 

272 if context.proposed_state.is_running(): 

273 # do not increment the run count if resuming a paused flow 

274 api_version = context.parameters.get("api-version", None) 

275 if api_version is None or api_version >= Version("0.8.4"): 

276 if context.run.empirical_policy.resuming: 

277 return 

278 

279 # increment the run count 

280 context.run.run_count += 1 

281 

282 

283class RemoveResumingIndicator( 1a

284 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

285): 

286 """ 

287 Removes the indicator on a flow run that marks it as resuming. 

288 """ 

289 

290 async def before_transition( 1a

291 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

292 ) -> None: 

293 if self.nullified_transition(): 

294 return 

295 

296 proposed_state = context.proposed_state 

297 

298 api_version = context.parameters.get("api-version", None) 

299 if api_version is None or api_version >= Version("0.8.4"): 

300 if proposed_state is not None and ( 

301 proposed_state.is_running() or proposed_state.is_final() 

302 ): 

303 if context.run.empirical_policy.resuming: 

304 updated_policy = context.run.empirical_policy.model_dump() 

305 updated_policy["resuming"] = False 

306 context.run.empirical_policy = FlowRunPolicy(**updated_policy) 

307 

308 

309class IncrementTaskRunCount(TaskRunUniversalTransform): 1a

310 """ 

311 Records the number of times a run enters a running state. For use with retries. 

312 """ 

313 

314 async def before_transition( 1a

315 self, context: OrchestrationContext[orm_models.TaskRun, core.TaskRunPolicy] 

316 ) -> None: 

317 if self.nullified_transition(): 

318 return 

319 

320 proposed_state = context.proposed_state 

321 

322 # if entering a running state... 

323 if proposed_state is not None and proposed_state.is_running(): 

324 # increment the run count 

325 context.run.run_count += 1 

326 

327 

328class SetExpectedStartTime( 1a

329 BaseUniversalTransform[ 

330 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

331 ] 

332): 

333 """ 

334 Estimates the time a state is expected to start running if not set. 

335 

336 For scheduled states, this estimate is simply the scheduled time. For other states, 

337 this is set to the time the proposed state was created by Prefect. 

338 """ 

339 

340 async def before_transition( 1a

341 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

342 ) -> None: 

343 if self.nullified_transition(): 

344 return 

345 

346 # set expected start time if this is the first state 

347 if not context.run.expected_start_time and context.proposed_state is not None: 

348 if context.proposed_state.is_scheduled(): 

349 context.run.expected_start_time = ( 

350 context.proposed_state.state_details.scheduled_time 

351 ) 

352 else: 

353 context.run.expected_start_time = context.proposed_state.timestamp 

354 

355 

356class SetNextScheduledStartTime( 1a

357 BaseUniversalTransform[ 

358 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

359 ] 

360): 

361 """ 

362 Records the scheduled time on a run. 

363 

364 When a run enters a scheduled state, `run.next_scheduled_start_time` is set to 

365 the state's scheduled time. When leaving a scheduled state, 

366 `run.next_scheduled_start_time` is unset. 

367 """ 

368 

369 async def before_transition( 1a

370 self, context: GenericOrchestrationContext[orm_models.Run, Any] 

371 ) -> None: 

372 if self.nullified_transition(): 

373 return 

374 

375 # remove the next scheduled start time if exiting a scheduled state 

376 if context.initial_state and context.initial_state.is_scheduled(): 

377 context.run.next_scheduled_start_time = None 

378 

379 # set next scheduled start time if entering a scheduled state 

380 if context.proposed_state is not None and context.proposed_state.is_scheduled(): 

381 context.run.next_scheduled_start_time = ( 

382 context.proposed_state.state_details.scheduled_time 

383 ) 

384 

385 

386class UpdateSubflowParentTask( 1a

387 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

388): 

389 """ 

390 Whenever a subflow changes state, it must update its parent task run's state. 

391 """ 

392 

393 async def after_transition( 1a

394 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

395 ) -> None: 

396 if self.nullified_transition(): 

397 return 

398 

399 # only applies to flow runs with a parent task run id 

400 if ( 

401 context.run.parent_task_run_id is not None 

402 and context.validated_state is not None 

403 ): 

404 # avoid mutation of the flow run state 

405 subflow_parent_task_state = context.validated_state.fresh_copy() 

406 

407 # set the task's "child flow run id" to be the subflow run id 

408 subflow_parent_task_state.state_details.child_flow_run_id = context.run.id 

409 

410 await models.task_runs.set_task_run_state( 

411 session=context.session, 

412 task_run_id=context.run.parent_task_run_id, 

413 state=subflow_parent_task_state, 

414 force=True, 

415 ) 

416 

417 

418class UpdateSubflowStateDetails( 1a

419 BaseUniversalTransform[orm_models.FlowRun, core.FlowRunPolicy] 

420): 

421 """ 

422 Update a child subflow state's references to a corresponding tracking task run id 

423 in the parent flow run 

424 """ 

425 

426 async def before_transition( 1a

427 self, context: OrchestrationContext[orm_models.FlowRun, core.FlowRunPolicy] 

428 ) -> None: 

429 if self.nullified_transition(): 

430 return 

431 

432 # only applies to flow runs with a parent task run id 

433 if ( 

434 context.run.parent_task_run_id is not None 

435 and context.proposed_state is not None 

436 ): 

437 context.proposed_state.state_details.task_run_id = ( 

438 context.run.parent_task_run_id 

439 ) 

440 

441 

442class UpdateStateDetails( 1a

443 BaseUniversalTransform[ 

444 orm_models.Run, Union[core.FlowRunPolicy, core.TaskRunPolicy] 

445 ] 

446): 

447 """ 

448 Update a state's references to a corresponding flow- or task- run. 

449 """ 

450 

451 async def before_transition( 1a

452 self, 

453 context: GenericOrchestrationContext, 

454 ) -> None: 

455 if self.nullified_transition(): 

456 return 

457 

458 if isinstance(context, FlowOrchestrationContext): 

459 flow_run = await context.flow_run() 

460 context.proposed_state.state_details.flow_run_id = flow_run.id 

461 

462 elif isinstance(context, TaskOrchestrationContext): 

463 task_run = await context.task_run() 

464 context.proposed_state.state_details.flow_run_id = task_run.flow_run_id 

465 context.proposed_state.state_details.task_run_id = task_run.id