Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_concurrency_limits/client.py: 16%

231 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING, Any, Literal 1a

4 

5from httpx import HTTPStatusError, RequestError 1a

6 

7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

8from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a

9 

10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a

11 from uuid import UUID 

12 

13 from httpx import Response 

14 

15 from prefect.client.schemas.actions import ( 

16 GlobalConcurrencyLimitCreate, 

17 GlobalConcurrencyLimitUpdate, 

18 ) 

19 from prefect.client.schemas.objects import ConcurrencyLeaseHolder, ConcurrencyLimit 

20 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse 

21 

22 

23class ConcurrencyLimitClient(BaseClient): 1a

24 def create_concurrency_limit( 1a

25 self, 

26 tag: str, 

27 concurrency_limit: int, 

28 ) -> "UUID": 

29 """ 

30 Create a tag concurrency limit in the Prefect API. These limits govern concurrently 

31 running tasks. 

32 

33 Args: 

34 tag: a tag the concurrency limit is applied to 

35 concurrency_limit: the maximum number of concurrent task runs for a given tag 

36 

37 Raises: 

38 httpx.RequestError: if the concurrency limit was not created for any reason 

39 

40 Returns: 

41 the ID of the concurrency limit in the backend 

42 """ 

43 from prefect.client.schemas.actions import ConcurrencyLimitCreate 

44 

45 concurrency_limit_create = ConcurrencyLimitCreate( 

46 tag=tag, 

47 concurrency_limit=concurrency_limit, 

48 ) 

49 response = self.request( 

50 "POST", 

51 "/concurrency_limits/", 

52 json=concurrency_limit_create.model_dump(mode="json"), 

53 ) 

54 

55 concurrency_limit_id = response.json().get("id") 

56 

57 if not concurrency_limit_id: 

58 raise RequestError(f"Malformed response: {response}") 

59 from uuid import UUID 

60 

61 return UUID(concurrency_limit_id) 

62 

63 def read_concurrency_limit_by_tag( 1a

64 self, 

65 tag: str, 

66 ) -> "ConcurrencyLimit": 

67 """ 

68 Read the concurrency limit set on a specific tag. 

69 

70 Args: 

71 tag: a tag the concurrency limit is applied to 

72 

73 Raises: 

74 ObjectNotFound: If request returns 404 

75 httpx.RequestError: if the concurrency limit was not created for any reason 

76 

77 Returns: 

78 the concurrency limit set on a specific tag 

79 """ 

80 try: 

81 response = self.request( 

82 "GET", 

83 "/concurrency_limits/tag/{tag}", 

84 path_params={"tag": tag}, 

85 ) 

86 except HTTPStatusError as e: 

87 if e.response.status_code == 404: 

88 raise ObjectNotFound(http_exc=e) from e 

89 else: 

90 raise 

91 

92 concurrency_limit_id = response.json().get("id") 

93 

94 if not concurrency_limit_id: 

95 raise RequestError(f"Malformed response: {response}") 

96 from prefect.client.schemas.objects import ConcurrencyLimit 

97 

98 return ConcurrencyLimit.model_validate(response.json()) 

99 

100 def read_concurrency_limits( 1a

101 self, 

102 limit: int, 

103 offset: int, 

104 ) -> list["ConcurrencyLimit"]: 

105 """ 

106 Lists concurrency limits set on task run tags. 

107 

108 Args: 

109 limit: the maximum number of concurrency limits returned 

110 offset: the concurrency limit query offset 

111 

112 Returns: 

113 a list of concurrency limits 

114 """ 

115 

116 body = { 

117 "limit": limit, 

118 "offset": offset, 

119 } 

120 

121 response = self.request("POST", "/concurrency_limits/filter", json=body) 

122 from prefect.client.schemas.objects import ConcurrencyLimit 

123 

124 return ConcurrencyLimit.model_validate_list(response.json()) 

125 

126 def reset_concurrency_limit_by_tag( 1a

127 self, 

128 tag: str, 

129 slot_override: list["UUID | str"] | None = None, 

130 ) -> None: 

