Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/retries.py: 62%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio 1a
2from collections.abc import Coroutine 1a
3from functools import wraps 1a
4from typing import Any, Callable, Optional, TypeVar 1a
6from typing_extensions import ParamSpec 1a
8from prefect._internal._logging import logger 1a
9from prefect.utilities.math import clamped_poisson_interval 1a
11P = ParamSpec("P") 1a
12R = TypeVar("R") 1a
15def exponential_backoff_with_jitter( 1a
16 attempt: int, base_delay: float, max_delay: float
17) -> float:
18 average_interval = min(base_delay * (2**attempt), max_delay)
19 return clamped_poisson_interval(average_interval, clamping_factor=0.3)
22def retry_async_fn( 1a
23 max_attempts: int = 3,
24 backoff_strategy: Callable[
25 [int, float, float], float
26 ] = exponential_backoff_with_jitter,
27 base_delay: float = 1,
28 max_delay: float = 10,
29 retry_on_exceptions: tuple[type[Exception], ...] = (Exception,),
30 operation_name: Optional[str] = None,
31) -> Callable[
32 [Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]
33]:
34 """A decorator for retrying an async function.
36 Args:
37 max_attempts: The maximum number of times to retry the function.
38 backoff_strategy: A function that takes in the number of attempts, the base
39 delay, and the maximum delay, and returns the delay to use for the next
40 attempt. Defaults to an exponential backoff with jitter.
41 base_delay: The base delay to use for the first attempt.
42 max_delay: The maximum delay to use for the last attempt.
43 retry_on_exceptions: A tuple of exception types to retry on. Defaults to
44 retrying on all exceptions.
45 operation_name: Optional name to use for logging the operation instead of
46 the function name. If None, uses the function name.
47 """
49 def decorator( 1ab
50 func: Callable[P, Coroutine[Any, Any, R]],
51 ) -> Callable[P, Coroutine[Any, Any, R]]:
52 @wraps(func) 1ab
53 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1ab
54 name = operation_name or func.__name__ 1cdefg
55 for attempt in range(max_attempts): 55 ↛ 72line 55 didn't jump to line 72 because the loop on line 55 didn't complete1cdefg
56 try: 1cdefg
57 return await func(*args, **kwargs) 1cdefg
58 except retry_on_exceptions as e:
59 if attempt == max_attempts - 1:
60 logger.exception(
61 f"Function {name!r} failed after {max_attempts} attempts"
62 )
63 raise
64 delay = backoff_strategy(attempt, base_delay, max_delay)
65 logger.warning(
66 f"Attempt {attempt + 1} of function {name!r} failed with {type(e).__name__}: {str(e)}. "
67 f"Retrying in {delay:.2f} seconds..."
68 )
69 await asyncio.sleep(delay)
70 # Technically unreachable, but this raise helps pyright know that this function
71 # won't return None.
72 raise Exception(f"Function {name!r} failed after {max_attempts} attempts")
74 return wrapper 1ab
76 return decorator 1ab