Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/work_queue.py: 13%
250 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Command line interface for working with work queues.
3"""
5import datetime 1a
6import warnings 1a
7from textwrap import dedent 1a
8from typing import Optional, Union 1a
9from uuid import UUID 1a
11import orjson 1a
12import typer 1a
13from rich.pretty import Pretty 1a
14from rich.table import Table 1a
16from prefect.cli._types import PrefectTyper 1a
17from prefect.cli._utilities import exit_with_error, exit_with_success 1a
18from prefect.cli.root import app, is_interactive 1a
19from prefect.client.orchestration import get_client 1a
20from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterId 1a
21from prefect.client.schemas.objects import ( 1a
22 DEFAULT_AGENT_WORK_POOL_NAME,
23 FlowRun,
24 WorkQueue,
25)
26from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a
27from prefect.types._datetime import now as now_fn 1a
29work_app: PrefectTyper = PrefectTyper(name="work-queue", help="Manage work queues.") 1a
30app.add_typer(work_app, aliases=["work-queues"]) 1a
33async def _get_work_queue_id_from_name_or_id( 1a
34 name_or_id: Union[UUID, str], work_pool_name: Optional[str] = None
35) -> UUID:
36 """
37 For backwards-compatibility, the main argument of the work queue CLI can be
38 either a name (preferred) or an ID (legacy behavior).
39 """
41 if not name_or_id:
42 # hint that we prefer names
43 exit_with_error("Provide a work queue name.")
45 # try parsing as ID
46 try:
47 return UUID(name_or_id)
49 # parse as string
50 except (AttributeError, ValueError):
51 async with get_client() as client:
52 try:
53 work_queue = await client.read_work_queue_by_name(
54 name=name_or_id,
55 work_pool_name=work_pool_name,
56 )
57 return work_queue.id
58 except ObjectNotFound:
59 if not work_pool_name:
60 exit_with_error(f"No work queue named {name_or_id!r} found.")
62 exit_with_error(
63 f"No work queue named {name_or_id!r} found in work pool"
64 f" {work_pool_name!r}."
65 )
68@work_app.command() 1a
69async def create( 1a
70 name: str = typer.Argument(..., help="The unique name to assign this work queue"),
71 limit: int = typer.Option(
72 None, "-l", "--limit", help="The concurrency limit to set on the queue."
73 ),
74 pool: Optional[str] = typer.Option(
75 None,
76 "-p",
77 "--pool",
78 help="The name of the work pool to create the work queue in.",
79 ),
80 priority: Optional[int] = typer.Option(
81 None,
82 "-q",
83 "--priority",
84 help="The associated priority for the created work queue",
85 ),
86):
87 """
88 Create a work queue.
89 """
91 async with get_client() as client:
92 try:
93 result = await client.create_work_queue(
94 name=name, work_pool_name=pool, priority=priority
95 )
96 if limit is not None:
97 await client.update_work_queue(
98 id=result.id,
99 concurrency_limit=limit,
100 )
101 except ObjectAlreadyExists:
102 exit_with_error(f"Work queue with name: {name!r} already exists.")
103 except ObjectNotFound:
104 exit_with_error(f"Work pool with name: {pool!r} not found.")
106 if not pool:
107 # specify the default work pool name after work queue creation to allow the server
108 # to handle a bunch of logic associated with agents without work pools
109 pool = DEFAULT_AGENT_WORK_POOL_NAME
110 output_msg = dedent(
111 f"""
112 Created work queue with properties:
113 name - {name!r}
114 work pool - {pool!r}
115 id - {result.id}
116 concurrency limit - {limit}
117 Start a worker to pick up flow runs from the work queue:
118 prefect worker start -q '{result.name} -p {pool}'
120 Inspect the work queue:
121 prefect work-queue inspect '{result.name}'
122 """
123 )
124 exit_with_success(output_msg)
127@work_app.command() 1a
128async def set_concurrency_limit( 1a
129 name: str = typer.Argument(..., help="The name or ID of the work queue"),
130 limit: int = typer.Argument(..., help="The concurrency limit to set on the queue."),
131 pool: Optional[str] = typer.Option(
132 None,
133 "-p",
134 "--pool",
135 help="The name of the work pool that the work queue belongs to.",
136 ),
137):
138 """
139 Set a concurrency limit on a work queue.
140 """
141 queue_id = await _get_work_queue_id_from_name_or_id(
142 name_or_id=name,
143 work_pool_name=pool,
144 )
146 async with get_client() as client:
147 try:
148 await client.update_work_queue(
149 id=queue_id,
150 concurrency_limit=limit,
151 )
152 except ObjectNotFound:
153 if pool:
154 error_message = (
155 f"No work queue named {name!r} found in work pool {pool!r}."
156 )
157 else:
158 error_message = f"No work queue named {name!r} found."
159 exit_with_error(error_message)
161 if pool:
162 success_message = (
163 f"Concurrency limit of {limit} set on work queue {name!r} in work pool"
164 f" {pool!r}"
165 )
166 else:
167 success_message = f"Concurrency limit of {limit} set on work queue {name!r}"
168 exit_with_success(success_message)
171@work_app.command() 1a
172async def clear_concurrency_limit( 1a
173 name: str = typer.Argument(..., help="The name or ID of the work queue to clear"),
174 pool: Optional[str] = typer.Option(
175 None,
176 "-p",
177 "--pool",
178 help="The name of the work pool that the work queue belongs to.",
179 ),
180):
181 """
182 Clear any concurrency limits from a work queue.
183 """
184 queue_id = await _get_work_queue_id_from_name_or_id(
185 name_or_id=name,
186 work_pool_name=pool,
187 )
188 async with get_client() as client:
189 try:
190 await client.update_work_queue(
191 id=queue_id,
192 concurrency_limit=None,
193 )
194 except ObjectNotFound:
195 if pool:
196 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
197 else:
198 error_message = f"No work queue found: {name!r}"
199 exit_with_error(error_message)
201 if pool:
202 success_message = (
203 f"Concurrency limits removed on work queue {name!r} in work pool {pool!r}"
204 )
205 else:
206 success_message = f"Concurrency limits removed on work queue {name!r}"
207 exit_with_success(success_message)
210@work_app.command() 1a
211async def pause( 1a
212 name: str = typer.Argument(..., help="The name or ID of the work queue to pause"),
213 pool: Optional[str] = typer.Option(
214 None,
215 "-p",
216 "--pool",
217 help="The name of the work pool that the work queue belongs to.",
218 ),
219):
220 """
221 Pause a work queue.
222 """
224 if not pool and not typer.confirm(
225 f"You have not specified a work pool. Are you sure you want to pause {name} work queue in `{DEFAULT_AGENT_WORK_POOL_NAME}`?"
226 ):
227 exit_with_error("Work queue pause aborted!")
229 queue_id = await _get_work_queue_id_from_name_or_id(
230 name_or_id=name,
231 work_pool_name=pool,
232 )
234 async with get_client() as client:
235 try:
236 await client.update_work_queue(
237 id=queue_id,
238 is_paused=True,
239 )
240 except ObjectNotFound:
241 if pool:
242 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
243 else:
244 error_message = f"No work queue found: {name!r}"
245 exit_with_error(error_message)
247 if pool:
248 success_message = f"Work queue {name!r} in work pool {pool!r} paused"
249 else:
250 success_message = f"Work queue {name!r} paused"
251 exit_with_success(success_message)
254@work_app.command() 1a
255async def resume( 1a
256 name: str = typer.Argument(..., help="The name or ID of the work queue to resume"),
257 pool: Optional[str] = typer.Option(
258 None,
259 "-p",
260 "--pool",
261 help="The name of the work pool that the work queue belongs to.",
262 ),
263):
264 """
265 Resume a paused work queue.
266 """
267 queue_id = await _get_work_queue_id_from_name_or_id(
268 name_or_id=name,
269 work_pool_name=pool,
270 )
272 async with get_client() as client:
273 try:
274 await client.update_work_queue(
275 id=queue_id,
276 is_paused=False,
277 )
278 except ObjectNotFound:
279 if pool:
280 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
281 else:
282 error_message = f"No work queue found: {name!r}"
283 exit_with_error(error_message)
285 if pool:
286 success_message = f"Work queue {name!r} in work pool {pool!r} resumed"
287 else:
288 success_message = f"Work queue {name!r} resumed"
289 exit_with_success(success_message)
292@work_app.command() 1a
293async def inspect( 1a
294 name: str = typer.Argument(
295 None, help="The name or ID of the work queue to inspect"
296 ),
297 pool: Optional[str] = typer.Option(
298 None,
299 "-p",
300 "--pool",
301 help="The name of the work pool that the work queue belongs to.",
302 ),
303 output: Optional[str] = typer.Option(
304 None,
305 "--output",
306 "-o",
307 help="Specify an output format. Currently supports: json",
308 ),
309):
310 """
311 Inspect a work queue by ID.
312 """
313 if output and output.lower() != "json":
314 exit_with_error("Only 'json' output format is supported.")
316 queue_id = await _get_work_queue_id_from_name_or_id(
317 name_or_id=name,
318 work_pool_name=pool,
319 )
320 async with get_client() as client:
321 try:
322 result = await client.read_work_queue(id=queue_id)
323 with warnings.catch_warnings():
324 warnings.simplefilter("ignore", category=DeprecationWarning)
326 if output and output.lower() == "json":
327 result_json = result.model_dump(mode="json")
329 # Try to get status and combine with result for JSON output
330 try:
331 status = await client.read_work_queue_status(id=queue_id)
332 status_json = status.model_dump(mode="json")
333 result_json["status_details"] = status_json
334 except ObjectNotFound:
335 pass
337 json_output = orjson.dumps(
338 result_json, option=orjson.OPT_INDENT_2
339 ).decode()
340 app.console.print(json_output)
341 else:
342 app.console.print(Pretty(result))
343 except ObjectNotFound:
344 if pool:
345 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
346 else:
347 error_message = f"No work queue found: {name!r}"
348 exit_with_error(error_message)
350 # Only print status separately for non-JSON output
351 if not (output and output.lower() == "json"):
352 try:
353 status = await client.read_work_queue_status(id=queue_id)
354 app.console.print(Pretty(status))
355 except ObjectNotFound:
356 pass
359@work_app.command() 1a
360async def ls( 1a
361 verbose: bool = typer.Option(
362 False, "--verbose", "-v", help="Display more information."
363 ),
364 work_queue_prefix: str = typer.Option(
365 None,
366 "--match",
367 "-m",
368 help=(
369 "Will match work queues with names that start with the specified prefix"
370 " string"
371 ),
372 ),
373 pool: Optional[str] = typer.Option(
374 None,
375 "-p",
376 "--pool",
377 help="The name of the work pool containing the work queues to list.",
378 ),
379):
380 """
381 View all work queues.
382 """
383 if not pool:
384 table = Table(
385 title="Work Queues",
386 caption="(**) denotes a paused queue",
387 caption_style="red",
388 )
389 table.add_column("Name", style="green", no_wrap=True)
390 table.add_column("Pool", style="magenta", no_wrap=True)
391 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
392 table.add_column("Concurrency Limit", style="blue", no_wrap=True)
393 if verbose:
394 table.add_column("Filter (Deprecated)", style="magenta", no_wrap=True)
396 async with get_client() as client:
397 if work_queue_prefix is not None:
398 queues = await client.match_work_queues([work_queue_prefix])
399 else:
400 queues = await client.read_work_queues()
402 pool_ids = [q.work_pool_id for q in queues]
403 wp_filter = WorkPoolFilter(id=WorkPoolFilterId(any_=pool_ids))
404 pools = await client.read_work_pools(work_pool_filter=wp_filter)
405 pool_id_name_map = {p.id: p.name for p in pools}
407 def sort_by_created_key(q: WorkQueue):
408 assert q.created is not None, "created is not None"
409 return now_fn("UTC") - q.created
411 for queue in sorted(queues, key=sort_by_created_key):
412 row = [
413 f"{queue.name} [red](**)" if queue.is_paused else queue.name,
414 pool_id_name_map[queue.work_pool_id],
415 str(queue.id),
416 (
417 f"[red]{queue.concurrency_limit}"
418 if queue.concurrency_limit is not None
419 else "[blue]None"
420 ),
421 ]
422 if verbose and queue.filter is not None:
423 row.append(queue.filter.model_dump_json())
424 table.add_row(*row)
426 else:
427 table = Table(
428 title=f"Work Queues in Work Pool {pool!r}",
429 caption="(**) denotes a paused queue",
430 caption_style="red",
431 )
432 table.add_column("Name", style="green", no_wrap=True)
433 table.add_column("Priority", style="magenta", no_wrap=True)
434 table.add_column("Concurrency Limit", style="blue", no_wrap=True)
435 if verbose:
436 table.add_column("Description", style="cyan", no_wrap=False)
438 async with get_client() as client:
439 try:
440 queues = await client.read_work_queues(work_pool_name=pool)
441 except ObjectNotFound:
442 exit_with_error(f"No work pool found: {pool!r}")
444 def sort_by_created_key(q: WorkQueue):
445 assert q.created is not None, "created is not None"
446 return now_fn("UTC") - q.created
448 for queue in sorted(queues, key=sort_by_created_key):
449 row = [
450 f"{queue.name} [red](**)" if queue.is_paused else queue.name,
451 f"{queue.priority}",
452 (
453 f"[red]{queue.concurrency_limit}"
454 if queue.concurrency_limit is not None
455 else "[blue]None"
456 ),
457 ]
458 if verbose:
459 row.append(queue.description)
460 table.add_row(*row)
462 app.console.print(table)
465@work_app.command() 1a
466async def preview( 1a
467 name: str = typer.Argument(
468 None, help="The name or ID of the work queue to preview"
469 ),
470 hours: int = typer.Option(
471 None,
472 "-h",
473 "--hours",
474 help="The number of hours to look ahead; defaults to 1 hour",
475 ),
476 pool: Optional[str] = typer.Option(
477 None,
478 "-p",
479 "--pool",
480 help="The name of the work pool that the work queue belongs to.",
481 ),
482):
483 """
484 Preview a work queue.
485 """
486 if pool:
487 title = f"Preview of Work Queue {name!r} in Work Pool {pool!r}"
488 else:
489 title = f"Preview of Work Queue {name!r}"
491 table = Table(title=title, caption="(**) denotes a late run", caption_style="red")
492 table.add_column(
493 "Scheduled Start Time", justify="left", style="yellow", no_wrap=True
494 )
495 table.add_column("Run ID", justify="left", style="cyan", no_wrap=True)
496 table.add_column("Name", style="green", no_wrap=True)
497 table.add_column("Deployment ID", style="blue", no_wrap=True)
499 window = now_fn("UTC") + datetime.timedelta(hours=hours or 1)
501 queue_id = await _get_work_queue_id_from_name_or_id(
502 name_or_id=name, work_pool_name=pool
503 )
504 async with get_client() as client:
505 if pool:
506 try:
507 responses = await client.get_scheduled_flow_runs_for_work_pool(
508 work_pool_name=pool,
509 work_queue_names=[name],
510 )
511 runs = [response.flow_run for response in responses]
512 except ObjectNotFound:
513 exit_with_error(f"No work queue found: {name!r} in work pool {pool!r}")
514 else:
515 try:
516 runs = await client.get_runs_in_work_queue(
517 queue_id,
518 limit=10,
519 scheduled_before=window,
520 )
521 except ObjectNotFound:
522 exit_with_error(f"No work queue found: {name!r}")
523 now = now_fn("UTC")
525 def sort_by_created_key(r: FlowRun):
526 assert r.created is not None, "created is not None"
527 return now - r.created
529 for run in sorted(runs, key=sort_by_created_key):
530 table.add_row(
531 (
532 f"{run.expected_start_time} [red](**)"
533 if run.expected_start_time < now
534 else f"{run.expected_start_time}"
535 ),
536 str(run.id),
537 run.name,
538 str(run.deployment_id),
539 )
541 if runs:
542 app.console.print(table)
543 else:
544 app.console.print(
545 (
546 "No runs found - try increasing how far into the future you preview"
547 " with the --hours flag"
548 ),
549 style="yellow",
550 )
553@work_app.command() 1a
554async def delete( 1a
555 name: str = typer.Argument(..., help="The name or ID of the work queue to delete"),
556 pool: Optional[str] = typer.Option(
557 None,
558 "-p",
559 "--pool",
560 help="The name of the work pool containing the work queue to delete.",
561 ),
562):
563 """
564 Delete a work queue by ID.
565 """
567 queue_id = await _get_work_queue_id_from_name_or_id(
568 name_or_id=name,
569 work_pool_name=pool,
570 )
571 async with get_client() as client:
572 try:
573 if is_interactive() and not typer.confirm(
574 (f"Are you sure you want to delete work queue with name {name!r}?"),
575 default=False,
576 ):
577 exit_with_error("Deletion aborted.")
578 await client.delete_work_queue_by_id(id=queue_id)
579 except ObjectNotFound:
580 if pool:
581 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
582 else:
583 error_message = f"No work queue found: {name!r}"
584 exit_with_error(error_message)
585 if pool:
586 success_message = (
587 f"Successfully deleted work queue {name!r} in work pool {pool!r}"
588 )
589 else:
590 success_message = f"Successfully deleted work queue {name!r}"
591 exit_with_success(success_message)
594@work_app.command("read-runs") 1a
595async def read_wq_runs( 1a
596 name: str = typer.Argument(..., help="The name or ID of the work queue to poll"),
597 pool: Optional[str] = typer.Option(
598 None,
599 "-p",
600 "--pool",
601 help="The name of the work pool containing the work queue to poll.",
602 ),
603):
604 """
605 Get runs in a work queue. Note that this will trigger an artificial poll of
606 the work queue.
607 """
609 queue_id = await _get_work_queue_id_from_name_or_id(
610 name_or_id=name,
611 work_pool_name=pool,
612 )
613 async with get_client() as client:
614 try:
615 runs = await client.get_runs_in_work_queue(id=queue_id)
616 except ObjectNotFound:
617 if pool:
618 error_message = f"No work queue found: {name!r} in work pool {pool!r}"
619 else:
620 error_message = f"No work queue found: {name!r}"
621 exit_with_error(error_message)
622 success_message = (
623 f"Read {len(runs)} runs for work queue {name!r} in work pool {pool}: {runs}"
624 )
625 exit_with_success(success_message)