Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/concurrency_limits.py: 34%

260 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with concurrency limit objects. 

3 

4This module provides a V1 API adapter that routes requests to the V2 concurrency 

5system. After the migration, V1 limits are converted to V2, but the V1 API 

6continues to work for backward compatibility. 

7""" 

8 

9from __future__ import annotations 1c

10 

11import logging 1c

12from datetime import timedelta 1c

13from typing import List, Optional, Sequence 1c

14from uuid import UUID 1c

15 

16from fastapi import Body, Depends, HTTPException, Path, Response, status 1c

17 

18import prefect.server.api.dependencies as dependencies 1c

19import prefect.server.models as models 1c

20import prefect.server.schemas as schemas 1c

21from prefect.logging.loggers import get_logger 1c

22from prefect.server.api.concurrency_limits_v2 import MinimalConcurrencyLimitResponse 1c

23from prefect.server.concurrency.lease_storage import ( 1c

24 ConcurrencyLimitLeaseMetadata, 

25 get_concurrency_lease_storage, 

26) 

27from prefect.server.database import PrefectDBInterface, provide_database_interface 1c

28from prefect.server.models import concurrency_limits 1c

29from prefect.server.models import concurrency_limits_v2 as cl_v2_models 1c

30from prefect.server.utilities.server import PrefectRouter 1c

31from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS 1c

32from prefect.types._concurrency import ConcurrencyLeaseHolder 1c

33 

34router: PrefectRouter = PrefectRouter( 1c

35 prefix="/concurrency_limits", tags=["Concurrency Limits"] 

36) 

37logger: logging.Logger = get_logger(__name__) 1c

38# V1 clients cannot renew leases; use a long TTL 

39V1_LEASE_TTL = timedelta(days=100 * 365) # ~100 years 1c

40 

41 

42@router.post("/") 1c

43async def create_concurrency_limit( 1c

44 concurrency_limit: schemas.actions.ConcurrencyLimitCreate, 

45 response: Response, 

46 db: PrefectDBInterface = Depends(provide_database_interface), 

47) -> schemas.core.ConcurrencyLimit: 

48 """ 

49 Create a task run concurrency limit. 

50 

51 For more information, see https://docs.prefect.io/v3/concepts/tag-based-concurrency-limits. 

52 """ 

53 # Always create V2 limits (no V1 record) 

54 v2_name = f"tag:{concurrency_limit.tag}" 1adb

55 

56 async with db.session_context(begin_transaction=True) as session: 1adeb

57 # Check if V2 already exists (upsert behavior) 

58 existing = await cl_v2_models.read_concurrency_limit( 1adeb

59 session=session, name=v2_name 

60 ) 

61 

62 if existing: 

63 # Update existing V2 limit 

64 await cl_v2_models.update_concurrency_limit( 1aeb

65 session=session, 

66 concurrency_limit_id=existing.id, 

67 concurrency_limit=schemas.actions.ConcurrencyLimitV2Update( 

68 limit=concurrency_limit.concurrency_limit 

69 ), 

70 ) 

71 model = existing 

72 model.limit = concurrency_limit.concurrency_limit 

73 response.status_code = status.HTTP_200_OK 

74 else: 

75 # Create new V2 limit 

76 model = await cl_v2_models.create_concurrency_limit( 1ab

77 session=session, 

78 concurrency_limit=schemas.core.ConcurrencyLimitV2( 

79 name=v2_name, 

80 limit=concurrency_limit.concurrency_limit, 

81 active=True, 

82 ), 

83 ) 

84 response.status_code = status.HTTP_201_CREATED 

85 

86 # Return V1 format 

87 lease_storage = get_concurrency_lease_storage() 1aeb

88 holders = await lease_storage.list_holders_for_limit(model.id) 1aeb

89 active_slots = [h.id for _, h in holders if h.type == "task_run"] 1aeb

90 

91 return schemas.core.ConcurrencyLimit( 1aeb

92 id=model.id, 

93 tag=concurrency_limit.tag, 

94 concurrency_limit=model.limit, 

95 active_slots=active_slots, 

96 created=model.created, 

97 updated=model.updated, 

98 ) 

99 

100 

101@router.get("/{id:uuid}") 1c

102async def read_concurrency_limit( 1c

103 concurrency_limit_id: UUID = Path( 

104 ..., description="The concurrency limit id", alias="id" 

105 ), 

106 db: PrefectDBInterface = Depends(provide_database_interface), 

107) -> schemas.core.ConcurrencyLimit: 

108 """ 

