Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/task_run.py: 24%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Command line interface for working with task runs 

3""" 

4 

5import logging 1a

6import webbrowser 1a

7from datetime import datetime 1a

8from typing import List, Optional, cast 1a

9from uuid import UUID 1a

10 

11import orjson 1a

12import typer 1a

13from rich.pretty import Pretty 1a

14from rich.table import Table 1a

15 

16from prefect.cli._types import PrefectTyper 1a

17from prefect.cli._utilities import exit_with_error, exit_with_success 1a

18from prefect.cli.root import app 1a

19from prefect.client.orchestration import get_client 1a

20from prefect.client.schemas.filters import ( 1a

21 LogFilter, 

22 LogFilterTaskRunId, 

23 TaskRunFilter, 

24 TaskRunFilterName, 

25 TaskRunFilterState, 

26 TaskRunFilterStateName, 

27 TaskRunFilterStateType, 

28) 

29from prefect.client.schemas.objects import StateType 1a

30from prefect.client.schemas.sorting import LogSort, TaskRunSort 1a

31from prefect.exceptions import ObjectNotFound 1a

32from prefect.types._datetime import ( 1a

33 human_friendly_diff, 

34 to_datetime_string, 

35) 

36from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

37from prefect.utilities.urls import url_for 1a

38 

39task_run_app: PrefectTyper = PrefectTyper( 1a

40 name="task-run", help="View and inspect task runs." 

41) 

42app.add_typer(task_run_app, aliases=["task-runs"]) 1a

43 

44LOGS_DEFAULT_PAGE_SIZE = 200 1a

45LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20 1a

46 

47 

48@task_run_app.command() 1a

49async def inspect( 1a

50 id: UUID, 

51 web: bool = typer.Option( 

52 False, 

53 "--web", 

54 help="Open the task run in a web browser.", 

55 ), 

56 output: Optional[str] = typer.Option( 

57 None, 

58 "--output", 

59 "-o", 

60 help="Specify an output format. Currently supports: json", 

61 ), 

62): 

63 """ 

64 View details about a task run. 

65 """ 

66 if output and output.lower() != "json": 

67 exit_with_error("Only 'json' output format is supported.") 

68 

69 async with get_client() as client: 

70 try: 

71 task_run = await client.read_task_run(id) 

72 except ObjectNotFound: 

73 exit_with_error(f"Task run '{id}' not found!") 

74 

75 if web: 

76 task_run_url = url_for("task-run", obj_id=id) 

77 if not task_run_url: 

78 exit_with_error( 

79 "Failed to generate URL for task run. Make sure PREFECT_UI_URL is configured." 

80 ) 

81 

82 await run_sync_in_worker_thread(webbrowser.open_new_tab, task_run_url) 

83 exit_with_success(f"Opened task run {id!r} in browser.") 

84 else: 

85 if output and output.lower() == "json": 

86 task_run_json = task_run.model_dump(mode="json") 

87 json_output = orjson.dumps( 

88 task_run_json, option=orjson.OPT_INDENT_2 

89 ).decode() 

90 app.console.print(json_output) 

91 else: 

92 app.console.print(Pretty(task_run)) 

93 

94 

95@task_run_app.command() 1a

96async def ls( 1a

97 task_run_name: List[str] = typer.Option(None, help="Name of the task"), 

98 limit: int = typer.Option(15, help="Maximum number of task runs to list"), 

99 state: List[str] = typer.Option(None, help="Name of the task run's state"), 

100 state_type: List[StateType] = typer.Option( 

101 None, help="Type of the task run's state" 

102 ), 

103): 

104 """ 

105 View recent task runs 

