Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/work_queue.py: 13%

250 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with work queues. 

3""" 

4 

5import datetime 1a

6import warnings 1a

7from textwrap import dedent 1a

8from typing import Optional, Union 1a

9from uuid import UUID 1a

10 

11import orjson 1a

12import typer 1a

13from rich.pretty import Pretty 1a

14from rich.table import Table 1a

15 

16from prefect.cli._types import PrefectTyper 1a

17from prefect.cli._utilities import exit_with_error, exit_with_success 1a

18from prefect.cli.root import app, is_interactive 1a

19from prefect.client.orchestration import get_client 1a

20from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterId 1a

21from prefect.client.schemas.objects import ( 1a

22 DEFAULT_AGENT_WORK_POOL_NAME, 

23 FlowRun, 

24 WorkQueue, 

25) 

26from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a

27from prefect.types._datetime import now as now_fn 1a

28 

29work_app: PrefectTyper = PrefectTyper(name="work-queue", help="Manage work queues.") 1a

30app.add_typer(work_app, aliases=["work-queues"]) 1a

31 

32 

33async def _get_work_queue_id_from_name_or_id( 1a

34 name_or_id: Union[UUID, str], work_pool_name: Optional[str] = None 

35) -> UUID: 

36 """ 

37 For backwards-compatibility, the main argument of the work queue CLI can be 

38 either a name (preferred) or an ID (legacy behavior). 

39 """ 

40 

41 if not name_or_id: 

42 # hint that we prefer names 

43 exit_with_error("Provide a work queue name.") 

44 

45 # try parsing as ID 

46 try: 

47 return UUID(name_or_id) 

48 

49 # parse as string 

50 except (AttributeError, ValueError): 

51 async with get_client() as client: 

52 try: 

53 work_queue = await client.read_work_queue_by_name( 

54 name=name_or_id, 

55 work_pool_name=work_pool_name, 

56 ) 

57 return work_queue.id 

58 except ObjectNotFound: 

59 if not work_pool_name: 

60 exit_with_error(f"No work queue named {name_or_id!r} found.") 

61 

62 exit_with_error( 

63 f"No work queue named {name_or_id!r} found in work pool" 

64 f" {work_pool_name!r}." 

65 ) 

66 

67 

68@work_app.command() 1a

69async def create( 1a

70 name: str = typer.Argument(..., help="The unique name to assign this work queue"), 

71 limit: int = typer.Option( 

72 None, "-l", "--limit", help="The concurrency limit to set on the queue." 

73 ), 

74 pool: Optional[str] = typer.Option( 

75 None, 

76 "-p", 

77 "--pool", 

78 help="The name of the work pool to create the work queue in.", 

79 ), 

80 priority: Optional[int] = typer.Option( 

81 None, 

82 "-q", 

83 "--priority", 

84 help="The associated priority for the created work queue", 

85 ), 

86): 

87 """ 

88 Create a work queue. 

89 """ 

90 

91 async with get_client() as client: 

92 try: 

93 result = await client.create_work_queue( 

94 name=name, work_pool_name=pool, priority=priority 

95 ) 

96 if limit is not None: 

97 await client.update_work_queue( 

98 id=result.id, 

99 concurrency_limit=limit, 

100 ) 

101 except ObjectAlreadyExists: 

102 exit_with_error(f"Work queue with name: {name!r} already exists.") 

103 except ObjectNotFound: 

104 exit_with_error(f"Work pool with name: {pool!r} not found.") 

105 

106 if not pool: 

107 # specify the default work pool name after work queue creation to allow the server 

108 # to handle a bunch of logic associated with agents without work pools 

109 pool = DEFAULT_AGENT_WORK_POOL_NAME 

110 output_msg = dedent( 

111 f""" 

112 Created work queue with properties: 

113 name - {name!r} 

114 work pool - {pool!r} 

115 id - {result.id} 

116 concurrency limit - {limit} 

117 Start a worker to pick up flow runs from the work queue: 

118 prefect worker start -q '{result.name} -p {pool}' 

119 

120 Inspect the work queue: 

121 prefect work-queue inspect '{result.name}' 

