Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/event_persister.py: 40%

133 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2The event persister moves event messages from the event bus to storage 

3storage as fast as it can. Never gets tired. 

4""" 

5 

6from __future__ import annotations 1a

7 

8import asyncio 1a

9from contextlib import asynccontextmanager 1a

10from datetime import timedelta 1a

11from typing import TYPE_CHECKING, Any, AsyncGenerator, List, NoReturn, TypeVar 1a

12 

13import sqlalchemy as sa 1a

14from sqlalchemy.ext.asyncio import AsyncSession 1a

15 

16from prefect.logging import get_logger 1a

17from prefect.server.database import provide_database_interface 1a

18from prefect.server.events.schemas.events import ReceivedEvent 1a

19from prefect.server.events.storage.database import write_events 1a

20from prefect.server.services.base import RunInEphemeralServers, Service 1a

21from prefect.server.utilities.messaging import ( 1a

22 Consumer, 

23 Message, 

24 MessageHandler, 

25 create_consumer, 

26) 

27from prefect.server.utilities.messaging._consumer_names import ( 1a

28 generate_unique_consumer_name, 

29) 

30from prefect.settings import ( 1a

31 PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE, 

32 PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL, 

33 PREFECT_EVENTS_RETENTION_PERIOD, 

34 PREFECT_SERVER_SERVICES_EVENT_PERSISTER_BATCH_SIZE_DELETE, 

35) 

36from prefect.settings.context import get_current_settings 1a

37from prefect.settings.models.server.services import ServicesBaseSetting 1a

38from prefect.types._datetime import now 1a

39 

40if TYPE_CHECKING: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true1a

41 import logging 

42 

43logger: "logging.Logger" = get_logger(__name__) 1a

44 

45T = TypeVar("T") 1a

46 

47 

48async def batch_delete( 1a

49 session: AsyncSession, 

50 model: type[T], 

51 condition: Any, 

52 batch_size: int = 10_000, 

53) -> int: 

54 """ 

55 Perform a batch deletion of database records using a subquery with LIMIT. Works with both PostgreSQL and 

56 SQLite. Compared to a basic delete(...).where(...), a batch deletion is more robust against timeouts 

57 when handling large tables, which is especially the case if we first delete old entries from long 

58 existing tables. 

59 

60 Returns: 

61 Total number of deleted records 

62 """ 

63 total_deleted = 0 

64 

65 while True: 

66 subquery = ( 

67 sa.select(model.id).where(condition).limit(batch_size).scalar_subquery() 

68 ) 

69 delete_stmt = sa.delete(model).where(model.id.in_(subquery)) 

70 

71 result = await session.execute(delete_stmt) 

72 batch_deleted = result.rowcount 

73 

74 if batch_deleted == 0: 

75 break 

76 

77 total_deleted += batch_deleted 

78 await session.commit() 

79 

80 return total_deleted 

81 

82 

83class EventPersister(RunInEphemeralServers, Service): 1a

84 """A service that persists events to the database as they arrive.""" 

85 

86 consumer_task: asyncio.Task[None] | None = None 1a

87 

88 @classmethod 1a

89 def service_settings(cls) -> ServicesBaseSetting: 1a

90 return get_current_settings().server.services.event_persister 1b

91 

92 def __init__(self): 1a

93 super().__init__() 1b

94 self._started_event: asyncio.Event | None = None 1b

95 

96 @property 1a

97 def started_event(self) -> asyncio.Event: 1a

98 if self._started_event is None: 98 ↛ 100line 98 didn't jump to line 100 because the condition on line 98 was always true1b

99 self._started_event = asyncio.Event() 1b

100 return self._started_event 1b

101 

102 @started_event.setter 1a

103 def started_event(self, value: asyncio.Event) -> None: 1a

104 self._started_event = value 

105 

106 async def start(self) -> NoReturn: 1a

107 assert self.consumer_task is None, "Event persister already started" 1b

108 self.consumer: Consumer = create_consumer( 1b

109 "events", 

110 group="event-persister", 

111 name=generate_unique_consumer_name("event-persister"), 

112 ) 

113 

114 async with create_handler( 1b

115 batch_size=PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE.value(), 

116 flush_every=timedelta( 

117 seconds=PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL.value() 

118 ), 

119 ) as handler: 

120 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 1b

121 logger.debug("Event persister started") 1b

122 self.started_event.set() 1b

123 

124 try: 1b

125 await self.consumer_task 1b

126 except asyncio.CancelledError: 

127 pass 

128 

129 async def stop(self) -> None: 1a

130 assert self.consumer_task is not None, "Event persister not started" 

131 self.consumer_task.cancel() 

132 try: 

133 await self.consumer_task 

134 except asyncio.CancelledError: 

135 pass 

136 finally: 

137 await self.consumer.cleanup() 

138 self.consumer_task = None 

139 if self.started_event: 

140 self.started_event.clear() 

141 logger.debug("Event persister stopped") 

142 

143 

144@asynccontextmanager 1a

145async def create_handler( 1a

146 batch_size: int = 20, 

147 flush_every: timedelta = timedelta(seconds=5), 

148 trim_every: timedelta = timedelta(minutes=15), 

149) -> AsyncGenerator[MessageHandler, None]: 

150 """ 

