Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/concurrency_limits_v2.py: 68%

149 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from datetime import datetime, timedelta, timezone 1b

2from typing import List, Literal, Optional, Union 1b

3from uuid import UUID 1b

4 

5from fastapi import Body, Depends, HTTPException, Path 1b

6from sqlalchemy.ext.asyncio import AsyncSession 1b

7 

8import prefect.server.models as models 1b

9import prefect.server.schemas as schemas 1b

10from prefect._internal.compatibility.starlette import status 1b

11from prefect.server.api.dependencies import LimitBody 1b

12from prefect.server.concurrency.lease_storage import ( 1b

13 ConcurrencyLimitLeaseMetadata, 

14 get_concurrency_lease_storage, 

15) 

16from prefect.server.database import PrefectDBInterface, provide_database_interface 1b

17from prefect.server.schemas import actions 1b

18from prefect.server.utilities.schemas import PrefectBaseModel 1b

19from prefect.server.utilities.server import PrefectRouter 1b

20from prefect.settings.context import get_current_settings 1b

21from prefect.types._concurrency import ConcurrencyLeaseHolder 1b

22from prefect.utilities.math import clamped_poisson_interval 1b

23 

24router: PrefectRouter = PrefectRouter( 1b

25 prefix="/v2/concurrency_limits", tags=["Concurrency Limits V2"] 

26) 

27 

28 

29@router.post("/", status_code=status.HTTP_201_CREATED) 1b

30async def create_concurrency_limit_v2( 1b

31 concurrency_limit: actions.ConcurrencyLimitV2Create, 

32 db: PrefectDBInterface = Depends(provide_database_interface), 

33) -> schemas.core.ConcurrencyLimitV2: 

34 """ 

35 Create a task run concurrency limit. 

36 

37 For more information, see https://docs.prefect.io/v3/how-to-guides/workflows/global-concurrency-limits. 

38 """ 

39 async with db.session_context(begin_transaction=True) as session: 1a

40 model = await models.concurrency_limits_v2.create_concurrency_limit( 1a

41 session=session, concurrency_limit=concurrency_limit 

42 ) 

43 

44 return schemas.core.ConcurrencyLimitV2.model_validate(model) 1a

45 

46 

47@router.get("/{id_or_name}") 1b

48async def read_concurrency_limit_v2( 1b

49 id_or_name: Union[UUID, str] = Path( 

50 ..., description="The ID or name of the concurrency limit", alias="id_or_name" 

51 ), 

52 db: PrefectDBInterface = Depends(provide_database_interface), 

53) -> schemas.responses.GlobalConcurrencyLimitResponse: 

54 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 54 ↛ 59line 54 didn't jump to line 59 because the condition on line 54 was always true1ac

55 try: 1ac

56 id_or_name = UUID(id_or_name) 1ac

57 except ValueError: 

58 pass 

59 async with db.session_context() as session: 1ac

60 if isinstance(id_or_name, UUID): 1ac

61 model = await models.concurrency_limits_v2.read_concurrency_limit( 1a

62 session, concurrency_limit_id=id_or_name 

63 ) 

64 else: 

65 model = await models.concurrency_limits_v2.read_concurrency_limit( 

66 session, name=id_or_name 

67 ) 

68 

69 if not model: 69 ↛ 74line 69 didn't jump to line 74 because the condition on line 69 was always true1ac

70 raise HTTPException( 1ac

71 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found" 

72 ) 

73 

74 return schemas.responses.GlobalConcurrencyLimitResponse.model_validate(model) 

75 

76 

77@router.post("/filter") 1b

78async def read_all_concurrency_limits_v2( 1b

79 limit: int = LimitBody(), 

80 offset: int = Body(0, ge=0), 

81 db: PrefectDBInterface = Depends(provide_database_interface), 

82) -> List[schemas.responses.GlobalConcurrencyLimitResponse]: 

83 async with db.session_context() as session: 1a

