Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/logs.py: 48%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with log objects. 

3""" 

4 

5from typing import Optional, Sequence 1a

6 

7from fastapi import Body, Depends, WebSocket, status 1a

8from pydantic import TypeAdapter 1a

9from starlette.status import WS_1002_PROTOCOL_ERROR 1a

10 

11import prefect.server.api.dependencies as dependencies 1a

12import prefect.server.models as models 1a

13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

14from prefect.server.logs import stream 1a

15from prefect.server.schemas.actions import LogCreate 1a

16from prefect.server.schemas.core import Log 1a

17from prefect.server.schemas.filters import LogFilter 1a

18from prefect.server.schemas.sorting import LogSort 1a

19from prefect.server.utilities import subscriptions 1a

20from prefect.server.utilities.server import PrefectRouter 1a

21 

22router: PrefectRouter = PrefectRouter(prefix="/logs", tags=["Logs"]) 1a

23 

24 

25@router.post("/", status_code=status.HTTP_201_CREATED) 1a

26async def create_logs( 1a

27 logs: Sequence[LogCreate], 

28 db: PrefectDBInterface = Depends(provide_database_interface), 

29) -> None: 

30 """ 

31 Create new logs from the provided schema. 

32 

33 For more information, see https://docs.prefect.io/v3/how-to-guides/workflows/add-logging. 

34 """ 

35 for batch in models.logs.split_logs_into_batches(logs): 1b

36 async with db.session_context(begin_transaction=True) as session: 1b

37 await models.logs.create_logs(session=session, logs=batch) 1b

38 

39 

40logs_adapter: TypeAdapter[Sequence[Log]] = TypeAdapter(Sequence[Log]) 1a

41 

42 

43@router.post("/filter") 1a

44async def read_logs( 1a

45 limit: int = dependencies.LimitBody(), 

46 offset: int = Body(0, ge=0), 

47 logs: Optional[LogFilter] = None, 

48 sort: LogSort = Body(LogSort.TIMESTAMP_ASC), 

49 db: PrefectDBInterface = Depends(provide_database_interface), 

50) -> Sequence[Log]: 

51 """ 

52 Query for logs. 

53 """ 

54 async with db.session_context() as session: 1bc

55 return logs_adapter.validate_python( 1bc

56 await models.logs.read_logs( 

57 session=session, log_filter=logs, offset=offset, limit=limit, sort=sort 

58 ) 

59 ) 

60 

61 

62@router.websocket("/out") 1a

63async def stream_logs_out(websocket: WebSocket) -> None: 1a

64 """Serve a WebSocket to stream live logs""" 

65 websocket = await subscriptions.accept_prefect_socket(websocket) 

66 if not websocket: 

67 return 

68 

69 try: 

70 # After authentication, the next message is expected to be a filter message, any 

71 # other type of message will close the connection. 

72 message = await websocket.receive_json() 

73 

74 if message["type"] != "filter": 

75 return await websocket.close( 

76 WS_1002_PROTOCOL_ERROR, reason="Expected 'filter' message" 

77 ) 

78 

79 try: 

80 filter = LogFilter.model_validate(message["filter"]) 

81 except Exception as e: 

82 return await websocket.close( 

83 WS_1002_PROTOCOL_ERROR, reason=f"Invalid filter: {e}" 

84 ) 

85 

86 # No backfill support for logs - only live streaming 

87 # Subscribe to the ongoing log stream 

88 async with stream.logs(filter) as log_stream: 

89 async for log in log_stream: 

90 if not log: 

91 if await subscriptions.still_connected(websocket): 

92 continue 

93 break 

94 

95 await websocket.send_json( 

96 {"type": "log", "log": log.model_dump(mode="json")} 

97 ) 

98 

99 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover 

100 pass # it's fine if a client disconnects either normally or abnormally 

101 

102 return None