Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_concurrency_limits/client.py: 16%
231 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING, Any, Literal 1a
5from httpx import HTTPStatusError, RequestError 1a
7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
8from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from uuid import UUID
13 from httpx import Response
15 from prefect.client.schemas.actions import (
16 GlobalConcurrencyLimitCreate,
17 GlobalConcurrencyLimitUpdate,
18 )
19 from prefect.client.schemas.objects import ConcurrencyLeaseHolder, ConcurrencyLimit
20 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse
23class ConcurrencyLimitClient(BaseClient): 1a
24 def create_concurrency_limit( 1a
25 self,
26 tag: str,
27 concurrency_limit: int,
28 ) -> "UUID":
29 """
30 Create a tag concurrency limit in the Prefect API. These limits govern concurrently
31 running tasks.
33 Args:
34 tag: a tag the concurrency limit is applied to
35 concurrency_limit: the maximum number of concurrent task runs for a given tag
37 Raises:
38 httpx.RequestError: if the concurrency limit was not created for any reason
40 Returns:
41 the ID of the concurrency limit in the backend
42 """
43 from prefect.client.schemas.actions import ConcurrencyLimitCreate
45 concurrency_limit_create = ConcurrencyLimitCreate(
46 tag=tag,
47 concurrency_limit=concurrency_limit,
48 )
49 response = self.request(
50 "POST",
51 "/concurrency_limits/",
52 json=concurrency_limit_create.model_dump(mode="json"),
53 )
55 concurrency_limit_id = response.json().get("id")
57 if not concurrency_limit_id:
58 raise RequestError(f"Malformed response: {response}")
59 from uuid import UUID
61 return UUID(concurrency_limit_id)
63 def read_concurrency_limit_by_tag( 1a
64 self,
65 tag: str,
66 ) -> "ConcurrencyLimit":
67 """
68 Read the concurrency limit set on a specific tag.
70 Args:
71 tag: a tag the concurrency limit is applied to
73 Raises:
74 ObjectNotFound: If request returns 404
75 httpx.RequestError: if the concurrency limit was not created for any reason
77 Returns:
78 the concurrency limit set on a specific tag
79 """
80 try:
81 response = self.request(
82 "GET",
83 "/concurrency_limits/tag/{tag}",
84 path_params={"tag": tag},
85 )
86 except HTTPStatusError as e:
87 if e.response.status_code == 404:
88 raise ObjectNotFound(http_exc=e) from e
89 else:
90 raise
92 concurrency_limit_id = response.json().get("id")
94 if not concurrency_limit_id:
95 raise RequestError(f"Malformed response: {response}")
96 from prefect.client.schemas.objects import ConcurrencyLimit
98 return ConcurrencyLimit.model_validate(response.json())
100 def read_concurrency_limits( 1a
101 self,
102 limit: int,
103 offset: int,
104 ) -> list["ConcurrencyLimit"]:
105 """
106 Lists concurrency limits set on task run tags.
108 Args:
109 limit: the maximum number of concurrency limits returned
110 offset: the concurrency limit query offset
112 Returns:
113 a list of concurrency limits
114 """
116 body = {
117 "limit": limit,
118 "offset": offset,
119 }
121 response = self.request("POST", "/concurrency_limits/filter", json=body)
122 from prefect.client.schemas.objects import ConcurrencyLimit
124 return ConcurrencyLimit.model_validate_list(response.json())
126 def reset_concurrency_limit_by_tag( 1a
127 self,
128 tag: str,
129 slot_override: list["UUID | str"] | None = None,
130 ) -> None:
131 """
132 Resets the concurrency limit slots set on a specific tag.
134 Args:
135 tag: a tag the concurrency limit is applied to
136 slot_override: a list of task run IDs that are currently using a
137 concurrency slot, please check that any task run IDs included in
138 `slot_override` are currently running, otherwise those concurrency
139 slots will never be released.
141 Raises:
142 ObjectNotFound: If request returns 404
143 httpx.RequestError: If request fails
145 """
146 if slot_override is not None:
147 slot_override = [str(slot) for slot in slot_override]
149 try:
150 self.request(
151 "POST",
152 "/concurrency_limits/tag/{tag}/reset",
153 path_params={"tag": tag},
154 json=dict(slot_override=slot_override),
155 )
156 except HTTPStatusError as e:
157 if e.response.status_code == 404:
158 raise ObjectNotFound(http_exc=e) from e
159 else:
160 raise
162 def delete_concurrency_limit_by_tag( 1a
163 self,
164 tag: str,
165 ) -> None:
166 """
167 Delete the concurrency limit set on a specific tag.
169 Args:
170 tag: a tag the concurrency limit is applied to
172 Raises:
173 ObjectNotFound: If request returns 404
174 httpx.RequestError: If request fails
176 """
177 try:
178 self.request(
179 "DELETE",
180 "/concurrency_limits/tag/{tag}",
181 path_params={"tag": tag},
182 )
183 except HTTPStatusError as e:
184 if e.response.status_code == 404:
185 raise ObjectNotFound(http_exc=e) from e
186 else:
187 raise
189 def increment_v1_concurrency_slots( 1a
190 self,
191 names: list[str],
192 task_run_id: "UUID",
193 ) -> "Response":
194 """
195 Increment concurrency limit slots for the specified limits.
197 Args:
198 names (List[str]): A list of limit names for which to increment limits.
199 task_run_id (UUID): The task run ID incrementing the limits.
200 """
201 data: dict[str, Any] = {
202 "names": names,
203 "task_run_id": str(task_run_id),
204 }
206 return self.request(
207 "POST",
208 "/concurrency_limits/increment",
209 json=data,
210 )
212 def decrement_v1_concurrency_slots( 1a
213 self,
214 names: list[str],
215 task_run_id: "UUID",
216 occupancy_seconds: float,
217 ) -> "Response":
218 """
219 Decrement concurrency limit slots for the specified limits.
221 Args:
222 names: A list of limit names to decrement.
223 task_run_id: The task run ID that incremented the limits.
224 occupancy_seconds (float): The duration in seconds that the limits
225 were held.
227 Returns:
228 "Response": The HTTP response from the server.
229 """
230 data: dict[str, Any] = {
231 "names": names,
232 "task_run_id": str(task_run_id),
233 "occupancy_seconds": occupancy_seconds,
234 }
236 return self.request(
237 "POST",
238 "/concurrency_limits/decrement",
239 json=data,
240 )
242 def increment_concurrency_slots( 1a
243 self,
244 names: list[str],
245 slots: int,
246 mode: str,
247 ) -> "Response":
248 """
249 Increment concurrency slots for the specified limits.
251 Args:
252 names: A list of limit names for which to occupy slots.
253 slots: The number of concurrency slots to occupy.
254 mode: The mode of the concurrency limits.
255 """
256 return self.request(
257 "POST",
258 "/v2/concurrency_limits/increment",
259 json={
260 "names": names,
261 "slots": slots,
262 "mode": mode,
263 },
264 )
266 def increment_concurrency_slots_with_lease( 1a
267 self,
268 names: list[str],
269 slots: int,
270 mode: Literal["concurrency", "rate_limit"],
271 lease_duration: float,
272 holder: "ConcurrencyLeaseHolder | None" = None,
273 ) -> "Response":
274 """
275 Increment concurrency slots for the specified limits with a lease.
277 Args:
278 names: A list of limit names for which to occupy slots.
279 slots: The number of concurrency slots to occupy.
280 mode: The mode of the concurrency limits.
281 lease_duration: The duration of the lease in seconds.
282 holder: Optional holder information for tracking who holds the slots.
283 """
284 body: dict[str, Any] = {
285 "names": names,
286 "slots": slots,
287 "mode": mode,
288 "lease_duration": lease_duration,
289 }
290 if holder is not None:
291 body["holder"] = holder.model_dump(mode="json")
293 return self.request(
294 "POST",
295 "/v2/concurrency_limits/increment-with-lease",
296 json=body,
297 )
299 def renew_concurrency_lease( 1a
300 self,
301 lease_id: "UUID",
302 lease_duration: float,
303 ) -> "Response":
304 """
305 Renew a concurrency lease.
307 Args:
308 lease_id: The ID of the lease to renew.
309 lease_duration: The new lease duration in seconds.
310 """
311 return self.request(
312 "POST",
313 "/v2/concurrency_limits/leases/{lease_id}/renew",
314 path_params={"lease_id": lease_id},
315 json={"lease_duration": lease_duration},
316 )
318 def release_concurrency_slots( 1a
319 self, names: list[str], slots: int, occupancy_seconds: float
320 ) -> "Response":
321 """
322 Release concurrency slots for the specified limits.
324 Args:
325 names: A list of limit names for which to release slots.
326 slots: The number of concurrency slots to release.
327 occupancy_seconds (float): The duration in seconds that the slots
328 were occupied.
330 Returns:
331 "Response": The HTTP response from the server.
332 """
334 return self.request(
335 "POST",
336 "/v2/concurrency_limits/decrement",
337 json={
338 "names": names,
339 "slots": slots,
340 "occupancy_seconds": occupancy_seconds,
341 },
342 )
344 def release_concurrency_slots_with_lease( 1a
345 self,
346 lease_id: "UUID",
347 ) -> "Response":
348 """
349 Release concurrency slots for the specified lease.
351 Args:
352 lease_id: The ID of the lease corresponding to the concurrency limits to release.
353 """
354 return self.request(
355 "POST",
356 "/v2/concurrency_limits/decrement-with-lease",
357 json={
358 "lease_id": str(lease_id),
359 },
360 )
362 def create_global_concurrency_limit( 1a
363 self, concurrency_limit: "GlobalConcurrencyLimitCreate"
364 ) -> "UUID":
365 try:
366 response = self.request(
367 "POST",
368 "/v2/concurrency_limits/",
369 json=concurrency_limit.model_dump(mode="json", exclude_unset=True),
370 )
371 except HTTPStatusError as e:
372 if e.response.status_code == 409:
373 raise ObjectAlreadyExists(http_exc=e) from e
374 else:
375 raise
376 from uuid import UUID
378 return UUID(response.json()["id"])
380 def update_global_concurrency_limit( 1a
381 self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate"
382 ) -> "Response":
383 try:
384 response = self.request(
385 "PATCH",
386 "/v2/concurrency_limits/{id_or_name}",
387 path_params={"id_or_name": name},
388 json=concurrency_limit.model_dump(mode="json", exclude_unset=True),
389 )
390 return response
391 except HTTPStatusError as e:
392 if e.response.status_code == 404:
393 raise ObjectNotFound(http_exc=e) from e
394 else:
395 raise
397 def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": 1a
398 try:
399 response = self.request(
400 "DELETE",
401 "/v2/concurrency_limits/{id_or_name}",
402 path_params={"id_or_name": name},
403 )
404 return response
405 except HTTPStatusError as e:
406 if e.response.status_code == 404:
407 raise ObjectNotFound(http_exc=e) from e
408 else:
409 raise
411 def read_global_concurrency_limit_by_name( 1a
412 self, name: str
413 ) -> "GlobalConcurrencyLimitResponse":
414 try:
415 response = self.request(
416 "GET",
417 "/v2/concurrency_limits/{id_or_name}",
418 path_params={"id_or_name": name},
419 )
420 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse
422 return GlobalConcurrencyLimitResponse.model_validate(response.json())
423 except HTTPStatusError as e:
424 if e.response.status_code == 404:
425 raise ObjectNotFound(http_exc=e) from e
426 else:
427 raise
429 def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None: 1a
430 """Creates a global concurrency limit with the given name and limit if one does not already exist.
432 If one does already exist matching the name then update it's limit if it is different.
434 Note: This is not done atomically.
435 """
436 from prefect.client.schemas.actions import (
437 GlobalConcurrencyLimitCreate,
438 GlobalConcurrencyLimitUpdate,
439 )
441 try:
442 existing_limit = self.read_global_concurrency_limit_by_name(name)
443 except ObjectNotFound:
444 existing_limit = None
446 if not existing_limit:
447 self.create_global_concurrency_limit(
448 GlobalConcurrencyLimitCreate(
449 name=name,
450 limit=limit,
451 )
452 )
453 elif existing_limit.limit != limit:
454 self.update_global_concurrency_limit(
455 name, GlobalConcurrencyLimitUpdate(limit=limit)
456 )
458 def read_global_concurrency_limits( 1a
459 self, limit: int = 10, offset: int = 0
460 ) -> list["GlobalConcurrencyLimitResponse"]:
461 response = self.request(
462 "POST",
463 "/v2/concurrency_limits/filter",
464 json={
465 "limit": limit,
466 "offset": offset,
467 },
468 )
470 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse
472 return GlobalConcurrencyLimitResponse.model_validate_list(response.json())
475class ConcurrencyLimitAsyncClient(BaseAsyncClient): 1a
476 async def create_concurrency_limit( 1a
477 self,
478 tag: str,
479 concurrency_limit: int,
480 ) -> "UUID":
481 """
482 Create a tag concurrency limit in the Prefect API. These limits govern concurrently
483 running tasks.
485 Args:
486 tag: a tag the concurrency limit is applied to
487 concurrency_limit: the maximum number of concurrent task runs for a given tag
489 Raises:
490 httpx.RequestError: if the concurrency limit was not created for any reason
492 Returns:
493 the ID of the concurrency limit in the backend
494 """
495 from prefect.client.schemas.actions import ConcurrencyLimitCreate
497 concurrency_limit_create = ConcurrencyLimitCreate(
498 tag=tag,
499 concurrency_limit=concurrency_limit,
500 )
501 response = await self.request(
502 "POST",
503 "/concurrency_limits/",
504 json=concurrency_limit_create.model_dump(mode="json"),
505 )
507 concurrency_limit_id = response.json().get("id")
509 if not concurrency_limit_id:
510 raise RequestError(f"Malformed response: {response}")
511 from uuid import UUID
513 return UUID(concurrency_limit_id)
515 async def read_concurrency_limit_by_tag( 1a
516 self,
517 tag: str,
518 ) -> "ConcurrencyLimit":
519 """
520 Read the concurrency limit set on a specific tag.
522 Args:
523 tag: a tag the concurrency limit is applied to
525 Raises:
526 ObjectNotFound: If request returns 404
527 httpx.RequestError: if the concurrency limit was not created for any reason
529 Returns:
530 the concurrency limit set on a specific tag
531 """
532 try:
533 response = await self.request(
534 "GET",
535 "/concurrency_limits/tag/{tag}",
536 path_params={"tag": tag},
537 )
538 except HTTPStatusError as e:
539 if e.response.status_code == 404:
540 raise ObjectNotFound(http_exc=e) from e
541 else:
542 raise
544 concurrency_limit_id = response.json().get("id")
546 if not concurrency_limit_id:
547 raise RequestError(f"Malformed response: {response}")
548 from prefect.client.schemas.objects import ConcurrencyLimit
550 return ConcurrencyLimit.model_validate(response.json())
552 async def read_concurrency_limits( 1a
553 self,
554 limit: int,
555 offset: int,
556 ) -> list["ConcurrencyLimit"]:
557 """
558 Lists concurrency limits set on task run tags.
560 Args:
561 limit: the maximum number of concurrency limits returned
562 offset: the concurrency limit query offset
564 Returns:
565 a list of concurrency limits
566 """
568 body = {
569 "limit": limit,
570 "offset": offset,
571 }
573 response = await self.request("POST", "/concurrency_limits/filter", json=body)
574 from prefect.client.schemas.objects import ConcurrencyLimit
576 return ConcurrencyLimit.model_validate_list(response.json())
578 async def reset_concurrency_limit_by_tag( 1a
579 self,
580 tag: str,
581 slot_override: list["UUID | str"] | None = None,
582 ) -> None:
583 """
584 Resets the concurrency limit slots set on a specific tag.
586 Args:
587 tag: a tag the concurrency limit is applied to
588 slot_override: a list of task run IDs that are currently using a
589 concurrency slot, please check that any task run IDs included in
590 `slot_override` are currently running, otherwise those concurrency
591 slots will never be released.
593 Raises:
594 ObjectNotFound: If request returns 404
595 httpx.RequestError: If request fails
597 """
598 if slot_override is not None:
599 slot_override = [str(slot) for slot in slot_override]
601 try:
602 await self.request(
603 "POST",
604 "/concurrency_limits/tag/{tag}/reset",
605 path_params={"tag": tag},
606 json=dict(slot_override=slot_override),
607 )
608 except HTTPStatusError as e:
609 if e.response.status_code == 404:
610 raise ObjectNotFound(http_exc=e) from e
611 else:
612 raise
614 async def delete_concurrency_limit_by_tag( 1a
615 self,
616 tag: str,
617 ) -> None:
618 """
619 Delete the concurrency limit set on a specific tag.
621 Args:
622 tag: a tag the concurrency limit is applied to
624 Raises:
625 ObjectNotFound: If request returns 404
626 httpx.RequestError: If request fails
628 """
629 try:
630 await self.request(
631 "DELETE",
632 "/concurrency_limits/tag/{tag}",
633 path_params={"tag": tag},
634 )
635 except HTTPStatusError as e:
636 if e.response.status_code == 404:
637 raise ObjectNotFound(http_exc=e) from e
638 else:
639 raise
641 async def increment_v1_concurrency_slots( 1a
642 self,
643 names: list[str],
644 task_run_id: "UUID",
645 ) -> "Response":
646 """
647 Increment concurrency limit slots for the specified limits.
649 Args:
650 names: A list of limit names for which to increment limits.
651 task_run_id: The task run ID incrementing the limits.
652 """
653 data: dict[str, Any] = {
654 "names": names,
655 "task_run_id": str(task_run_id),
656 }
658 return await self.request(
659 "POST",
660 "/concurrency_limits/increment",
661 json=data,
662 )
664 async def decrement_v1_concurrency_slots( 1a
665 self,
666 names: list[str],
667 task_run_id: "UUID",
668 occupancy_seconds: float,
669 ) -> "Response":
670 """
671 Decrement concurrency limit slots for the specified limits.
673 Args:
674 names: A list of limit names to decrement.
675 task_run_id: The task run ID that incremented the limits.
676 occupancy_seconds (float): The duration in seconds that the limits
677 were held.
679 Returns:
680 "Response": The HTTP response from the server.
681 """
682 data: dict[str, Any] = {
683 "names": names,
684 "task_run_id": str(task_run_id),
685 "occupancy_seconds": occupancy_seconds,
686 }
688 return await self.request(
689 "POST",
690 "/concurrency_limits/decrement",
691 json=data,
692 )
694 async def increment_concurrency_slots( 1a
695 self,
696 names: list[str],
697 slots: int,
698 mode: Literal["concurrency", "rate_limit"],
699 ) -> "Response":
700 """
701 Increment concurrency slots for the specified limits.
703 Args:
704 names: A list of limit names for which to occupy slots.
705 slots: The number of concurrency slots to occupy.
706 mode: The mode of the concurrency limits.
707 """
708 return await self.request(
709 "POST",
710 "/v2/concurrency_limits/increment",
711 json={
712 "names": names,
713 "slots": slots,
714 "mode": mode,
715 },
716 )
718 async def increment_concurrency_slots_with_lease( 1a
719 self,
720 names: list[str],
721 slots: int,
722 mode: Literal["concurrency", "rate_limit"],
723 lease_duration: float,
724 holder: "ConcurrencyLeaseHolder | None" = None,
725 ) -> "Response":
726 """
727 Increment concurrency slots for the specified limits with a lease.
729 Args:
730 names: A list of limit names for which to occupy slots.
731 slots: The number of concurrency slots to occupy.
732 mode: The mode of the concurrency limits.
733 lease_duration: The duration of the lease in seconds.
734 holder: Optional holder information for tracking who holds the slots.
735 """
736 body: dict[str, Any] = {
737 "names": names,
738 "slots": slots,
739 "mode": mode,
740 "lease_duration": lease_duration,
741 }
742 if holder is not None:
743 body["holder"] = holder.model_dump(mode="json")
745 return await self.request(
746 "POST",
747 "/v2/concurrency_limits/increment-with-lease",
748 json=body,
749 )
751 async def renew_concurrency_lease( 1a
752 self,
753 lease_id: "UUID",
754 lease_duration: float,
755 ) -> "Response":
756 """
757 Renew a concurrency lease.
759 Args:
760 lease_id: The ID of the lease to renew.
761 lease_duration: The new lease duration in seconds.
762 """
763 return await self.request(
764 "POST",
765 "/v2/concurrency_limits/leases/{lease_id}/renew",
766 path_params={"lease_id": lease_id},
767 json={"lease_duration": lease_duration},
768 )
770 async def release_concurrency_slots( 1a
771 self, names: list[str], slots: int, occupancy_seconds: float
772 ) -> "Response":
773 """
774 Release concurrency slots for the specified limits.
776 Args:
777 names: A list of limit names for which to release slots.
778 slots: The number of concurrency slots to release.
779 occupancy_seconds (float): The duration in seconds that the slots
780 were occupied.
782 Returns:
783 "Response": The HTTP response from the server.
784 """
786 return await self.request(
787 "POST",
788 "/v2/concurrency_limits/decrement",
789 json={
790 "names": names,
791 "slots": slots,
792 "occupancy_seconds": occupancy_seconds,
793 },
794 )
796 async def release_concurrency_slots_with_lease( 1a
797 self,
798 lease_id: "UUID",
799 ) -> "Response":
800 """
801 Release concurrency slots for the specified lease.
803 Args:
804 lease_id: The ID of the lease corresponding to the concurrency limits to release.
805 """
806 return await self.request(
807 "POST",
808 "/v2/concurrency_limits/decrement-with-lease",
809 json={
810 "lease_id": str(lease_id),
811 },
812 )
814 async def create_global_concurrency_limit( 1a
815 self, concurrency_limit: "GlobalConcurrencyLimitCreate"
816 ) -> "UUID":
817 try:
818 response = await self.request(
819 "POST",
820 "/v2/concurrency_limits/",
821 json=concurrency_limit.model_dump(mode="json", exclude_unset=True),
822 )
823 except HTTPStatusError as e:
824 if e.response.status_code == 409:
825 raise ObjectAlreadyExists(http_exc=e) from e
826 else:
827 raise
829 from uuid import UUID
831 return UUID(response.json()["id"])
833 async def update_global_concurrency_limit( 1a
834 self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate"
835 ) -> "Response":
836 try:
837 response = await self.request(
838 "PATCH",
839 "/v2/concurrency_limits/{id_or_name}",
840 path_params={"id_or_name": name},
841 json=concurrency_limit.model_dump(mode="json", exclude_unset=True),
842 )
843 return response
844 except HTTPStatusError as e:
845 if e.response.status_code == 404:
846 raise ObjectNotFound(http_exc=e) from e
847 else:
848 raise
850 async def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": 1a
851 try:
852 response = await self.request(
853 "DELETE",
854 "/v2/concurrency_limits/{id_or_name}",
855 path_params={"id_or_name": name},
856 )
857 return response
858 except HTTPStatusError as e:
859 if e.response.status_code == 404:
860 raise ObjectNotFound(http_exc=e) from e
861 else:
862 raise
864 async def read_global_concurrency_limit_by_name( 1a
865 self, name: str
866 ) -> "GlobalConcurrencyLimitResponse":
867 try:
868 response = await self.request(
869 "GET",
870 "/v2/concurrency_limits/{id_or_name}",
871 path_params={"id_or_name": name},
872 )
873 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse
875 return GlobalConcurrencyLimitResponse.model_validate(response.json())
876 except HTTPStatusError as e:
877 if e.response.status_code == 404:
878 raise ObjectNotFound(http_exc=e) from e
879 else:
880 raise
882 async def upsert_global_concurrency_limit_by_name( 1a
883 self, name: str, limit: int
884 ) -> None:
885 """Creates a global concurrency limit with the given name and limit if one does not already exist.
887 If one does already exist matching the name then update it's limit if it is different.
889 Note: This is not done atomically.
890 """
891 from prefect.client.schemas.actions import (
892 GlobalConcurrencyLimitCreate,
893 GlobalConcurrencyLimitUpdate,
894 )
896 try:
897 existing_limit = await self.read_global_concurrency_limit_by_name(name)
898 except ObjectNotFound:
899 existing_limit = None
901 if not existing_limit:
902 await self.create_global_concurrency_limit(
903 GlobalConcurrencyLimitCreate(
904 name=name,
905 limit=limit,
906 )
907 )
908 elif existing_limit.limit != limit:
909 await self.update_global_concurrency_limit(
910 name, GlobalConcurrencyLimitUpdate(limit=limit)
911 )
913 async def read_global_concurrency_limits( 1a
914 self, limit: int = 10, offset: int = 0
915 ) -> list["GlobalConcurrencyLimitResponse"]:
916 response = await self.request(
917 "POST",
918 "/v2/concurrency_limits/filter",
919 json={
920 "limit": limit,
921 "offset": offset,
922 },
923 )
925 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse
927 return GlobalConcurrencyLimitResponse.model_validate_list(response.json())