Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/flow_run.py: 21%

173 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for working with flow runs 

3""" 

4 

5from __future__ import annotations 1a

6 

7import logging 1a

8import os 1a

9import signal 1a

10import threading 1a

11import webbrowser 1a

12from types import FrameType 1a

13from typing import List, Optional 1a

14from uuid import UUID 1a

15 

16import httpx 1a

17import orjson 1a

18import typer 1a

19from rich.markup import escape 1a

20from rich.pretty import Pretty 1a

21from rich.table import Table 1a

22from starlette import status 1a

23 

24from prefect.cli._types import PrefectTyper 1a

25from prefect.cli._utilities import exit_with_error, exit_with_success 1a

26from prefect.cli.root import app, is_interactive 1a

27from prefect.client.orchestration import get_client 1a

28from prefect.client.schemas.filters import FlowFilter, FlowRunFilter, LogFilter 1a

29from prefect.client.schemas.objects import StateType 1a

30from prefect.client.schemas.responses import SetStateStatus 1a

31from prefect.client.schemas.sorting import FlowRunSort, LogSort 1a

32from prefect.exceptions import ObjectNotFound 1a

33from prefect.logging import get_logger 1a

34from prefect.runner import Runner 1a

35from prefect.states import State 1a

36from prefect.types._datetime import human_friendly_diff 1a

37from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

38from prefect.utilities.urls import url_for 1a

39 

40flow_run_app: PrefectTyper = PrefectTyper( 1a

41 name="flow-run", help="Interact with flow runs." 

42) 

43app.add_typer(flow_run_app, aliases=["flow-runs"]) 1a

44 

45LOGS_DEFAULT_PAGE_SIZE = 200 1a

46LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS = 20 1a

47 

48logger: "logging.Logger" = get_logger(__name__) 1a

49 

50 

51@flow_run_app.command() 1a

52async def inspect( 1a

53 id: UUID, 

54 web: bool = typer.Option( 

55 False, 

56 "--web", 

57 help="Open the flow run in a web browser.", 

58 ), 

59 output: Optional[str] = typer.Option( 

60 None, 

61 "--output", 

62 "-o", 

63 help="Specify an output format. Currently supports: json", 

64 ), 

65): 

66 """ 

67 View details about a flow run. 

68 """ 

69 if output and output.lower() != "json": 

70 exit_with_error("Only 'json' output format is supported.") 

71 

72 async with get_client() as client: 

73 try: 

74 flow_run = await client.read_flow_run(id) 

75 except httpx.HTTPStatusError as exc: 

76 if exc.response.status_code == status.HTTP_404_NOT_FOUND: 

77 exit_with_error(f"Flow run {id!r} not found!") 

78 else: 

79 raise 

80 

81 if web: 

82 flow_run_url = url_for("flow-run", obj_id=id) 

83 if not flow_run_url: 

84 exit_with_error( 

85 "Failed to generate URL for flow run. Make sure PREFECT_UI_URL is configured." 

86 ) 

87 

88 await run_sync_in_worker_thread(webbrowser.open_new_tab, flow_run_url) 

89 exit_with_success(f"Opened flow run {id!r} in browser.") 

90 else: 

91 if output and output.lower() == "json": 

92 flow_run_json = flow_run.model_dump(mode="json") 

93 json_output = orjson.dumps( 

94 flow_run_json, option=orjson.OPT_INDENT_2 

95 ).decode() 

96 app.console.print(json_output) 

97 else: 

98 app.console.print(Pretty(flow_run)) 

99 

100 

101@flow_run_app.command() 1a

102async def ls( 1a

103 flow_name: List[str] = typer.Option(None, help="Name of the flow"), 

104 limit: int = typer.Option(15, help="Maximum number of flow runs to list"), 

105 state: List[str] = typer.Option(None, help="Name of the flow run's state"), 

106 state_type: List[str] = typer.Option(None, help="Type of the flow run's state"), 

107 output: Optional[str] = typer.Option( 

108 None, 

109 "--output", 

110 "-o", 

111 help="Specify an output format. Currently supports: json", 

112 ), 

113): 

114 """ 