109 Get a concurrency limit by id. 

110 

111 The `active slots` field contains a list of TaskRun IDs currently using a 

112 concurrency slot for the specified tag. 

113 """ 

114 # Try V2 first 

115 async with db.session_context() as session: 1adb

116 v2_limit = await cl_v2_models.read_concurrency_limit( 1adb

117 session=session, concurrency_limit_id=concurrency_limit_id 

118 ) 

119 

120 if v2_limit and v2_limit.name.startswith("tag:"): 

121 tag = v2_limit.name.removeprefix("tag:") 

122 lease_storage = get_concurrency_lease_storage() 

123 holders = await lease_storage.list_holders_for_limit(v2_limit.id) 

124 active_slots = [h.id for _, h in holders if h.type == "task_run"] 

125 

126 return schemas.core.ConcurrencyLimit( 

127 id=v2_limit.id, 

128 tag=tag, 

129 concurrency_limit=v2_limit.limit, 

130 active_slots=active_slots, 

131 created=v2_limit.created, 

132 updated=v2_limit.updated, 

133 ) 

134 

135 # Fall back to V1 (for pre-migration compatibility) 

136 async with db.session_context() as session: 1ab

137 model = await models.concurrency_limits.read_concurrency_limit( 1ab

138 session=session, concurrency_limit_id=concurrency_limit_id 

139 ) 

140 if not model: 140 ↛ 144line 140 didn't jump to line 144 because the condition on line 140 was always true1ab

141 raise HTTPException( 1ab

142 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency limit not found" 

143 ) 

144 return model 

145 

146 

147@router.get("/tag/{tag}") 1c

148async def read_concurrency_limit_by_tag( 1c

149 tag: str = Path(..., description="The tag name", alias="tag"), 

150 db: PrefectDBInterface = Depends(provide_database_interface), 

151) -> schemas.core.ConcurrencyLimit: 

152 """ 

153 Get a concurrency limit by tag. 

154 

155 The `active slots` field contains a list of TaskRun IDs currently using a 

156 concurrency slot for the specified tag. 

157 """ 

158 # Try V2 first 

159 v2_name = f"tag:{tag}" 1ab

160 

161 async with db.session_context() as session: 1ab

162 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1ab

163 

164 if model: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true1ab

165 lease_storage = get_concurrency_lease_storage() 

166 holders = await lease_storage.list_holders_for_limit(model.id) 

167 active_slots = [h.id for _, h in holders if h.type == "task_run"] 

168 

169 return schemas.core.ConcurrencyLimit( 

170 id=model.id, 

171 tag=tag, 

172 concurrency_limit=model.limit, 

173 active_slots=active_slots, 

174 created=model.created, 

175 updated=model.updated, 

176 ) 

177 

178 # Fall back to V1 (for pre-migration compatibility) 

179 async with db.session_context() as session: 1ab

180 v1_model = await models.concurrency_limits.read_concurrency_limit_by_tag( 1ab

181 session=session, tag=tag 

182 ) 

183 if not v1_model: 183 ↛ 187line 183 didn't jump to line 187 because the condition on line 183 was always true1ab

184 raise HTTPException( 1ab

185 status.HTTP_404_NOT_FOUND, detail="Concurrency limit not found" 

186 ) 

187 return v1_model 

188 

189 

190@router.post("/filter") 1c

191async def read_concurrency_limits( 1c

192 limit: int = dependencies.LimitBody(), 

193 offset: int = Body(0, ge=0), 

194 db: PrefectDBInterface = Depends(provide_database_interface), 

195) -> Sequence[schemas.core.ConcurrencyLimit]: 

196 """ 

197 Query for concurrency limits. 

198 

199 For each concurrency limit the `active slots` field contains a list of TaskRun IDs 

200 currently using a concurrency slot for the specified tag. 