151 Set up a message handler that will accumulate and send events to 

152 the database every `batch_size` messages, or every `flush_every` interval to flush 

153 any remaining messages 

154 """ 

155 db = provide_database_interface() 1b

156 

157 queue: asyncio.Queue[ReceivedEvent] = asyncio.Queue() 1b

158 

159 async def flush() -> None: 1b

160 logger.debug(f"Persisting {queue.qsize()} events...") 

161 

162 batch: List[ReceivedEvent] = [] 

163 

164 while queue.qsize() > 0: 

165 batch.append(await queue.get()) 

166 

167 try: 

168 async with db.session_context() as session: 

169 await write_events(session=session, events=batch) 

170 await session.commit() 

171 logger.debug("Finished persisting events.") 

172 except Exception: 

173 logger.debug("Error flushing events, restoring to queue", exc_info=True) 

174 for event in batch: 

175 queue.put_nowait(event) 

176 

177 async def trim() -> None: 1b

178 older_than = now("UTC") - PREFECT_EVENTS_RETENTION_PERIOD.value() 

179 delete_batch_size = ( 

180 PREFECT_SERVER_SERVICES_EVENT_PERSISTER_BATCH_SIZE_DELETE.value() 

181 ) 

182 try: 

183 async with db.session_context() as session: 

184 resource_count = await batch_delete( 

185 session, 

186 db.EventResource, 

187 db.EventResource.updated < older_than, 

188 batch_size=delete_batch_size, 

189 ) 

190 

191 event_count = await batch_delete( 

192 session, 

193 db.Event, 

194 db.Event.occurred < older_than, 

195 batch_size=delete_batch_size, 

196 ) 

197 

198 if resource_count or event_count: 

199 logger.debug( 

200 "Trimmed %s events and %s event resources older than %s.", 

201 event_count, 

202 resource_count, 

203 older_than, 

204 ) 

205 except Exception: 

206 logger.exception("Error trimming events and resources", exc_info=True) 

207 

208 async def flush_periodically(): 1b

209 try: 

210 while True: 

211 await asyncio.sleep(flush_every.total_seconds()) 

212 if queue.qsize(): 

213 await flush() 

214 except asyncio.CancelledError: 

215 return 

216 

217 async def trim_periodically(): 1b

218 try: 

219 while True: 

220 await asyncio.sleep(trim_every.total_seconds()) 

221 await trim() 

222 except asyncio.CancelledError: 

223 return 

224 

225 async def message_handler(message: Message): 1b

226 if not message.data: 

227 return 

228 

229 event = ReceivedEvent.model_validate_json(message.data) 

230 

231 logger.debug( 

232 "Received event: %s with id: %s for resource: %s", 

233 event.event, 

234 event.id, 

235 event.resource.get("prefect.resource.id"), 

236 ) 

237 

238 await queue.put(event) 

239 

240 if queue.qsize() >= batch_size: 

241 await flush() 

242 

243 periodic_flush = asyncio.create_task(flush_periodically()) 1b

244 periodic_trim = asyncio.create_task(trim_periodically()) 1b

245 

246 try: 1b

247 yield message_handler 1b

248 finally: 

249 periodic_flush.cancel() 

250 periodic_trim.cancel() 

251 if queue.qsize(): 

252 await flush()