115 View recent flow runs or flow runs for specific flows. 

116 

117 Arguments: 

118 

119 flow_name: Name of the flow 

120 

121 limit: Maximum number of flow runs to list. Defaults to 15. 

122 

123 state: Name of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'PAUSED', 'SUSPENDED', 'AWAITINGRETRY', 'RETRYING', and 'LATE'. 

124 

125 state_type: Type of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'CRASHED', and 'PAUSED'. 

126 

127 Examples: 

128 

129 $ prefect flow-runs ls --state Running 

130 

131 $ prefect flow-runs ls --state Running --state late 

132 

133 $ prefect flow-runs ls --state-type RUNNING 

134 

135 $ prefect flow-runs ls --state-type RUNNING --state-type FAILED 

136 """ 

137 if output and output.lower() != "json": 

138 exit_with_error("Only 'json' output format is supported.") 

139 

140 # Handling `state` and `state_type` argument validity in the function instead of by specifying 

141 # List[StateType] and List[StateName] in the type hints, allows users to provide 

142 # case-insensitive arguments for `state` and `state_type`. 

143 

144 prefect_state_names = { 

145 "SCHEDULED": "Scheduled", 

146 "PENDING": "Pending", 

147 "RUNNING": "Running", 

148 "COMPLETED": "Completed", 

149 "FAILED": "Failed", 

150 "CANCELLED": "Cancelled", 

151 "CRASHED": "Crashed", 

152 "PAUSED": "Paused", 

153 "CANCELLING": "Cancelling", 

154 "SUSPENDED": "Suspended", 

155 "AWAITINGRETRY": "AwaitingRetry", 

156 "RETRYING": "Retrying", 

157 "LATE": "Late", 

158 } 

159 

160 state_filter = {} 

161 formatted_states = [] 

162 

163 if state: 

164 for s in state: 

165 uppercased_state = s.upper() 

166 if uppercased_state in prefect_state_names: 

167 capitalized_state = prefect_state_names[uppercased_state] 

168 formatted_states.append(capitalized_state) 

169 else: 

170 # Do not change the case of the state name if it is not one of the official Prefect state names 

171 formatted_states.append(s) 

172 logger.warning( 

173 f"State name {repr(s)} is not one of the official Prefect state names." 

174 ) 

175 

176 state_filter["name"] = {"any_": formatted_states} 

177 

178 if state_type: 

179 upper_cased_states = [s.upper() for s in state_type] 

180 if not all(s in StateType.__members__ for s in upper_cased_states): 

181 exit_with_error( 

182 f"Invalid state type. Options are {', '.join(StateType.__members__)}." 

183 ) 

184 

185 state_filter["type"] = { 

186 "any_": [StateType[s].value for s in upper_cased_states] 

187 } 

188 

189 async with get_client() as client: 

190 flow_runs = await client.read_flow_runs( 

191 flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None, 

192 flow_run_filter=FlowRunFilter(state=state_filter) if state_filter else None, 

193 limit=limit, 

194 sort=FlowRunSort.EXPECTED_START_TIME_DESC, 

195 ) 

196 flows_by_id = { 

197 flow.id: flow 

198 for flow in await client.read_flows( 

199 flow_filter=FlowFilter(id={"any_": [run.flow_id for run in flow_runs]}) 

200 ) 

201 } 

202 

203 if not flow_runs: 

204 if output and output.lower() == "json": 

205 app.console.print("[]") 

206 return 

207 exit_with_success("No flow runs found.") 

208 

209 if output and output.lower() == "json": 

210 flow_runs_json = [flow_run.model_dump(mode="json") for flow_run in flow_runs] 

211 json_output = orjson.dumps(flow_runs_json, option=orjson.OPT_INDENT_2).decode() 

212 app.console.print(json_output) 

213 else: 

214 table = Table(title="Flow Runs") 

215 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

216 table.add_column("Flow", style="blue", no_wrap=True) 

217 table.add_column("Name", style="green", no_wrap=True) 

218 table.add_column("State", no_wrap=True) 

219 table.add_column("When", style="bold", no_wrap=True) 

220 

221 for flow_run in sorted(flow_runs, key=lambda d: d.created, reverse=True): 

222 flow = flows_by_id[flow_run.flow_id] 

223 timestamp = ( 

224 flow_run.state.state_details.scheduled_time 

225 if flow_run.state.is_scheduled() 

226 else flow_run.state.timestamp 

227 ) 

228 table.add_row( 

229 str(flow_run.id), 

230 str(flow.name), 

231 str(flow_run.name), 

232 str(flow_run.state.type.value), 

233 human_friendly_diff(timestamp), 

234 ) 

235 

236 app.console.print(table) 

237 

238 

239@flow_run_app.command() 1a

240async def delete(id: UUID): 1a

241 """ 