201 """ 

202 # Get both V1 and V2, then merge 

203 async with db.session_context() as session: 1adb

204 v1_limits = await models.concurrency_limits.read_concurrency_limits( 1adb

205 session=session, limit=limit + offset, offset=0 

206 ) 

207 v2_limits = await cl_v2_models.read_all_concurrency_limits( 1adb

208 session=session, limit=limit + offset, offset=0 

209 ) 

210 

211 # Convert V2 to V1 format 

212 converted_v2: list[schemas.core.ConcurrencyLimit] = [] 1adb

213 lease_storage = get_concurrency_lease_storage() 1adb

214 

215 for v2_limit in v2_limits: 1adb

216 if not v2_limit.name.startswith("tag:"): 1adb

217 continue 1adb

218 tag = v2_limit.name.removeprefix("tag:") 1adb

219 holders = await lease_storage.list_holders_for_limit(v2_limit.id) 1adb

220 active_slots = [h.id for _, h in holders if h.type == "task_run"] 1adb

221 

222 converted_v2.append( 1adb

223 schemas.core.ConcurrencyLimit( 

224 id=v2_limit.id, 

225 tag=tag, 

226 concurrency_limit=v2_limit.limit, 

227 active_slots=active_slots, 

228 created=v2_limit.created, 

229 updated=v2_limit.updated, 

230 ) 

231 ) 

232 

233 # Merge and deduplicate by tag (prefer V2) 

234 seen_tags = {cl.tag for cl in converted_v2} 1adb

235 combined = converted_v2 + [cl for cl in v1_limits if cl.tag not in seen_tags] 1adb

236 

237 return combined[offset : offset + limit] 1adb

238 

239 

240@router.post("/tag/{tag}/reset") 1c

241async def reset_concurrency_limit_by_tag( 1c

242 tag: str = Path(..., description="The tag name"), 

243 slot_override: Optional[List[UUID]] = Body( 

244 None, 

245 embed=True, 

246 description="Manual override for active concurrency limit slots.", 

247 ), 

248 db: PrefectDBInterface = Depends(provide_database_interface), 

249) -> None: 

250 # Try V2 first 

251 v2_name = f"tag:{tag}" 1a

252 

253 async with db.session_context(begin_transaction=True) as session: 1a

254 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1a

255 

256 if model: 

257 # Revoke all existing leases 

258 lease_storage = get_concurrency_lease_storage() 

259 

260 # Keep fetching and revoking in batches until all are gone 

261 batch_size = 100 

262 offset = 0 

263 while True: 

264 active_lease_ids = await lease_storage.read_active_lease_ids( 

265 limit=batch_size, offset=offset 

266 ) 

267 if not active_lease_ids: 

268 break 

269 

270 revoked_any = False 

271 for lease_id in active_lease_ids: 

272 lease = await lease_storage.read_lease(lease_id) 

273 if lease and model.id in lease.resource_ids: 

274 await lease_storage.revoke_lease(lease_id) 

275 revoked_any = True 

276 

277 # If we didn't revoke any in this batch, we're done with this limit 

278 if not revoked_any: 

279 offset += batch_size 

280 else: 

281 # Start from beginning since we modified the list 

282 offset = 0 

283 

284 # Create new leases for slot_override if provided 

285 if slot_override: 

286 for task_run_id in slot_override: 

287 await cl_v2_models.bulk_increment_active_slots( 

288 session=session, 

289 concurrency_limit_ids=[model.id], 

290 slots=1, 

291 ) 

292 await lease_storage.create_lease( 

293 resource_ids=[model.id], 

294 ttl=V1_LEASE_TTL, 

295 metadata=ConcurrencyLimitLeaseMetadata( 

296 slots=1, 

297 holder=ConcurrencyLeaseHolder( 

298 type="task_run", id=task_run_id 

299 ), 

300 ), 

301 ) 

302 return 

303 

304 # Fall back to V1 

305 model = await models.concurrency_limits.reset_concurrency_limit_by_tag( 1a

306 session=session, tag=tag, slot_override=slot_override 

307 ) 

308 if not model: 

309 raise HTTPException( 

310 status_code=status.HTTP_404_NOT_FOUND, 

311 detail="Concurrency limit not found", 

312 ) 

313 

314 

315@router.delete("/{id:uuid}") 1c

316async def delete_concurrency_limit( 1c

317 concurrency_limit_id: UUID = Path( 

318 ..., description="The concurrency limit id", alias="id" 

319 ), 

320 db: PrefectDBInterface = Depends(provide_database_interface), 

321) -> None: 

322 # Try V2 first 

323 async with db.session_context(begin_transaction=True) as session: 1ab

324 v2 = await cl_v2_models.read_concurrency_limit( 1ab

325 session=session, concurrency_limit_id=concurrency_limit_id 

326 ) 

327 if v2: 

328 # Clean up leases 

329 lease_storage = get_concurrency_lease_storage() 

330 

331 # Keep fetching and revoking in batches until all are gone 

332 batch_size = 100 

333 offset = 0 

334 while True: 

335 active_lease_ids = await lease_storage.read_active_lease_ids( 

336 limit=batch_size, offset=offset 

337 ) 

338 if not active_lease_ids: 

339 break 

340 

341 revoked_any = False 

342 for lease_id in active_lease_ids: 

343 lease = await lease_storage.read_lease(lease_id) 

344 if lease and v2.id in lease.resource_ids: 

345 await lease_storage.revoke_lease(lease_id) 

346 revoked_any = True 

347 

348 # If we didn't revoke any in this batch, we're done with this limit 

349 if not revoked_any: 

350 offset += batch_size 

351 else: 

352 # Start from beginning since we modified the list 

353 offset = 0 

354 

355 # Delete V2 

356 await cl_v2_models.delete_concurrency_limit( 1ab

357 session=session, concurrency_limit_id=v2.id 

358 ) 

359 return 

360 

361 # Try V1 

362 async with db.session_context(begin_transaction=True) as session: 1ab

363 v1_deleted = await models.concurrency_limits.delete_concurrency_limit( 1ab

364 session=session, concurrency_limit_id=concurrency_limit_id 

365 ) 

366 if not v1_deleted: 

367 raise HTTPException( 

368 status_code=status.HTTP_404_NOT_FOUND, 

369 detail="Concurrency limit not found", 

370 ) 

371 

372 

373@router.delete("/tag/{tag}") 1c

374async def delete_concurrency_limit_by_tag( 1c

375 tag: str = Path(..., description="The tag name"), 

376 db: PrefectDBInterface = Depends(provide_database_interface), 

377) -> None: 

378 # Try V2 first 

379 v2_name = f"tag:{tag}" 1ab

380 async with db.session_context(begin_transaction=True) as session: 1ab

381 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1ab

382 if model: 

383 # Clean up leases 

384 lease_storage = get_concurrency_lease_storage() 

385 

386 # Keep fetching and revoking in batches until all are gone 

387 batch_size = 100 

388 offset = 0 

389 while True: 

390 active_lease_ids = await lease_storage.read_active_lease_ids( 

391 limit=batch_size, offset=offset 

392 ) 

393 if not active_lease_ids: 

394 break 

395 

396 revoked_any = False 

397 for lease_id in active_lease_ids: 

398 lease = await lease_storage.read_lease(lease_id) 

399 if lease and model.id in lease.resource_ids: 

400 await lease_storage.revoke_lease(lease_id) 

401 revoked_any = True 

402 

403 # If we didn't revoke any in this batch, we're done with this limit 

404 if not revoked_any: 

405 offset += batch_size 

406 else: 

407 # Start from beginning since we modified the list 

408 offset = 0 

409 

410 # Delete V2 

411 await cl_v2_models.delete_concurrency_limit( 

412 session=session, concurrency_limit_id=model.id 

413 ) 

414 return 

415 

416 # Try V1 

417 async with db.session_context(begin_transaction=True) as session: 1ab

418 v1_deleted = await models.concurrency_limits.delete_concurrency_limit_by_tag( 1ab

419 session=session, tag=tag 

420 ) 

421 if not v1_deleted: 

422 raise HTTPException( 

423 status_code=status.HTTP_404_NOT_FOUND, 

424 detail="Concurrency limit not found", 

425 ) 

426 

427 

428class Abort(Exception): 1c

429 def __init__(self, reason: str): 1c

430 self.reason = reason 

431 

432 

433class Delay(Exception): 1c

434 def __init__(self, delay_seconds: float, reason: str): 1c

435 self.delay_seconds = delay_seconds 

436 self.reason = reason 

437 

438 

439@router.post("/increment") 1c

440async def increment_concurrency_limits_v1( 1c

441 names: List[str] = Body(..., description="The tags to acquire a slot for"), 

442 task_run_id: UUID = Body( 

443 ..., description="The ID of the task run acquiring the slot" 

444 ), 

445 db: PrefectDBInterface = Depends(provide_database_interface), 

446) -> List[MinimalConcurrencyLimitResponse]: 

447 """ 

