Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/concurrency_limits_v2.py: 68%
149 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from datetime import datetime, timedelta, timezone 1b
2from typing import List, Literal, Optional, Union 1b
3from uuid import UUID 1b
5from fastapi import Body, Depends, HTTPException, Path 1b
6from sqlalchemy.ext.asyncio import AsyncSession 1b
8import prefect.server.models as models 1b
9import prefect.server.schemas as schemas 1b
10from prefect._internal.compatibility.starlette import status 1b
11from prefect.server.api.dependencies import LimitBody 1b
12from prefect.server.concurrency.lease_storage import ( 1b
13 ConcurrencyLimitLeaseMetadata,
14 get_concurrency_lease_storage,
15)
16from prefect.server.database import PrefectDBInterface, provide_database_interface 1b
17from prefect.server.schemas import actions 1b
18from prefect.server.utilities.schemas import PrefectBaseModel 1b
19from prefect.server.utilities.server import PrefectRouter 1b
20from prefect.settings.context import get_current_settings 1b
21from prefect.types._concurrency import ConcurrencyLeaseHolder 1b
22from prefect.utilities.math import clamped_poisson_interval 1b
24router: PrefectRouter = PrefectRouter( 1b
25 prefix="/v2/concurrency_limits", tags=["Concurrency Limits V2"]
26)
29@router.post("/", status_code=status.HTTP_201_CREATED) 1b
30async def create_concurrency_limit_v2( 1b
31 concurrency_limit: actions.ConcurrencyLimitV2Create,
32 db: PrefectDBInterface = Depends(provide_database_interface),
33) -> schemas.core.ConcurrencyLimitV2:
34 """
35 Create a task run concurrency limit.
37 For more information, see https://docs.prefect.io/v3/how-to-guides/workflows/global-concurrency-limits.
38 """
39 async with db.session_context(begin_transaction=True) as session: 1a
40 model = await models.concurrency_limits_v2.create_concurrency_limit( 1a
41 session=session, concurrency_limit=concurrency_limit
42 )
44 return schemas.core.ConcurrencyLimitV2.model_validate(model) 1a
47@router.get("/{id_or_name}") 1b
48async def read_concurrency_limit_v2( 1b
49 id_or_name: Union[UUID, str] = Path(
50 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
51 ),
52 db: PrefectDBInterface = Depends(provide_database_interface),
53) -> schemas.responses.GlobalConcurrencyLimitResponse:
54 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 54 ↛ 59line 54 didn't jump to line 59 because the condition on line 54 was always true1ac
55 try: 1ac
56 id_or_name = UUID(id_or_name) 1ac
57 except ValueError:
58 pass
59 async with db.session_context() as session: 1ac
60 if isinstance(id_or_name, UUID): 1ac
61 model = await models.concurrency_limits_v2.read_concurrency_limit( 1a
62 session, concurrency_limit_id=id_or_name
63 )
64 else:
65 model = await models.concurrency_limits_v2.read_concurrency_limit(
66 session, name=id_or_name
67 )
69 if not model: 69 ↛ 74line 69 didn't jump to line 74 because the condition on line 69 was always true1ac
70 raise HTTPException( 1ac
71 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
72 )
74 return schemas.responses.GlobalConcurrencyLimitResponse.model_validate(model)
77@router.post("/filter") 1b
78async def read_all_concurrency_limits_v2( 1b
79 limit: int = LimitBody(),
80 offset: int = Body(0, ge=0),
81 db: PrefectDBInterface = Depends(provide_database_interface),
82) -> List[schemas.responses.GlobalConcurrencyLimitResponse]:
83 async with db.session_context() as session: 1a
84 concurrency_limits = (
85 await models.concurrency_limits_v2.read_all_concurrency_limits(
86 session=session,
87 limit=limit,
88 offset=offset,
89 )
90 )
92 return [ 1a
93 schemas.responses.GlobalConcurrencyLimitResponse.model_validate(limit)
94 for limit in concurrency_limits
95 ]
98@router.patch("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1b
99async def update_concurrency_limit_v2( 1b
100 concurrency_limit: actions.ConcurrencyLimitV2Update,
101 id_or_name: Union[UUID, str] = Path(
102 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
103 ),
104 db: PrefectDBInterface = Depends(provide_database_interface),
105) -> None:
106 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 106 ↛ 111line 106 didn't jump to line 111 because the condition on line 106 was always true1ac
107 try: 1ac
108 id_or_name = UUID(id_or_name) 1ac
109 except ValueError:
110 pass
111 async with db.session_context(begin_transaction=True) as session: 1ac
112 if isinstance(id_or_name, UUID): 1ac
113 updated = await models.concurrency_limits_v2.update_concurrency_limit( 1a
114 session,
115 concurrency_limit_id=id_or_name,
116 concurrency_limit=concurrency_limit,
117 )
118 else:
119 updated = await models.concurrency_limits_v2.update_concurrency_limit(
120 session, name=id_or_name, concurrency_limit=concurrency_limit
121 )
123 if not updated: 123 ↛ exitline 123 didn't return from function 'update_concurrency_limit_v2' because the condition on line 123 was always true1ac
124 raise HTTPException( 1ac
125 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
126 )
129@router.delete("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1b
130async def delete_concurrency_limit_v2( 1b
131 id_or_name: Union[UUID, str] = Path(
132 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
133 ),
134 db: PrefectDBInterface = Depends(provide_database_interface),
135) -> None:
136 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary 136 ↛ 141line 136 didn't jump to line 141 because the condition on line 136 was always true1ac
137 try: 1ac
138 id_or_name = UUID(id_or_name) 1ac
139 except ValueError:
140 pass
141 async with db.session_context(begin_transaction=True) as session: 1ac
142 deleted = False 1ac
143 if isinstance(id_or_name, UUID): 1ac
144 deleted = await models.concurrency_limits_v2.delete_concurrency_limit( 1a
145 session, concurrency_limit_id=id_or_name
146 )
147 else:
148 deleted = await models.concurrency_limits_v2.delete_concurrency_limit(
149 session, name=id_or_name
150 )
152 if not deleted: 152 ↛ exitline 152 didn't return from function 'delete_concurrency_limit_v2' because the condition on line 152 was always true1ac
153 raise HTTPException( 1ac
154 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
155 )
158class MinimalConcurrencyLimitResponse(PrefectBaseModel): 1b
159 id: UUID 1b
160 name: str 1b
161 limit: int 1b
164class ConcurrencyLimitWithLeaseResponse(PrefectBaseModel): 1b
165 lease_id: UUID 1b
166 limits: list[MinimalConcurrencyLimitResponse] 1b
169async def _acquire_concurrency_slots( 1b
170 session: AsyncSession,
171 names: List[str],
172 slots: int,
173 mode: Literal["concurrency", "rate_limit"],
174) -> tuple[list[schemas.core.ConcurrencyLimitV2], bool]:
175 limits = [
176 schemas.core.ConcurrencyLimitV2.model_validate(limit)
177 for limit in (
178 await models.concurrency_limits_v2.bulk_read_concurrency_limits(
179 session=session, names=names
180 )
181 )
182 ]
184 active_limits = [limit for limit in limits if bool(limit.active)]
186 if any(limit.limit < slots for limit in active_limits): 186 ↛ anywhereline 186 didn't jump anywhere: it always raised an exception.1a
187 raise HTTPException(
188 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
189 detail="Slots requested is greater than the limit",
190 )
192 non_decaying = [
193 str(limit.name) for limit in active_limits if limit.slot_decay_per_second == 0.0
194 ]
196 if mode == "rate_limit" and non_decaying:
197 raise HTTPException(
198 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
199 detail=(
200 "Only concurrency limits with slot decay can be used for "
201 "rate limiting. The following limits do not have a decay "
202 f"configured: {','.join(non_decaying)!r}"
203 ),
204 )
205 acquired = await models.concurrency_limits_v2.bulk_increment_active_slots( 1a
206 session=session,
207 concurrency_limit_ids=[limit.id for limit in active_limits],
208 slots=slots,
209 )
211 if not acquired:
212 await session.rollback()
214 return limits, acquired
217async def _generate_concurrency_locked_response( 1b
218 session: AsyncSession,
219 limits: list[schemas.core.ConcurrencyLimitV2],
220 slots: int,
221) -> HTTPException:
222 """
223 Generate a 423 Locked response when concurrency slots cannot be acquired.
225 Calculates an appropriate Retry-After header value based on the blocking limit's
226 characteristics. For limits without slot decay, caps avg_slot_occupancy_seconds
227 at a configured maximum to prevent excessive retry delays from long-running tasks:
229 - Tag-based limits (name starts with "tag:"): Capped at tag_concurrency_slot_wait_seconds
230 (default 30s) to restore V1 behavior that users relied on
231 - Other limits: Capped at maximum_concurrency_slot_wait_seconds (default 30s) to allow
232 more uniform queues while still preventing astronomical delays
234 Low average occupancies are always respected (e.g., 2s stays 2s, not forced higher).
235 Limits with slot decay use the decay rate directly without capping.
237 The final retry value includes jitter via clamped_poisson_interval to prevent
238 thundering herd when many tasks retry simultaneously.
239 """
240 active_limits = [limit for limit in limits if bool(limit.active)]
242 await models.concurrency_limits_v2.bulk_update_denied_slots(
243 session=session,
244 concurrency_limit_ids=[limit.id for limit in active_limits],
245 slots=slots,
246 )
248 def num_blocking_slots(limit: schemas.core.ConcurrencyLimitV2) -> float:
249 if limit.slot_decay_per_second > 0:
250 return slots + limit.denied_slots
251 else:
252 return (slots + limit.denied_slots) / limit.limit
254 blocking_limit = max((limit for limit in active_limits), key=num_blocking_slots)
255 blocking_slots = num_blocking_slots(blocking_limit)
257 if blocking_limit.slot_decay_per_second == 0.0:
258 settings = get_current_settings()
259 max_wait = (
260 settings.server.tasks.tag_concurrency_slot_wait_seconds
261 if blocking_limit.name.startswith("tag:")
262 else settings.server.concurrency.maximum_concurrency_slot_wait_seconds
263 )
264 wait_time_per_slot = min(blocking_limit.avg_slot_occupancy_seconds, max_wait)
265 else:
266 wait_time_per_slot = 1.0 / blocking_limit.slot_decay_per_second
268 retry_after = clamped_poisson_interval(
269 average_interval=wait_time_per_slot * blocking_slots
270 )
272 return HTTPException(
273 status_code=status.HTTP_423_LOCKED,
274 headers={
275 "Retry-After": str(retry_after),
276 },
277 )
280@router.post("/increment", status_code=status.HTTP_200_OK) 1b
281async def bulk_increment_active_slots( 1b
282 slots: int = Body(..., gt=0),
283 names: List[str] = Body(..., min_items=1),
284 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"),
285 create_if_missing: Optional[bool] = Body(
286 None,
287 deprecated="Limits must be explicitly created before acquiring concurrency slots.",
288 ),
289 db: PrefectDBInterface = Depends(provide_database_interface),
290) -> List[MinimalConcurrencyLimitResponse]:
291 async with db.session_context(begin_transaction=True) as session: 1a
292 acquired_limits, acquired = await _acquire_concurrency_slots( 1a
293 session=session,
294 names=names,
295 slots=slots,
296 mode=mode,
297 )
299 if acquired: 299 ↛ 307line 299 didn't jump to line 307 because the condition on line 299 was always true1a
300 return [ 1a
301 MinimalConcurrencyLimitResponse(
302 id=limit.id, name=str(limit.name), limit=limit.limit
303 )
304 for limit in acquired_limits
305 ]
306 else:
307 async with db.session_context(begin_transaction=True) as session:
308 raise await _generate_concurrency_locked_response(
309 session=session,
310 limits=acquired_limits,
311 slots=slots,
312 )
315@router.post("/increment-with-lease", status_code=status.HTTP_200_OK) 1b
316async def bulk_increment_active_slots_with_lease( 1b
317 slots: int = Body(..., gt=0),
318 names: List[str] = Body(..., min_items=1),
319 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"),
320 lease_duration: float = Body(
321 300, # 5 minutes
322 ge=60, # 1 minute
323 le=60 * 60 * 24, # 1 day
324 description="The duration of the lease in seconds.",
325 ),
326 holder: Optional[ConcurrencyLeaseHolder] = Body(
327 None,
328 description="The holder of the lease with type (flow_run, task_run, or deployment) and id.",
329 ),
330 db: PrefectDBInterface = Depends(provide_database_interface),
331) -> ConcurrencyLimitWithLeaseResponse:
332 async with db.session_context(begin_transaction=True) as session: 1a
333 acquired_limits, acquired = await _acquire_concurrency_slots( 1a
334 session=session,
335 names=names,
336 slots=slots,
337 mode=mode,
338 )
340 if acquired: 340 ↛ 361line 340 didn't jump to line 361 because the condition on line 340 was always true1a
341 lease_storage = get_concurrency_lease_storage() 1a
342 lease = await lease_storage.create_lease( 1a
343 resource_ids=[limit.id for limit in acquired_limits],
344 ttl=timedelta(seconds=lease_duration),
345 metadata=ConcurrencyLimitLeaseMetadata(
346 slots=slots,
347 holder=holder,
348 ),
349 )
350 return ConcurrencyLimitWithLeaseResponse( 1a
351 lease_id=lease.id,
352 limits=[
353 MinimalConcurrencyLimitResponse(
354 id=limit.id, name=str(limit.name), limit=limit.limit
355 )
356 for limit in acquired_limits
357 ],
358 )
360 else:
361 async with db.session_context(begin_transaction=True) as session:
362 raise await _generate_concurrency_locked_response(
363 session=session,
364 limits=acquired_limits,
365 slots=slots,
366 )
369@router.post("/decrement", status_code=status.HTTP_200_OK) 1b
370async def bulk_decrement_active_slots( 1b
371 slots: int = Body(..., gt=0),
372 names: List[str] = Body(..., min_items=1),
373 occupancy_seconds: Optional[float] = Body(None, gt=0.0),
374 create_if_missing: bool = Body(
375 None,
376 deprecated="Limits must be explicitly created before decrementing active slots.",
377 ),
378 db: PrefectDBInterface = Depends(provide_database_interface),
379) -> List[MinimalConcurrencyLimitResponse]:
380 async with db.session_context(begin_transaction=True) as session: 1a
381 limits = await models.concurrency_limits_v2.bulk_read_concurrency_limits( 1a
382 session=session, names=names
383 )
385 if not limits:
386 return []
388 await models.concurrency_limits_v2.bulk_decrement_active_slots(
389 session=session,
390 concurrency_limit_ids=[limit.id for limit in limits if bool(limit.active)],
391 slots=slots,
392 occupancy_seconds=occupancy_seconds,
393 )
395 return [
396 MinimalConcurrencyLimitResponse(
397 id=limit.id, name=str(limit.name), limit=limit.limit
398 )
399 for limit in limits
400 ]
403@router.post("/decrement-with-lease", status_code=status.HTTP_204_NO_CONTENT) 1b
404async def bulk_decrement_active_slots_with_lease( 1b
405 lease_id: UUID = Body(
406 ...,
407 description="The ID of the lease corresponding to the concurrency limits to decrement.",
408 embed=True,
409 ),
410 db: PrefectDBInterface = Depends(provide_database_interface),
411) -> None:
412 lease_storage = get_concurrency_lease_storage() 1a
413 lease = await lease_storage.read_lease(lease_id) 1a
414 if not lease: 414 ↛ 417line 414 didn't jump to line 417 because the condition on line 414 was always true1a
415 return 1a
417 occupancy_seconds = (datetime.now(timezone.utc) - lease.created_at).total_seconds()
419 async with db.session_context(begin_transaction=True) as session:
420 await models.concurrency_limits_v2.bulk_decrement_active_slots(
421 session=session,
422 concurrency_limit_ids=lease.resource_ids,
423 slots=lease.metadata.slots if lease.metadata else 0,
424 occupancy_seconds=occupancy_seconds,
425 )
426 await lease_storage.revoke_lease(lease_id)
429@router.post("/leases/{lease_id}/renew", status_code=status.HTTP_204_NO_CONTENT) 1b
430async def renew_concurrency_lease( 1b
431 lease_id: UUID = Path(..., description="The ID of the lease to renew"),
432 lease_duration: float = Body(
433 300, # 5 minutes
434 ge=60, # 1 minute
435 le=60 * 60 * 24, # 1 day
436 description="The duration of the lease in seconds.",
437 embed=True,
438 ),
439) -> None:
440 lease_storage = get_concurrency_lease_storage() 1a
442 # Atomically renew the lease (checks existence and updates index in single operation)
443 renewed = await lease_storage.renew_lease( 1a
444 lease_id=lease_id,
445 ttl=timedelta(seconds=lease_duration),
446 )
448 # Handle the three possible return values:
449 # - True: lease successfully renewed
450 # - False: lease not found or expired
451 # - None: legacy implementation (check lease existence to determine success)
452 lease = None 1a
453 if renewed is None: 453 ↛ 455line 453 didn't jump to line 455 because the condition on line 453 was never true1a
454 # Legacy implementation returned None - check if lease actually exists
455 lease = await lease_storage.read_lease(lease_id)
457 if renewed is False or (renewed is None and lease is None): 457 ↛ exitline 457 didn't return from function 'renew_concurrency_lease' because the condition on line 457 was always true1a
458 raise HTTPException( 1a
459 status_code=status.HTTP_410_GONE,
460 detail="Lease not found - it may have expired or been revoked",
461 )