242 Delete a flow run by ID. 

243 """ 

244 async with get_client() as client: 

245 try: 

246 if is_interactive() and not typer.confirm( 

247 (f"Are you sure you want to delete flow run with id {id!r}?"), 

248 default=False, 

249 ): 

250 exit_with_error("Deletion aborted.") 

251 await client.delete_flow_run(id) 

252 except ObjectNotFound: 

253 exit_with_error(f"Flow run '{id}' not found!") 

254 

255 exit_with_success(f"Successfully deleted flow run '{id}'.") 

256 

257 

258@flow_run_app.command() 1a

259async def cancel(id: UUID): 1a

260 """Cancel a flow run by ID.""" 

261 async with get_client() as client: 

262 cancelling_state = State(type=StateType.CANCELLING) 

263 try: 

264 result = await client.set_flow_run_state( 

265 flow_run_id=id, state=cancelling_state 

266 ) 

267 except ObjectNotFound: 

268 exit_with_error(f"Flow run '{id}' not found!") 

269 

270 if result.status == SetStateStatus.ABORT: 

271 exit_with_error( 

272 f"Flow run '{id}' was unable to be cancelled. Reason:" 

273 f" '{result.details.reason}'" 

274 ) 

275 

276 exit_with_success(f"Flow run '{id}' was successfully scheduled for cancellation.") 

277 

278 

279@flow_run_app.command() 1a

280async def logs( 1a

281 id: UUID, 

282 head: bool = typer.Option( 

283 False, 

284 "--head", 

285 "-h", 

286 help=( 

287 f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of" 

288 " all logs." 

289 ), 

290 ), 

291 num_logs: int = typer.Option( 

292 None, 

293 "--num-logs", 

294 "-n", 

295 help=( 

296 "Number of logs to show when using the --head or --tail flag. If None," 

297 f" defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}." 

298 ), 

299 min=1, 

300 ), 

301 reverse: bool = typer.Option( 

302 False, 

303 "--reverse", 

304 "-r", 

305 help="Reverse the logs order to print the most recent logs first", 

306 ), 

307 tail: bool = typer.Option( 

308 False, 

309 "--tail", 

310 "-t", 

311 help=( 

312 f"Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of" 

313 " all logs." 

314 ), 

315 ), 

316): 

317 """ 

318 View logs for a flow run. 

