Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/timeout.py: 24%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from contextlib import contextmanager 1a

2from typing import Optional 1a

3 

4from prefect._internal.concurrency.cancellation import ( 1a

5 CancelledError, 

6 cancel_async_after, 

7 cancel_sync_after, 

8) 

9 

10 

11def fail_if_not_timeout_error(timeout_exc_type: type[Exception]) -> None: 1a

12 if not issubclass(timeout_exc_type, TimeoutError): 

13 raise ValueError( 

14 "The `timeout_exc_type` argument must be a subclass of `TimeoutError`." 

15 ) 

16 

17 

18@contextmanager 1a

19def timeout_async( 1a

20 seconds: Optional[float] = None, timeout_exc_type: type[TimeoutError] = TimeoutError 

21): 

22 fail_if_not_timeout_error(timeout_exc_type) 

23 

24 if seconds is None: 

25 yield 

26 return 

27 

28 try: 

29 with cancel_async_after(timeout=seconds): 

30 yield 

31 except CancelledError: 

32 raise timeout_exc_type(f"Scope timed out after {seconds} second(s).") 

33 

34 

35@contextmanager 1a

36def timeout( 1a

37 seconds: Optional[float] = None, timeout_exc_type: type[TimeoutError] = TimeoutError 

38): 

39 fail_if_not_timeout_error(timeout_exc_type) 

40 

41 if seconds is None: 

42 yield 

43 return 

44 

45 try: 

46 with cancel_sync_after(timeout=seconds): 

47 yield 

48 except CancelledError: 

49 raise timeout_exc_type(f"Scope timed out after {seconds} second(s).")