Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/stream.py: 38%

104 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4from asyncio import Queue 1a

5from contextlib import asynccontextmanager 1a

6from typing import ( 1a

7 TYPE_CHECKING, 

8 AsyncGenerator, 

9 AsyncIterable, 

10 Dict, 

11 NoReturn, 

12 Optional, 

13 Set, 

14) 

15 

16from prefect.logging import get_logger 1a

17from prefect.server.events.filters import EventFilter 1a

18from prefect.server.events.schemas.events import ReceivedEvent 1a

19from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a

20from prefect.server.utilities import messaging 1a

21from prefect.settings.context import get_current_settings 1a

22from prefect.settings.models.server.services import ServicesBaseSetting 1a

23 

24if TYPE_CHECKING: 24 ↛ 25line 24 didn't jump to line 25 because the condition on line 24 was never true1a

25 import logging 

26 

27logger: "logging.Logger" = get_logger(__name__) 1a

28 

29subscribers: Set["Queue[ReceivedEvent]"] = set() 1a

30filters: Dict["Queue[ReceivedEvent]", EventFilter] = {} 1a

31 

32# The maximum number of message that can be waiting for one subscriber, after which 

33# new messages will be dropped 

34SUBSCRIPTION_BACKLOG = 256 1a

35 

36 

37@asynccontextmanager 1a

38async def subscribed( 1a

39 filter: EventFilter, 

40) -> AsyncGenerator["Queue[ReceivedEvent]", None]: 

41 queue: "Queue[ReceivedEvent]" = Queue(maxsize=SUBSCRIPTION_BACKLOG) 

42 

43 subscribers.add(queue) 

44 filters[queue] = filter 

45 

46 try: 

47 yield queue 

48 finally: 

49 subscribers.remove(queue) 

50 del filters[queue] 

51 

52 

53@asynccontextmanager 1a

54async def events( 1a

55 filter: EventFilter, 

56) -> AsyncGenerator[AsyncIterable[Optional[ReceivedEvent]], None]: 

57 async with subscribed(filter) as queue: 

58 

59 async def consume() -> AsyncGenerator[Optional[ReceivedEvent], None]: 

60 while True: 

61 # Use a brief timeout to allow for cancellation, especially when a 

62 # client disconnects. Without a timeout here, a consumer may block 

63 # forever waiting for a message to be put on the queue, and never notice 

64 # that their client (like a websocket) has actually disconnected. 

65 try: 

66 event = await asyncio.wait_for(queue.get(), timeout=1) 

67 except asyncio.TimeoutError: 

68 # If the queue is empty, we'll yield to the caller with a None in 

69 # order to give it control over what happens next. This helps with 

70 # the outbound websocket, where we want to check if the client is 

71 # still connected periodically. 

72 yield None 

73 continue 

74 

75 yield event 

76 

77 yield consume() 

78 

79 

80@asynccontextmanager 1a

81async def distributor() -> AsyncGenerator[messaging.MessageHandler, None]: 1a

82 async def message_handler(message: messaging.Message): 

83 assert message.data 1bcdefg

84 

85 try: 1bcdefg

86 assert message.attributes 1bcdefg

87 except Exception: 

88 return 

89 

90 if subscribers: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true1bcdefg

91 event = ReceivedEvent.model_validate_json(message.data) 

92 for queue in subscribers: 

93 filter = filters[queue] 

94 if filter.excludes(event): 

95 continue 

96 

97 try: 

98 queue.put_nowait(event) 

99 except asyncio.QueueFull: 

100 continue 

101 

102 yield message_handler 

103 

104 

105_distributor_task: asyncio.Task[None] | None = None 1a

106_distributor_started: asyncio.Event | None = None 1a

107 

108 

109async def start_distributor() -> None: 1a

110 """Starts the distributor consumer as a global background task""" 

111 global _distributor_task 

112 global _distributor_started 

113 if _distributor_task: 

114 return 

115 

116 _distributor_started = asyncio.Event() 

117 _distributor_task = asyncio.create_task(run_distributor(_distributor_started)) 

118 await _distributor_started.wait() 

119 

120 

121async def stop_distributor() -> None: 1a

122 """Stops the distributor consumer global background task""" 

123 global _distributor_task 

124 global _distributor_started 

125 if not _distributor_task: 

126 return 

127 

128 task = _distributor_task 

129 _distributor_task = None 

130 _distributor_started = None 

131 

132 task.cancel() 

133 try: 

134 await asyncio.shield(task) 

135 except asyncio.CancelledError: 

136 pass 

137 

138 

139class Distributor(RunInEphemeralServers, RunInWebservers, Service): 1a

140 name: str = "Distributor" 1a

141 

142 @classmethod 1a

143 def service_settings(cls) -> ServicesBaseSetting: 1a

144 raise NotImplementedError("Distributor does not have settings") 

145 

146 @classmethod 1a

147 def environment_variable_name(cls) -> dict[str, str]: 1a

148 return "PREFECT_API_EVENTS_STREAM_OUT_ENABLED" 

149 

150 @classmethod 1a

151 def enabled(cls) -> bool: 1a

152 return get_current_settings().server.events.stream_out_enabled 1h

153 

154 async def start(self) -> None: 1a

155 await start_distributor() 

156 try: 

157 if TYPE_CHECKING: 

158 # start_distributor should have set _distributor_task 

159 assert _distributor_task 

160 await _distributor_task 

161 except asyncio.CancelledError: 

162 pass 

163 

164 async def stop(self) -> None: 1a

165 await stop_distributor() 

166 

167 

168async def run_distributor(started: asyncio.Event) -> NoReturn: 1a

169 """Runs the distributor consumer forever until it is cancelled""" 

170 global _distributor_started 

171 async with messaging.ephemeral_subscription( 

172 topic="events", 

173 ) as create_consumer_kwargs: 

174 started.set() 

175 async with distributor() as handler: 

176 consumer = messaging.create_consumer(**create_consumer_kwargs) 

177 await consumer.run( 

178 handler=handler, 

179 )