Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_logs/client.py: 34%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING, Any, Iterable, Union 1a
5from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true1a
8 from prefect.client.schemas.actions import (
9 LogCreate,
10 )
11 from prefect.client.schemas.filters import (
12 LogFilter,
13 )
14 from prefect.client.schemas.objects import (
15 Log,
16 )
17 from prefect.client.schemas.sorting import LogSort
20class LogClient(BaseClient): 1a
21 def create_logs(self, logs: Iterable[Union["LogCreate", dict[str, Any]]]) -> None: 1a
22 """
23 Create logs for a flow or task run
24 """
25 from prefect.client.schemas.actions import LogCreate
27 serialized_logs = [
28 log.model_dump(mode="json") if isinstance(log, LogCreate) else log
29 for log in logs
30 ]
31 self.request("POST", "/logs/", json=serialized_logs)
33 def read_logs( 1a
34 self,
35 log_filter: "LogFilter | None" = None,
36 limit: int | None = None,
37 offset: int | None = None,
38 sort: "LogSort | None" = None,
39 ) -> list["Log"]:
40 """
41 Read flow and task run logs.
42 """
43 from prefect.client.schemas.sorting import LogSort
45 body: dict[str, Any] = {
46 "logs": log_filter.model_dump(mode="json") if log_filter else None,
47 "limit": limit,
48 "offset": offset,
49 "sort": sort or LogSort.TIMESTAMP_ASC,
50 }
51 response = self.request("POST", "/logs/filter", json=body)
52 from prefect.client.schemas.objects import Log
54 return Log.model_validate_list(response.json())
57class LogAsyncClient(BaseAsyncClient): 1a
58 async def create_logs( 1a
59 self, logs: Iterable[Union["LogCreate", dict[str, Any]]]
60 ) -> None:
61 """
62 Create logs for a flow or task run
64 Args:
65 logs: An iterable of `LogCreate` objects or already json-compatible dicts
66 """
67 from prefect.client.schemas.actions import LogCreate
69 serialized_logs = [
70 log.model_dump(mode="json") if isinstance(log, LogCreate) else log
71 for log in logs
72 ]
73 await self.request("POST", "/logs/", json=serialized_logs)
75 async def read_logs( 1a
76 self,
77 log_filter: "LogFilter | None" = None,
78 limit: int | None = None,
79 offset: int | None = None,
80 sort: "LogSort | None" = None,
81 ) -> list[Log]:
82 """
83 Read flow and task run logs.
84 """
85 from prefect.client.schemas.sorting import LogSort
87 body: dict[str, Any] = {
88 "logs": log_filter.model_dump(mode="json") if log_filter else None,
89 "limit": limit,
90 "offset": offset,
91 "sort": sort or LogSort.TIMESTAMP_ASC,
92 }
94 response = await self.request("POST", "/logs/filter", json=body)
95 from prefect.client.schemas.objects import Log
97 return Log.model_validate_list(response.json())