Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/logs.py: 48%
48 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with log objects.
3"""
5from typing import Optional, Sequence 1a
7from fastapi import Body, Depends, WebSocket, status 1a
8from pydantic import TypeAdapter 1a
9from starlette.status import WS_1002_PROTOCOL_ERROR 1a
11import prefect.server.api.dependencies as dependencies 1a
12import prefect.server.models as models 1a
13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
14from prefect.server.logs import stream 1a
15from prefect.server.schemas.actions import LogCreate 1a
16from prefect.server.schemas.core import Log 1a
17from prefect.server.schemas.filters import LogFilter 1a
18from prefect.server.schemas.sorting import LogSort 1a
19from prefect.server.utilities import subscriptions 1a
20from prefect.server.utilities.server import PrefectRouter 1a
22router: PrefectRouter = PrefectRouter(prefix="/logs", tags=["Logs"]) 1a
25@router.post("/", status_code=status.HTTP_201_CREATED) 1a
26async def create_logs( 1a
27 logs: Sequence[LogCreate],
28 db: PrefectDBInterface = Depends(provide_database_interface),
29) -> None:
30 """
31 Create new logs from the provided schema.
33 For more information, see https://docs.prefect.io/v3/how-to-guides/workflows/add-logging.
34 """
35 for batch in models.logs.split_logs_into_batches(logs): 1b
36 async with db.session_context(begin_transaction=True) as session: 1b
37 await models.logs.create_logs(session=session, logs=batch) 1b
40logs_adapter: TypeAdapter[Sequence[Log]] = TypeAdapter(Sequence[Log]) 1a
43@router.post("/filter") 1a
44async def read_logs( 1a
45 limit: int = dependencies.LimitBody(),
46 offset: int = Body(0, ge=0),
47 logs: Optional[LogFilter] = None,
48 sort: LogSort = Body(LogSort.TIMESTAMP_ASC),
49 db: PrefectDBInterface = Depends(provide_database_interface),
50) -> Sequence[Log]:
51 """
52 Query for logs.
53 """
54 async with db.session_context() as session: 1bc
55 return logs_adapter.validate_python( 1bc
56 await models.logs.read_logs(
57 session=session, log_filter=logs, offset=offset, limit=limit, sort=sort
58 )
59 )
62@router.websocket("/out") 1a
63async def stream_logs_out(websocket: WebSocket) -> None: 1a
64 """Serve a WebSocket to stream live logs"""
65 websocket = await subscriptions.accept_prefect_socket(websocket)
66 if not websocket:
67 return
69 try:
70 # After authentication, the next message is expected to be a filter message, any
71 # other type of message will close the connection.
72 message = await websocket.receive_json()
74 if message["type"] != "filter":
75 return await websocket.close(
76 WS_1002_PROTOCOL_ERROR, reason="Expected 'filter' message"
77 )
79 try:
80 filter = LogFilter.model_validate(message["filter"])
81 except Exception as e:
82 return await websocket.close(
83 WS_1002_PROTOCOL_ERROR, reason=f"Invalid filter: {e}"
84 )
86 # No backfill support for logs - only live streaming
87 # Subscribe to the ongoing log stream
88 async with stream.logs(filter) as log_stream:
89 async for log in log_stream:
90 if not log:
91 if await subscriptions.still_connected(websocket):
92 continue
93 break
95 await websocket.send_json(
96 {"type": "log", "log": log.model_dump(mode="json")}
97 )
99 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover
100 pass # it's fine if a client disconnects either normally or abnormally
102 return None