Coverage for /usr/local/lib/python3.12/site-packages/prefect/flow_runs.py: 20%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3from typing import ( 1a

4 TYPE_CHECKING, 

5 Any, 

6 Type, 

7 TypeVar, 

8 overload, 

9) 

10from uuid import UUID, uuid4 1a

11 

12import anyio 1a

13 

14from prefect.client.orchestration import PrefectClient, get_client 1a

15from prefect.client.schemas import FlowRun 1a

16from prefect.client.schemas.objects import ( 1a

17 State, 

18 StateType, 

19) 

20from prefect.client.schemas.responses import SetStateStatus 1a

21from prefect.client.utilities import inject_client 1a

22from prefect.context import ( 1a

23 FlowRunContext, 

24 TaskRunContext, 

25) 

26from prefect.events.clients import get_events_subscriber 1a

27from prefect.events.filters import EventFilter, EventNameFilter, EventResourceFilter 1a

28from prefect.exceptions import ( 1a

29 Abort, 

30 FlowPauseTimeout, 

31 FlowRunWaitTimeout, 

32 NotPausedError, 

33 Pause, 

34) 

35from prefect.input import keyset_from_paused_state 1a

36from prefect.input.run_input import run_input_subclass_from_type 1a

37from prefect.logging import get_logger 1a

38from prefect.logging.loggers import ( 1a

39 get_run_logger, 

40) 

41from prefect.states import ( 1a

42 Paused, 

43 Suspended, 

44) 

45from prefect.utilities.asyncutils import ( 1a

46 sync_compatible, 

47) 

48from prefect.utilities.engine import ( 1a

49 propose_state, 

50) 

51 

52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1a

53 from prefect.client.orchestration import PrefectClient 

54 

55 

56@inject_client 1a

57async def wait_for_flow_run( 1a

58 flow_run_id: UUID, 

59 timeout: int | None = 10800, 

60 poll_interval: int | None = None, 

61 client: "PrefectClient | None" = None, 

62 log_states: bool = False, 

63) -> FlowRun: 

64 """ 

65 Waits for the prefect flow run to finish and returns the FlowRun 

66 

67 Args: 

68 flow_run_id: The flow run ID for the flow run to wait for. 

69 timeout: The wait timeout in seconds. Defaults to 10800 (3 hours). 

70 poll_interval: Deprecated; polling is no longer used to wait for flow runs. 

71 client: Optional Prefect client. If not provided, one will be injected. 

72 log_states: If True, log state changes. Defaults to False. 

73 

74 Returns: 

75 FlowRun: The finished flow run. 

76 

77 Raises: 

78 prefect.exceptions.FlowWaitTimeout: If flow run goes over the timeout. 

79 

80 Examples: 

81 Create a flow run for a deployment and wait for it to finish: 

82 ```python 

83 import asyncio 

84 

85 from prefect.client.orchestration import get_client 

86 from prefect.flow_runs import wait_for_flow_run 

87 

88 async def main(): 

89 async with get_client() as client: 

90 flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id") 

91 flow_run = await wait_for_flow_run(flow_run_id=flow_run.id) 

92 print(flow_run.state) 

93 

94 if __name__ == "__main__": 

95 asyncio.run(main()) 

96 

97 ``` 

98 

99 Trigger multiple flow runs and wait for them to finish: 

100 ```python 

101 import asyncio 

102 

103 from prefect.client.orchestration import get_client 

104 from prefect.flow_runs import wait_for_flow_run 

105 

106 async def main(num_runs: int): 

107 async with get_client() as client: 

108 flow_runs = [ 

109 await client.create_flow_run_from_deployment(deployment_id="my-deployment-id") 

110 for _ 

111 in range(num_runs) 

112 ] 

113 coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs] 

114 finished_flow_runs = await asyncio.gather(*coros) 

115 print([flow_run.state for flow_run in finished_flow_runs]) 

116 

117 if __name__ == "__main__": 

118 asyncio.run(main(num_runs=10)) 

119 

120 ``` 

121 """ 

122 if poll_interval is not None: 

123 get_logger().warning( 

124 "The `poll_interval` argument is deprecated and will be removed in a future release. " 

125 ) 

126 

127 assert client is not None, "Client injection failed" 

128 logger = get_logger() 

129 

130 event_filter = EventFilter( 

131 event=EventNameFilter(prefix=["prefect.flow-run"]), 

132 resource=EventResourceFilter(id=[f"prefect.flow-run.{flow_run_id}"]), 

133 ) 

134 

135 with anyio.move_on_after(timeout): 

136 async with get_events_subscriber(filter=event_filter) as subscriber: 

