Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/primitives.py: 59%

31 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Thread-safe async synchronization primitives. 

3""" 

4 

5import asyncio 1a

6import collections 1a

7import threading 1a

8from typing import TypeVar 1a

9 

10from typing_extensions import Literal 1a

11 

12from prefect._internal.concurrency.event_loop import call_soon_in_loop 1a

13 

14T = TypeVar("T") 1a

15 

16 

17class Event: 1a

18 """ 

19 A thread-safe async event. 

20 

21 Unlike `asyncio.Event` this implementation does not bind to a loop on creation. This 

22 matches the behavior of `asyncio.Event` in Python 3.10+, but differs from earlier 

23 versions. 

24 

25 This event also does not support a `clear()` operation. This matches the behavior of 

26 `anyio.Event` types and prevents sneaky bugs; create a new event instead. 

27 """ 

28 

29 def __init__(self) -> None: 1a

30 self._waiters: collections.deque[asyncio.Future[bool]] = collections.deque() 

31 self._value = False 

32 self._lock = threading.Lock() 

33 

34 def set(self) -> None: 1a

35 """ 

36 Set the flag, notifying all waiters. 

37 

38 Unlike `asyncio.Event`, waiters may not be notified immediately when this is 

39 called; instead, notification will be placed on the owning loop of each waiter 

40 for thread safety. 

41 """ 

42 with self._lock: 

43 if not self._value: 

44 self._value = True 

45 

46 # We freeze the waiters queue during iteration so removal in `wait()` 

47 # does not change the size during iteration. The lock ensures that no 

48 # waiters are added until after we finish here. 

49 for fut in tuple(self._waiters): 

50 if not fut.done(): 

51 # The `asyncio.Future.set_result` method is not thread-safe 

52 # and must be run in the loop that owns the future 

53 call_soon_in_loop(fut._loop, fut.set_result, True) 

54 

55 def is_set(self): 1a

56 return self._value 1bdec

57 

58 async def wait(self) -> Literal[True]: 1a

59 """ 

60 Block until the internal flag is true. 

61 

62 If the internal flag is true on entry, return True immediately. 

63 Otherwise, block until another `set()` is called, then return True. 

64 """ 

65 # Taking a sync lock in an async context is generally not recommended, but this 

66 # lock should only ever be held very briefly and we need to prevent race 

67 # conditions during between `set()` and `wait()` 

68 with self._lock: 1b

69 if self._value: 69 ↛ 70line 69 didn't jump to line 70 because the condition on line 69 was never true1b

70 return True 

71 

72 fut: asyncio.Future[bool] = asyncio.get_running_loop().create_future() 1b

73 self._waiters.append(fut) 1b

74 

75 try: 1b

76 await fut 1b

77 return True 

78 finally: 

79 self._waiters.remove(fut)