Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/stream.py: 38%
104 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4from asyncio import Queue 1a
5from contextlib import asynccontextmanager 1a
6from typing import ( 1a
7 TYPE_CHECKING,
8 AsyncGenerator,
9 AsyncIterable,
10 Dict,
11 NoReturn,
12 Optional,
13 Set,
14)
16from prefect.logging import get_logger 1a
17from prefect.server.events.filters import EventFilter 1a
18from prefect.server.events.schemas.events import ReceivedEvent 1a
19from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a
20from prefect.server.utilities import messaging 1a
21from prefect.settings.context import get_current_settings 1a
22from prefect.settings.models.server.services import ServicesBaseSetting 1a
24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true1a
25 import logging
27logger: "logging.Logger" = get_logger(__name__) 1a
29subscribers: Set["Queue[ReceivedEvent]"] = set() 1a
30filters: Dict["Queue[ReceivedEvent]", EventFilter] = {} 1a
32# The maximum number of message that can be waiting for one subscriber, after which
33# new messages will be dropped
34SUBSCRIPTION_BACKLOG = 256 1a
37@asynccontextmanager 1a
38async def subscribed( 1a
39 filter: EventFilter,
40) -> AsyncGenerator["Queue[ReceivedEvent]", None]:
41 queue: "Queue[ReceivedEvent]" = Queue(maxsize=SUBSCRIPTION_BACKLOG)
43 subscribers.add(queue)
44 filters[queue] = filter
46 try:
47 yield queue
48 finally:
49 subscribers.remove(queue)
50 del filters[queue]
53@asynccontextmanager 1a
54async def events( 1a
55 filter: EventFilter,
56) -> AsyncGenerator[AsyncIterable[Optional[ReceivedEvent]], None]:
57 async with subscribed(filter) as queue:
59 async def consume() -> AsyncGenerator[Optional[ReceivedEvent], None]:
60 while True:
61 # Use a brief timeout to allow for cancellation, especially when a
62 # client disconnects. Without a timeout here, a consumer may block
63 # forever waiting for a message to be put on the queue, and never notice
64 # that their client (like a websocket) has actually disconnected.
65 try:
66 event = await asyncio.wait_for(queue.get(), timeout=1)
67 except asyncio.TimeoutError:
68 # If the queue is empty, we'll yield to the caller with a None in
69 # order to give it control over what happens next. This helps with
70 # the outbound websocket, where we want to check if the client is
71 # still connected periodically.
72 yield None
73 continue
75 yield event
77 yield consume()
80@asynccontextmanager 1a
81async def distributor() -> AsyncGenerator[messaging.MessageHandler, None]: 1a
82 async def message_handler(message: messaging.Message):
83 assert message.data 1bcdefg
85 try: 1bcdefg
86 assert message.attributes 1bcdefg
87 except Exception:
88 return
90 if subscribers: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true1bcdefg
91 event = ReceivedEvent.model_validate_json(message.data)
92 for queue in subscribers:
93 filter = filters[queue]
94 if filter.excludes(event):
95 continue
97 try:
98 queue.put_nowait(event)
99 except asyncio.QueueFull:
100 continue
102 yield message_handler
105_distributor_task: asyncio.Task[None] | None = None 1a
106_distributor_started: asyncio.Event | None = None 1a
109async def start_distributor() -> None: 1a
110 """Starts the distributor consumer as a global background task"""
111 global _distributor_task
112 global _distributor_started
113 if _distributor_task:
114 return
116 _distributor_started = asyncio.Event()
117 _distributor_task = asyncio.create_task(run_distributor(_distributor_started))
118 await _distributor_started.wait()
121async def stop_distributor() -> None: 1a
122 """Stops the distributor consumer global background task"""
123 global _distributor_task
124 global _distributor_started
125 if not _distributor_task:
126 return
128 task = _distributor_task
129 _distributor_task = None
130 _distributor_started = None
132 task.cancel()
133 try:
134 await asyncio.shield(task)
135 except asyncio.CancelledError:
136 pass
139class Distributor(RunInEphemeralServers, RunInWebservers, Service): 1a
140 name: str = "Distributor" 1a
142 @classmethod 1a
143 def service_settings(cls) -> ServicesBaseSetting: 1a
144 raise NotImplementedError("Distributor does not have settings")
146 @classmethod 1a
147 def environment_variable_name(cls) -> dict[str, str]: 1a
148 return "PREFECT_API_EVENTS_STREAM_OUT_ENABLED"
150 @classmethod 1a
151 def enabled(cls) -> bool: 1a
152 return get_current_settings().server.events.stream_out_enabled 1h
154 async def start(self) -> None: 1a
155 await start_distributor()
156 try:
157 if TYPE_CHECKING:
158 # start_distributor should have set _distributor_task
159 assert _distributor_task
160 await _distributor_task
161 except asyncio.CancelledError:
162 pass
164 async def stop(self) -> None: 1a
165 await stop_distributor()
168async def run_distributor(started: asyncio.Event) -> NoReturn: 1a
169 """Runs the distributor consumer forever until it is cancelled"""
170 global _distributor_started
171 async with messaging.ephemeral_subscription(
172 topic="events",
173 ) as create_consumer_kwargs:
174 started.set()
175 async with distributor() as handler:
176 consumer = messaging.create_consumer(**create_consumer_kwargs)
177 await consumer.run(
178 handler=handler,
179 )