319 """ 

320 # Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time 

321 offset = 0 

322 more_logs = True 

323 num_logs_returned = 0 

324 

325 # if head and tail flags are being used together 

326 if head and tail: 

327 exit_with_error("Please provide either a `head` or `tail` option but not both.") 

328 

329 user_specified_num_logs = ( 

330 num_logs or LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS 

331 if head or tail or num_logs 

332 else None 

333 ) 

334 

335 # if using tail update offset according to LOGS_DEFAULT_PAGE_SIZE 

336 if tail: 

337 offset = max(0, user_specified_num_logs - LOGS_DEFAULT_PAGE_SIZE) 

338 

339 log_filter = LogFilter(flow_run_id={"any_": [id]}) 

340 

341 async with get_client() as client: 

342 # Get the flow run 

343 try: 

344 flow_run = await client.read_flow_run(id) 

345 except ObjectNotFound: 

346 exit_with_error(f"Flow run {str(id)!r} not found!") 

347 

348 while more_logs: 

349 num_logs_to_return_from_page = ( 

350 LOGS_DEFAULT_PAGE_SIZE 

351 if user_specified_num_logs is None 

352 else min( 

353 LOGS_DEFAULT_PAGE_SIZE, user_specified_num_logs - num_logs_returned 

354 ) 

355 ) 

356 

357 # Get the next page of logs 

358 page_logs = await client.read_logs( 

359 log_filter=log_filter, 

360 limit=num_logs_to_return_from_page, 

361 offset=offset, 

362 sort=( 

363 LogSort.TIMESTAMP_DESC if reverse or tail else LogSort.TIMESTAMP_ASC 

364 ), 

365 ) 

366 

367 for log in reversed(page_logs) if tail and not reverse else page_logs: 

368 # Print following the flow run format (declared in logging.yml) 

369 timestamp = f"{log.timestamp:%Y-%m-%d %H:%M:%S.%f}"[:-3] 

370 log_level = f"{logging.getLevelName(log.level):7s}" 

371 flow_run_info = f"Flow run {flow_run.name!r} - {escape(log.message)}" 

372 

373 log_message = f"{timestamp} | {log_level} | {flow_run_info}" 

374 app.console.print( 

375 log_message, 

376 soft_wrap=True, 

377 ) 

378 

379 # Update the number of logs retrieved 

380 num_logs_returned += num_logs_to_return_from_page 

381 

382 if tail: 

383 # If the current offset is not 0, update the offset for the next page 

384 if offset != 0: 

385 offset = ( 

386 0 

387 # Reset the offset to 0 if there are less logs than the LOGS_DEFAULT_PAGE_SIZE to get the remaining log 

388 if offset < LOGS_DEFAULT_PAGE_SIZE 

389 else offset - LOGS_DEFAULT_PAGE_SIZE 

390 ) 

391 else: 

392 more_logs = False 

393 else: 

394 if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE: 

395 offset += LOGS_DEFAULT_PAGE_SIZE 

396 else: 

397 # No more logs to show, exit 

398 more_logs = False 

399 

400 

401@flow_run_app.command() 1a

402async def execute( 1a

403 id: Optional[UUID] = typer.Argument(None, help="ID of the flow run to execute"), 

404): 

405 if id is None: 

406 environ_flow_id = os.environ.get("PREFECT__FLOW_RUN_ID") 

407 if environ_flow_id: 

408 id = UUID(environ_flow_id) 

409 

410 if id is None: 

411 exit_with_error("Could not determine the ID of the flow run to execute.") 

412 

413 runner = Runner() 

414 

415 def _handle_reschedule_sigterm(_signal: int, _frame: FrameType | None): 

416 logger.info("SIGTERM received, initiating graceful shutdown...") 

417 runner.reschedule_current_flow_runs() 

418 exit_with_success("Flow run successfully rescheduled.") 

419 

420 # Set up signal handling to reschedule run on SIGTERM 

421 on_sigterm = os.environ.get("PREFECT_FLOW_RUN_EXECUTE_SIGTERM_BEHAVIOR", "").lower() 

422 if ( 

423 threading.current_thread() is threading.main_thread() 

424 and on_sigterm == "reschedule" 

425 ): 

426 signal.signal(signal.SIGTERM, _handle_reschedule_sigterm) 

427 

428 await runner.execute_flow_run(id)