122 """ 

123 ) 

124 exit_with_success(output_msg) 

125 

126 

127@work_app.command() 1a

128async def set_concurrency_limit( 1a

129 name: str = typer.Argument(..., help="The name or ID of the work queue"), 

130 limit: int = typer.Argument(..., help="The concurrency limit to set on the queue."), 

131 pool: Optional[str] = typer.Option( 

132 None, 

133 "-p", 

134 "--pool", 

135 help="The name of the work pool that the work queue belongs to.", 

136 ), 

137): 

138 """ 

139 Set a concurrency limit on a work queue. 

140 """ 

141 queue_id = await _get_work_queue_id_from_name_or_id( 

142 name_or_id=name, 

143 work_pool_name=pool, 

144 ) 

145 

146 async with get_client() as client: 

147 try: 

148 await client.update_work_queue( 

149 id=queue_id, 

150 concurrency_limit=limit, 

151 ) 

152 except ObjectNotFound: 

153 if pool: 

154 error_message = ( 

155 f"No work queue named {name!r} found in work pool {pool!r}." 

156 ) 

157 else: 

158 error_message = f"No work queue named {name!r} found." 

159 exit_with_error(error_message) 

160 

161 if pool: 

162 success_message = ( 

163 f"Concurrency limit of {limit} set on work queue {name!r} in work pool" 

164 f" {pool!r}" 

165 ) 

166 else: 

167 success_message = f"Concurrency limit of {limit} set on work queue {name!r}" 

168 exit_with_success(success_message) 

169 

170 

171@work_app.command() 1a

172async def clear_concurrency_limit( 1a

173 name: str = typer.Argument(..., help="The name or ID of the work queue to clear"), 

174 pool: Optional[str] = typer.Option( 

175 None, 

176 "-p", 

177 "--pool", 

178 help="The name of the work pool that the work queue belongs to.", 

179 ), 

180): 

181 """ 

182 Clear any concurrency limits from a work queue. 

183 """ 

184 queue_id = await _get_work_queue_id_from_name_or_id( 

185 name_or_id=name, 

186 work_pool_name=pool, 

187 ) 

188 async with get_client() as client: 

189 try: 

190 await client.update_work_queue( 

191 id=queue_id, 

192 concurrency_limit=None, 

193 ) 

194 except ObjectNotFound: 

195 if pool: 

196 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

197 else: 

198 error_message = f"No work queue found: {name!r}" 

199 exit_with_error(error_message) 

200 

201 if pool: 

202 success_message = ( 

203 f"Concurrency limits removed on work queue {name!r} in work pool {pool!r}" 

204 ) 

205 else: 

206 success_message = f"Concurrency limits removed on work queue {name!r}" 

207 exit_with_success(success_message) 

208 

209 

210@work_app.command() 1a

211async def pause( 1a

212 name: str = typer.Argument(..., help="The name or ID of the work queue to pause"), 

213 pool: Optional[str] = typer.Option( 

214 None, 

215 "-p", 

216 "--pool", 

217 help="The name of the work pool that the work queue belongs to.", 

218 ), 

219): 

220 """ 

221 Pause a work queue. 

222 """ 

223 

224 if not pool and not typer.confirm( 

225 f"You have not specified a work pool. Are you sure you want to pause {name} work queue in `{DEFAULT_AGENT_WORK_POOL_NAME}`?" 

226 ): 

227 exit_with_error("Work queue pause aborted!") 

228 

229 queue_id = await _get_work_queue_id_from_name_or_id( 

230 name_or_id=name, 

231 work_pool_name=pool, 

232 ) 

233 

234 async with get_client() as client: 

235 try: 

236 await client.update_work_queue( 

237 id=queue_id, 

238 is_paused=True, 

239 ) 

240 except ObjectNotFound: 

241 if pool: 

242 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

243 else: 

244 error_message = f"No work queue found: {name!r}" 

245 exit_with_error(error_message) 

246 

247 if pool: 

248 success_message = f"Work queue {name!r} in work pool {pool!r} paused" 

249 else: 

250 success_message = f"Work queue {name!r} paused" 

251 exit_with_success(success_message) 

252 

253 

254@work_app.command() 1a

255async def resume( 1a

256 name: str = typer.Argument(..., help="The name or ID of the work queue to resume"), 

257 pool: Optional[str] = typer.Option( 

258 None, 

259 "-p", 

260 "--pool", 

261 help="The name of the work pool that the work queue belongs to.", 

262 ), 

263): 

264 """ 

