Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow_run.py: 21%
173 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Command line interface for working with flow runs
3"""
5from __future__ import annotations 1a
7import logging 1a
8import os 1a
9import signal 1a
10import threading 1a
11import webbrowser 1a
12from types import FrameType 1a
13from typing import List, Optional 1a
14from uuid import UUID 1a
16import httpx 1a
17import orjson 1a
18import typer 1a
19from rich.markup import escape 1a
20from rich.pretty import Pretty 1a
21from rich.table import Table 1a
22from starlette import status 1a
24from prefect.cli._types import PrefectTyper 1a
25from prefect.cli._utilities import exit_with_error, exit_with_success 1a
26from prefect.cli.root import app, is_interactive 1a
27from prefect.client.orchestration import get_client 1a
28from prefect.client.schemas.filters import FlowFilter, FlowRunFilter, LogFilter 1a
29from prefect.client.schemas.objects import StateType 1a
30from prefect.client.schemas.responses import SetStateStatus 1a
31from prefect.client.schemas.sorting import FlowRunSort, LogSort 1a
32from prefect.exceptions import ObjectNotFound 1a
33from prefect.logging import get_logger 1a
34from prefect.runner import Runner 1a
35from prefect.states import State 1a
36from prefect.types._datetime import human_friendly_diff 1a
37from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
38from prefect.utilities.urls import url_for 1a
40flow_run_app: PrefectTyper = PrefectTyper( 1a
41 name="flow-run", help="Interact with flow runs."
42)
43app.add_typer(flow_run_app, aliases=["flow-runs"]) 1a
45LOGS_DEFAULT_PAGE_SIZE = 200 1a
46LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20 1a
48logger: "logging.Logger" = get_logger(__name__) 1a
51@flow_run_app.command() 1a
52async def inspect( 1a
53 id: UUID,
54 web: bool = typer.Option(
55 False,
56 "--web",
57 help="Open the flow run in a web browser.",
58 ),
59 output: Optional[str] = typer.Option(
60 None,
61 "--output",
62 "-o",
63 help="Specify an output format. Currently supports: json",
64 ),
65):
66 """
67 View details about a flow run.
68 """
69 if output and output.lower() != "json":
70 exit_with_error("Only 'json' output format is supported.")
72 async with get_client() as client:
73 try:
74 flow_run = await client.read_flow_run(id)
75 except httpx.HTTPStatusError as exc:
76 if exc.response.status_code == status.HTTP_404_NOT_FOUND:
77 exit_with_error(f"Flow run {id!r} not found!")
78 else:
79 raise
81 if web:
82 flow_run_url = url_for("flow-run", obj_id=id)
83 if not flow_run_url:
84 exit_with_error(
85 "Failed to generate URL for flow run. Make sure PREFECT_UI_URL is configured."
86 )
88 await run_sync_in_worker_thread(webbrowser.open_new_tab, flow_run_url)
89 exit_with_success(f"Opened flow run {id!r} in browser.")
90 else:
91 if output and output.lower() == "json":
92 flow_run_json = flow_run.model_dump(mode="json")
93 json_output = orjson.dumps(
94 flow_run_json, option=orjson.OPT_INDENT_2
95 ).decode()
96 app.console.print(json_output)
97 else:
98 app.console.print(Pretty(flow_run))
101@flow_run_app.command() 1a
102async def ls( 1a
103 flow_name: List[str] = typer.Option(None, help="Name of the flow"),
104 limit: int = typer.Option(15, help="Maximum number of flow runs to list"),
105 state: List[str] = typer.Option(None, help="Name of the flow run's state"),
106 state_type: List[str] = typer.Option(None, help="Type of the flow run's state"),
107 output: Optional[str] = typer.Option(
108 None,
109 "--output",
110 "-o",
111 help="Specify an output format. Currently supports: json",
112 ),
113):
114 """
115 View recent flow runs or flow runs for specific flows.
117 Arguments:
119 flow_name: Name of the flow
121 limit: Maximum number of flow runs to list. Defaults to 15.
123 state: Name of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'PAUSED', 'SUSPENDED', 'AWAITINGRETRY', 'RETRYING', and 'LATE'.
125 state_type: Type of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'CRASHED', and 'PAUSED'.
127 Examples:
129 $ prefect flow-runs ls --state Running
131 $ prefect flow-runs ls --state Running --state late
133 $ prefect flow-runs ls --state-type RUNNING
135 $ prefect flow-runs ls --state-type RUNNING --state-type FAILED
136 """
137 if output and output.lower() != "json":
138 exit_with_error("Only 'json' output format is supported.")
140 # Handling `state` and `state_type` argument validity in the function instead of by specifying
141 # List[StateType] and List[StateName] in the type hints, allows users to provide
142 # case-insensitive arguments for `state` and `state_type`.
144 prefect_state_names = {
145 "SCHEDULED": "Scheduled",
146 "PENDING": "Pending",
147 "RUNNING": "Running",
148 "COMPLETED": "Completed",
149 "FAILED": "Failed",
150 "CANCELLED": "Cancelled",
151 "CRASHED": "Crashed",
152 "PAUSED": "Paused",
153 "CANCELLING": "Cancelling",
154 "SUSPENDED": "Suspended",
155 "AWAITINGRETRY": "AwaitingRetry",
156 "RETRYING": "Retrying",
157 "LATE": "Late",
158 }
160 state_filter = {}
161 formatted_states = []
163 if state:
164 for s in state:
165 uppercased_state = s.upper()
166 if uppercased_state in prefect_state_names:
167 capitalized_state = prefect_state_names[uppercased_state]
168 formatted_states.append(capitalized_state)
169 else:
170 # Do not change the case of the state name if it is not one of the official Prefect state names
171 formatted_states.append(s)
172 logger.warning(
173 f"State name {repr(s)} is not one of the official Prefect state names."
174 )
176 state_filter["name"] = {"any_": formatted_states}
178 if state_type:
179 upper_cased_states = [s.upper() for s in state_type]
180 if not all(s in StateType.__members__ for s in upper_cased_states):
181 exit_with_error(
182 f"Invalid state type. Options are {', '.join(StateType.__members__)}."
183 )
185 state_filter["type"] = {
186 "any_": [StateType[s].value for s in upper_cased_states]
187 }
189 async with get_client() as client:
190 flow_runs = await client.read_flow_runs(
191 flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None,
192 flow_run_filter=FlowRunFilter(state=state_filter) if state_filter else None,
193 limit=limit,
194 sort=FlowRunSort.EXPECTED_START_TIME_DESC,
195 )
196 flows_by_id = {
197 flow.id: flow
198 for flow in await client.read_flows(
199 flow_filter=FlowFilter(id={"any_": [run.flow_id for run in flow_runs]})
200 )
201 }
203 if not flow_runs:
204 if output and output.lower() == "json":
205 app.console.print("[]")
206 return
207 exit_with_success("No flow runs found.")
209 if output and output.lower() == "json":
210 flow_runs_json = [flow_run.model_dump(mode="json") for flow_run in flow_runs]
211 json_output = orjson.dumps(flow_runs_json, option=orjson.OPT_INDENT_2).decode()
212 app.console.print(json_output)
213 else:
214 table = Table(title="Flow Runs")
215 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
216 table.add_column("Flow", style="blue", no_wrap=True)
217 table.add_column("Name", style="green", no_wrap=True)
218 table.add_column("State", no_wrap=True)
219 table.add_column("When", style="bold", no_wrap=True)
221 for flow_run in sorted(flow_runs, key=lambda d: d.created, reverse=True):
222 flow = flows_by_id[flow_run.flow_id]
223 timestamp = (
224 flow_run.state.state_details.scheduled_time
225 if flow_run.state.is_scheduled()
226 else flow_run.state.timestamp
227 )
228 table.add_row(
229 str(flow_run.id),
230 str(flow.name),
231 str(flow_run.name),
232 str(flow_run.state.type.value),
233 human_friendly_diff(timestamp),
234 )
236 app.console.print(table)
239@flow_run_app.command() 1a
240async def delete(id: UUID): 1a
241 """
242 Delete a flow run by ID.
243 """
244 async with get_client() as client:
245 try:
246 if is_interactive() and not typer.confirm(
247 (f"Are you sure you want to delete flow run with id {id!r}?"),
248 default=False,
249 ):
250 exit_with_error("Deletion aborted.")
251 await client.delete_flow_run(id)
252 except ObjectNotFound:
253 exit_with_error(f"Flow run '{id}' not found!")
255 exit_with_success(f"Successfully deleted flow run '{id}'.")
258@flow_run_app.command() 1a
259async def cancel(id: UUID): 1a
260 """Cancel a flow run by ID."""
261 async with get_client() as client:
262 cancelling_state = State(type=StateType.CANCELLING)
263 try:
264 result = await client.set_flow_run_state(
265 flow_run_id=id, state=cancelling_state
266 )
267 except ObjectNotFound:
268 exit_with_error(f"Flow run '{id}' not found!")
270 if result.status == SetStateStatus.ABORT:
271 exit_with_error(
272 f"Flow run '{id}' was unable to be cancelled. Reason:"
273 f" '{result.details.reason}'"
274 )
276 exit_with_success(f"Flow run '{id}' was successfully scheduled for cancellation.")
279@flow_run_app.command() 1a
280async def logs( 1a
281 id: UUID,
282 head: bool = typer.Option(
283 False,
284 "--head",
285 "-h",
286 help=(
287 f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
288 " all logs."
289 ),
290 ),
291 num_logs: int = typer.Option(
292 None,
293 "--num-logs",
294 "-n",
295 help=(
296 "Number of logs to show when using the --head or --tail flag. If None,"
297 f" defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}."
298 ),
299 min=1,
300 ),
301 reverse: bool = typer.Option(
302 False,
303 "--reverse",
304 "-r",
305 help="Reverse the logs order to print the most recent logs first",
306 ),
307 tail: bool = typer.Option(
308 False,
309 "--tail",
310 "-t",
311 help=(
312 f"Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
313 " all logs."
314 ),
315 ),
316):
317 """
318 View logs for a flow run.
319 """
320 # Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time
321 offset = 0
322 more_logs = True
323 num_logs_returned = 0
325 # if head and tail flags are being used together
326 if head and tail:
327 exit_with_error("Please provide either a `head` or `tail` option but not both.")
329 user_specified_num_logs = (
330 num_logs or LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
331 if head or tail or num_logs
332 else None
333 )
335 # if using tail update offset according to LOGS_DEFAULT_PAGE_SIZE
336 if tail:
337 offset = max(0, user_specified_num_logs - LOGS_DEFAULT_PAGE_SIZE)
339 log_filter = LogFilter(flow_run_id={"any_": [id]})
341 async with get_client() as client:
342 # Get the flow run
343 try:
344 flow_run = await client.read_flow_run(id)
345 except ObjectNotFound:
346 exit_with_error(f"Flow run {str(id)!r} not found!")
348 while more_logs:
349 num_logs_to_return_from_page = (
350 LOGS_DEFAULT_PAGE_SIZE
351 if user_specified_num_logs is None
352 else min(
353 LOGS_DEFAULT_PAGE_SIZE, user_specified_num_logs - num_logs_returned
354 )
355 )
357 # Get the next page of logs
358 page_logs = await client.read_logs(
359 log_filter=log_filter,
360 limit=num_logs_to_return_from_page,
361 offset=offset,
362 sort=(
363 LogSort.TIMESTAMP_DESC if reverse or tail else LogSort.TIMESTAMP_ASC
364 ),
365 )
367 for log in reversed(page_logs) if tail and not reverse else page_logs:
368 # Print following the flow run format (declared in logging.yml)
369 timestamp = f"{log.timestamp:%Y-%m-%d %H:%M:%S.%f}"[:-3]
370 log_level = f"{logging.getLevelName(log.level):7s}"
371 flow_run_info = f"Flow run {flow_run.name!r} - {escape(log.message)}"
373 log_message = f"{timestamp} | {log_level} | {flow_run_info}"
374 app.console.print(
375 log_message,
376 soft_wrap=True,
377 )
379 # Update the number of logs retrieved
380 num_logs_returned += num_logs_to_return_from_page
382 if tail:
383 # If the current offset is not 0, update the offset for the next page
384 if offset != 0:
385 offset = (
386 0
387 # Reset the offset to 0 if there are less logs than the LOGS_DEFAULT_PAGE_SIZE to get the remaining log
388 if offset < LOGS_DEFAULT_PAGE_SIZE
389 else offset - LOGS_DEFAULT_PAGE_SIZE
390 )
391 else:
392 more_logs = False
393 else:
394 if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE:
395 offset += LOGS_DEFAULT_PAGE_SIZE
396 else:
397 # No more logs to show, exit
398 more_logs = False
401@flow_run_app.command() 1a
402async def execute( 1a
403 id: Optional[UUID] = typer.Argument(None, help="ID of the flow run to execute"),
404):
405 if id is None:
406 environ_flow_id = os.environ.get("PREFECT__FLOW_RUN_ID")
407 if environ_flow_id:
408 id = UUID(environ_flow_id)
410 if id is None:
411 exit_with_error("Could not determine the ID of the flow run to execute.")
413 runner = Runner()
415 def _handle_reschedule_sigterm(_signal: int, _frame: FrameType | None):
416 logger.info("SIGTERM received, initiating graceful shutdown...")
417 runner.reschedule_current_flow_runs()
418 exit_with_success("Flow run successfully rescheduled.")
420 # Set up signal handling to reschedule run on SIGTERM
421 on_sigterm = os.environ.get("PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR", "").lower()
422 if (
423 threading.current_thread() is threading.main_thread()
424 and on_sigterm == "reschedule"
425 ):
426 signal.signal(signal.SIGTERM, _handle_reschedule_sigterm)
428 await runner.execute_flow_run(id)