Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/logs.py: 81%
40 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Functions for interacting with log ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from typing import TYPE_CHECKING, Generator, Optional, Sequence, Tuple 1a
8from sqlalchemy import delete, select 1a
9from sqlalchemy.ext.asyncio import AsyncSession 1a
11import prefect.server.schemas as schemas 1a
12from prefect.logging import get_logger 1a
13from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
14from prefect.server.logs import messaging 1a
15from prefect.server.schemas.actions import LogCreate 1a
16from prefect.utilities.collections import batched_iterable 1a
18# We have a limit of 32,767 parameters at a time for a single query...
19MAXIMUM_QUERY_PARAMETERS = 32_767 1a
21# ...and logs have a certain number of fields...
22NUMBER_OF_LOG_FIELDS = len(schemas.core.Log.model_fields) 1a
24# ...so we can only INSERT batches of a certain size at a time
25LOG_BATCH_SIZE = MAXIMUM_QUERY_PARAMETERS // NUMBER_OF_LOG_FIELDS 1a
27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true1a
28 import logging
30logger: "logging.Logger" = get_logger(__name__) 1a
33def split_logs_into_batches( 1a
34 logs: Sequence[schemas.actions.LogCreate],
35) -> Generator[Tuple[LogCreate, ...], None, None]:
36 for batch in batched_iterable(logs, LOG_BATCH_SIZE): 1b
37 yield batch 1b
40@db_injector 1a
41async def create_logs( 1a
42 db: PrefectDBInterface, session: AsyncSession, logs: Sequence[LogCreate]
43) -> None:
44 """
45 Creates new logs
47 Args:
48 session: a database session
49 logs: a list of log schemas
51 Returns:
52 None
53 """
54 try: 1b
55 full_logs = [schemas.core.Log(**log.model_dump()) for log in logs] 1b
56 await session.execute( 1b
57 db.queries.insert(db.Log).values(
58 [log.model_dump(exclude={"created", "updated"}) for log in full_logs]
59 )
60 )
61 await messaging.publish_logs(full_logs)
63 except RuntimeError as exc: 1b
64 if "can't create new thread at interpreter shutdown" in str(exc):
65 # Background logs sometimes fail to write when the interpreter is shutting down.
66 # This is a known issue in Python 3.12.2 that can be ignored and is fixed in Python 3.12.3.
67 # see e.g. https://github.com/python/cpython/issues/113964
68 logger.debug("Received event during interpreter shutdown, ignoring")
69 else:
70 raise
73@db_injector 1a
74async def read_logs( 1a
75 db: PrefectDBInterface,
76 session: AsyncSession,
77 log_filter: Optional[schemas.filters.LogFilter],
78 offset: Optional[int] = None,
79 limit: Optional[int] = None,
80 sort: schemas.sorting.LogSort = schemas.sorting.LogSort.TIMESTAMP_ASC,
81) -> Sequence[orm_models.Log]:
82 """
83 Read logs.
85 Args:
86 session: a database session
87 db: the database interface
88 log_filter: only select logs that match these filters
89 offset: Query offset
90 limit: Query limit
91 sort: Query sort
93 Returns:
94 List[orm_models.Log]: the matching logs
95 """
96 query = select(db.Log).order_by(*sort.as_sql_sort()).offset(offset).limit(limit) 1bc
98 if log_filter: 1bc
99 query = query.where(log_filter.as_sql_filter())
101 result = await session.execute(query) 1bc
102 return result.scalars().unique().all()
105@db_injector 1a
106async def delete_logs( 1a
107 db: PrefectDBInterface,
108 session: AsyncSession,
109 log_filter: schemas.filters.LogFilter,
110) -> int:
111 query = delete(db.Log).where(log_filter.as_sql_filter()) 1bc
112 result = await session.execute(query) 1bc
113 return result.rowcount 1bc