Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/event_persister.py: 40%
133 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2The event persister moves event messages from the event bus to storage
3storage as fast as it can. Never gets tired.
4"""
6from __future__ import annotations 1a
8import asyncio 1a
9from contextlib import asynccontextmanager 1a
10from datetime import timedelta 1a
11from typing import TYPE_CHECKING, Any, AsyncGenerator, List, NoReturn, TypeVar 1a
13import sqlalchemy as sa 1a
14from sqlalchemy.ext.asyncio import AsyncSession 1a
16from prefect.logging import get_logger 1a
17from prefect.server.database import provide_database_interface 1a
18from prefect.server.events.schemas.events import ReceivedEvent 1a
19from prefect.server.events.storage.database import write_events 1a
20from prefect.server.services.base import RunInEphemeralServers, Service 1a
21from prefect.server.utilities.messaging import ( 1a
22 Consumer,
23 Message,
24 MessageHandler,
25 create_consumer,
26)
27from prefect.server.utilities.messaging._consumer_names import ( 1a
28 generate_unique_consumer_name,
29)
30from prefect.settings import ( 1a
31 PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE,
32 PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL,
33 PREFECT_EVENTS_RETENTION_PERIOD,
34 PREFECT_SERVER_SERVICES_EVENT_PERSISTER_BATCH_SIZE_DELETE,
35)
36from prefect.settings.context import get_current_settings 1a
37from prefect.settings.models.server.services import ServicesBaseSetting 1a
38from prefect.types._datetime import now 1a
40if TYPE_CHECKING: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true1a
41 import logging
43logger: "logging.Logger" = get_logger(__name__) 1a
45T = TypeVar("T") 1a
48async def batch_delete( 1a
49 session: AsyncSession,
50 model: type[T],
51 condition: Any,
52 batch_size: int = 10_000,
53) -> int:
54 """
55 Perform a batch deletion of database records using a subquery with LIMIT. Works with both PostgreSQL and
56 SQLite. Compared to a basic delete(...).where(...), a batch deletion is more robust against timeouts
57 when handling large tables, which is especially the case if we first delete old entries from long
58 existing tables.
60 Returns:
61 Total number of deleted records
62 """
63 total_deleted = 0
65 while True:
66 subquery = (
67 sa.select(model.id).where(condition).limit(batch_size).scalar_subquery()
68 )
69 delete_stmt = sa.delete(model).where(model.id.in_(subquery))
71 result = await session.execute(delete_stmt)
72 batch_deleted = result.rowcount
74 if batch_deleted == 0:
75 break
77 total_deleted += batch_deleted
78 await session.commit()
80 return total_deleted
83class EventPersister(RunInEphemeralServers, Service): 1a
84 """A service that persists events to the database as they arrive."""
86 consumer_task: asyncio.Task[None] | None = None 1a
88 @classmethod 1a
89 def service_settings(cls) -> ServicesBaseSetting: 1a
90 return get_current_settings().server.services.event_persister 1b
92 def __init__(self): 1a
93 super().__init__() 1b
94 self._started_event: asyncio.Event | None = None 1b
96 @property 1a
97 def started_event(self) -> asyncio.Event: 1a
98 if self._started_event is None: 98 ↛ 100line 98 didn't jump to line 100 because the condition on line 98 was always true1b
99 self._started_event = asyncio.Event() 1b
100 return self._started_event 1b
102 @started_event.setter 1a
103 def started_event(self, value: asyncio.Event) -> None: 1a
104 self._started_event = value
106 async def start(self) -> NoReturn: 1a
107 assert self.consumer_task is None, "Event persister already started" 1b
108 self.consumer: Consumer = create_consumer( 1b
109 "events",
110 group="event-persister",
111 name=generate_unique_consumer_name("event-persister"),
112 )
114 async with create_handler( 1b
115 batch_size=PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE.value(),
116 flush_every=timedelta(
117 seconds=PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL.value()
118 ),
119 ) as handler:
120 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 1b
121 logger.debug("Event persister started") 1b
122 self.started_event.set() 1b
124 try: 1b
125 await self.consumer_task 1b
126 except asyncio.CancelledError:
127 pass
129 async def stop(self) -> None: 1a
130 assert self.consumer_task is not None, "Event persister not started"
131 self.consumer_task.cancel()
132 try:
133 await self.consumer_task
134 except asyncio.CancelledError:
135 pass
136 finally:
137 await self.consumer.cleanup()
138 self.consumer_task = None
139 if self.started_event:
140 self.started_event.clear()
141 logger.debug("Event persister stopped")
144@asynccontextmanager 1a
145async def create_handler( 1a
146 batch_size: int = 20,
147 flush_every: timedelta = timedelta(seconds=5),
148 trim_every: timedelta = timedelta(minutes=15),
149) -> AsyncGenerator[MessageHandler, None]:
150 """
151 Set up a message handler that will accumulate and send events to
152 the database every `batch_size` messages, or every `flush_every` interval to flush
153 any remaining messages
154 """
155 db = provide_database_interface() 1b
157 queue: asyncio.Queue[ReceivedEvent] = asyncio.Queue() 1b
159 async def flush() -> None: 1b
160 logger.debug(f"Persisting {queue.qsize()} events...")
162 batch: List[ReceivedEvent] = []
164 while queue.qsize() > 0:
165 batch.append(await queue.get())
167 try:
168 async with db.session_context() as session:
169 await write_events(session=session, events=batch)
170 await session.commit()
171 logger.debug("Finished persisting events.")
172 except Exception:
173 logger.debug("Error flushing events, restoring to queue", exc_info=True)
174 for event in batch:
175 queue.put_nowait(event)
177 async def trim() -> None: 1b
178 older_than = now("UTC") - PREFECT_EVENTS_RETENTION_PERIOD.value()
179 delete_batch_size = (
180 PREFECT_SERVER_SERVICES_EVENT_PERSISTER_BATCH_SIZE_DELETE.value()
181 )
182 try:
183 async with db.session_context() as session:
184 resource_count = await batch_delete(
185 session,
186 db.EventResource,
187 db.EventResource.updated < older_than,
188 batch_size=delete_batch_size,
189 )
191 event_count = await batch_delete(
192 session,
193 db.Event,
194 db.Event.occurred < older_than,
195 batch_size=delete_batch_size,
196 )
198 if resource_count or event_count:
199 logger.debug(
200 "Trimmed %s events and %s event resources older than %s.",
201 event_count,
202 resource_count,
203 older_than,
204 )
205 except Exception:
206 logger.exception("Error trimming events and resources", exc_info=True)
208 async def flush_periodically(): 1b
209 try:
210 while True:
211 await asyncio.sleep(flush_every.total_seconds())
212 if queue.qsize():
213 await flush()
214 except asyncio.CancelledError:
215 return
217 async def trim_periodically(): 1b
218 try:
219 while True:
220 await asyncio.sleep(trim_every.total_seconds())
221 await trim()
222 except asyncio.CancelledError:
223 return
225 async def message_handler(message: Message): 1b
226 if not message.data:
227 return
229 event = ReceivedEvent.model_validate_json(message.data)
231 logger.debug(
232 "Received event: %s with id: %s for resource: %s",
233 event.event,
234 event.id,
235 event.resource.get("prefect.resource.id"),
236 )
238 await queue.put(event)
240 if queue.qsize() >= batch_size:
241 await flush()
243 periodic_flush = asyncio.create_task(flush_periodically()) 1b
244 periodic_trim = asyncio.create_task(trim_periodically()) 1b
246 try: 1b
247 yield message_handler 1b
248 finally:
249 periodic_flush.cancel()
250 periodic_trim.cancel()
251 if queue.qsize():
252 await flush()