Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/task_run.py: 24%
95 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for working with task runs
3"""
5import logging 1a
6import webbrowser 1a
7from datetime import datetime 1a
8from typing import List, Optional, cast 1a
9from uuid import UUID 1a
11import orjson 1a
12import typer 1a
13from rich.pretty import Pretty 1a
14from rich.table import Table 1a
16from prefect.cli._types import PrefectTyper 1a
17from prefect.cli._utilities import exit_with_error, exit_with_success 1a
18from prefect.cli.root import app 1a
19from prefect.client.orchestration import get_client 1a
20from prefect.client.schemas.filters import ( 1a
21 LogFilter,
22 LogFilterTaskRunId,
23 TaskRunFilter,
24 TaskRunFilterName,
25 TaskRunFilterState,
26 TaskRunFilterStateName,
27 TaskRunFilterStateType,
28)
29from prefect.client.schemas.objects import StateType 1a
30from prefect.client.schemas.sorting import LogSort, TaskRunSort 1a
31from prefect.exceptions import ObjectNotFound 1a
32from prefect.types._datetime import ( 1a
33 human_friendly_diff,
34 to_datetime_string,
35)
36from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
37from prefect.utilities.urls import url_for 1a
39task_run_app: PrefectTyper = PrefectTyper( 1a
40 name="task-run", help="View and inspect task runs."
41)
42app.add_typer(task_run_app, aliases=["task-runs"]) 1a
44LOGS_DEFAULT_PAGE_SIZE = 200 1a
45LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20 1a
48@task_run_app.command() 1a
49async def inspect( 1a
50 id: UUID,
51 web: bool = typer.Option(
52 False,
53 "--web",
54 help="Open the task run in a web browser.",
55 ),
56 output: Optional[str] = typer.Option(
57 None,
58 "--output",
59 "-o",
60 help="Specify an output format. Currently supports: json",
61 ),
62):
63 """
64 View details about a task run.
65 """
66 if output and output.lower() != "json":
67 exit_with_error("Only 'json' output format is supported.")
69 async with get_client() as client:
70 try:
71 task_run = await client.read_task_run(id)
72 except ObjectNotFound:
73 exit_with_error(f"Task run '{id}' not found!")
75 if web:
76 task_run_url = url_for("task-run", obj_id=id)
77 if not task_run_url:
78 exit_with_error(
79 "Failed to generate URL for task run. Make sure PREFECT_UI_URL is configured."
80 )
82 await run_sync_in_worker_thread(webbrowser.open_new_tab, task_run_url)
83 exit_with_success(f"Opened task run {id!r} in browser.")
84 else:
85 if output and output.lower() == "json":
86 task_run_json = task_run.model_dump(mode="json")
87 json_output = orjson.dumps(
88 task_run_json, option=orjson.OPT_INDENT_2
89 ).decode()
90 app.console.print(json_output)
91 else:
92 app.console.print(Pretty(task_run))
95@task_run_app.command() 1a
96async def ls( 1a
97 task_run_name: List[str] = typer.Option(None, help="Name of the task"),
98 limit: int = typer.Option(15, help="Maximum number of task runs to list"),
99 state: List[str] = typer.Option(None, help="Name of the task run's state"),
100 state_type: List[StateType] = typer.Option(
101 None, help="Type of the task run's state"
102 ),
103):
104 """
105 View recent task runs
106 """
108 if state or state_type:
109 state_filter = TaskRunFilterState(
110 name=TaskRunFilterStateName(any_=[s.capitalize() for s in state])
111 if state
112 else None,
113 type=TaskRunFilterStateType(any_=state_type) if state_type else None,
114 )
115 else:
116 state_filter = None
118 async with get_client() as client:
119 task_runs = await client.read_task_runs(
120 task_run_filter=TaskRunFilter(
121 name=TaskRunFilterName(any_=task_run_name) if task_run_name else None,
122 state=state_filter,
123 ),
124 limit=limit,
125 sort=TaskRunSort.EXPECTED_START_TIME_DESC,
126 )
128 if not task_runs:
129 app.console.print("No task runs found.")
130 return
132 table = Table(title="Task Runs")
133 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
134 table.add_column("Task", style="blue", no_wrap=True)
135 table.add_column("Name", style="green", no_wrap=True)
136 table.add_column("State", no_wrap=True)
137 table.add_column("When", style="bold", no_wrap=True)
139 for task_run in sorted(
140 task_runs, key=lambda d: cast(datetime, d.created), reverse=True
141 ):
142 task = task_run
143 if task_run.state:
144 timestamp = (
145 task_run.state.state_details.scheduled_time
146 if task_run.state.is_scheduled()
147 else task_run.state.timestamp
148 )
149 else:
150 timestamp = task_run.created
152 table.add_row(
153 str(task_run.id),
154 str(task.name),
155 str(task_run.name),
156 str(task_run.state.type.value) if task_run.state else "Unknown",
157 human_friendly_diff(timestamp),
158 )
160 app.console.print(table)
163@task_run_app.command() 1a
164async def logs( 1a
165 id: UUID,
166 head: bool = typer.Option(
167 False,
168 "--head",
169 "-h",
170 help=(
171 f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
172 " all logs."
173 ),
174 ),
175 num_logs: int = typer.Option(
176 LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS,
177 "--num-logs",
178 "-n",
179 help=(
180 "Number of logs to show when using the --head or --tail flag. If None,"
181 f" defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}."
182 ),
183 min=1,
184 ),
185 reverse: bool = typer.Option(
186 False,
187 "--reverse",
188 "-r",
189 help="Reverse the logs order to print the most recent logs first",
190 ),
191 tail: bool = typer.Option(
192 False,
193 "--tail",
194 "-t",
195 help=(
196 f"Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
197 " all logs."
198 ),
199 ),
200):
201 """
202 View logs for a task run.
203 """
204 # Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time
205 offset = 0
206 more_logs = True
207 num_logs_returned = 0
209 # if head and tail flags are being used together
210 if head and tail:
211 exit_with_error("Please provide either a `head` or `tail` option but not both.")
213 # if using tail update offset according to LOGS_DEFAULT_PAGE_SIZE
214 if tail:
215 offset = max(0, num_logs - LOGS_DEFAULT_PAGE_SIZE)
217 log_filter = LogFilter(task_run_id=LogFilterTaskRunId(any_=[id]))
219 async with get_client() as client:
220 # Get the task run
221 try:
222 task_run = await client.read_task_run(id)
223 except ObjectNotFound:
224 exit_with_error(f"task run {str(id)!r} not found!")
226 while more_logs:
227 num_logs_to_return_from_page = min(
228 LOGS_DEFAULT_PAGE_SIZE, num_logs - num_logs_returned
229 )
231 # Get the next page of logs
232 page_logs = await client.read_logs(
233 log_filter=log_filter,
234 limit=num_logs_to_return_from_page,
235 offset=offset,
236 sort=(
237 LogSort.TIMESTAMP_DESC if reverse or tail else LogSort.TIMESTAMP_ASC
238 ),
239 )
241 for log in reversed(page_logs) if tail and not reverse else page_logs:
242 app.console.print(
243 # Print following the task run format (declared in logging.yml)
244 (
245 f"{to_datetime_string(log.timestamp)}.{log.timestamp.microsecond // 1000:03d} |"
246 f" {logging.getLevelName(log.level):7s} | Task run"
247 f" {task_run.name!r} - {log.message}"
248 ),
249 soft_wrap=True,
250 )
252 # Update the number of logs retrieved
253 num_logs_returned += num_logs_to_return_from_page
255 if tail:
256 # If the current offset is not 0, update the offset for the next page
257 if offset != 0:
258 offset = (
259 0
260 # Reset the offset to 0 if there are less logs than the LOGS_DEFAULT_PAGE_SIZE to get the remaining log
261 if offset < LOGS_DEFAULT_PAGE_SIZE
262 else offset - LOGS_DEFAULT_PAGE_SIZE
263 )
264 else:
265 more_logs = False
266 else:
267 if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE:
268 offset += LOGS_DEFAULT_PAGE_SIZE
269 else:
270 # No more logs to show, exit
271 more_logs = False