137 flow_run = await client.read_flow_run(flow_run_id) 

138 if flow_run.state and flow_run.state.is_final(): 

139 if log_states: 

140 logger.info(f"Flow run is in state {flow_run.state.name!r}") 

141 return flow_run 

142 

143 async for event in subscriber: 

144 if not (state_type := event.resource.get("prefect.state-type")): 

145 logger.debug(f"Received {event.event!r} event") 

146 continue 

147 state_type = StateType(state_type) 

148 state = State(type=state_type) 

149 

150 if log_states: 

151 logger.info(f"Flow run is in state {state.name!r}") 

152 

153 if state.is_final(): 

154 return await client.read_flow_run(flow_run_id) 

155 

156 raise FlowRunWaitTimeout( 

157 f"Flow run with ID {flow_run_id} exceeded watch timeout of {timeout} seconds" 

158 ) 

159 

160 

161R = TypeVar("R") 1a

162T = TypeVar("T") 1a

163 

164 

165@overload 1a

166async def pause_flow_run( 166 ↛ exitline 166 didn't return from function 'pause_flow_run' because 1a

167 wait_for_input: None = None, 

168 timeout: int = 3600, 

169 poll_interval: int = 10, 

170 key: str | None = None, 

171) -> None: ... 

172 

173 

174@overload 1a

175async def pause_flow_run( 175 ↛ exitline 175 didn't return from function 'pause_flow_run' because 1a

176 wait_for_input: Type[T], 

177 timeout: int = 3600, 

178 poll_interval: int = 10, 

179 key: str | None = None, 

180) -> T: ... 

181 

182 

183@sync_compatible 1a

184async def pause_flow_run( 1a

185 wait_for_input: Type[T] | None = None, 

186 timeout: int = 3600, 

187 poll_interval: int = 10, 

188 key: str | None = None, 

189) -> T | None: 

190 """ 

191 Pauses the current flow run by blocking execution until resumed. 

192 

193 When called within a flow run, execution will block and no downstream tasks will 

194 run until the flow is resumed. Task runs that have already started will continue 

195 running. A timeout parameter can be passed that will fail the flow run if it has not 

196 been resumed within the specified time. 

197 

198 Args: 

199 timeout: the number of seconds to wait for the flow to be resumed before 

200 failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds 

201 any configured flow-level timeout, the flow might fail even after resuming. 

202 poll_interval: The number of seconds between checking whether the flow has been 

203 resumed. Defaults to 10 seconds. 

204 key: An optional key to prevent calling pauses more than once. This defaults to 

205 the number of pauses observed by the flow so far, and prevents pauses that 

206 use the "reschedule" option from running the same pause twice. A custom key 

207 can be supplied for custom pausing behavior. 

208 wait_for_input: a subclass of `RunInput` or any type supported by 

209 Pydantic. If provided when the flow pauses, the flow will wait for the 

210 input to be provided before resuming. If the flow is resumed without 

211 providing the input, the flow will fail. If the flow is resumed with the 

212 input, the flow will resume and the input will be loaded and returned 

213 from this function. 

214 

215 Example: 

216 ```python 

217 @task 

218 def task_one(): 

219 for i in range(3): 

220 sleep(1) 

221 

222 @flow 

223 def my_flow(): 

224 terminal_state = task_one.submit(return_state=True) 

225 if terminal_state.type == StateType.COMPLETED: 

226 print("Task one succeeded! Pausing flow run..") 

227 pause_flow_run(timeout=2) 

228 else: 

229 print("Task one failed. Skipping pause flow run..") 

230 ``` 

231 

232 """ 

233 return await _in_process_pause( 

234 timeout=timeout, 

235 poll_interval=poll_interval, 

236 key=key, 

237 wait_for_input=wait_for_input, 

238 ) 

239 

240 

241@inject_client 1a

242async def _in_process_pause( 1a

243 timeout: int = 3600, 

244 poll_interval: int = 10, 

245 key: str | None = None, 

246 client: "PrefectClient | None" = None, 

247 wait_for_input: Type[T] | None = None, 

248) -> T | None: 

249 if TYPE_CHECKING: 

250 assert client is not None 

251 

252 # Get the flow run context - this works even when called from within a task 

253 # since both FlowRunContext and TaskRunContext are independently active 

254 context = FlowRunContext.get() 

255 if not context: 

256 raise RuntimeError("Flow runs can only be paused from within a flow run.") 

257 

258 logger = get_run_logger(context=context) 

259 

260 pause_counter = _observed_flow_pauses(context) 

261 pause_key = key or str(pause_counter) 

