Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/logs.py: 46%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Functions for interacting with log ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import TYPE_CHECKING, Generator, Optional, Sequence, Tuple 1a

7 

8from sqlalchemy import delete, select 1a

9from sqlalchemy.ext.asyncio import AsyncSession 1a

10 

11import prefect.server.schemas as schemas 1a

12from prefect.logging import get_logger 1a

13from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

14from prefect.server.logs import messaging 1a

15from prefect.server.schemas.actions import LogCreate 1a

16from prefect.utilities.collections import batched_iterable 1a

17 

18# We have a limit of 32,767 parameters at a time for a single query... 

19MAXIMUM_QUERY_PARAMETERS = 32_767 1a

20 

21# ...and logs have a certain number of fields... 

22NUMBER_OF_LOG_FIELDS = len(schemas.core.Log.model_fields) 1a

23 

24# ...so we can only INSERT batches of a certain size at a time 

25LOG_BATCH_SIZE = MAXIMUM_QUERY_PARAMETERS // NUMBER_OF_LOG_FIELDS 1a

26 

27if TYPE_CHECKING: 27 ↛ 28line 27 didn't jump to line 28 because the condition on line 27 was never true1a

28 import logging 

29 

30logger: "logging.Logger" = get_logger(__name__) 1a

31 

32 

33def split_logs_into_batches( 1a

34 logs: Sequence[schemas.actions.LogCreate], 

35) -> Generator[Tuple[LogCreate, ...], None, None]: 

36 for batch in batched_iterable(logs, LOG_BATCH_SIZE): 

37 yield batch 

38 

39 

40@db_injector 1a

41async def create_logs( 1a

42 db: PrefectDBInterface, session: AsyncSession, logs: Sequence[LogCreate] 

43) -> None: 

44 """ 

45 Creates new logs 

46 

47 Args: 

48 session: a database session 

49 logs: a list of log schemas 

50 

51 Returns: 

52 None 

53 """ 

54 try: 

55 full_logs = [schemas.core.Log(**log.model_dump()) for log in logs] 

56 await session.execute( 

57 db.queries.insert(db.Log).values( 

58 [log.model_dump(exclude={"created", "updated"}) for log in full_logs] 

59 ) 

60 ) 

61 await messaging.publish_logs(full_logs) 

62 

63 except RuntimeError as exc: 

64 if "can't create new thread at interpreter shutdown" in str(exc): 

65 # Background logs sometimes fail to write when the interpreter is shutting down. 

66 # This is a known issue in Python 3.12.2 that can be ignored and is fixed in Python 3.12.3. 

67 # see e.g. https://github.com/python/cpython/issues/113964 

68 logger.debug("Received event during interpreter shutdown, ignoring") 

69 else: 

70 raise 

71 

72 

73@db_injector 1a

74async def read_logs( 1a

75 db: PrefectDBInterface, 

76 session: AsyncSession, 

77 log_filter: Optional[schemas.filters.LogFilter], 

78 offset: Optional[int] = None, 

79 limit: Optional[int] = None, 

80 sort: schemas.sorting.LogSort = schemas.sorting.LogSort.TIMESTAMP_ASC, 

81) -> Sequence[orm_models.Log]: 

82 """ 

83 Read logs. 

84 

85 Args: 

86 session: a database session 

87 db: the database interface 

88 log_filter: only select logs that match these filters 

89 offset: Query offset 

90 limit: Query limit 

91 sort: Query sort 

92 

93 Returns: 

94 List[orm_models.Log]: the matching logs 

95 """ 

96 query = select(db.Log).order_by(*sort.as_sql_sort()).offset(offset).limit(limit) 

97 

98 if log_filter: 

99 query = query.where(log_filter.as_sql_filter()) 

100 

101 result = await session.execute(query) 

102 return result.scalars().unique().all() 

103 

104 

105@db_injector 1a

106async def delete_logs( 1a

107 db: PrefectDBInterface, 

108 session: AsyncSession, 

109 log_filter: schemas.filters.LogFilter, 

110) -> int: 

111 query = delete(db.Log).where(log_filter.as_sql_filter()) 

112 result = await session.execute(query) 

113 return result.rowcount