Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/retries.py: 62%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2from collections.abc import Coroutine 1a

3from functools import wraps 1a

4from typing import Any, Callable, Optional, TypeVar 1a

5 

6from typing_extensions import ParamSpec 1a

7 

8from prefect._internal._logging import logger 1a

9from prefect.utilities.math import clamped_poisson_interval 1a

10 

11P = ParamSpec("P") 1a

12R = TypeVar("R") 1a

13 

14 

15def exponential_backoff_with_jitter( 1a

16 attempt: int, base_delay: float, max_delay: float 

17) -> float: 

18 average_interval = min(base_delay * (2**attempt), max_delay) 

19 return clamped_poisson_interval(average_interval, clamping_factor=0.3) 

20 

21 

22def retry_async_fn( 1a

23 max_attempts: int = 3, 

24 backoff_strategy: Callable[ 

25 [int, float, float], float 

26 ] = exponential_backoff_with_jitter, 

27 base_delay: float = 1, 

28 max_delay: float = 10, 

29 retry_on_exceptions: tuple[type[Exception], ...] = (Exception,), 

30 operation_name: Optional[str] = None, 

31) -> Callable[ 

32 [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]] 

33]: 

34 """A decorator for retrying an async function. 

35 

36 Args: 

37 max_attempts: The maximum number of times to retry the function. 

38 backoff_strategy: A function that takes in the number of attempts, the base 

39 delay, and the maximum delay, and returns the delay to use for the next 

40 attempt. Defaults to an exponential backoff with jitter. 

41 base_delay: The base delay to use for the first attempt. 

42 max_delay: The maximum delay to use for the last attempt. 

43 retry_on_exceptions: A tuple of exception types to retry on. Defaults to 

44 retrying on all exceptions. 

45 operation_name: Optional name to use for logging the operation instead of 

46 the function name. If None, uses the function name. 

47 """ 

48 

49 def decorator( 1ab

50 func: Callable[P, Coroutine[Any, Any, R]], 

51 ) -> Callable[P, Coroutine[Any, Any, R]]: 

52 @wraps(func) 1ab

53 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1ab

54 name = operation_name or func.__name__ 1c

55 for attempt in range(max_attempts): 55 ↛ 72line 55 didn't jump to line 72 because the loop on line 55 didn't complete1c

56 try: 1c

57 return await func(*args, **kwargs) 1c

58 except retry_on_exceptions as e: 

59 if attempt == max_attempts - 1: 

60 logger.exception( 

61 f"Function {name!r} failed after {max_attempts} attempts" 

62 ) 

63 raise 

64 delay = backoff_strategy(attempt, base_delay, max_delay) 

65 logger.warning( 

66 f"Attempt {attempt + 1} of function {name!r} failed with {type(e).__name__}: {str(e)}. " 

67 f"Retrying in {delay:.2f} seconds..." 

68 ) 

69 await asyncio.sleep(delay) 

70 # Technically unreachable, but this raise helps pyright know that this function 

71 # won't return None. 

72 raise Exception(f"Function {name!r} failed after {max_attempts} attempts") 

73 

74 return wrapper 1ab

75 

76 return decorator 1ab