448 Increment concurrency limits for the given tags. 

449 

450 During migration, this handles both V1 and V2 limits to support mixed states. 

451 Post-migration, it only uses V2 with lease-based concurrency. 

452 """ 

453 results = [] 1ab

454 v2_names = [f"tag:{tag}" for tag in names] 1ab

455 

456 async with db.session_context(begin_transaction=True) as session: 1ab

457 # Get V2 limits 

458 v2_limits = await cl_v2_models.bulk_read_concurrency_limits( 1ab

459 session=session, names=v2_names 

460 ) 

461 v2_by_name = {limit.name: limit for limit in v2_limits} 

462 

463 # Get V1 limits (for pre-migration compatibility) 

464 v1_limits = ( 

465 await concurrency_limits.filter_concurrency_limits_for_orchestration( 

466 session, tags=names 

467 ) 

468 ) 

469 v1_by_tag = {limit.tag: limit for limit in v1_limits} 

470 

471 # Check all zero limits upfront before acquiring any 

472 for tag in names: 

473 v2_limit = v2_by_name.get(f"tag:{tag}") 

474 v1_limit = v1_by_tag.get(tag) 

475 

476 if v2_limit and v2_limit.limit == 0: 

477 raise HTTPException( 

478 status_code=status.HTTP_423_LOCKED, 

479 detail=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.', 

480 ) 

481 elif v1_limit and v1_limit.concurrency_limit == 0: 

482 raise HTTPException( 

483 status_code=status.HTTP_423_LOCKED, 

484 detail=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.', 

485 ) 

486 

487 # Collect V2 limits to acquire in bulk 

488 v2_to_acquire = [] 

489 v2_tags_map = {} # Map limit IDs to tags for results 

490 

491 # Check V1 limits availability upfront 

492 v1_to_acquire = [] 

493 

494 for tag in names: 

495 v2_limit = v2_by_name.get(f"tag:{tag}") 

496 v1_limit = v1_by_tag.get(tag) 

497 

498 if v2_limit and v2_limit.active: 

499 v2_to_acquire.append(v2_limit.id) 

500 v2_tags_map[v2_limit.id] = tag 

501 elif v1_limit: 

502 # Check V1 limit availability 

503 if len(v1_limit.active_slots) >= v1_limit.concurrency_limit: 

504 raise HTTPException( 

505 status_code=status.HTTP_423_LOCKED, 

506 detail=f"Concurrency limit for the {tag} tag has been reached", 

507 headers={ 

508 "Retry-After": str( 

509 PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value() 

510 ) 

511 }, 

512 ) 

513 v1_to_acquire.append(v1_limit) 

514 

515 # Bulk acquire all V2 limits at once 

516 acquired_v2_ids = [] 

517 if v2_to_acquire: 

518 acquired = await cl_v2_models.bulk_increment_active_slots( 

519 session=session, 

520 concurrency_limit_ids=v2_to_acquire, 

521 slots=1, 

522 ) 

523 if not acquired: 

524 # Find which limit was at capacity 

525 for lid in v2_to_acquire: 

526 tag = v2_tags_map[lid] 

527 raise HTTPException( 

528 status_code=status.HTTP_423_LOCKED, 

529 detail=f"Concurrency limit for the {tag} tag has been reached", 

530 headers={ 

531 "Retry-After": str( 

532 PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value() 

533 ) 

534 }, 

535 ) 

536 acquired_v2_ids = v2_to_acquire 

537 

538 # Add V2 results 

539 for lid in acquired_v2_ids: 

540 tag = v2_tags_map[lid] 

541 limit = v2_by_name[f"tag:{tag}"] 

542 results.append( 

543 MinimalConcurrencyLimitResponse( 

544 id=limit.id, name=tag, limit=limit.limit 

545 ) 

546 ) 

547 

548 # Apply V1 increments (already checked availability) 

549 for v1_limit in v1_to_acquire: 

550 active_slots = set(v1_limit.active_slots) 

551 active_slots.add(str(task_run_id)) 

552 v1_limit.active_slots = list(active_slots) 

553 results.append( 

554 MinimalConcurrencyLimitResponse( 

555 id=v1_limit.id, name=v1_limit.tag, limit=v1_limit.concurrency_limit 

556 ) 

557 ) 

558 

559 # Create lease for V2 limits 

560 if acquired_v2_ids: 1ab

561 lease_storage = get_concurrency_lease_storage() 

562 await lease_storage.create_lease( 

563 resource_ids=acquired_v2_ids, 

564 ttl=V1_LEASE_TTL, 

565 metadata=ConcurrencyLimitLeaseMetadata( 

566 slots=1, 

567 holder=ConcurrencyLeaseHolder(type="task_run", id=task_run_id), 

568 ), 

569 ) 

570 

571 return results 1ab

572 

573 

574@router.post("/decrement") 1c

575async def decrement_concurrency_limits_v1( 1c

576 names: List[str] = Body(..., description="The tags to release a slot for"), 

577 task_run_id: UUID = Body( 

578 ..., description="The ID of the task run releasing the slot" 

579 ), 

580 db: PrefectDBInterface = Depends(provide_database_interface), 

581) -> List[MinimalConcurrencyLimitResponse]: 

582 """ 