84 concurrency_limits = ( 

85 await models.concurrency_limits_v2.read_all_concurrency_limits( 

86 session=session, 

87 limit=limit, 

88 offset=offset, 

89 ) 

90 ) 

91 

92 return [ 1a

93 schemas.responses.GlobalConcurrencyLimitResponse.model_validate(limit) 

94 for limit in concurrency_limits 

95 ] 

96 

97 

98@router.patch("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1b

99async def update_concurrency_limit_v2( 1b

100 concurrency_limit: actions.ConcurrencyLimitV2Update, 

101 id_or_name: Union[UUID, str] = Path( 

102 ..., description="The ID or name of the concurrency limit", alias="id_or_name" 

103 ), 

104 db: PrefectDBInterface = Depends(provide_database_interface), 

105) -> None: 

106 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 106 ↛ 111line 106 didn't jump to line 111 because the condition on line 106 was always true1ac

107 try: 1ac

108 id_or_name = UUID(id_or_name) 1ac

109 except ValueError: 

110 pass 

111 async with db.session_context(begin_transaction=True) as session: 1ac

112 if isinstance(id_or_name, UUID): 1ac

113 updated = await models.concurrency_limits_v2.update_concurrency_limit( 1a

114 session, 

115 concurrency_limit_id=id_or_name, 

116 concurrency_limit=concurrency_limit, 

117 ) 

118 else: 

119 updated = await models.concurrency_limits_v2.update_concurrency_limit( 

120 session, name=id_or_name, concurrency_limit=concurrency_limit 

121 ) 

122 

123 if not updated: 123 ↛ exitline 123 didn't return from function 'update_concurrency_limit_v2' because the condition on line 123 was always true1ac

124 raise HTTPException( 1ac

125 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found" 

126 ) 

127 

128 

129@router.delete("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1b

130async def delete_concurrency_limit_v2( 1b

131 id_or_name: Union[UUID, str] = Path( 

132 ..., description="The ID or name of the concurrency limit", alias="id_or_name" 

133 ), 

134 db: PrefectDBInterface = Depends(provide_database_interface), 

135) -> None: 

136 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 136 ↛ 141line 136 didn't jump to line 141 because the condition on line 136 was always true1ac

137 try: 1ac

138 id_or_name = UUID(id_or_name) 1ac

139 except ValueError: 

140 pass 

141 async with db.session_context(begin_transaction=True) as session: 1ac

142 deleted = False 1ac

143 if isinstance(id_or_name, UUID): 1ac

144 deleted = await models.concurrency_limits_v2.delete_concurrency_limit( 1a

145 session, concurrency_limit_id=id_or_name 

146 ) 

147 else: 

148 deleted = await models.concurrency_limits_v2.delete_concurrency_limit( 

149 session, name=id_or_name 

150 ) 

151 

152 if not deleted: 152 ↛ exitline 152 didn't return from function 'delete_concurrency_limit_v2' because the condition on line 152 was always true1ac

153 raise HTTPException( 1ac

154 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found" 

155 ) 

156 

157 

158class MinimalConcurrencyLimitResponse(PrefectBaseModel): 1b

159 id: UUID 1b

160 name: str 1b

161 limit: int 1b

162 

163 

164class ConcurrencyLimitWithLeaseResponse(PrefectBaseModel): 1b

165 lease_id: UUID 1b

166 limits: list[MinimalConcurrencyLimitResponse] 1b

167 

168 

169async def _acquire_concurrency_slots( 1b

170 session: AsyncSession, 

171 names: List[str], 

172 slots: int, 

173 mode: Literal["concurrency", "rate_limit"], 

174) -> tuple[list[schemas.core.ConcurrencyLimitV2], bool]: 

175 limits = [ 

176 schemas.core.ConcurrencyLimitV2.model_validate(limit) 

177 for limit in ( 

178 await models.concurrency_limits_v2.bulk_read_concurrency_limits( 

179 session=session, names=names 

180 ) 

181 ) 

182 ] 

183 

184 active_limits = [limit for limit in limits if bool(limit.active)] 

185 

186 if any(limit.limit < slots for limit in active_limits): 186 ↛ anywhereline 186 didn't jump anywhere: it always raised an exception.1a

187 raise HTTPException( 

188 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

189 detail="Slots requested is greater than the limit", 

190 ) 

191 

192 non_decaying = [ 

193 str(limit.name) for limit in active_limits if limit.slot_decay_per_second == 0.0 

194 ] 

195 

196 if mode == "rate_limit" and non_decaying: 

197 raise HTTPException( 

198 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

199 detail=( 

200 "Only concurrency limits with slot decay can be used for " 

201 "rate limiting. The following limits do not have a decay " 

202 f"configured: {','.join(non_decaying)!r}" 

203 ), 

204 ) 

205 acquired = await models.concurrency_limits_v2.bulk_increment_active_slots( 1a

206 session=session, 

207 concurrency_limit_ids=[limit.id for limit in active_limits], 

208 slots=slots, 

209 ) 

210 

211 if not acquired: 

212 await session.rollback() 

213 

214 return limits, acquired 

215 

216 

217async def _generate_concurrency_locked_response( 1b

218 session: AsyncSession, 

219 limits: list[schemas.core.ConcurrencyLimitV2], 

220 slots: int, 

221) -> HTTPException: 

222 """ 

223 Generate a 423 Locked response when concurrency slots cannot be acquired. 

224 

225 Calculates an appropriate Retry-After header value based on the blocking limit's 

226 characteristics. For limits without slot decay, caps avg_slot_occupancy_seconds 

227 at a configured maximum to prevent excessive retry delays from long-running tasks: 

228 

229 - Tag-based limits (name starts with "tag:"): Capped at tag_concurrency_slot_wait_seconds 

230 (default 30s) to restore V1 behavior that users relied on 

231 - Other limits: Capped at maximum_concurrency_slot_wait_seconds (default 30s) to allow 

232 more uniform queues while still preventing astronomical delays 

233 

234 Low average occupancies are always respected (e.g., 2s stays 2s, not forced higher). 

235 Limits with slot decay use the decay rate directly without capping. 

236 

237 The final retry value includes jitter via clamped_poisson_interval to prevent 

238 thundering herd when many tasks retry simultaneously. 

239 """ 

240 active_limits = [limit for limit in limits if bool(limit.active)] 

241 

242 await models.concurrency_limits_v2.bulk_update_denied_slots( 

243 session=session, 

244 concurrency_limit_ids=[limit.id for limit in active_limits], 

245 slots=slots, 

246 ) 

247 

248 def num_blocking_slots(limit: schemas.core.ConcurrencyLimitV2) -> float: 

249 if limit.slot_decay_per_second > 0: 

250 return slots + limit.denied_slots 

251 else: 

252 return (slots + limit.denied_slots) / limit.limit 

253 

254 blocking_limit = max((limit for limit in active_limits), key=num_blocking_slots) 

255 blocking_slots = num_blocking_slots(blocking_limit) 

256 

257 if blocking_limit.slot_decay_per_second == 0.0: 

258 settings = get_current_settings() 

259 max_wait = ( 

260 settings.server.tasks.tag_concurrency_slot_wait_seconds 

261 if blocking_limit.name.startswith("tag:") 

262 else settings.server.concurrency.maximum_concurrency_slot_wait_seconds 

263 ) 

264 wait_time_per_slot = min(blocking_limit.avg_slot_occupancy_seconds, max_wait) 

265 else: 

266 wait_time_per_slot = 1.0 / blocking_limit.slot_decay_per_second 

267 

268 retry_after = clamped_poisson_interval( 

269 average_interval=wait_time_per_slot * blocking_slots 

270 ) 

271 

272 return HTTPException( 

273 status_code=status.HTTP_423_LOCKED, 

274 headers={ 

275 "Retry-After": str(retry_after), 

276 }, 

277 ) 

278 

279 

280@router.post("/increment", status_code=status.HTTP_200_OK) 1b

281async def bulk_increment_active_slots( 1b

282 slots: int = Body(..., gt=0), 

283 names: List[str] = Body(..., min_items=1), 

284 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"), 

285 create_if_missing: Optional[bool] = Body( 

286 None, 

287 deprecated="Limits must be explicitly created before acquiring concurrency slots.", 

288 ), 

289 db: PrefectDBInterface = Depends(provide_database_interface), 

290) -> List[MinimalConcurrencyLimitResponse]: 

291 async with db.session_context(begin_transaction=True) as session: 1a

292 acquired_limits, acquired = await _acquire_concurrency_slots( 1a

293 session=session, 

294 names=names, 

295 slots=slots, 

296 mode=mode, 

297 ) 

298 

299 if acquired: 299 ↛ 307line 299 didn't jump to line 307 because the condition on line 299 was always true1a

300 return [ 1a

301 MinimalConcurrencyLimitResponse( 

302 id=limit.id, name=str(limit.name), limit=limit.limit 

303 ) 

304 for limit in acquired_limits 

305 ] 

306 else: 

307 async with db.session_context(begin_transaction=True) as session: 

308 raise await _generate_concurrency_locked_response( 

309 session=session, 

310 limits=acquired_limits, 

311 slots=slots, 

312 ) 

313 

314 

315@router.post("/increment-with-lease", status_code=status.HTTP_200_OK) 1b

316async def bulk_increment_active_slots_with_lease( 1b

317 slots: int = Body(..., gt=0), 

318 names: List[str] = Body(..., min_items=1), 

319 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"), 

320 lease_duration: float = Body( 

321 300, # 5 minutes 

322 ge=60, # 1 minute 

323 le=60 * 60 * 24, # 1 day 

324 description="The duration of the lease in seconds.", 

325 ), 

326 holder: Optional[ConcurrencyLeaseHolder] = Body( 

327 None, 

328 description="The holder of the lease with type (flow_run, task_run, or deployment) and id.", 

329 ), 

330 db: PrefectDBInterface = Depends(provide_database_interface), 

331) -> ConcurrencyLimitWithLeaseResponse: 

332 async with db.session_context(begin_transaction=True) as session: 1a

333 acquired_limits, acquired = await _acquire_concurrency_slots( 1a

334 session=session, 

335 names=names, 

336 slots=slots, 

337 mode=mode, 

338 ) 

339 

340 if acquired: 340 ↛ 361line 340 didn't jump to line 361 because the condition on line 340 was always true1a

341 lease_storage = get_concurrency_lease_storage() 1a

342 lease = await lease_storage.create_lease( 1a

343 resource_ids=[limit.id for limit in acquired_limits], 

344 ttl=timedelta(seconds=lease_duration), 

345 metadata=ConcurrencyLimitLeaseMetadata( 

346 slots=slots, 

347 holder=holder, 

348 ), 

349 ) 

350 return ConcurrencyLimitWithLeaseResponse( 1a

351 lease_id=lease.id, 

352 limits=[ 

353 MinimalConcurrencyLimitResponse( 

354 id=limit.id, name=str(limit.name), limit=limit.limit 

355 ) 

356 for limit in acquired_limits 

357 ], 

358 ) 

359 

360 else: 

361 async with db.session_context(begin_transaction=True) as session: 

362 raise await _generate_concurrency_locked_response( 

363 session=session, 

364 limits=acquired_limits, 

365 slots=slots, 

366 ) 

367 

368 

369@router.post("/decrement", status_code=status.HTTP_200_OK) 1b

370async def bulk_decrement_active_slots( 1b

371 slots: int = Body(..., gt=0), 

372 names: List[str] = Body(..., min_items=1), 

373 occupancy_seconds: Optional[float] = Body(None, gt=0.0), 

374 create_if_missing: bool = Body( 

375 None, 

376 deprecated="Limits must be explicitly created before decrementing active slots.", 

377 ), 

378 db: PrefectDBInterface = Depends(provide_database_interface), 

379) -> List[MinimalConcurrencyLimitResponse]: 

380 async with db.session_context(begin_transaction=True) as session: 1a

381 limits = await models.concurrency_limits_v2.bulk_read_concurrency_limits( 1a

382 session=session, names=names 

383 ) 

384 

385 if not limits: 

386 return [] 

387 

388 await models.concurrency_limits_v2.bulk_decrement_active_slots( 

389 session=session, 

390 concurrency_limit_ids=[limit.id for limit in limits if bool(limit.active)], 

391 slots=slots, 

392 occupancy_seconds=occupancy_seconds, 

393 ) 

394 

395 return [ 

396 MinimalConcurrencyLimitResponse( 

397 id=limit.id, name=str(limit.name), limit=limit.limit 

398 ) 

399 for limit in limits 

400 ] 

401 

402 

403@router.post("/decrement-with-lease", status_code=status.HTTP_204_NO_CONTENT) 1b

404async def bulk_decrement_active_slots_with_lease( 1b

405 lease_id: UUID = Body( 

406 ..., 

407 description="The ID of the lease corresponding to the concurrency limits to decrement.", 

408 embed=True, 

409 ), 

410 db: PrefectDBInterface = Depends(provide_database_interface), 

411) -> None: 

412 lease_storage = get_concurrency_lease_storage() 1a

413 lease = await lease_storage.read_lease(lease_id) 1a

414 if not lease: 414 ↛ 417line 414 didn't jump to line 417 because the condition on line 414 was always true1a

415 return 1a

416 

417 occupancy_seconds = (datetime.now(timezone.utc) - lease.created_at).total_seconds() 

418 

419 async with db.session_context(begin_transaction=True) as session: 

420 await models.concurrency_limits_v2.bulk_decrement_active_slots( 

421 session=session, 

422 concurrency_limit_ids=lease.resource_ids, 

423 slots=lease.metadata.slots if lease.metadata else 0, 

424 occupancy_seconds=occupancy_seconds, 

425 ) 

426 await lease_storage.revoke_lease(lease_id) 

427 

428 

429@router.post("/leases/{lease_id}/renew", status_code=status.HTTP_204_NO_CONTENT) 1b

430async def renew_concurrency_lease( 1b

431 lease_id: UUID = Path(..., description="The ID of the lease to renew"), 

432 lease_duration: float = Body( 

433 300, # 5 minutes 

434 ge=60, # 1 minute 

435 le=60 * 60 * 24, # 1 day 

436 description="The duration of the lease in seconds.", 

437 embed=True, 

438 ), 

439) -> None: 

440 lease_storage = get_concurrency_lease_storage() 1a

441 

442 # Atomically renew the lease (checks existence and updates index in single operation) 

443 renewed = await lease_storage.renew_lease( 1a

444 lease_id=lease_id, 

445 ttl=timedelta(seconds=lease_duration), 

446 ) 

447 

448 # Handle the three possible return values: 

449 # - True: lease successfully renewed 

450 # - False: lease not found or expired 

451 # - None: legacy implementation (check lease existence to determine success) 

452 lease = None 1a

453 if renewed is None: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was never true1a

454 # Legacy implementation returned None - check if lease actually exists 

455 lease = await lease_storage.read_lease(lease_id) 

456 

457 if renewed is False or (renewed is None and lease is None): 457 ↛ exitline 457 didn't return from function 'renew_concurrency_lease' because the condition on line 457 was always true1a

458 raise HTTPException( 1a

459 status_code=status.HTTP_410_GONE, 

460 detail="Lease not found - it may have expired or been revoked", 

461 )