265 Resume a paused work queue. 

266 """ 

267 queue_id = await _get_work_queue_id_from_name_or_id( 

268 name_or_id=name, 

269 work_pool_name=pool, 

270 ) 

271 

272 async with get_client() as client: 

273 try: 

274 await client.update_work_queue( 

275 id=queue_id, 

276 is_paused=False, 

277 ) 

278 except ObjectNotFound: 

279 if pool: 

280 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

281 else: 

282 error_message = f"No work queue found: {name!r}" 

283 exit_with_error(error_message) 

284 

285 if pool: 

286 success_message = f"Work queue {name!r} in work pool {pool!r} resumed" 

287 else: 

288 success_message = f"Work queue {name!r} resumed" 

289 exit_with_success(success_message) 

290 

291 

292@work_app.command() 1a

293async def inspect( 1a

294 name: str = typer.Argument( 

295 None, help="The name or ID of the work queue to inspect" 

296 ), 

297 pool: Optional[str] = typer.Option( 

298 None, 

299 "-p", 

300 "--pool", 

301 help="The name of the work pool that the work queue belongs to.", 

302 ), 

303 output: Optional[str] = typer.Option( 

304 None, 

305 "--output", 

306 "-o", 

307 help="Specify an output format. Currently supports: json", 

308 ), 

309): 

310 """ 

311 Inspect a work queue by ID. 

312 """ 

313 if output and output.lower() != "json": 

314 exit_with_error("Only 'json' output format is supported.") 

315 

316 queue_id = await _get_work_queue_id_from_name_or_id( 

317 name_or_id=name, 

318 work_pool_name=pool, 

319 ) 

320 async with get_client() as client: 

321 try: 

322 result = await client.read_work_queue(id=queue_id) 

323 with warnings.catch_warnings(): 

324 warnings.simplefilter("ignore", category=DeprecationWarning) 

325 

326 if output and output.lower() == "json": 

327 result_json = result.model_dump(mode="json") 

328 

329 # Try to get status and combine with result for JSON output 

330 try: 

331 status = await client.read_work_queue_status(id=queue_id) 

332 status_json = status.model_dump(mode="json") 

333 result_json["status_details"] = status_json 

334 except ObjectNotFound: 

335 pass 

336 

337 json_output = orjson.dumps( 

338 result_json, option=orjson.OPT_INDENT_2 

339 ).decode() 

340 app.console.print(json_output) 

341 else: 

342 app.console.print(Pretty(result)) 

343 except ObjectNotFound: 

344 if pool: 

345 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

346 else: 

347 error_message = f"No work queue found: {name!r}" 

348 exit_with_error(error_message) 

349 

350 # Only print status separately for non-JSON output 

351 if not (output and output.lower() == "json"): 

352 try: 

353 status = await client.read_work_queue_status(id=queue_id) 

354 app.console.print(Pretty(status)) 

355 except ObjectNotFound: 

356 pass 

357 

358 

359@work_app.command() 1a

360async def ls( 1a

361 verbose: bool = typer.Option( 

362 False, "--verbose", "-v", help="Display more information." 

363 ), 

364 work_queue_prefix: str = typer.Option( 

365 None, 

366 "--match", 

367 "-m", 

368 help=( 

369 "Will match work queues with names that start with the specified prefix" 

370 " string" 

371 ), 

372 ), 

373 pool: Optional[str] = typer.Option( 

374 None, 

375 "-p", 

376 "--pool", 

377 help="The name of the work pool containing the work queues to list.", 

378 ), 

379): 

380 """ 

381 View all work queues. 