583 Decrement concurrency limits for the given tags. 

584 

585 Finds and revokes the lease for V2 limits or decrements V1 active slots. 

586 Returns the list of limits that were decremented. 

587 """ 

588 results: list[MinimalConcurrencyLimitResponse] = [] 1ab

589 lease_storage = get_concurrency_lease_storage() 1ab

590 v2_names = [f"tag:{tag}" for tag in names] 1ab

591 

592 async with db.session_context(begin_transaction=True) as session: 1ab

593 # Bulk read V2 limits 

594 v2_limits = await cl_v2_models.bulk_read_concurrency_limits( 1ab

595 session=session, names=v2_names 

596 ) 

597 v2_by_name = {limit.name: limit for limit in v2_limits} 

598 

599 # Find and revoke lease for V2 limits 

600 if v2_limits: 

601 leases_ids_to_revoke: set[UUID] = set() 

602 

603 for limit in v2_limits: 

604 holders = await lease_storage.list_holders_for_limit(limit.id) 

605 for lease_id, holder in holders: 

606 if holder.id == task_run_id: 

607 lease = await lease_storage.read_lease(lease_id) 

608 if lease: 

609 leases_ids_to_revoke.add(lease.id) 

610 

611 for lease_id in leases_ids_to_revoke: 

612 lease = await lease_storage.read_lease(lease_id) 

613 if lease: 

614 await cl_v2_models.bulk_decrement_active_slots( 

615 session=session, 

616 concurrency_limit_ids=lease.resource_ids, 

617 slots=lease.metadata.slots if lease.metadata else 0, 

618 ) 

619 await lease_storage.revoke_lease(lease.id) 

620 else: 

621 logger.warning(f"Lease {lease_id} not found during decrement") 

622 

623 results.extend( 

624 [ 

625 MinimalConcurrencyLimitResponse( 

626 id=limit.id, name=limit.name, limit=limit.limit 

627 ) 

628 for limit in v2_limits 

629 ] 

630 ) 

631 

632 # Handle V1 decrements (for pre-migration compatibility) 

633 v1_limits = ( 

634 await concurrency_limits.filter_concurrency_limits_for_orchestration( 

635 session, tags=names 

636 ) 

637 ) 

638 for cl in v1_limits: 

639 # Skip if already handled as V2 

640 v2_name = f"tag:{cl.tag}" 

641 if v2_name not in v2_by_name: 

642 active_slots = set(cl.active_slots) 

643 if str(task_run_id) in active_slots: 

644 active_slots.discard(str(task_run_id)) 

645 cl.active_slots = list(active_slots) 

646 results.append( 

647 MinimalConcurrencyLimitResponse( 

648 id=cl.id, name=cl.tag, limit=cl.concurrency_limit 

649 ) 

650 ) 

651 

652 return results 1ab