131 """ 

132 Resets the concurrency limit slots set on a specific tag. 

133 

134 Args: 

135 tag: a tag the concurrency limit is applied to 

136 slot_override: a list of task run IDs that are currently using a 

137 concurrency slot, please check that any task run IDs included in 

138 `slot_override` are currently running, otherwise those concurrency 

139 slots will never be released. 

140 

141 Raises: 

142 ObjectNotFound: If request returns 404 

143 httpx.RequestError: If request fails 

144 

145 """ 

146 if slot_override is not None: 

147 slot_override = [str(slot) for slot in slot_override] 

148 

149 try: 

150 self.request( 

151 "POST", 

152 "/concurrency_limits/tag/{tag}/reset", 

153 path_params={"tag": tag}, 

154 json=dict(slot_override=slot_override), 

155 ) 

156 except HTTPStatusError as e: 

157 if e.response.status_code == 404: 

158 raise ObjectNotFound(http_exc=e) from e 

159 else: 

160 raise 

161 

162 def delete_concurrency_limit_by_tag( 1a

163 self, 

164 tag: str, 

165 ) -> None: 

166 """ 

167 Delete the concurrency limit set on a specific tag. 

168 

169 Args: 

170 tag: a tag the concurrency limit is applied to 

171 

172 Raises: 

173 ObjectNotFound: If request returns 404 

174 httpx.RequestError: If request fails 

175 

176 """ 

177 try: 

178 self.request( 

179 "DELETE", 

180 "/concurrency_limits/tag/{tag}", 

181 path_params={"tag": tag}, 

182 ) 

183 except HTTPStatusError as e: 

184 if e.response.status_code == 404: 

185 raise ObjectNotFound(http_exc=e) from e 

186 else: 

187 raise 

188 

189 def increment_v1_concurrency_slots( 1a

190 self, 

191 names: list[str], 

192 task_run_id: "UUID", 

193 ) -> "Response": 

194 """ 

195 Increment concurrency limit slots for the specified limits. 

196 

197 Args: 

198 names (List[str]): A list of limit names for which to increment limits. 

199 task_run_id (UUID): The task run ID incrementing the limits. 

200 """ 

201 data: dict[str, Any] = { 

202 "names": names, 

203 "task_run_id": str(task_run_id), 

204 } 

205 

206 return self.request( 

207 "POST", 

208 "/concurrency_limits/increment", 

209 json=data, 

210 ) 

211 

212 def decrement_v1_concurrency_slots( 1a

213 self, 

214 names: list[str], 

215 task_run_id: "UUID", 

216 occupancy_seconds: float, 

217 ) -> "Response": 

218 """ 

219 Decrement concurrency limit slots for the specified limits. 

220 

221 Args: 

222 names: A list of limit names to decrement. 

223 task_run_id: The task run ID that incremented the limits. 

224 occupancy_seconds (float): The duration in seconds that the limits 

225 were held. 

226 

227 Returns: 

228 "Response": The HTTP response from the server. 

229 """ 

230 data: dict[str, Any] = { 

231 "names": names, 

232 "task_run_id": str(task_run_id), 

233 "occupancy_seconds": occupancy_seconds, 

234 } 

235 

236 return self.request( 

237 "POST", 

238 "/concurrency_limits/decrement", 

239 json=data, 

240 ) 

241 

242 def increment_concurrency_slots( 1a

243 self, 

244 names: list[str], 

245 slots: int, 

246 mode: str, 

247 ) -> "Response": 

248 """ 

249 Increment concurrency slots for the specified limits. 

250 

251 Args: 

252 names: A list of limit names for which to occupy slots. 

253 slots: The number of concurrency slots to occupy. 

254 mode: The mode of the concurrency limits. 

255 """ 

256 return self.request( 

257 "POST", 

258 "/v2/concurrency_limits/increment", 

259 json={ 

260 "names": names, 

261 "slots": slots, 

262 "mode": mode, 

263 }, 

264 ) 

265 

266 def increment_concurrency_slots_with_lease( 1a

267 self, 

268 names: list[str], 

269 slots: int, 

270 mode: Literal["concurrency", "rate_limit"], 

271 lease_duration: float, 

272 holder: "ConcurrencyLeaseHolder | None" = None, 

273 ) -> "Response": 

