Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/concurrency_limit.py: 30%
77 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Command line interface for working with concurrency limits.
3"""
5import textwrap 1a
6from typing import Optional 1a
8import orjson 1a
9import typer 1a
10from rich.console import Group 1a
11from rich.panel import Panel 1a
12from rich.pretty import Pretty 1a
13from rich.table import Table 1a
15from prefect.cli._types import PrefectTyper 1a
16from prefect.cli._utilities import exit_with_error, exit_with_success 1a
17from prefect.cli.root import app, is_interactive 1a
18from prefect.client.orchestration import get_client 1a
19from prefect.exceptions import ObjectNotFound 1a
20from prefect.types._datetime import human_friendly_diff 1a
22concurrency_limit_app: PrefectTyper = PrefectTyper( 1a
23 name="concurrency-limit",
24 help="Manage task-level concurrency limits.",
25)
26app.add_typer(concurrency_limit_app, aliases=["concurrency-limits"]) 1a
29@concurrency_limit_app.command() 1a
30async def create(tag: str, concurrency_limit: int): 1a
31 """
32 Create a concurrency limit against a tag.
34 This limit controls how many task runs with that tag may simultaneously be in a
35 Running state.
36 """
38 async with get_client() as client:
39 await client.create_concurrency_limit(
40 tag=tag, concurrency_limit=concurrency_limit
41 )
42 await client.read_concurrency_limit_by_tag(tag)
44 app.console.print(
45 textwrap.dedent(
46 f"""
47 Created concurrency limit with properties:
48 tag - {tag!r}
49 concurrency_limit - {concurrency_limit}
51 Delete the concurrency limit:
52 prefect concurrency-limit delete {tag!r}
54 Inspect the concurrency limit:
55 prefect concurrency-limit inspect {tag!r}
56 """
57 )
58 )
61@concurrency_limit_app.command() 1a
62async def inspect( 1a
63 tag: str,
64 output: Optional[str] = typer.Option(
65 None,
66 "--output",
67 "-o",
68 help="Specify an output format. Currently supports: json",
69 ),
70):
71 """
72 View details about a concurrency limit. `active_slots` shows a list of TaskRun IDs
73 which are currently using a concurrency slot.
74 """
75 if output and output.lower() != "json":
76 exit_with_error("Only 'json' output format is supported.")
78 async with get_client() as client:
79 try:
80 result = await client.read_concurrency_limit_by_tag(tag=tag)
81 except ObjectNotFound:
82 exit_with_error(f"No concurrency limit found for the tag: {tag}")
84 if output and output.lower() == "json":
85 result_json = result.model_dump(mode="json")
86 json_output = orjson.dumps(result_json, option=orjson.OPT_INDENT_2).decode()
87 app.console.print(json_output)
88 else:
89 trid_table = Table()
90 trid_table.add_column("Active Task Run IDs", style="cyan", no_wrap=True)
92 cl_table = Table(title=f"Concurrency Limit ID: [red]{str(result.id)}")
93 cl_table.add_column("Tag", style="green", no_wrap=True)
94 cl_table.add_column("Concurrency Limit", style="blue", no_wrap=True)
95 cl_table.add_column("Created", style="magenta", no_wrap=True)
96 cl_table.add_column("Updated", style="magenta", no_wrap=True)
98 for trid in sorted(result.active_slots):
99 trid_table.add_row(str(trid))
101 cl_table.add_row(
102 str(result.tag),
103 str(result.concurrency_limit),
104 Pretty(human_friendly_diff(result.created) if result.created else ""),
105 Pretty(human_friendly_diff(result.updated) if result.updated else ""),
106 )
108 group = Group(
109 cl_table,
110 trid_table,
111 )
112 app.console.print(Panel(group, expand=False))
115@concurrency_limit_app.command() 1a
116async def ls(limit: int = 15, offset: int = 0): 1a
117 """
118 View all concurrency limits.
119 """
120 table = Table(
121 title="Concurrency Limits",
122 caption="inspect a concurrency limit to show active task run IDs",
123 )
124 table.add_column("Tag", style="green", no_wrap=True)
125 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
126 table.add_column("Concurrency Limit", style="blue", no_wrap=True)
127 table.add_column("Active Task Runs", style="magenta", no_wrap=True)
129 async with get_client() as client:
130 concurrency_limits = await client.read_concurrency_limits(
131 limit=limit, offset=offset
132 )
134 for cl in sorted(
135 concurrency_limits, key=lambda c: c.updated or c.created or "", reverse=True
136 ):
137 table.add_row(
138 str(cl.tag),
139 str(cl.id),
140 str(cl.concurrency_limit),
141 str(len(cl.active_slots)),
142 )
144 app.console.print(table)
147@concurrency_limit_app.command() 1a
148async def reset(tag: str): 1a
149 """
150 Resets the concurrency limit slots set on the specified tag.
151 """
153 async with get_client() as client:
154 try:
155 await client.reset_concurrency_limit_by_tag(tag=tag)
156 except ObjectNotFound:
157 exit_with_error(f"No concurrency limit found for the tag: {tag}")
159 exit_with_success(f"Reset concurrency limit set on the tag: {tag}")
162@concurrency_limit_app.command() 1a
163async def delete(tag: str): 1a
164 """
165 Delete the concurrency limit set on the specified tag.
166 """
168 async with get_client() as client:
169 try:
170 if is_interactive() and not typer.confirm(
171 (
172 f"Are you sure you want to delete concurrency limit with tag {tag!r}?"
173 ),
174 default=False,
175 ):
176 exit_with_error("Deletion aborted.")
177 await client.delete_concurrency_limit_by_tag(tag=tag)
178 except ObjectNotFound:
179 exit_with_error(f"No concurrency limit found for the tag: {tag}")
181 exit_with_success(f"Deleted concurrency limit set on the tag: {tag}")