Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow_runs_watching.py: 15%
104 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Utilities for following flow runs with interleaved events and logs
3"""
5from __future__ import annotations 1a
7from datetime import datetime 1a
8from typing import TYPE_CHECKING 1a
9from uuid import UUID 1a
11import anyio 1a
12from rich.console import Console 1a
14from prefect.client.orchestration import get_client 1a
15from prefect.client.schemas.objects import Log, StateType 1a
16from prefect.events import Event 1a
17from prefect.events.subscribers import FlowRunSubscriber 1a
18from prefect.exceptions import FlowRunWaitTimeout 1a
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a
21 from prefect.client.schemas.objects import FlowRun
24# Color mapping for state types
25STATE_TYPE_COLORS: dict[StateType, str] = { 1a
26 StateType.SCHEDULED: "yellow",
27 StateType.PENDING: "bright_black",
28 StateType.RUNNING: "blue",
29 StateType.COMPLETED: "green",
30 StateType.FAILED: "red",
31 StateType.CANCELLED: "bright_black",
32 StateType.CANCELLING: "bright_black",
33 StateType.CRASHED: "orange1",
34 StateType.PAUSED: "bright_black",
35}
38async def watch_flow_run( 1a
39 flow_run_id: UUID, console: Console, timeout: int | None = None
40) -> FlowRun:
41 """
42 Watch a flow run, displaying interleaved events and logs until completion.
44 Args:
45 flow_run_id: The ID of the flow run to watch
46 console: Rich console for output
47 timeout: Maximum time to wait for flow run completion in seconds.
48 If None, waits indefinitely.
50 Returns:
51 The finished flow run
53 Raises:
54 FlowRunWaitTimeout: If the flow run exceeds the timeout
55 """
56 formatter = FlowRunFormatter()
58 if timeout is not None:
59 with anyio.move_on_after(timeout) as cancel_scope:
60 async with FlowRunSubscriber(flow_run_id=flow_run_id) as subscriber:
61 async for item in subscriber:
62 console.print(formatter.format(item))
64 if cancel_scope.cancelled_caught:
65 raise FlowRunWaitTimeout(
66 f"Flow run with ID {flow_run_id} exceeded watch timeout of {timeout} seconds"
67 )
68 else:
69 async with FlowRunSubscriber(flow_run_id=flow_run_id) as subscriber:
70 async for item in subscriber:
71 console.print(formatter.format(item))
73 async with get_client() as client:
74 return await client.read_flow_run(flow_run_id)
77class FlowRunFormatter: 1a
78 """Handles formatting of logs and events for CLI display"""
80 def __init__(self): 1a
81 self._last_timestamp_parts = ["", "", "", ""]
82 self._last_datetime: datetime | None = None
84 def format_timestamp(self, dt: datetime) -> str: 1a
85 """Format timestamp with incremental display"""
86 ms = dt.strftime("%f")[:3]
87 current_parts = [dt.strftime("%H"), dt.strftime("%M"), dt.strftime("%S"), ms]
89 if self._last_datetime and dt < self._last_datetime:
90 self._last_timestamp_parts = current_parts[:]
91 self._last_datetime = dt
92 return f"{current_parts[0]}:{current_parts[1]}:{current_parts[2]}.{current_parts[3]}"
94 display_parts = []
95 for i, (last, current) in enumerate(
96 zip(self._last_timestamp_parts, current_parts)
97 ):
98 if current != last:
99 display_parts = current_parts[i:]
100 break
101 else:
102 display_parts = [current_parts[3]]
104 self._last_timestamp_parts = current_parts[:]
105 self._last_datetime = dt
107 if len(display_parts) == 4:
108 timestamp_str = f"{display_parts[0]}:{display_parts[1]}:{display_parts[2]}.{display_parts[3]}"
109 elif len(display_parts) == 3:
110 timestamp_str = f":{display_parts[0]}:{display_parts[1]}.{display_parts[2]}"
111 elif len(display_parts) == 2:
112 timestamp_str = f":{display_parts[0]}.{display_parts[1]}"
113 else:
114 timestamp_str = f".{display_parts[0]}"
116 return f"{timestamp_str:>12}"
118 def format_run_id(self, run_id_short: str) -> str: 1a
119 """Format run ID"""
120 return f"{run_id_short:>12}"
122 def format(self, item: Log | Event) -> str: 1a
123 """Format a log or event for display"""
124 if isinstance(item, Log):
125 return self.format_log(item)
126 else:
127 return self.format_event(item)
129 def format_log(self, log: Log) -> str: 1a
130 """Format a log entry"""
131 timestamp = self.format_timestamp(log.timestamp)
133 run_id = log.task_run_id or log.flow_run_id
134 run_id_short = str(run_id)[-12:] if run_id else "............"
135 run_id_display = self.format_run_id(run_id_short)
137 icon = "▪"
138 prefix_plain = f"{icon} {timestamp.strip()} {run_id_display.strip()} "
140 lines = log.message.split("\n")
141 if len(lines) == 1:
142 return f"[dim]▪[/dim] {timestamp} [dim]{run_id_display}[/dim] {log.message}"
144 first_line = f"[dim]▪[/dim] {timestamp} [dim]{run_id_display}[/dim] {lines[0]}"
145 indent = " " * len(prefix_plain)
146 continuation_lines = [f"{indent}{line}" for line in lines[1:]]
148 return first_line + "\n" + "\n".join(continuation_lines)
150 def format_event(self, event: Event) -> str: 1a
151 """Format an event"""
152 timestamp = self.format_timestamp(event.occurred)
154 run_id = None
156 if event.resource.id.startswith("prefect.task-run."):
157 run_id = event.resource.id.split(".", 2)[2]
158 elif event.resource.id.startswith("prefect.flow-run."):
159 run_id = event.resource.id.split(".", 2)[2]
161 if not run_id:
162 for related in event.related:
163 if related.id.startswith("prefect.task-run."):
164 run_id = related.id.split(".", 2)[2]
165 break
166 elif related.id.startswith("prefect.flow-run."):
167 run_id = related.id.split(".", 2)[2]
168 break
170 run_id_short = run_id[-12:] if run_id else "............"
171 run_id_display = self.format_run_id(run_id_short)
173 # Get state type from event resource or payload
174 state_type_str = event.resource.get("prefect.state-type")
175 if not state_type_str and "validated_state" in event.payload:
176 state_type_str = event.payload["validated_state"].get("type")
178 # Map state type to color
179 color = "bright_magenta" # default for unknown states
180 if state_type_str:
181 try:
182 state_type = StateType(state_type_str)
183 color = STATE_TYPE_COLORS.get(state_type, "bright_magenta")
184 except ValueError:
185 pass
187 name = event.resource.get("prefect.resource.name") or event.resource.id
188 return (
189 f"[{color}]●[/{color}] {timestamp} [dim]{run_id_display}[/dim] "
190 f"{event.event} * [bold cyan]{name}[/bold cyan]"
191 )