274 """ 

275 Increment concurrency slots for the specified limits with a lease. 

276 

277 Args: 

278 names: A list of limit names for which to occupy slots. 

279 slots: The number of concurrency slots to occupy. 

280 mode: The mode of the concurrency limits. 

281 lease_duration: The duration of the lease in seconds. 

282 holder: Optional holder information for tracking who holds the slots. 

283 """ 

284 body: dict[str, Any] = { 

285 "names": names, 

286 "slots": slots, 

287 "mode": mode, 

288 "lease_duration": lease_duration, 

289 } 

290 if holder is not None: 

291 body["holder"] = holder.model_dump(mode="json") 

292 

293 return self.request( 

294 "POST", 

295 "/v2/concurrency_limits/increment-with-lease", 

296 json=body, 

297 ) 

298 

299 def renew_concurrency_lease( 1a

300 self, 

301 lease_id: "UUID", 

302 lease_duration: float, 

303 ) -> "Response": 

304 """ 

305 Renew a concurrency lease. 

306 

307 Args: 

308 lease_id: The ID of the lease to renew. 

309 lease_duration: The new lease duration in seconds. 

310 """ 

311 return self.request( 

312 "POST", 

313 "/v2/concurrency_limits/leases/{lease_id}/renew", 

314 path_params={"lease_id": lease_id}, 

315 json={"lease_duration": lease_duration}, 

316 ) 

317 

318 def release_concurrency_slots( 1a

319 self, names: list[str], slots: int, occupancy_seconds: float 

320 ) -> "Response": 

321 """ 

322 Release concurrency slots for the specified limits. 

323 

324 Args: 

325 names: A list of limit names for which to release slots. 

326 slots: The number of concurrency slots to release. 

327 occupancy_seconds (float): The duration in seconds that the slots 

328 were occupied. 

329 

330 Returns: 

331 "Response": The HTTP response from the server. 

332 """ 

333 

334 return self.request( 

335 "POST", 

336 "/v2/concurrency_limits/decrement", 

337 json={ 

338 "names": names, 

339 "slots": slots, 

340 "occupancy_seconds": occupancy_seconds, 

341 }, 

342 ) 

343 

344 def release_concurrency_slots_with_lease( 1a

345 self, 

346 lease_id: "UUID", 

347 ) -> "Response": 

348 """ 

349 Release concurrency slots for the specified lease. 

350 

351 Args: 

352 lease_id: The ID of the lease corresponding to the concurrency limits to release. 

353 """ 

354 return self.request( 

355 "POST", 

356 "/v2/concurrency_limits/decrement-with-lease", 

357 json={ 

358 "lease_id": str(lease_id), 

359 }, 

360 ) 

361 

362 def create_global_concurrency_limit( 1a

363 self, concurrency_limit: "GlobalConcurrencyLimitCreate" 

364 ) -> "UUID": 

365 try: 

366 response = self.request( 

367 "POST", 

368 "/v2/concurrency_limits/", 

369 json=concurrency_limit.model_dump(mode="json", exclude_unset=True), 

370 ) 

371 except HTTPStatusError as e: 

372 if e.response.status_code == 409: 

373 raise ObjectAlreadyExists(http_exc=e) from e 

374 else: 

375 raise 

376 from uuid import UUID 

377 

378 return UUID(response.json()["id"]) 

379 

380 def update_global_concurrency_limit( 1a

381 self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" 

382 ) -> "Response": 

383 try: 

384 response = self.request( 

385 "PATCH", 

386 "/v2/concurrency_limits/{id_or_name}", 

387 path_params={"id_or_name": name}, 

388 json=concurrency_limit.model_dump(mode="json", exclude_unset=True), 

389 ) 

390 return response 

391 except HTTPStatusError as e: 

392 if e.response.status_code == 404: 

393 raise ObjectNotFound(http_exc=e) from e 

394 else: 

395 raise 

396 

397 def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": 1a

398 try: 

399 response = self.request( 

400 "DELETE", 

401 "/v2/concurrency_limits/{id_or_name}", 

402 path_params={"id_or_name": name}, 

403 ) 

404 return response 

405 except HTTPStatusError as e: 

406 if e.response.status_code == 404: 

407 raise ObjectNotFound(http_exc=e) from e 

