Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow_runs_watching.py: 15%

104 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Utilities for following flow runs with interleaved events and logs 

3""" 

4 

5from __future__ import annotations 1a

6 

7from datetime import datetime 1a

8from typing import TYPE_CHECKING 1a

9from uuid import UUID 1a

10 

11import anyio 1a

12from rich.console import Console 1a

13 

14from prefect.client.orchestration import get_client 1a

15from prefect.client.schemas.objects import Log, StateType 1a

16from prefect.events import Event 1a

17from prefect.events.subscribers import FlowRunSubscriber 1a

18from prefect.exceptions import FlowRunWaitTimeout 1a

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a

21 from prefect.client.schemas.objects import FlowRun 

22 

23 

24# Color mapping for state types 

25STATE_TYPE_COLORS: dict[StateType, str] = { 1a

26 StateType.SCHEDULED: "yellow", 

27 StateType.PENDING: "bright_black", 

28 StateType.RUNNING: "blue", 

29 StateType.COMPLETED: "green", 

30 StateType.FAILED: "red", 

31 StateType.CANCELLED: "bright_black", 

32 StateType.CANCELLING: "bright_black", 

33 StateType.CRASHED: "orange1", 

34 StateType.PAUSED: "bright_black", 

35} 

36 

37 

38async def watch_flow_run( 1a

39 flow_run_id: UUID, console: Console, timeout: int | None = None 

40) -> FlowRun: 

41 """ 

42 Watch a flow run, displaying interleaved events and logs until completion. 

43 

44 Args: 

45 flow_run_id: The ID of the flow run to watch 

46 console: Rich console for output 

47 timeout: Maximum time to wait for flow run completion in seconds. 

48 If None, waits indefinitely. 

49 

50 Returns: 

51 The finished flow run 

52 

53 Raises: 

54 FlowRunWaitTimeout: If the flow run exceeds the timeout 

55 """ 

56 formatter = FlowRunFormatter() 

57 

58 if timeout is not None: 

59 with anyio.move_on_after(timeout) as cancel_scope: 

60 async with FlowRunSubscriber(flow_run_id=flow_run_id) as subscriber: 

61 async for item in subscriber: 

62 console.print(formatter.format(item)) 

63 

64 if cancel_scope.cancelled_caught: 

65 raise FlowRunWaitTimeout( 

66 f"Flow run with ID {flow_run_id} exceeded watch timeout of {timeout} seconds" 

67 ) 

68 else: 

69 async with FlowRunSubscriber(flow_run_id=flow_run_id) as subscriber: 

70 async for item in subscriber: 

71 console.print(formatter.format(item)) 

72 

73 async with get_client() as client: 

74 return await client.read_flow_run(flow_run_id) 

75 

76 

77class FlowRunFormatter: 1a

78 """Handles formatting of logs and events for CLI display""" 

79 

80 def __init__(self): 1a

81 self._last_timestamp_parts = ["", "", "", ""] 

82 self._last_datetime: datetime | None = None 

83 

84 def format_timestamp(self, dt: datetime) -> str: 1a

85 """Format timestamp with incremental display""" 

86 ms = dt.strftime("%f")[:3] 

87 current_parts = [dt.strftime("%H"), dt.strftime("%M"), dt.strftime("%S"), ms] 

88 

89 if self._last_datetime and dt < self._last_datetime: 

90 self._last_timestamp_parts = current_parts[:] 

91 self._last_datetime = dt 

92 return f"{current_parts[0]}:{current_parts[1]}:{current_parts[2]}.{current_parts[3]}" 

93 

94 display_parts = [] 

95 for i, (last, current) in enumerate( 

96 zip(self._last_timestamp_parts, current_parts) 

97 ): 

98 if current != last: 

99 display_parts = current_parts[i:] 

100 break 

101 else: 

102 display_parts = [current_parts[3]] 

103 

104 self._last_timestamp_parts = current_parts[:] 

105 self._last_datetime = dt 

106 

107 if len(display_parts) == 4: 

108 timestamp_str = f"{display_parts[0]}:{display_parts[1]}:{display_parts[2]}.{display_parts[3]}" 

109 elif len(display_parts) == 3: 

110 timestamp_str = f":{display_parts[0]}:{display_parts[1]}.{display_parts[2]}" 

111 elif len(display_parts) == 2: 

112 timestamp_str = f":{display_parts[0]}.{display_parts[1]}" 

113 else: 

114 timestamp_str = f".{display_parts[0]}" 

115 

116 return f"{timestamp_str:>12}" 

117 

118 def format_run_id(self, run_id_short: str) -> str: 1a

119 """Format run ID""" 

120 return f"{run_id_short:>12}" 

121 

122 def format(self, item: Log | Event) -> str: 1a

123 """Format a log or event for display""" 

124 if isinstance(item, Log): 

125 return self.format_log(item) 

126 else: 

127 return self.format_event(item) 

128 

129 def format_log(self, log: Log) -> str: 1a

130 """Format a log entry""" 

131 timestamp = self.format_timestamp(log.timestamp) 

132 

133 run_id = log.task_run_id or log.flow_run_id 

134 run_id_short = str(run_id)[-12:] if run_id else "............" 

135 run_id_display = self.format_run_id(run_id_short) 

136 

137 icon = "▪" 

138 prefix_plain = f"{icon} {timestamp.strip()} {run_id_display.strip()} " 

139 

140 lines = log.message.split("\n") 

141 if len(lines) == 1: 

142 return f"[dim]▪[/dim] {timestamp} [dim]{run_id_display}[/dim] {log.message}" 

143 

144 first_line = f"[dim]▪[/dim] {timestamp} [dim]{run_id_display}[/dim] {lines[0]}" 

145 indent = " " * len(prefix_plain) 

146 continuation_lines = [f"{indent}{line}" for line in lines[1:]] 

147 

148 return first_line + "\n" + "\n".join(continuation_lines) 

149 

150 def format_event(self, event: Event) -> str: 1a

151 """Format an event""" 

152 timestamp = self.format_timestamp(event.occurred) 

153 

154 run_id = None 

155 

156 if event.resource.id.startswith("prefect.task-run."): 

157 run_id = event.resource.id.split(".", 2)[2] 

158 elif event.resource.id.startswith("prefect.flow-run."): 

159 run_id = event.resource.id.split(".", 2)[2] 

160 

161 if not run_id: 

162 for related in event.related: 

163 if related.id.startswith("prefect.task-run."): 

164 run_id = related.id.split(".", 2)[2] 

165 break 

166 elif related.id.startswith("prefect.flow-run."): 

167 run_id = related.id.split(".", 2)[2] 

168 break 

169 

170 run_id_short = run_id[-12:] if run_id else "............" 

171 run_id_display = self.format_run_id(run_id_short) 

172 

173 # Get state type from event resource or payload 

174 state_type_str = event.resource.get("prefect.state-type") 

175 if not state_type_str and "validated_state" in event.payload: 

176 state_type_str = event.payload["validated_state"].get("type") 

177 

178 # Map state type to color 

179 color = "bright_magenta" # default for unknown states 

180 if state_type_str: 

181 try: 

182 state_type = StateType(state_type_str) 

183 color = STATE_TYPE_COLORS.get(state_type, "bright_magenta") 

184 except ValueError: 

185 pass 

186 

187 name = event.resource.get("prefect.resource.name") or event.resource.id 

188 return ( 

189 f"[{color}]●[/{color}] {timestamp} [dim]{run_id_display}[/dim] " 

190 f"{event.event} * [bold cyan]{name}[/bold cyan]" 

191 )