382 """ 

383 if not pool: 

384 table = Table( 

385 title="Work Queues", 

386 caption="(**) denotes a paused queue", 

387 caption_style="red", 

388 ) 

389 table.add_column("Name", style="green", no_wrap=True) 

390 table.add_column("Pool", style="magenta", no_wrap=True) 

391 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

392 table.add_column("Concurrency Limit", style="blue", no_wrap=True) 

393 if verbose: 

394 table.add_column("Filter (Deprecated)", style="magenta", no_wrap=True) 

395 

396 async with get_client() as client: 

397 if work_queue_prefix is not None: 

398 queues = await client.match_work_queues([work_queue_prefix]) 

399 else: 

400 queues = await client.read_work_queues() 

401 

402 pool_ids = [q.work_pool_id for q in queues] 

403 wp_filter = WorkPoolFilter(id=WorkPoolFilterId(any_=pool_ids)) 

404 pools = await client.read_work_pools(work_pool_filter=wp_filter) 

405 pool_id_name_map = {p.id: p.name for p in pools} 

406 

407 def sort_by_created_key(q: WorkQueue): 

408 assert q.created is not None, "created is not None" 

409 return now_fn("UTC") - q.created 

410 

411 for queue in sorted(queues, key=sort_by_created_key): 

412 row = [ 

413 f"{queue.name} [red](**)" if queue.is_paused else queue.name, 

414 pool_id_name_map[queue.work_pool_id], 

415 str(queue.id), 

416 ( 

417 f"[red]{queue.concurrency_limit}" 

418 if queue.concurrency_limit is not None 

419 else "[blue]None" 

420 ), 

421 ] 

422 if verbose and queue.filter is not None: 

423 row.append(queue.filter.model_dump_json()) 

424 table.add_row(*row) 

425 

426 else: 

427 table = Table( 

428 title=f"Work Queues in Work Pool {pool!r}", 

429 caption="(**) denotes a paused queue", 

430 caption_style="red", 

431 ) 

432 table.add_column("Name", style="green", no_wrap=True) 

433 table.add_column("Priority", style="magenta", no_wrap=True) 

434 table.add_column("Concurrency Limit", style="blue", no_wrap=True) 

435 if verbose: 

436 table.add_column("Description", style="cyan", no_wrap=False) 

437 

438 async with get_client() as client: 

439 try: 

440 queues = await client.read_work_queues(work_pool_name=pool) 

441 except ObjectNotFound: 

442 exit_with_error(f"No work pool found: {pool!r}") 

443 

444 def sort_by_created_key(q: WorkQueue): 

445 assert q.created is not None, "created is not None" 

446 return now_fn("UTC") - q.created 

447 

448 for queue in sorted(queues, key=sort_by_created_key): 

449 row = [ 

450 f"{queue.name} [red](**)" if queue.is_paused else queue.name, 

451 f"{queue.priority}", 

452 ( 

453 f"[red]{queue.concurrency_limit}" 

454 if queue.concurrency_limit is not None 

455 else "[blue]None" 

456 ), 

457 ] 

458 if verbose: 

459 row.append(queue.description) 

460 table.add_row(*row) 

461 

462 app.console.print(table) 

463 

464 

465@work_app.command() 1a

466async def preview( 1a

467 name: str = typer.Argument( 

468 None, help="The name or ID of the work queue to preview" 

469 ), 

470 hours: int = typer.Option( 

471 None, 

472 "-h", 

473 "--hours", 

474 help="The number of hours to look ahead; defaults to 1 hour", 

475 ), 

476 pool: Optional[str] = typer.Option( 

477 None, 

478 "-p", 

479 "--pool", 

480 help="The name of the work pool that the work queue belongs to.", 

481 ), 

482): 

483 """ 

484 Preview a work queue. 