408 else: 

409 raise 

410 

411 def read_global_concurrency_limit_by_name( 1a

412 self, name: str 

413 ) -> "GlobalConcurrencyLimitResponse": 

414 try: 

415 response = self.request( 

416 "GET", 

417 "/v2/concurrency_limits/{id_or_name}", 

418 path_params={"id_or_name": name}, 

419 ) 

420 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse 

421 

422 return GlobalConcurrencyLimitResponse.model_validate(response.json()) 

423 except HTTPStatusError as e: 

424 if e.response.status_code == 404: 

425 raise ObjectNotFound(http_exc=e) from e 

426 else: 

427 raise 

428 

429 def upsert_global_concurrency_limit_by_name(self, name: str, limit: int) -> None: 1a

430 """Creates a global concurrency limit with the given name and limit if one does not already exist. 

431 

432 If one does already exist matching the name then update it's limit if it is different. 

433 

434 Note: This is not done atomically. 

435 """ 

436 from prefect.client.schemas.actions import ( 

437 GlobalConcurrencyLimitCreate, 

438 GlobalConcurrencyLimitUpdate, 

439 ) 

440 

441 try: 

442 existing_limit = self.read_global_concurrency_limit_by_name(name) 

443 except ObjectNotFound: 

444 existing_limit = None 

445 

446 if not existing_limit: 

447 self.create_global_concurrency_limit( 

448 GlobalConcurrencyLimitCreate( 

449 name=name, 

450 limit=limit, 

451 ) 

452 ) 

453 elif existing_limit.limit != limit: 

454 self.update_global_concurrency_limit( 

455 name, GlobalConcurrencyLimitUpdate(limit=limit) 

456 ) 

457 

458 def read_global_concurrency_limits( 1a

459 self, limit: int = 10, offset: int = 0 

460 ) -> list["GlobalConcurrencyLimitResponse"]: 

461 response = self.request( 

462 "POST", 

463 "/v2/concurrency_limits/filter", 

464 json={ 

465 "limit": limit, 

466 "offset": offset, 

467 }, 

468 ) 

469 

470 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse 

471 

472 return GlobalConcurrencyLimitResponse.model_validate_list(response.json()) 

473 

474 

475class ConcurrencyLimitAsyncClient(BaseAsyncClient): 1a

476 async def create_concurrency_limit( 1a

477 self, 

478 tag: str, 

479 concurrency_limit: int, 

480 ) -> "UUID": 

481 """ 

482 Create a tag concurrency limit in the Prefect API. These limits govern concurrently 

483 running tasks. 

484 

485 Args: 

486 tag: a tag the concurrency limit is applied to 

487 concurrency_limit: the maximum number of concurrent task runs for a given tag 

488 

489 Raises: 

490 httpx.RequestError: if the concurrency limit was not created for any reason 

491 

492 Returns: 

493 the ID of the concurrency limit in the backend 

494 """ 

495 from prefect.client.schemas.actions import ConcurrencyLimitCreate 

496 

497 concurrency_limit_create = ConcurrencyLimitCreate( 

498 tag=tag, 

499 concurrency_limit=concurrency_limit, 

500 ) 

501 response = await self.request( 

502 "POST", 

503 "/concurrency_limits/", 

504 json=concurrency_limit_create.model_dump(mode="json"), 

505 ) 

506 

507 concurrency_limit_id = response.json().get("id") 

508 

509 if not concurrency_limit_id: 

510 raise RequestError(f"Malformed response: {response}") 

511 from uuid import UUID 

512 

513 return UUID(concurrency_limit_id) 

514 

515 async def read_concurrency_limit_by_tag( 1a

516 self, 

517 tag: str, 

518 ) -> "ConcurrencyLimit": 

519 """ 

520 Read the concurrency limit set on a specific tag. 

521 

522 Args: 

523 tag: a tag the concurrency limit is applied to 

524 

525 Raises: 

526 ObjectNotFound: If request returns 404 

527 httpx.RequestError: if the concurrency limit was not created for any reason 

528 

529 Returns: 

530 the concurrency limit set on a specific tag 

531 """ 

532 try: 

