Coverage for /usr/local/lib/python3.12/site-packages/prefect/futures.py: 20%
338 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import abc 1a
4import concurrent.futures 1a
5import threading 1a
6import uuid 1a
7import warnings 1a
8from collections.abc import Generator, Iterator 1a
9from functools import partial 1a
10from typing import TYPE_CHECKING, Any, Callable, Generic 1a
12from typing_extensions import NamedTuple, Self, TypeVar 1a
14from prefect._waiters import FlowRunWaiter 1a
15from prefect.client.orchestration import get_client 1a
16from prefect.exceptions import ObjectNotFound 1a
17from prefect.logging.loggers import get_logger 1a
18from prefect.states import Pending, State 1a
19from prefect.task_runs import TaskRunWaiter 1a
20from prefect.utilities.annotations import quote 1a
21from prefect.utilities.asyncutils import run_coro_as_sync 1a
22from prefect.utilities.collections import StopVisiting, visit_collection 1a
23from prefect.utilities.timeout import timeout as timeout_context 1a
25F = TypeVar("F") 1a
26R = TypeVar("R") 1a
28if TYPE_CHECKING: 28 ↛ 29line 28 didn't jump to line 29 because the condition on line 28 was never true1a
29 import logging
31logger: "logging.Logger" = get_logger(__name__) 1a
34class PrefectFuture(abc.ABC, Generic[R]): 1a
35 """
36 Abstract base class for Prefect futures. A Prefect future is a handle to the
37 asynchronous execution of a run. It provides methods to wait for the
38 to complete and to retrieve the result of the run.
39 """
41 def __init__(self, task_run_id: uuid.UUID): 1a
42 warnings.warn(
43 "The __init__ method of PrefectFuture is deprecated and will be removed in a future release. "
44 "If you are subclassing PrefectFuture, please implement the __init__ method in your subclass or "
45 "subclass PrefectTaskRunFuture instead.",
46 DeprecationWarning,
47 )
48 self._task_run_id = task_run_id
49 self._final_state: State[R] | None = None
51 @property 1a
52 def task_run_id(self) -> uuid.UUID: 1a
53 """The ID of the task run associated with this future"""
54 warnings.warn(
55 "The task_run_id property of PrefectFuture is deprecated and will be removed in a future release. "
56 "If you are subclassing PrefectFuture, please implement the task_run_id property in your subclass or "
57 "subclass PrefectTaskRunFuture instead.",
58 DeprecationWarning,
59 )
61 return self._task_run_id
63 @property 1a
64 def state(self) -> State: 1a
65 """The current state of the task run associated with this future"""
66 warnings.warn(
67 "The state property of PrefectFuture is deprecated and will be removed in a future release. "
68 "If you are subclassing PrefectFuture, please implement the state property in your subclass or "
69 "subclass PrefectTaskRunFuture instead.",
70 DeprecationWarning,
71 )
73 if self._final_state:
74 return self._final_state
75 client = get_client(sync_client=True)
76 try:
77 task_run = client.read_task_run(task_run_id=self.task_run_id)
78 except ObjectNotFound:
79 # We'll be optimistic and assume this task will eventually start
80 # TODO: Consider using task run events to wait for the task to start
81 return Pending()
82 return task_run.state or Pending()
84 @abc.abstractmethod 1a
85 def wait(self, timeout: float | None = None) -> None: 1a
86 ...
87 """
88 Wait for the task run to complete.
90 If the task run has already completed, this method will return immediately.
92 Args:
93 timeout: The maximum number of seconds to wait for the task run to complete.
94 If the task run has not completed after the timeout has elapsed, this method will return.
95 """
97 @abc.abstractmethod 1a
98 def result( 1a
99 self,
100 timeout: float | None = None,
101 raise_on_failure: bool = True,
102 ) -> R:
103 ...
104 """
105 Get the result of the task run associated with this future.
107 If the task run has not completed, this method will wait for the task run to complete.
109 Args:
110 timeout: The maximum number of seconds to wait for the task run to complete.
111 If the task run has not completed after the timeout has elapsed, this method will return.
112 raise_on_failure: If `True`, an exception will be raised if the task run fails.
114 Returns:
115 The result of the task run.
116 """
118 @abc.abstractmethod 1a
119 def add_done_callback(self, fn: Callable[["PrefectFuture[R]"], None]) -> None: 1a
120 """
121 Add a callback to be run when the future completes or is cancelled.
123 Args:
124 fn: A callable that will be called with this future as its only argument when the future completes or is cancelled.
125 """
126 ...
129class PrefectTaskRunFuture(PrefectFuture[R]): 1a
130 """
131 A Prefect future that represents the eventual execution of a task run.
132 """
134 def __init__(self, task_run_id: uuid.UUID): 1a
135 self._task_run_id = task_run_id
136 self._final_state: State[R] | None = None
138 @property 1a
139 def task_run_id(self) -> uuid.UUID: 1a
140 """The ID of the task run associated with this future"""
141 return self._task_run_id
143 @property 1a
144 def state(self) -> State: 1a
145 """The current state of the task run associated with this future"""
146 if self._final_state:
147 return self._final_state
148 client = get_client(sync_client=True)
149 try:
150 task_run = client.read_task_run(task_run_id=self.task_run_id)
151 except ObjectNotFound:
152 # We'll be optimistic and assume this task will eventually start
153 # TODO: Consider using task run events to wait for the task to start
154 return Pending()
155 return task_run.state or Pending()
158class PrefectWrappedFuture(PrefectTaskRunFuture[R], abc.ABC, Generic[R, F]): 1a
159 """
160 A Prefect future that wraps another future object.
162 Type Parameters:
163 R: The return type of the future
164 F: The type of the wrapped future
165 """
167 def __init__(self, task_run_id: uuid.UUID, wrapped_future: F): 1a
168 self._wrapped_future: F = wrapped_future
169 super().__init__(task_run_id)
171 @property 1a
172 def wrapped_future(self) -> F: 1a
173 """The underlying future object wrapped by this Prefect future"""
174 return self._wrapped_future
176 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a
177 """Add a callback to be executed when the future completes."""
178 if not self._final_state:
180 def call_with_self(future: F):
181 """Call the callback with self as the argument, this is necessary to ensure we remove the future from the pending set"""
182 fn(self)
184 self._wrapped_future.add_done_callback(call_with_self)
185 return
186 fn(self)
189class PrefectConcurrentFuture(PrefectWrappedFuture[R, concurrent.futures.Future[R]]): 1a
190 """
191 A Prefect future that wraps a concurrent.futures.Future. This future is used
192 when the task run is submitted to a ThreadPoolExecutor.
193 """
195 def wait(self, timeout: float | None = None) -> None: 1a
196 try:
197 result = self._wrapped_future.result(timeout=timeout)
198 except concurrent.futures.TimeoutError:
199 return
200 if isinstance(result, State):
201 self._final_state = result
203 def result( 1a
204 self,
205 timeout: float | None = None,
206 raise_on_failure: bool = True,
207 ) -> R:
208 if not self._final_state:
209 try:
210 future_result = self._wrapped_future.result(timeout=timeout)
211 except concurrent.futures.TimeoutError as exc:
212 raise TimeoutError(
213 f"Task run {self.task_run_id} did not complete within {timeout} seconds"
214 ) from exc
216 if isinstance(future_result, State):
217 self._final_state = future_result
219 else:
220 return future_result
222 _result = self._final_state.result(
223 raise_on_failure=raise_on_failure, _sync=True
224 )
225 return _result
228class PrefectDistributedFuture(PrefectTaskRunFuture[R]): 1a
229 """
230 Represents the result of a computation happening anywhere.
232 This class is typically used to interact with the result of a task run
233 scheduled to run in a Prefect task worker but can be used to interact with
234 any task run scheduled in Prefect's API.
235 """
237 done_callbacks: list[Callable[[PrefectFuture[R]], None]] = [] 1a
238 waiter = None 1a
240 def wait(self, timeout: float | None = None) -> None: 1a
241 return run_coro_as_sync(self.wait_async(timeout=timeout))
243 async def wait_async(self, timeout: float | None = None) -> None: 1a
244 if self._final_state:
245 logger.debug(
246 "Final state already set for %s. Returning...", self.task_run_id
247 )
248 return
250 # Ask for the instance of TaskRunWaiter _now_ so that it's already running and
251 # can catch the completion event if it happens before we start listening for it.
252 TaskRunWaiter.instance()
254 # Read task run to see if it is still running
255 async with get_client() as client:
256 task_run = await client.read_task_run(task_run_id=self._task_run_id)
257 if task_run.state is None:
258 raise RuntimeError(
259 f"Task run {self.task_run_id} has no state which means it hasn't started yet."
260 )
261 if task_run.state.is_final():
262 logger.debug(
263 "Task run %s already finished. Returning...",
264 self.task_run_id,
265 )
266 self._final_state = task_run.state
267 return
269 # If still running, wait for a completed event from the server
270 logger.debug(
271 "Waiting for completed event for task run %s...",
272 self.task_run_id,
273 )
274 state_from_event = await TaskRunWaiter.wait_for_task_run(
275 self._task_run_id, timeout=timeout
276 )
278 if state_from_event:
279 # We got the final state directly from the event
280 self._final_state = state_from_event
281 logger.debug(
282 "Task run %s completed with state from event: %s",
283 self.task_run_id,
284 state_from_event.type,
285 )
286 return
288 def result( 1a
289 self,
290 timeout: float | None = None,
291 raise_on_failure: bool = True,
292 ) -> R:
293 return run_coro_as_sync(
294 self.result_async(timeout=timeout, raise_on_failure=raise_on_failure)
295 )
297 async def result_async( 1a
298 self,
299 timeout: float | None = None,
300 raise_on_failure: bool = True,
301 ) -> R:
302 if not self._final_state:
303 await self.wait_async(timeout=timeout)
304 if not self._final_state:
305 # If still no final state after wait, try reading it once more.
306 # This should rarely happen since wait_async() now gets state from events.
307 async with get_client() as client:
308 task_run = await client.read_task_run(task_run_id=self._task_run_id)
309 if task_run.state and task_run.state.is_final():
310 self._final_state = task_run.state
311 else:
312 raise TimeoutError(
313 f"Task run {self.task_run_id} did not complete within {timeout} seconds"
314 )
316 return await self._final_state.aresult(raise_on_failure=raise_on_failure)
318 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a
319 if self._final_state:
320 fn(self)
321 return
322 TaskRunWaiter.instance()
323 with get_client(sync_client=True) as client:
324 task_run = client.read_task_run(task_run_id=self._task_run_id)
325 if task_run.state.is_final():
326 self._final_state = task_run.state
327 fn(self)
328 return
329 TaskRunWaiter.add_done_callback(self._task_run_id, partial(fn, self))
331 def __eq__(self, other: Any) -> bool: 1a
332 if not isinstance(other, PrefectDistributedFuture):
333 return False
334 return self.task_run_id == other.task_run_id
336 def __hash__(self) -> int: 1a
337 return hash(self.task_run_id)
340class PrefectFlowRunFuture(PrefectFuture[R]): 1a
341 """
342 A Prefect future that represents the eventual execution of a flow run.
343 """
345 def __init__(self, flow_run_id: uuid.UUID): 1a
346 self._flow_run_id = flow_run_id
347 self._final_state: State[R] | None = None
349 @property 1a
350 def flow_run_id(self) -> uuid.UUID: 1a
351 """The ID of the flow run associated with this future"""
352 return self._flow_run_id
354 @property 1a
355 def state(self) -> State: 1a
356 """The current state of the flow run associated with this future"""
357 if self._final_state:
358 return self._final_state
359 client = get_client(sync_client=True)
360 state = Pending()
361 try:
362 flow_run = client.read_flow_run(flow_run_id=self.flow_run_id)
363 if flow_run.state:
364 state = flow_run.state
365 except ObjectNotFound:
366 # We'll be optimistic and assume this flow run will eventually start
367 pass
368 return state
370 def wait(self, timeout: float | None = None) -> None: 1a
371 return run_coro_as_sync(self.wait_async(timeout=timeout))
373 async def wait_async(self, timeout: float | None = None) -> None: 1a
374 if self._final_state:
375 logger.debug(
376 "Final state already set for %s. Returning...", self.task_run_id
377 )
378 return
380 # Ask for the instance of FlowRunWaiter _now_ so that it's already running and
381 # can catch the completion event if it happens before we start listening for it.
382 FlowRunWaiter.instance()
384 # Read task run to see if it is still running
385 async with get_client() as client:
386 flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id)
387 if flow_run.state is None:
388 raise RuntimeError(
389 f"Flow run {self.flow_run_id} has no state which means it hasn't started yet."
390 )
391 if flow_run.state and flow_run.state.is_final():
392 logger.debug(
393 "Flow run %s already finished. Returning...",
394 self.flow_run_id,
395 )
396 self._final_state = flow_run.state
397 return
399 # If still running, wait for a completed event from the server
400 logger.debug(
401 "Waiting for completed event for flow run %s...",
402 self.flow_run_id,
403 )
404 await FlowRunWaiter.wait_for_flow_run(self._flow_run_id, timeout=timeout)
405 flow_run = await client.read_flow_run(flow_run_id=self._flow_run_id)
406 if flow_run.state and flow_run.state.is_final():
407 self._final_state = flow_run.state
408 return
410 def result( 1a
411 self,
412 timeout: float | None = None,
413 raise_on_failure: bool = True,
414 ) -> R:
415 return run_coro_as_sync(
416 self.aresult(timeout=timeout, raise_on_failure=raise_on_failure)
417 )
419 async def aresult( 1a
420 self,
421 timeout: float | None = None,
422 raise_on_failure: bool = True,
423 ) -> R:
424 if not self._final_state:
425 await self.wait_async(timeout=timeout)
426 if not self._final_state:
427 raise TimeoutError(
428 f"Task run {self.task_run_id} did not complete within {timeout} seconds"
429 )
431 return await self._final_state.aresult(raise_on_failure=raise_on_failure)
433 def add_done_callback(self, fn: Callable[[PrefectFuture[R]], None]) -> None: 1a
434 if self._final_state:
435 fn(self)
436 return
437 FlowRunWaiter.instance()
438 with get_client(sync_client=True) as client:
439 flow_run = client.read_flow_run(flow_run_id=self._flow_run_id)
440 if flow_run.state and flow_run.state.is_final():
441 self._final_state = flow_run.state
442 fn(self)
443 return
444 FlowRunWaiter.add_done_callback(self._flow_run_id, partial(fn, self))
446 def __eq__(self, other: Any) -> bool: 1a
447 if not isinstance(other, PrefectFlowRunFuture):
448 return False
449 return self.flow_run_id == other.flow_run_id
451 def __hash__(self) -> int: 1a
452 return hash(self.flow_run_id)
455class PrefectFutureList(list[PrefectFuture[R]], Iterator[PrefectFuture[R]]): 1a
456 """
457 A list of Prefect futures.
459 This class provides methods to wait for all futures
460 in the list to complete and to retrieve the results of all task runs.
461 """
463 def wait(self, timeout: float | None = None) -> None: 1a
464 """
465 Wait for all futures in the list to complete.
467 Args:
468 timeout: The maximum number of seconds to wait for all futures to
469 complete. This method will not raise if the timeout is reached.
470 """
471 wait(self, timeout=timeout)
473 def result( 1a
474 self: Self,
475 timeout: float | None = None,
476 raise_on_failure: bool = True,
477 ) -> list[R]:
478 """
479 Get the results of all task runs associated with the futures in the list.
481 Args:
482 timeout: The maximum number of seconds to wait for all futures to
483 complete.
484 raise_on_failure: If `True`, an exception will be raised if any task run fails.
486 Returns:
487 A list of results of the task runs.
489 Raises:
490 TimeoutError: If the timeout is reached before all futures complete.
491 """
492 try:
493 with timeout_context(timeout):
494 return [
495 future.result(raise_on_failure=raise_on_failure) for future in self
496 ]
497 except TimeoutError as exc:
498 # timeout came from inside the task
499 if "Scope timed out after {timeout} second(s)." not in str(exc):
500 raise
501 raise TimeoutError(
502 f"Timed out waiting for all futures to complete within {timeout} seconds"
503 ) from exc
506def as_completed( 1a
507 futures: list[PrefectFuture[R]], timeout: float | None = None
508) -> Generator[PrefectFuture[R], None]:
509 unique_futures: set[PrefectFuture[R]] = set(futures)
510 total_futures = len(unique_futures)
511 pending = unique_futures
512 try:
513 with timeout_context(timeout):
514 done = {f for f in unique_futures if f._final_state} # type: ignore[privateUsage]
515 pending = unique_futures - done
516 yield from done
518 finished_event = threading.Event()
519 finished_lock = threading.Lock()
520 finished_futures: list[PrefectFuture[R]] = []
522 def add_to_done(future: PrefectFuture[R]):
523 with finished_lock:
524 finished_futures.append(future)
525 finished_event.set()
527 for future in pending:
528 future.add_done_callback(add_to_done)
530 while pending:
531 finished_event.wait()
532 with finished_lock:
533 done = finished_futures
534 finished_futures = []
535 finished_event.clear()
537 for future in done:
538 pending.remove(future)
539 yield future
541 except TimeoutError:
542 raise TimeoutError(
543 "%d (of %d) futures unfinished" % (len(pending), total_futures)
544 )
547class DoneAndNotDoneFutures(NamedTuple, Generic[R]): 1a
548 """A named 2-tuple of sets.
550 multiple inheritance supported in 3.11+, use typing_extensions.NamedTuple
551 """
553 done: set[PrefectFuture[R]] 1a
554 not_done: set[PrefectFuture[R]] 1a
557def wait( 1a
558 futures: list[PrefectFuture[R]], timeout: float | None = None
559) -> DoneAndNotDoneFutures[R]:
560 """
561 Wait for the futures in the given sequence to complete.
563 Args:
564 futures: The sequence of Futures to wait upon.
565 timeout: The maximum number of seconds to wait. If None, then there
566 is no limit on the wait time.
568 Returns:
569 A named 2-tuple of sets. The first set, named 'done', contains the
570 futures that completed (is finished or cancelled) before the wait
571 completed. The second set, named 'not_done', contains uncompleted
572 futures. Duplicate futures given to *futures* are removed and will be
573 returned only once.
575 Examples:
576 ```python
577 @task
578 def sleep_task(seconds):
579 sleep(seconds)
580 return 42
582 @flow
583 def flow():
584 futures = random_task.map(range(10))
585 done, not_done = wait(futures, timeout=5)
586 print(f"Done: {len(done)}")
587 print(f"Not Done: {len(not_done)}")
588 ```
589 """
590 _futures = set(futures)
591 done = {f for f in _futures if f._final_state}
592 not_done = _futures - done
593 if len(done) == len(_futures):
594 return DoneAndNotDoneFutures(done, not_done)
596 # If no timeout, wait for all futures sequentially
597 if timeout is None:
598 for future in not_done.copy():
599 future.wait()
600 done.add(future)
601 not_done.remove(future)
602 return DoneAndNotDoneFutures(done, not_done)
604 # With timeout, monitor all futures concurrently
605 try:
606 with timeout_context(timeout):
607 finished_event = threading.Event()
608 finished_lock = threading.Lock()
609 finished_futures: list[PrefectFuture[R]] = []
611 def mark_done(future: PrefectFuture[R]):
612 with finished_lock:
613 finished_futures.append(future)
614 finished_event.set()
616 # Add callbacks to all pending futures
617 for future in not_done:
618 future.add_done_callback(mark_done)
620 # Wait for futures to complete within timeout
621 while not_done:
622 # Wait for at least one future to complete
623 finished_event.wait()
624 with finished_lock:
625 newly_done = finished_futures[:]
626 finished_futures.clear()
627 finished_event.clear()
629 # Move completed futures to done set
630 for future in newly_done:
631 if future in not_done:
632 not_done.remove(future)
633 done.add(future)
635 return DoneAndNotDoneFutures(done, not_done)
636 except TimeoutError:
637 logger.debug("Timed out waiting for all futures to complete.")
638 return DoneAndNotDoneFutures(done, not_done)
641def resolve_futures_to_states( 1a
642 expr: PrefectFuture[R] | Any,
643) -> PrefectFuture[R] | Any:
644 """
645 Given a Python built-in collection, recursively find `PrefectFutures` and build a
646 new collection with the same structure with futures resolved to their final states.
647 Resolving futures to their final states may wait for execution to complete.
649 Unsupported object types will be returned without modification.
650 """
652 def _resolve_state(future: PrefectFuture[R]):
653 future.wait()
654 return future.state
656 return _resolve_futures(
657 expr,
658 resolve_fn=_resolve_state,
659 )
662def resolve_futures_to_results( 1a
663 expr: PrefectFuture[R] | Any,
664) -> Any:
665 """
666 Given a Python built-in collection, recursively find `PrefectFutures` and build a
667 new collection with the same structure with futures resolved to their final results.
668 Resolving futures to their final result may wait for execution to complete.
670 Unsupported object types will be returned without modification.
671 """
673 def _resolve_result(future: PrefectFuture[R]) -> Any:
674 future.wait()
675 if future.state.is_completed():
676 return future.result()
677 else:
678 raise Exception("At least one result did not complete successfully")
680 return _resolve_futures(expr, resolve_fn=_resolve_result)
683def _resolve_futures( 1a
684 expr: PrefectFuture[R] | Any,
685 resolve_fn: Callable[[PrefectFuture[R]], Any],
686) -> Any:
687 """Helper function to resolve PrefectFutures in a collection."""
688 futures: set[PrefectFuture[R]] = set()
690 visit_collection(
691 expr,
692 visit_fn=partial(_collect_futures, futures),
693 return_data=False,
694 context={},
695 )
697 # If no futures were found, return the original expression
698 if not futures:
699 return expr
701 # Resolve each future using the provided resolve function
702 resolved_values = {future: resolve_fn(future) for future in futures}
704 def replace_futures(expr: Any, context: Any) -> Any:
705 # Expressions inside quotes should not be modified
706 if isinstance(context.get("annotation"), quote):
707 raise StopVisiting()
709 if isinstance(expr, PrefectFuture):
710 return resolved_values[expr]
711 else:
712 return expr
714 return visit_collection(
715 expr,
716 visit_fn=replace_futures,
717 return_data=True,
718 context={},
719 )
722def _collect_futures( 1a
723 futures: set[PrefectFuture[R]], expr: Any | PrefectFuture[R], context: Any
724) -> Any | PrefectFuture[R]:
725 # Expressions inside quotes should not be traversed
726 if isinstance(context.get("annotation"), quote):
727 raise StopVisiting()
729 if isinstance(expr, PrefectFuture):
730 futures.add(expr)
732 return expr