485 """ 

486 if pool: 

487 title = f"Preview of Work Queue {name!r} in Work Pool {pool!r}" 

488 else: 

489 title = f"Preview of Work Queue {name!r}" 

490 

491 table = Table(title=title, caption="(**) denotes a late run", caption_style="red") 

492 table.add_column( 

493 "Scheduled Start Time", justify="left", style="yellow", no_wrap=True 

494 ) 

495 table.add_column("Run ID", justify="left", style="cyan", no_wrap=True) 

496 table.add_column("Name", style="green", no_wrap=True) 

497 table.add_column("Deployment ID", style="blue", no_wrap=True) 

498 

499 window = now_fn("UTC") + datetime.timedelta(hours=hours or 1) 

500 

501 queue_id = await _get_work_queue_id_from_name_or_id( 

502 name_or_id=name, work_pool_name=pool 

503 ) 

504 async with get_client() as client: 

505 if pool: 

506 try: 

507 responses = await client.get_scheduled_flow_runs_for_work_pool( 

508 work_pool_name=pool, 

509 work_queue_names=[name], 

510 ) 

511 runs = [response.flow_run for response in responses] 

512 except ObjectNotFound: 

513 exit_with_error(f"No work queue found: {name!r} in work pool {pool!r}") 

514 else: 

515 try: 

516 runs = await client.get_runs_in_work_queue( 

517 queue_id, 

518 limit=10, 

519 scheduled_before=window, 

520 ) 

521 except ObjectNotFound: 

522 exit_with_error(f"No work queue found: {name!r}") 

523 now = now_fn("UTC") 

524 

525 def sort_by_created_key(r: FlowRun): 

526 assert r.created is not None, "created is not None" 

527 return now - r.created 

528 

529 for run in sorted(runs, key=sort_by_created_key): 

530 table.add_row( 

531 ( 

532 f"{run.expected_start_time} [red](**)" 

533 if run.expected_start_time < now 

534 else f"{run.expected_start_time}" 

535 ), 

536 str(run.id), 

537 run.name, 

538 str(run.deployment_id), 

539 ) 

540 

541 if runs: 

542 app.console.print(table) 

543 else: 

544 app.console.print( 

545 ( 

546 "No runs found - try increasing how far into the future you preview" 

547 " with the --hours flag" 

548 ), 

549 style="yellow", 

550 ) 

551 

552 

553@work_app.command() 1a

554async def delete( 1a

555 name: str = typer.Argument(..., help="The name or ID of the work queue to delete"), 

556 pool: Optional[str] = typer.Option( 

557 None, 

558 "-p", 

559 "--pool", 

560 help="The name of the work pool containing the work queue to delete.", 

561 ), 

562): 

563 """ 

564 Delete a work queue by ID. 

565 """ 

566 

567 queue_id = await _get_work_queue_id_from_name_or_id( 

568 name_or_id=name, 

569 work_pool_name=pool, 

570 ) 

571 async with get_client() as client: 

572 try: 

573 if is_interactive() and not typer.confirm( 

574 (f"Are you sure you want to delete work queue with name {name!r}?"), 

575 default=False, 

576 ): 

577 exit_with_error("Deletion aborted.") 

578 await client.delete_work_queue_by_id(id=queue_id) 

579 except ObjectNotFound: 

580 if pool: 

581 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

582 else: 

583 error_message = f"No work queue found: {name!r}" 

584 exit_with_error(error_message) 

585 if pool: 

586 success_message = ( 

587 f"Successfully deleted work queue {name!r} in work pool {pool!r}" 

588 ) 

589 else: 

590 success_message = f"Successfully deleted work queue {name!r}" 

591 exit_with_success(success_message) 

592 

593 

594@work_app.command("read-runs") 1a

595async def read_wq_runs( 1a

596 name: str = typer.Argument(..., help="The name or ID of the work queue to poll"), 

597 pool: Optional[str] = typer.Option( 

598 None, 

599 "-p", 

600 "--pool", 

601 help="The name of the work pool containing the work queue to poll.", 

602 ), 

603): 

604 """ 

605 Get runs in a work queue. Note that this will trigger an artificial poll of 

606 the work queue. 

607 """ 

608 

609 queue_id = await _get_work_queue_id_from_name_or_id( 

610 name_or_id=name, 

611 work_pool_name=pool, 

612 ) 

613 async with get_client() as client: 

614 try: 

615 runs = await client.get_runs_in_work_queue(id=queue_id) 

616 except ObjectNotFound: 

617 if pool: 

618 error_message = f"No work queue found: {name!r} in work pool {pool!r}" 

619 else: 

620 error_message = f"No work queue found: {name!r}" 

621 exit_with_error(error_message) 

622 success_message = ( 

623 f"Read {len(runs)} runs for work queue {name!r} in work pool {pool}: {runs}" 

624 ) 

625 exit_with_success(success_message)