Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/global_concurrency_limit.py: 17%
145 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from pathlib import Path 1a
2from typing import Optional 1a
4import orjson 1a
5import typer 1a
6from pydantic import ValidationError 1a
7from rich.pretty import Pretty 1a
8from rich.table import Table 1a
10from prefect.cli._types import PrefectTyper 1a
11from prefect.cli._utilities import exit_with_error, exit_with_success 1a
12from prefect.cli.root import app, is_interactive 1a
13from prefect.client.orchestration import get_client 1a
14from prefect.client.schemas.actions import ( 1a
15 GlobalConcurrencyLimitCreate,
16 GlobalConcurrencyLimitUpdate,
17)
18from prefect.exceptions import ( 1a
19 ObjectNotFound,
20 PrefectHTTPStatusError,
21)
22from prefect.types._datetime import human_friendly_diff 1a
24global_concurrency_limit_app: PrefectTyper = PrefectTyper( 1a
25 name="global-concurrency-limit",
26 help="Manage global concurrency limits.",
27)
29app.add_typer(global_concurrency_limit_app, aliases=["gcl"]) 1a
32@global_concurrency_limit_app.command("ls") 1a
33async def list_global_concurrency_limits(): 1a
34 """
35 List all global concurrency limits.
36 """
37 async with get_client() as client:
38 gcl_limits = await client.read_global_concurrency_limits(limit=100, offset=0)
39 if not gcl_limits:
40 exit_with_success("No global concurrency limits found.")
42 table = Table(
43 title="Global Concurrency Limits",
44 caption="List Global Concurrency Limits using `prefect global-concurrency-limit ls`",
45 show_header=True,
46 )
48 table.add_column("ID", justify="right", style="cyan", no_wrap=True, overflow="fold")
49 table.add_column("Name", style="blue", no_wrap=True, overflow="fold")
50 table.add_column("Active", style="blue", no_wrap=True)
51 table.add_column("Limit", style="blue", no_wrap=True)
52 table.add_column("Active Slots", style="blue", no_wrap=True)
53 table.add_column("Slot Decay Per Second", style="blue", no_wrap=True)
54 table.add_column("Created", style="blue", no_wrap=True)
55 table.add_column("Updated", style="blue", no_wrap=True)
57 for gcl_limit in sorted(gcl_limits, key=lambda x: f"{x.name}"):
58 assert gcl_limit.created is not None, "created is not None"
59 assert gcl_limit.updated is not None, "updated is not None"
60 table.add_row(
61 str(gcl_limit.id),
62 gcl_limit.name,
63 str(gcl_limit.active),
64 str(gcl_limit.limit),
65 str(gcl_limit.active_slots),
66 str(gcl_limit.slot_decay_per_second),
67 gcl_limit.created.isoformat(),
68 human_friendly_diff(gcl_limit.updated),
69 )
71 app.console.print(table)
74@global_concurrency_limit_app.command("inspect") 1a
75async def inspect_global_concurrency_limit( 1a
76 name: str = typer.Argument(
77 ..., help="The name of the global concurrency limit to inspect."
78 ),
79 output: Optional[str] = typer.Option(
80 None,
81 "--output",
82 "-o",
83 help="Specify an output format. Currently supports: json",
84 ),
85 file_path: Optional[Path] = typer.Option(
86 None,
87 "--file",
88 "-f",
89 help="Path to .json file to write the global concurrency limit output to.",
90 ),
91):
92 """
93 Inspect a global concurrency limit.
95 Arguments:
96 name (str): The name of the global concurrency limit to inspect.
97 output (Optional[OutputFormat]): An output format for the command. Currently only supports JSON.
98 Required if --file/-f is set.
99 file_path (Optional[Path]): A path to .json file to write the global concurrent limit output to.
101 Returns:
102 id (str): The ID of the global concurrency limit.
103 created (str): The created date of the global concurrency limit.
104 updated (str): The updated date of the global concurrency limit.
105 name (str): The name of the global concurrency limit.
106 limit (int): The limit of the global concurrency limit.
107 active_slots (int): The number of active slots.
108 slot_decay_per_second (float): The slot decay per second.
110 """
111 if output and output.lower() != "json":
112 exit_with_error("Only 'json' output format is supported.")
114 if file_path and not output:
115 exit_with_error("The --file/-f option requires the --output option to be set.")
117 async with get_client() as client:
118 try:
119 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
120 except ObjectNotFound:
121 exit_with_error(f"Global concurrency limit {name!r} not found.")
123 if output and output.lower() == "json":
124 gcl_limit_json = gcl_limit.model_dump(mode="json")
125 json_output = orjson.dumps(gcl_limit_json, option=orjson.OPT_INDENT_2).decode()
126 if not file_path:
127 app.console.print(json_output)
128 else:
129 with open(file_path, "w") as f:
130 f.write(json_output)
131 exit_with_success(
132 f"Global concurrency limit {name!r} written to {file_path}"
133 )
134 else:
135 app.console.print(Pretty(gcl_limit))
138@global_concurrency_limit_app.command("delete") 1a
139async def delete_global_concurrency_limit( 1a
140 name: str = typer.Argument(
141 ..., help="The name of the global concurrency limit to delete."
142 ),
143):
144 """
145 Delete a global concurrency limit.
147 Arguments:
148 name (str): The name of the global concurrency limit to delete.
149 """
150 async with get_client() as client:
151 try:
152 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
154 if is_interactive() and not typer.confirm(
155 f"Are you sure you want to delete global concurrency limit with name {gcl_limit.name!r}?",
156 default=False,
157 ):
158 exit_with_error("Deletion aborted.")
160 await client.delete_global_concurrency_limit_by_name(name=name)
161 except ObjectNotFound:
162 exit_with_error(f"Global concurrency limit {name!r} not found.")
164 exit_with_success(f"Deleted global concurrency limit with name {name!r}.")
167@global_concurrency_limit_app.command("enable") 1a
168async def enable_global_concurrency_limit( 1a
169 name: str = typer.Argument(
170 ..., help="The name of the global concurrency limit to enable."
171 ),
172):
173 """
174 Enable a global concurrency limit.
176 Arguments:
177 name (str): The name of the global concurrency limit to enable.
178 """
179 async with get_client() as client:
180 try:
181 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
182 if gcl_limit.active:
183 exit_with_error(
184 f"Global concurrency limit with name {name!r} is already enabled."
185 )
186 await client.update_global_concurrency_limit(
187 name=name,
188 concurrency_limit=GlobalConcurrencyLimitUpdate(active=True),
189 )
190 except ObjectNotFound:
191 exit_with_error(f"Global concurrency limit {name!r} not found.")
193 exit_with_success(f"Enabled global concurrency limit with name {name!r}.")
196@global_concurrency_limit_app.command("disable") 1a
197async def disable_global_concurrency_limit( 1a
198 name: str = typer.Argument(
199 ..., help="The name of the global concurrency limit to disable."
200 ),
201):
202 """
203 Disable a global concurrency limit.
205 Arguments:
206 name (str): The name of the global concurrency limit to disable.
207 """
208 async with get_client() as client:
209 try:
210 gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
211 if not gcl_limit.active:
212 exit_with_error(
213 f"Global concurrency limit with name {name!r} is already disabled."
214 )
215 await client.update_global_concurrency_limit(
216 name=name,
217 concurrency_limit=GlobalConcurrencyLimitUpdate(active=False),
218 )
219 except ObjectNotFound:
220 exit_with_error(f"Global concurrency limit {name!r} not found.")
222 exit_with_success(f"Disabled global concurrency limit with name {name!r}.")
225@global_concurrency_limit_app.command("update") 1a
226async def update_global_concurrency_limit( 1a
227 name: str = typer.Argument(
228 ..., help="The name of the global concurrency limit to update."
229 ),
230 enable: Optional[bool] = typer.Option(
231 None, "--enable", help="Enable the global concurrency limit."
232 ),
233 disable: Optional[bool] = typer.Option(
234 None, "--disable", help="Disable the global concurrency limit."
235 ),
236 limit: Optional[int] = typer.Option(
237 None, "--limit", "-l", help="The limit of the global concurrency limit."
238 ),
239 active_slots: Optional[int] = typer.Option(
240 None, "--active-slots", help="The number of active slots."
241 ),
242 slot_decay_per_second: Optional[float] = typer.Option(
243 None, "--slot-decay-per-second", help="The slot decay per second."
244 ),
245):
246 """
247 Update a global concurrency limit.
249 Arguments:
250 name (str): The name of the global concurrency limit to update.
251 enable (Optional[bool]): Enable the global concurrency limit.
252 disable (Optional[bool]): Disable the global concurrency limit.
253 limit (Optional[int]): The limit of the global concurrency limit.
254 active_slots (Optional[int]): The number of active slots.
255 slot_decay_per_second (Optional[float]): The slot decay per second.
257 Examples:
258 $ prefect global-concurrency-limit update my-gcl --limit 10
259 $ prefect gcl update my-gcl --active-slots 5
260 $ prefect gcl update my-gcl --slot-decay-per-second 0.5
261 $ prefect gcl update my-gcl --enable
262 $ prefect gcl update my-gcl --disable --limit 5
263 """
264 gcl = GlobalConcurrencyLimitUpdate()
266 if enable and disable:
267 exit_with_error(
268 "Cannot enable and disable a global concurrency limit at the same time."
269 )
271 if enable:
272 gcl.active = True
273 if disable:
274 gcl.active = False
276 if limit is not None:
277 gcl.limit = limit
279 if active_slots is not None:
280 gcl.active_slots = active_slots
282 if slot_decay_per_second is not None:
283 gcl.slot_decay_per_second = slot_decay_per_second
285 if not gcl.model_dump(exclude_unset=True):
286 exit_with_error("No update arguments provided.")
288 try:
289 GlobalConcurrencyLimitUpdate(**gcl.model_dump())
290 except ValidationError as exc:
291 exit_with_error(f"Invalid arguments provided: {exc}")
292 except Exception as exc:
293 exit_with_error(f"Error creating global concurrency limit: {exc}")
295 async with get_client() as client:
296 try:
297 await client.update_global_concurrency_limit(
298 name=name, concurrency_limit=gcl
299 )
300 except ObjectNotFound:
301 exit_with_error(f"Global concurrency limit {name!r} not found.")
302 except PrefectHTTPStatusError as exc:
303 if exc.response.status_code == 422:
304 parsed_response = exc.response.json()
306 error_message = parsed_response["exception_detail"][0]["msg"]
308 exit_with_error(
309 f"Error updating global concurrency limit: {error_message}"
310 )
312 exit_with_success(f"Updated global concurrency limit with name {name!r}.")
315@global_concurrency_limit_app.command("create") 1a
316async def create_global_concurrency_limit( 1a
317 name: str = typer.Argument(
318 ..., help="The name of the global concurrency limit to create."
319 ),
320 limit: int = typer.Option(
321 ..., "--limit", "-l", help="The limit of the global concurrency limit."
322 ),
323 disable: Optional[bool] = typer.Option(
324 None, "--disable", help="Create an inactive global concurrency limit."
325 ),
326 active_slots: Optional[int] = typer.Option(
327 0, "--active-slots", help="The number of active slots."
328 ),
329 slot_decay_per_second: Optional[float] = typer.Option(
330 0.0, "--slot-decay-per-second", help="The slot decay per second."
331 ),
332):
333 """
334 Create a global concurrency limit.
336 Arguments:
338 name (str): The name of the global concurrency limit to create.
340 limit (int): The limit of the global concurrency limit.
342 disable (Optional[bool]): Create an inactive global concurrency limit.
344 active_slots (Optional[int]): The number of active slots.
346 slot_decay_per_second (Optional[float]): The slot decay per second.
348 Examples:
350 $ prefect global-concurrency-limit create my-gcl --limit 10
352 $ prefect gcl create my-gcl --limit 5 --active-slots 3
354 $ prefect gcl create my-gcl --limit 5 --active-slots 3 --slot-decay-per-second 0.5
356 $ prefect gcl create my-gcl --limit 5 --inactive
357 """
358 async with get_client() as client:
359 try:
360 await client.read_global_concurrency_limit_by_name(name=name)
361 except ObjectNotFound:
362 pass
363 else:
364 exit_with_error(
365 f"Global concurrency limit {name!r} already exists. Please try creating with a different name."
366 )
368 try:
369 gcl = GlobalConcurrencyLimitCreate(
370 name=name,
371 limit=limit,
372 active=False if disable else True,
373 active_slots=active_slots,
374 slot_decay_per_second=slot_decay_per_second,
375 )
377 except ValidationError as exc:
378 exit_with_error(f"Invalid arguments provided: {exc}")
379 except Exception as exc:
380 exit_with_error(f"Error creating global concurrency limit: {exc}")
382 async with get_client() as client:
383 try:
384 gcl_id = await client.create_global_concurrency_limit(concurrency_limit=gcl)
385 except PrefectHTTPStatusError as exc:
386 parsed_response = exc.response.json()
387 exc = parsed_response["exception_detail"][0]["msg"]
389 exit_with_error(f"Error updating global concurrency limit: {exc}")
391 exit_with_success(
392 f"Created global concurrency limit with name {name!r} and ID '{gcl_id}'. Run `prefect gcl inspect {name}` to view details."
393 )