Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_logs/client.py: 34%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING, Any, Iterable, Union 1a

4 

5from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

6 

7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true1a

8 from prefect.client.schemas.actions import ( 

9 LogCreate, 

10 ) 

11 from prefect.client.schemas.filters import ( 

12 LogFilter, 

13 ) 

14 from prefect.client.schemas.objects import ( 

15 Log, 

16 ) 

17 from prefect.client.schemas.sorting import LogSort 

18 

19 

20class LogClient(BaseClient): 1a

21 def create_logs(self, logs: Iterable[Union["LogCreate", dict[str, Any]]]) -> None: 1a

22 """ 

23 Create logs for a flow or task run 

24 """ 

25 from prefect.client.schemas.actions import LogCreate 

26 

27 serialized_logs = [ 

28 log.model_dump(mode="json") if isinstance(log, LogCreate) else log 

29 for log in logs 

30 ] 

31 self.request("POST", "/logs/", json=serialized_logs) 

32 

33 def read_logs( 1a

34 self, 

35 log_filter: "LogFilter | None" = None, 

36 limit: int | None = None, 

37 offset: int | None = None, 

38 sort: "LogSort | None" = None, 

39 ) -> list["Log"]: 

40 """ 

41 Read flow and task run logs. 

42 """ 

43 from prefect.client.schemas.sorting import LogSort 

44 

45 body: dict[str, Any] = { 

46 "logs": log_filter.model_dump(mode="json") if log_filter else None, 

47 "limit": limit, 

48 "offset": offset, 

49 "sort": sort or LogSort.TIMESTAMP_ASC, 

50 } 

51 response = self.request("POST", "/logs/filter", json=body) 

52 from prefect.client.schemas.objects import Log 

53 

54 return Log.model_validate_list(response.json()) 

55 

56 

57class LogAsyncClient(BaseAsyncClient): 1a

58 async def create_logs( 1a

59 self, logs: Iterable[Union["LogCreate", dict[str, Any]]] 

60 ) -> None: 

61 """ 

62 Create logs for a flow or task run 

63 

64 Args: 

65 logs: An iterable of `LogCreate` objects or already json-compatible dicts 

66 """ 

67 from prefect.client.schemas.actions import LogCreate 

68 

69 serialized_logs = [ 

70 log.model_dump(mode="json") if isinstance(log, LogCreate) else log 

71 for log in logs 

72 ] 

73 await self.request("POST", "/logs/", json=serialized_logs) 

74 

75 async def read_logs( 1a

76 self, 

77 log_filter: "LogFilter | None" = None, 

78 limit: int | None = None, 

79 offset: int | None = None, 

80 sort: "LogSort | None" = None, 

81 ) -> list[Log]: 

82 """ 

83 Read flow and task run logs. 

84 """ 

85 from prefect.client.schemas.sorting import LogSort 

86 

87 body: dict[str, Any] = { 

88 "logs": log_filter.model_dump(mode="json") if log_filter else None, 

89 "limit": limit, 

90 "offset": offset, 

91 "sort": sort or LogSort.TIMESTAMP_ASC, 

92 } 

93 

94 response = await self.request("POST", "/logs/filter", json=body) 

95 from prefect.client.schemas.objects import Log 

96 

97 return Log.model_validate_list(response.json())