262 

263 logger.info("Pausing flow, execution will continue when this flow run is resumed.") 

264 

265 proposed_state = Paused( 

266 timeout_seconds=timeout, reschedule=False, pause_key=pause_key 

267 ) 

268 

269 if wait_for_input: 

270 wait_for_input = run_input_subclass_from_type(wait_for_input) 

271 run_input_keyset = keyset_from_paused_state(proposed_state) 

272 proposed_state.state_details.run_input_keyset = run_input_keyset 

273 

274 try: 

275 state = await propose_state( 

276 client=client, 

277 state=proposed_state, 

278 flow_run_id=context.flow_run.id, 

279 ) 

280 except Abort as exc: 

281 # Aborted pause requests mean the pause is not allowed 

282 raise RuntimeError(f"Flow run cannot be paused: {exc}") 

283 

284 if state.is_running(): 

285 # The orchestrator rejected the paused state which means that this 

286 # pause has happened before (via reschedule) and the flow run has 

287 # been resumed. 

288 if wait_for_input: 

289 # The flow run wanted input, so we need to load it and return it 

290 # to the user. 

291 await wait_for_input.load(run_input_keyset) 

292 

293 return 

294 

295 if not state.is_paused(): 

296 # If we receive anything but a PAUSED state, we are unable to continue 

297 raise RuntimeError( 

298 f"Flow run cannot be paused. Received non-paused state from API: {state}" 

299 ) 

300 

301 if wait_for_input: 

302 # We're now in a paused state and the flow run is waiting for input. 

303 # Save the schema of the users `RunInput` subclass, stored in 

304 # `wait_for_input`, so the UI can display the form and we can validate 

305 # the input when the flow is resumed. 

306 await wait_for_input.save(run_input_keyset) 

307 

308 # Otherwise, block and check for completion on an interval 

309 with anyio.move_on_after(timeout): 

310 # attempt to check if a flow has resumed at least once 

311 initial_sleep = min(timeout / 2, poll_interval) 

312 await anyio.sleep(initial_sleep) 

313 while True: 

314 flow_run = await client.read_flow_run(context.flow_run.id) 

315 if flow_run.state.is_running(): 

316 logger.info("Resuming flow run execution!") 

317 if wait_for_input: 

318 return await wait_for_input.load(run_input_keyset) 

319 return 

320 await anyio.sleep(poll_interval) 

321 

322 # check one last time before failing the flow 

323 flow_run = await client.read_flow_run(context.flow_run.id) 

324 if flow_run.state.is_running(): 

325 logger.info("Resuming flow run execution!") 

326 if wait_for_input: 

327 return await wait_for_input.load(run_input_keyset) 

328 return 

329 

330 raise FlowPauseTimeout("Flow run was paused and never resumed.") 

331 

332 

333@overload 1a

334async def suspend_flow_run( 334 ↛ exitline 334 didn't return from function 'suspend_flow_run' because 1a

335 wait_for_input: None = None, 

336 flow_run_id: UUID | None = None, 

337 timeout: int | None = 3600, 

338 key: str | None = None, 

339 client: "PrefectClient | None" = None, 

340) -> None: ... 

341 

342 

343@overload 1a

344async def suspend_flow_run( 344 ↛ exitline 344 didn't return from function 'suspend_flow_run' because 1a

345 wait_for_input: Type[T], 

346 flow_run_id: UUID | None = None, 

347 timeout: int | None = 3600, 

348 key: str | None = None, 

349 client: "PrefectClient | None" = None, 

350) -> T: ... 

351 

352 

353@sync_compatible 1a

354@inject_client 1a

355async def suspend_flow_run( 1a

356 wait_for_input: Type[T] | None = None, 

357 flow_run_id: UUID | None = None, 

358 timeout: int | None = 3600, 

359 key: str | None = None, 

360 client: "PrefectClient | None" = None, 

361) -> T | None: 

362 """ 

363 Suspends a flow run by stopping code execution until resumed. 

364 

365 When suspended, the flow run will continue execution until the NEXT task is 

366 orchestrated, at which point the flow will exit. Any tasks that have 

367 already started will run until completion. When resumed, the flow run will 

368 be rescheduled to finish execution. In order suspend a flow run in this 

369 way, the flow needs to have an associated deployment and results need to be 

370 configured with the `persist_result` option. 

371 

372 Args: 

373 flow_run_id: a flow run id. If supplied, this function will attempt to 

374 suspend the specified flow run. If not supplied will attempt to 

375 suspend the current flow run. 

376 timeout: the number of seconds to wait for the flow to be resumed before 

377 failing. Defaults to 1 hour (3600 seconds). If the pause timeout 

378 exceeds any configured flow-level timeout, the flow might fail even 

379 after resuming. 

380 key: An optional key to prevent calling suspend more than once. This 

381 defaults to a random string and prevents suspends from running the 

382 same suspend twice. A custom key can be supplied for custom 

383 suspending behavior. 

384 wait_for_input: a subclass of `RunInput` or any type supported by 

385 Pydantic. If provided when the flow suspends, the flow will remain 

386 suspended until receiving the input before resuming. If the flow is 

387 resumed without providing the input, the flow will fail. If the flow is 

388 resumed with the input, the flow will resume and the input will be 

389 loaded and returned from this function. 

390 """ 