533 response = await self.request( 

534 "GET", 

535 "/concurrency_limits/tag/{tag}", 

536 path_params={"tag": tag}, 

537 ) 

538 except HTTPStatusError as e: 

539 if e.response.status_code == 404: 

540 raise ObjectNotFound(http_exc=e) from e 

541 else: 

542 raise 

543 

544 concurrency_limit_id = response.json().get("id") 

545 

546 if not concurrency_limit_id: 

547 raise RequestError(f"Malformed response: {response}") 

548 from prefect.client.schemas.objects import ConcurrencyLimit 

549 

550 return ConcurrencyLimit.model_validate(response.json()) 

551 

552 async def read_concurrency_limits( 1a

553 self, 

554 limit: int, 

555 offset: int, 

556 ) -> list["ConcurrencyLimit"]: 

557 """ 

558 Lists concurrency limits set on task run tags. 

559 

560 Args: 

561 limit: the maximum number of concurrency limits returned 

562 offset: the concurrency limit query offset 

563 

564 Returns: 

565 a list of concurrency limits 

566 """ 

567 

568 body = { 

569 "limit": limit, 

570 "offset": offset, 

571 } 

572 

573 response = await self.request("POST", "/concurrency_limits/filter", json=body) 

574 from prefect.client.schemas.objects import ConcurrencyLimit 

575 

576 return ConcurrencyLimit.model_validate_list(response.json()) 

577 

578 async def reset_concurrency_limit_by_tag( 1a

579 self, 

580 tag: str, 

581 slot_override: list["UUID | str"] | None = None, 

582 ) -> None: 

583 """ 

584 Resets the concurrency limit slots set on a specific tag. 

585 

586 Args: 

587 tag: a tag the concurrency limit is applied to 

588 slot_override: a list of task run IDs that are currently using a 

589 concurrency slot, please check that any task run IDs included in 

590 `slot_override` are currently running, otherwise those concurrency 

591 slots will never be released. 

592 

593 Raises: 

594 ObjectNotFound: If request returns 404 

595 httpx.RequestError: If request fails 

596 

597 """ 

598 if slot_override is not None: 

599 slot_override = [str(slot) for slot in slot_override] 

600 

601 try: 

602 await self.request( 

603 "POST", 

604 "/concurrency_limits/tag/{tag}/reset", 

605 path_params={"tag": tag}, 

606 json=dict(slot_override=slot_override), 

607 ) 

608 except HTTPStatusError as e: 

609 if e.response.status_code == 404: 

610 raise ObjectNotFound(http_exc=e) from e 

611 else: 

612 raise 

613 

614 async def delete_concurrency_limit_by_tag( 1a

615 self, 

616 tag: str, 

617 ) -> None: 

618 """ 

619 Delete the concurrency limit set on a specific tag. 

620 

621 Args: 

622 tag: a tag the concurrency limit is applied to 

623 

624 Raises: 

625 ObjectNotFound: If request returns 404 

626 httpx.RequestError: If request fails 

627 

628 """ 

629 try: 

630 await self.request( 

631 "DELETE", 

632 "/concurrency_limits/tag/{tag}", 

633 path_params={"tag": tag}, 

634 ) 

635 except HTTPStatusError as e: 

636 if e.response.status_code == 404: 

637 raise ObjectNotFound(http_exc=e) from e 

638 else: 

639 raise 

640 

641 async def increment_v1_concurrency_slots( 1a

642 self, 

643 names: list[str], 

644 task_run_id: "UUID", 

645 ) -> "Response": 

646 """ 

647 Increment concurrency limit slots for the specified limits. 

648 

649 Args: 

650 names: A list of limit names for which to increment limits. 

651 task_run_id: The task run ID incrementing the limits. 

652 """ 

653 data: dict[str, Any] = { 

654 "names": names, 

655 "task_run_id": str(task_run_id), 

656 } 

657 

658 return await self.request( 

659 "POST", 

660 "/concurrency_limits/increment", 

661 json=data, 

662 ) 

663 

664 async def decrement_v1_concurrency_slots( 1a

665 self, 

666 names: list[str], 

667 task_run_id: "UUID", 

668 occupancy_seconds: float, 

669 ) -> "Response": 

