Coverage for /usr/local/lib/python3.12/site-packages/prefect/flow_runs.py: 20%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from typing import ( 1a
4 TYPE_CHECKING,
5 Any,
6 Type,
7 TypeVar,
8 overload,
9)
10from uuid import UUID, uuid4 1a
12import anyio 1a
14from prefect.client.orchestration import PrefectClient, get_client 1a
15from prefect.client.schemas import FlowRun 1a
16from prefect.client.schemas.objects import ( 1a
17 State,
18 StateType,
19)
20from prefect.client.schemas.responses import SetStateStatus 1a
21from prefect.client.utilities import inject_client 1a
22from prefect.context import ( 1a
23 FlowRunContext,
24 TaskRunContext,
25)
26from prefect.events.clients import get_events_subscriber 1a
27from prefect.events.filters import EventFilter, EventNameFilter, EventResourceFilter 1a
28from prefect.exceptions import ( 1a
29 Abort,
30 FlowPauseTimeout,
31 FlowRunWaitTimeout,
32 NotPausedError,
33 Pause,
34)
35from prefect.input import keyset_from_paused_state 1a
36from prefect.input.run_input import run_input_subclass_from_type 1a
37from prefect.logging import get_logger 1a
38from prefect.logging.loggers import ( 1a
39 get_run_logger,
40)
41from prefect.states import ( 1a
42 Paused,
43 Suspended,
44)
45from prefect.utilities.asyncutils import ( 1a
46 sync_compatible,
47)
48from prefect.utilities.engine import ( 1a
49 propose_state,
50)
52if TYPE_CHECKING: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1a
53 from prefect.client.orchestration import PrefectClient
56@inject_client 1a
57async def wait_for_flow_run( 1a
58 flow_run_id: UUID,
59 timeout: int | None = 10800,
60 poll_interval: int | None = None,
61 client: "PrefectClient | None" = None,
62 log_states: bool = False,
63) -> FlowRun:
64 """
65 Waits for the prefect flow run to finish and returns the FlowRun
67 Args:
68 flow_run_id: The flow run ID for the flow run to wait for.
69 timeout: The wait timeout in seconds. Defaults to 10800 (3 hours).
70 poll_interval: Deprecated; polling is no longer used to wait for flow runs.
71 client: Optional Prefect client. If not provided, one will be injected.
72 log_states: If True, log state changes. Defaults to False.
74 Returns:
75 FlowRun: The finished flow run.
77 Raises:
78 prefect.exceptions.FlowWaitTimeout: If flow run goes over the timeout.
80 Examples:
81 Create a flow run for a deployment and wait for it to finish:
82 ```python
83 import asyncio
85 from prefect.client.orchestration import get_client
86 from prefect.flow_runs import wait_for_flow_run
88 async def main():
89 async with get_client() as client:
90 flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
91 flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
92 print(flow_run.state)
94 if __name__ == "__main__":
95 asyncio.run(main())
97 ```
99 Trigger multiple flow runs and wait for them to finish:
100 ```python
101 import asyncio
103 from prefect.client.orchestration import get_client
104 from prefect.flow_runs import wait_for_flow_run
106 async def main(num_runs: int):
107 async with get_client() as client:
108 flow_runs = [
109 await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
110 for _
111 in range(num_runs)
112 ]
113 coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
114 finished_flow_runs = await asyncio.gather(*coros)
115 print([flow_run.state for flow_run in finished_flow_runs])
117 if __name__ == "__main__":
118 asyncio.run(main(num_runs=10))
120 ```
121 """
122 if poll_interval is not None:
123 get_logger().warning(
124 "The `poll_interval` argument is deprecated and will be removed in a future release. "
125 )
127 assert client is not None, "Client injection failed"
128 logger = get_logger()
130 event_filter = EventFilter(
131 event=EventNameFilter(prefix=["prefect.flow-run"]),
132 resource=EventResourceFilter(id=[f"prefect.flow-run.{flow_run_id}"]),
133 )
135 with anyio.move_on_after(timeout):
136 async with get_events_subscriber(filter=event_filter) as subscriber:
137 flow_run = await client.read_flow_run(flow_run_id)
138 if flow_run.state and flow_run.state.is_final():
139 if log_states:
140 logger.info(f"Flow run is in state {flow_run.state.name!r}")
141 return flow_run
143 async for event in subscriber:
144 if not (state_type := event.resource.get("prefect.state-type")):
145 logger.debug(f"Received {event.event!r} event")
146 continue
147 state_type = StateType(state_type)
148 state = State(type=state_type)
150 if log_states:
151 logger.info(f"Flow run is in state {state.name!r}")
153 if state.is_final():
154 return await client.read_flow_run(flow_run_id)
156 raise FlowRunWaitTimeout(
157 f"Flow run with ID {flow_run_id} exceeded watch timeout of {timeout} seconds"
158 )
161R = TypeVar("R") 1a
162T = TypeVar("T") 1a
165@overload 1a
166async def pause_flow_run( 166 ↛ exitline 166 didn't return from function 'pause_flow_run' because 1a
167 wait_for_input: None = None,
168 timeout: int = 3600,
169 poll_interval: int = 10,
170 key: str | None = None,
171) -> None: ...
174@overload 1a
175async def pause_flow_run( 175 ↛ exitline 175 didn't return from function 'pause_flow_run' because 1a
176 wait_for_input: Type[T],
177 timeout: int = 3600,
178 poll_interval: int = 10,
179 key: str | None = None,
180) -> T: ...
183@sync_compatible 1a
184async def pause_flow_run( 1a
185 wait_for_input: Type[T] | None = None,
186 timeout: int = 3600,
187 poll_interval: int = 10,
188 key: str | None = None,
189) -> T | None:
190 """
191 Pauses the current flow run by blocking execution until resumed.
193 When called within a flow run, execution will block and no downstream tasks will
194 run until the flow is resumed. Task runs that have already started will continue
195 running. A timeout parameter can be passed that will fail the flow run if it has not
196 been resumed within the specified time.
198 Args:
199 timeout: the number of seconds to wait for the flow to be resumed before
200 failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds
201 any configured flow-level timeout, the flow might fail even after resuming.
202 poll_interval: The number of seconds between checking whether the flow has been
203 resumed. Defaults to 10 seconds.
204 key: An optional key to prevent calling pauses more than once. This defaults to
205 the number of pauses observed by the flow so far, and prevents pauses that
206 use the "reschedule" option from running the same pause twice. A custom key
207 can be supplied for custom pausing behavior.
208 wait_for_input: a subclass of `RunInput` or any type supported by
209 Pydantic. If provided when the flow pauses, the flow will wait for the
210 input to be provided before resuming. If the flow is resumed without
211 providing the input, the flow will fail. If the flow is resumed with the
212 input, the flow will resume and the input will be loaded and returned
213 from this function.
215 Example:
216 ```python
217 @task
218 def task_one():
219 for i in range(3):
220 sleep(1)
222 @flow
223 def my_flow():
224 terminal_state = task_one.submit(return_state=True)
225 if terminal_state.type == StateType.COMPLETED:
226 print("Task one succeeded! Pausing flow run..")
227 pause_flow_run(timeout=2)
228 else:
229 print("Task one failed. Skipping pause flow run..")
230 ```
232 """
233 return await _in_process_pause(
234 timeout=timeout,
235 poll_interval=poll_interval,
236 key=key,
237 wait_for_input=wait_for_input,
238 )
241@inject_client 1a
242async def _in_process_pause( 1a
243 timeout: int = 3600,
244 poll_interval: int = 10,
245 key: str | None = None,
246 client: "PrefectClient | None" = None,
247 wait_for_input: Type[T] | None = None,
248) -> T | None:
249 if TYPE_CHECKING:
250 assert client is not None
252 # Get the flow run context - this works even when called from within a task
253 # since both FlowRunContext and TaskRunContext are independently active
254 context = FlowRunContext.get()
255 if not context:
256 raise RuntimeError("Flow runs can only be paused from within a flow run.")
258 logger = get_run_logger(context=context)
260 pause_counter = _observed_flow_pauses(context)
261 pause_key = key or str(pause_counter)
263 logger.info("Pausing flow, execution will continue when this flow run is resumed.")
265 proposed_state = Paused(
266 timeout_seconds=timeout, reschedule=False, pause_key=pause_key
267 )
269 if wait_for_input:
270 wait_for_input = run_input_subclass_from_type(wait_for_input)
271 run_input_keyset = keyset_from_paused_state(proposed_state)
272 proposed_state.state_details.run_input_keyset = run_input_keyset
274 try:
275 state = await propose_state(
276 client=client,
277 state=proposed_state,
278 flow_run_id=context.flow_run.id,
279 )
280 except Abort as exc:
281 # Aborted pause requests mean the pause is not allowed
282 raise RuntimeError(f"Flow run cannot be paused: {exc}")
284 if state.is_running():
285 # The orchestrator rejected the paused state which means that this
286 # pause has happened before (via reschedule) and the flow run has
287 # been resumed.
288 if wait_for_input:
289 # The flow run wanted input, so we need to load it and return it
290 # to the user.
291 await wait_for_input.load(run_input_keyset)
293 return
295 if not state.is_paused():
296 # If we receive anything but a PAUSED state, we are unable to continue
297 raise RuntimeError(
298 f"Flow run cannot be paused. Received non-paused state from API: {state}"
299 )
301 if wait_for_input:
302 # We're now in a paused state and the flow run is waiting for input.
303 # Save the schema of the users `RunInput` subclass, stored in
304 # `wait_for_input`, so the UI can display the form and we can validate
305 # the input when the flow is resumed.
306 await wait_for_input.save(run_input_keyset)
308 # Otherwise, block and check for completion on an interval
309 with anyio.move_on_after(timeout):
310 # attempt to check if a flow has resumed at least once
311 initial_sleep = min(timeout / 2, poll_interval)
312 await anyio.sleep(initial_sleep)
313 while True:
314 flow_run = await client.read_flow_run(context.flow_run.id)
315 if flow_run.state.is_running():
316 logger.info("Resuming flow run execution!")
317 if wait_for_input:
318 return await wait_for_input.load(run_input_keyset)
319 return
320 await anyio.sleep(poll_interval)
322 # check one last time before failing the flow
323 flow_run = await client.read_flow_run(context.flow_run.id)
324 if flow_run.state.is_running():
325 logger.info("Resuming flow run execution!")
326 if wait_for_input:
327 return await wait_for_input.load(run_input_keyset)
328 return
330 raise FlowPauseTimeout("Flow run was paused and never resumed.")
333@overload 1a
334async def suspend_flow_run( 334 ↛ exitline 334 didn't return from function 'suspend_flow_run' because 1a
335 wait_for_input: None = None,
336 flow_run_id: UUID | None = None,
337 timeout: int | None = 3600,
338 key: str | None = None,
339 client: "PrefectClient | None" = None,
340) -> None: ...
343@overload 1a
344async def suspend_flow_run( 344 ↛ exitline 344 didn't return from function 'suspend_flow_run' because 1a
345 wait_for_input: Type[T],
346 flow_run_id: UUID | None = None,
347 timeout: int | None = 3600,
348 key: str | None = None,
349 client: "PrefectClient | None" = None,
350) -> T: ...
353@sync_compatible 1a
354@inject_client 1a
355async def suspend_flow_run( 1a
356 wait_for_input: Type[T] | None = None,
357 flow_run_id: UUID | None = None,
358 timeout: int | None = 3600,
359 key: str | None = None,
360 client: "PrefectClient | None" = None,
361) -> T | None:
362 """
363 Suspends a flow run by stopping code execution until resumed.
365 When suspended, the flow run will continue execution until the NEXT task is
366 orchestrated, at which point the flow will exit. Any tasks that have
367 already started will run until completion. When resumed, the flow run will
368 be rescheduled to finish execution. In order suspend a flow run in this
369 way, the flow needs to have an associated deployment and results need to be
370 configured with the `persist_result` option.
372 Args:
373 flow_run_id: a flow run id. If supplied, this function will attempt to
374 suspend the specified flow run. If not supplied will attempt to
375 suspend the current flow run.
376 timeout: the number of seconds to wait for the flow to be resumed before
377 failing. Defaults to 1 hour (3600 seconds). If the pause timeout
378 exceeds any configured flow-level timeout, the flow might fail even
379 after resuming.
380 key: An optional key to prevent calling suspend more than once. This
381 defaults to a random string and prevents suspends from running the
382 same suspend twice. A custom key can be supplied for custom
383 suspending behavior.
384 wait_for_input: a subclass of `RunInput` or any type supported by
385 Pydantic. If provided when the flow suspends, the flow will remain
386 suspended until receiving the input before resuming. If the flow is
387 resumed without providing the input, the flow will fail. If the flow is
388 resumed with the input, the flow will resume and the input will be
389 loaded and returned from this function.
390 """
391 if TYPE_CHECKING:
392 assert client is not None
394 context = FlowRunContext.get()
396 if flow_run_id is None:
397 if TaskRunContext.get():
398 raise RuntimeError("Cannot suspend task runs.")
400 if context is None or context.flow_run is None:
401 raise RuntimeError(
402 "Flow runs can only be suspended from within a flow run."
403 )
405 logger = get_run_logger(context=context)
406 logger.info(
407 "Suspending flow run, execution will be rescheduled when this flow run is"
408 " resumed."
409 )
410 flow_run_id = context.flow_run.id
411 suspending_current_flow_run = True
412 pause_counter = _observed_flow_pauses(context)
413 pause_key = key or str(pause_counter)
414 else:
415 # Since we're suspending another flow run we need to generate a pause
416 # key that won't conflict with whatever suspends/pauses that flow may
417 # have. Since this method won't be called during that flow run it's
418 # okay that this is non-deterministic.
419 suspending_current_flow_run = False
420 pause_key = key or str(uuid4())
422 proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)
424 if wait_for_input:
425 wait_for_input = run_input_subclass_from_type(wait_for_input)
426 run_input_keyset = keyset_from_paused_state(proposed_state)
427 proposed_state.state_details.run_input_keyset = run_input_keyset
429 try:
430 state = await propose_state(
431 client=client,
432 state=proposed_state,
433 flow_run_id=flow_run_id,
434 )
435 except Abort as exc:
436 # Aborted requests mean the suspension is not allowed
437 raise RuntimeError(f"Flow run cannot be suspended: {exc}")
439 if state.is_running():
440 # The orchestrator rejected the suspended state which means that this
441 # suspend has happened before and the flow run has been resumed.
442 if wait_for_input:
443 # The flow run wanted input, so we need to load it and return it
444 # to the user.
445 return await wait_for_input.load(run_input_keyset)
446 return
448 if not state.is_paused():
449 # If we receive anything but a PAUSED state, we are unable to continue
450 raise RuntimeError(
451 f"Flow run cannot be suspended. Received unexpected state from API: {state}"
452 )
454 if wait_for_input:
455 await wait_for_input.save(run_input_keyset)
457 if suspending_current_flow_run:
458 # Exit this process so the run can be resubmitted later
459 raise Pause(state=state)
462@sync_compatible 1a
463async def resume_flow_run( 1a
464 flow_run_id: UUID, run_input: dict[str, Any] | None = None
465) -> None:
466 """
467 Resumes a paused flow.
469 Args:
470 flow_run_id: the flow_run_id to resume
471 run_input: a dictionary of inputs to provide to the flow run.
472 """
473 client = get_client()
474 async with client:
475 flow_run = await client.read_flow_run(flow_run_id)
477 if not flow_run.state.is_paused():
478 raise NotPausedError("Cannot resume a run that isn't paused!")
480 response = await client.resume_flow_run(flow_run_id, run_input=run_input)
482 if response.status == SetStateStatus.REJECT:
483 if response.state.type == StateType.FAILED:
484 raise FlowPauseTimeout("Flow run can no longer be resumed.")
485 else:
486 raise RuntimeError(f"Cannot resume this run: {response.details.reason}")
489def _observed_flow_pauses(context: FlowRunContext) -> int: 1a
490 if "counter" not in context.observed_flow_pauses:
491 context.observed_flow_pauses["counter"] = 1
492 else:
493 context.observed_flow_pauses["counter"] += 1
494 return context.observed_flow_pauses["counter"]