106 """ 

107 

108 if state or state_type: 

109 state_filter = TaskRunFilterState( 

110 name=TaskRunFilterStateName(any_=[s.capitalize() for s in state]) 

111 if state 

112 else None, 

113 type=TaskRunFilterStateType(any_=state_type) if state_type else None, 

114 ) 

115 else: 

116 state_filter = None 

117 

118 async with get_client() as client: 

119 task_runs = await client.read_task_runs( 

120 task_run_filter=TaskRunFilter( 

121 name=TaskRunFilterName(any_=task_run_name) if task_run_name else None, 

122 state=state_filter, 

123 ), 

124 limit=limit, 

125 sort=TaskRunSort.EXPECTED_START_TIME_DESC, 

126 ) 

127 

128 if not task_runs: 

129 app.console.print("No task runs found.") 

130 return 

131 

132 table = Table(title="Task Runs") 

133 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

134 table.add_column("Task", style="blue", no_wrap=True) 

135 table.add_column("Name", style="green", no_wrap=True) 

136 table.add_column("State", no_wrap=True) 

137 table.add_column("When", style="bold", no_wrap=True) 

138 

139 for task_run in sorted( 

140 task_runs, key=lambda d: cast(datetime, d.created), reverse=True 

141 ): 

142 task = task_run 

143 if task_run.state: 

144 timestamp = ( 

145 task_run.state.state_details.scheduled_time 

146 if task_run.state.is_scheduled() 

147 else task_run.state.timestamp 

148 ) 

149 else: 

150 timestamp = task_run.created 

151 

152 table.add_row( 

153 str(task_run.id), 

154 str(task.name), 

155 str(task_run.name), 

156 str(task_run.state.type.value) if task_run.state else "Unknown", 

157 human_friendly_diff(timestamp), 

158 ) 

159 

160 app.console.print(table) 

161 

162 

163@task_run_app.command() 1a

164async def logs( 1a

165 id: UUID, 

166 head: bool = typer.Option( 

167 False, 

168 "--head", 

169 "-h", 

170 help=( 

171 f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of" 

172 " all logs." 

173 ), 

174 ), 

175 num_logs: int = typer.Option( 

176 LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS, 

177 "--num-logs", 

178 "-n", 

179 help=( 

180 "Number of logs to show when using the --head or --tail flag. If None," 

181 f" defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}." 

182 ), 

183 min=1, 

184 ), 

185 reverse: bool = typer.Option( 

186 False, 

187 "--reverse", 

188 "-r", 

189 help="Reverse the logs order to print the most recent logs first", 

190 ), 

191 tail: bool = typer.Option( 

192 False, 

193 "--tail", 

194 "-t", 

195 help=( 

196 f"Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of" 

197 " all logs." 

198 ), 

199 ), 

200): 

201 """ 

202 View logs for a task run. 

203 """ 

204 # Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time 

205 offset = 0 

206 more_logs = True 

207 num_logs_returned = 0 

208 

209 # if head and tail flags are being used together 

210 if head and tail: 

211 exit_with_error("Please provide either a `head` or `tail` option but not both.") 

212 

213 # if using tail update offset according to LOGS_DEFAULT_PAGE_SIZE 

214 if tail: 

215 offset = max(0, num_logs - LOGS_DEFAULT_PAGE_SIZE) 

216 

217 log_filter = LogFilter(task_run_id=LogFilterTaskRunId(any_=[id])) 

218 

219 async with get_client() as client: 

220 # Get the task run 

221 try: 

222 task_run = await client.read_task_run(id) 

223 except ObjectNotFound: 

224 exit_with_error(f"task run {str(id)!r} not found!") 

225 

226 while more_logs: 

227 num_logs_to_return_from_page = min( 

228 LOGS_DEFAULT_PAGE_SIZE, num_logs - num_logs_returned 

229 ) 

230 

231 # Get the next page of logs 

232 page_logs = await client.read_logs( 

233 log_filter=log_filter, 

234 limit=num_logs_to_return_from_page, 

235 offset=offset, 

236 sort=( 

237 LogSort.TIMESTAMP_DESC if reverse or tail else LogSort.TIMESTAMP_ASC 

238 ), 

239 ) 

240 

241 for log in reversed(page_logs) if tail and not reverse else page_logs: 

242 app.console.print( 

243 # Print following the task run format (declared in logging.yml) 

244 ( 

245 f"{to_datetime_string(log.timestamp)}.{log.timestamp.microsecond // 1000:03d} |" 

246 f" {logging.getLevelName(log.level):7s} | Task run" 

247 f" {task_run.name!r} - {log.message}" 

248 ), 

249 soft_wrap=True, 

250 ) 

251 

252 # Update the number of logs retrieved 

253 num_logs_returned += num_logs_to_return_from_page 

254 

255 if tail: 

256 # If the current offset is not 0, update the offset for the next page 

257 if offset != 0: 

258 offset = ( 

259 0 

260 # Reset the offset to 0 if there are less logs than the LOGS_DEFAULT_PAGE_SIZE to get the remaining log 

261 if offset < LOGS_DEFAULT_PAGE_SIZE 

262 else offset - LOGS_DEFAULT_PAGE_SIZE 

263 ) 

264 else: 

265 more_logs = False 

266 else: 

267 if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE: 

268 offset += LOGS_DEFAULT_PAGE_SIZE 

269 else: 

270 # No more logs to show, exit 

271 more_logs = False