670 """ 

671 Decrement concurrency limit slots for the specified limits. 

672 

673 Args: 

674 names: A list of limit names to decrement. 

675 task_run_id: The task run ID that incremented the limits. 

676 occupancy_seconds (float): The duration in seconds that the limits 

677 were held. 

678 

679 Returns: 

680 "Response": The HTTP response from the server. 

681 """ 

682 data: dict[str, Any] = { 

683 "names": names, 

684 "task_run_id": str(task_run_id), 

685 "occupancy_seconds": occupancy_seconds, 

686 } 

687 

688 return await self.request( 

689 "POST", 

690 "/concurrency_limits/decrement", 

691 json=data, 

692 ) 

693 

694 async def increment_concurrency_slots( 1a

695 self, 

696 names: list[str], 

697 slots: int, 

698 mode: Literal["concurrency", "rate_limit"], 

699 ) -> "Response": 

700 """ 

701 Increment concurrency slots for the specified limits. 

702 

703 Args: 

704 names: A list of limit names for which to occupy slots. 

705 slots: The number of concurrency slots to occupy. 

706 mode: The mode of the concurrency limits. 

707 """ 

708 return await self.request( 

709 "POST", 

710 "/v2/concurrency_limits/increment", 

711 json={ 

712 "names": names, 

713 "slots": slots, 

714 "mode": mode, 

715 }, 

716 ) 

717 

718 async def increment_concurrency_slots_with_lease( 1a

719 self, 

720 names: list[str], 

721 slots: int, 

722 mode: Literal["concurrency", "rate_limit"], 

723 lease_duration: float, 

724 holder: "ConcurrencyLeaseHolder | None" = None, 

725 ) -> "Response": 

726 """ 

727 Increment concurrency slots for the specified limits with a lease. 

728 

729 Args: 

730 names: A list of limit names for which to occupy slots. 

731 slots: The number of concurrency slots to occupy. 

732 mode: The mode of the concurrency limits. 

733 lease_duration: The duration of the lease in seconds. 

734 holder: Optional holder information for tracking who holds the slots. 

735 """ 

736 body: dict[str, Any] = { 

737 "names": names, 

738 "slots": slots, 

739 "mode": mode, 

740 "lease_duration": lease_duration, 

741 } 

742 if holder is not None: 

743 body["holder"] = holder.model_dump(mode="json") 

744 

745 return await self.request( 

746 "POST", 

747 "/v2/concurrency_limits/increment-with-lease", 

748 json=body, 

749 ) 

750 

751 async def renew_concurrency_lease( 1a

752 self, 

753 lease_id: "UUID", 

754 lease_duration: float, 

755 ) -> "Response": 

756 """ 

757 Renew a concurrency lease. 

758 

759 Args: 

760 lease_id: The ID of the lease to renew. 

761 lease_duration: The new lease duration in seconds. 

762 """ 

763 return await self.request( 

764 "POST", 

765 "/v2/concurrency_limits/leases/{lease_id}/renew", 

766 path_params={"lease_id": lease_id}, 

767 json={"lease_duration": lease_duration}, 

768 ) 

769 

770 async def release_concurrency_slots( 1a

771 self, names: list[str], slots: int, occupancy_seconds: float 

772 ) -> "Response": 

773 """ 

774 Release concurrency slots for the specified limits. 

775 

776 Args: 

777 names: A list of limit names for which to release slots. 

778 slots: The number of concurrency slots to release. 

779 occupancy_seconds (float): The duration in seconds that the slots 

780 were occupied. 

781 

782 Returns: 

783 "Response": The HTTP response from the server. 

784 """ 

785 

786 return await self.request( 

787 "POST", 

788 "/v2/concurrency_limits/decrement", 

789 json={ 

790 "names": names, 

791 "slots": slots, 

792 "occupancy_seconds": occupancy_seconds, 

793 }, 

794 ) 

795 

796 async def release_concurrency_slots_with_lease( 1a

797 self, 

798 lease_id: "UUID", 

799 ) -> "Response": 

800 """ 

801 Release concurrency slots for the specified lease. 

802 

803 Args: 

804 lease_id: The ID of the lease corresponding to the concurrency limits to release. 

805 """ 

