Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/timeout.py: 24%
28 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from contextlib import contextmanager 1a
2from typing import Optional 1a
4from prefect._internal.concurrency.cancellation import ( 1a
5 CancelledError,
6 cancel_async_after,
7 cancel_sync_after,
8)
11def fail_if_not_timeout_error(timeout_exc_type: type[Exception]) -> None: 1a
12 if not issubclass(timeout_exc_type, TimeoutError):
13 raise ValueError(
14 "The `timeout_exc_type` argument must be a subclass of `TimeoutError`."
15 )
18@contextmanager 1a
19def timeout_async( 1a
20 seconds: Optional[float] = None, timeout_exc_type: type[TimeoutError] = TimeoutError
21):
22 fail_if_not_timeout_error(timeout_exc_type)
24 if seconds is None:
25 yield
26 return
28 try:
29 with cancel_async_after(timeout=seconds):
30 yield
31 except CancelledError:
32 raise timeout_exc_type(f"Scope timed out after {seconds} second(s).")
35@contextmanager 1a
36def timeout( 1a
37 seconds: Optional[float] = None, timeout_exc_type: type[TimeoutError] = TimeoutError
38):
39 fail_if_not_timeout_error(timeout_exc_type)
41 if seconds is None:
42 yield
43 return
45 try:
46 with cancel_sync_after(timeout=seconds):
47 yield
48 except CancelledError:
49 raise timeout_exc_type(f"Scope timed out after {seconds} second(s).")