391 if TYPE_CHECKING: 

392 assert client is not None 

393 

394 context = FlowRunContext.get() 

395 

396 if flow_run_id is None: 

397 if TaskRunContext.get(): 

398 raise RuntimeError("Cannot suspend task runs.") 

399 

400 if context is None or context.flow_run is None: 

401 raise RuntimeError( 

402 "Flow runs can only be suspended from within a flow run." 

403 ) 

404 

405 logger = get_run_logger(context=context) 

406 logger.info( 

407 "Suspending flow run, execution will be rescheduled when this flow run is" 

408 " resumed." 

409 ) 

410 flow_run_id = context.flow_run.id 

411 suspending_current_flow_run = True 

412 pause_counter = _observed_flow_pauses(context) 

413 pause_key = key or str(pause_counter) 

414 else: 

415 # Since we're suspending another flow run we need to generate a pause 

416 # key that won't conflict with whatever suspends/pauses that flow may 

417 # have. Since this method won't be called during that flow run it's 

418 # okay that this is non-deterministic. 

419 suspending_current_flow_run = False 

420 pause_key = key or str(uuid4()) 

421 

422 proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key) 

423 

424 if wait_for_input: 

425 wait_for_input = run_input_subclass_from_type(wait_for_input) 

426 run_input_keyset = keyset_from_paused_state(proposed_state) 

427 proposed_state.state_details.run_input_keyset = run_input_keyset 

428 

429 try: 

430 state = await propose_state( 

431 client=client, 

432 state=proposed_state, 

433 flow_run_id=flow_run_id, 

434 ) 

435 except Abort as exc: 

436 # Aborted requests mean the suspension is not allowed 

437 raise RuntimeError(f"Flow run cannot be suspended: {exc}") 

438 

439 if state.is_running(): 

440 # The orchestrator rejected the suspended state which means that this 

441 # suspend has happened before and the flow run has been resumed. 

442 if wait_for_input: 

443 # The flow run wanted input, so we need to load it and return it 

444 # to the user. 

445 return await wait_for_input.load(run_input_keyset) 

446 return 

447 

448 if not state.is_paused(): 

449 # If we receive anything but a PAUSED state, we are unable to continue 

450 raise RuntimeError( 

451 f"Flow run cannot be suspended. Received unexpected state from API: {state}" 

452 ) 

453 

454 if wait_for_input: 

455 await wait_for_input.save(run_input_keyset) 

456 

457 if suspending_current_flow_run: 

458 # Exit this process so the run can be resubmitted later 

459 raise Pause(state=state) 

460 

461 

462@sync_compatible 1a

463async def resume_flow_run( 1a

464 flow_run_id: UUID, run_input: dict[str, Any] | None = None 

465) -> None: 

466 """ 

467 Resumes a paused flow. 

468 

469 Args: 

470 flow_run_id: the flow_run_id to resume 

471 run_input: a dictionary of inputs to provide to the flow run. 

472 """ 

473 client = get_client() 

474 async with client: 

475 flow_run = await client.read_flow_run(flow_run_id) 

476 

477 if not flow_run.state.is_paused(): 

478 raise NotPausedError("Cannot resume a run that isn't paused!") 

479 

480 response = await client.resume_flow_run(flow_run_id, run_input=run_input) 

481 

482 if response.status == SetStateStatus.REJECT: 

483 if response.state.type == StateType.FAILED: 

484 raise FlowPauseTimeout("Flow run can no longer be resumed.") 

485 else: 

486 raise RuntimeError(f"Cannot resume this run: {response.details.reason}") 

487 

488 

489def _observed_flow_pauses(context: FlowRunContext) -> int: 1a

490 if "counter" not in context.observed_flow_pauses: 

491 context.observed_flow_pauses["counter"] = 1 

492 else: 

493 context.observed_flow_pauses["counter"] += 1 

494 return context.observed_flow_pauses["counter"]