Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/primitives.py: 31%
31 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Thread-safe async synchronization primitives.
3"""
5import asyncio 1a
6import collections 1a
7import threading 1a
8from typing import TypeVar 1a
10from typing_extensions import Literal 1a
12from prefect._internal.concurrency.event_loop import call_soon_in_loop 1a
14T = TypeVar("T") 1a
17class Event: 1a
18 """
19 A thread-safe async event.
21 Unlike `asyncio.Event` this implementation does not bind to a loop on creation. This
22 matches the behavior of `asyncio.Event` in Python 3.10+, but differs from earlier
23 versions.
25 This event also does not support a `clear()` operation. This matches the behavior of
26 `anyio.Event` types and prevents sneaky bugs; create a new event instead.
27 """
29 def __init__(self) -> None: 1a
30 self._waiters: collections.deque[asyncio.Future[bool]] = collections.deque()
31 self._value = False
32 self._lock = threading.Lock()
34 def set(self) -> None: 1a
35 """
36 Set the flag, notifying all waiters.
38 Unlike `asyncio.Event`, waiters may not be notified immediately when this is
39 called; instead, notification will be placed on the owning loop of each waiter
40 for thread safety.
41 """
42 with self._lock:
43 if not self._value:
44 self._value = True
46 # We freeze the waiters queue during iteration so removal in `wait()`
47 # does not change the size during iteration. The lock ensures that no
48 # waiters are added until after we finish here.
49 for fut in tuple(self._waiters):
50 if not fut.done():
51 # The `asyncio.Future.set_result` method is not thread-safe
52 # and must be run in the loop that owns the future
53 call_soon_in_loop(fut._loop, fut.set_result, True)
55 def is_set(self): 1a
56 return self._value
58 async def wait(self) -> Literal[True]: 1a
59 """
60 Block until the internal flag is true.
62 If the internal flag is true on entry, return True immediately.
63 Otherwise, block until another `set()` is called, then return True.
64 """
65 # Taking a sync lock in an async context is generally not recommended, but this
66 # lock should only ever be held very briefly and we need to prevent race
67 # conditions during between `set()` and `wait()`
68 with self._lock:
69 if self._value:
70 return True
72 fut: asyncio.Future[bool] = asyncio.get_running_loop().create_future()
73 self._waiters.append(fut)
75 try:
76 await fut
77 return True
78 finally:
79 self._waiters.remove(fut)