806 return await self.request( 

807 "POST", 

808 "/v2/concurrency_limits/decrement-with-lease", 

809 json={ 

810 "lease_id": str(lease_id), 

811 }, 

812 ) 

813 

814 async def create_global_concurrency_limit( 1a

815 self, concurrency_limit: "GlobalConcurrencyLimitCreate" 

816 ) -> "UUID": 

817 try: 

818 response = await self.request( 

819 "POST", 

820 "/v2/concurrency_limits/", 

821 json=concurrency_limit.model_dump(mode="json", exclude_unset=True), 

822 ) 

823 except HTTPStatusError as e: 

824 if e.response.status_code == 409: 

825 raise ObjectAlreadyExists(http_exc=e) from e 

826 else: 

827 raise 

828 

829 from uuid import UUID 

830 

831 return UUID(response.json()["id"]) 

832 

833 async def update_global_concurrency_limit( 1a

834 self, name: str, concurrency_limit: "GlobalConcurrencyLimitUpdate" 

835 ) -> "Response": 

836 try: 

837 response = await self.request( 

838 "PATCH", 

839 "/v2/concurrency_limits/{id_or_name}", 

840 path_params={"id_or_name": name}, 

841 json=concurrency_limit.model_dump(mode="json", exclude_unset=True), 

842 ) 

843 return response 

844 except HTTPStatusError as e: 

845 if e.response.status_code == 404: 

846 raise ObjectNotFound(http_exc=e) from e 

847 else: 

848 raise 

849 

850 async def delete_global_concurrency_limit_by_name(self, name: str) -> "Response": 1a

851 try: 

852 response = await self.request( 

853 "DELETE", 

854 "/v2/concurrency_limits/{id_or_name}", 

855 path_params={"id_or_name": name}, 

856 ) 

857 return response 

858 except HTTPStatusError as e: 

859 if e.response.status_code == 404: 

860 raise ObjectNotFound(http_exc=e) from e 

861 else: 

862 raise 

863 

864 async def read_global_concurrency_limit_by_name( 1a

865 self, name: str 

866 ) -> "GlobalConcurrencyLimitResponse": 

867 try: 

868 response = await self.request( 

869 "GET", 

870 "/v2/concurrency_limits/{id_or_name}", 

871 path_params={"id_or_name": name}, 

872 ) 

873 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse 

874 

875 return GlobalConcurrencyLimitResponse.model_validate(response.json()) 

876 except HTTPStatusError as e: 

877 if e.response.status_code == 404: 

878 raise ObjectNotFound(http_exc=e) from e 

879 else: 

880 raise 

881 

882 async def upsert_global_concurrency_limit_by_name( 1a

883 self, name: str, limit: int 

884 ) -> None: 

885 """Creates a global concurrency limit with the given name and limit if one does not already exist. 

886 

887 If one does already exist matching the name then update it's limit if it is different. 

888 

889 Note: This is not done atomically. 

890 """ 

891 from prefect.client.schemas.actions import ( 

892 GlobalConcurrencyLimitCreate, 

893 GlobalConcurrencyLimitUpdate, 

894 ) 

895 

896 try: 

897 existing_limit = await self.read_global_concurrency_limit_by_name(name) 

898 except ObjectNotFound: 

899 existing_limit = None 

900 

901 if not existing_limit: 

902 await self.create_global_concurrency_limit( 

903 GlobalConcurrencyLimitCreate( 

904 name=name, 

905 limit=limit, 

906 ) 

907 ) 

908 elif existing_limit.limit != limit: 

909 await self.update_global_concurrency_limit( 

910 name, GlobalConcurrencyLimitUpdate(limit=limit) 

911 ) 

912 

913 async def read_global_concurrency_limits( 1a

914 self, limit: int = 10, offset: int = 0 

915 ) -> list["GlobalConcurrencyLimitResponse"]: 

916 response = await self.request( 

917 "POST", 

918 "/v2/concurrency_limits/filter", 

919 json={ 

920 "limit": limit, 

921 "offset": offset, 

922 }, 

923 ) 

924 

925 from prefect.client.schemas.responses import GlobalConcurrencyLimitResponse 

926 

927 return GlobalConcurrencyLimitResponse.model_validate_list(response.json())