Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/concurrency_limits_v2.py: 25%
149 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from datetime import datetime, timedelta, timezone 1a
2from typing import List, Literal, Optional, Union 1a
3from uuid import UUID 1a
5from fastapi import Body, Depends, HTTPException, Path 1a
6from sqlalchemy.ext.asyncio import AsyncSession 1a
8import prefect.server.models as models 1a
9import prefect.server.schemas as schemas 1a
10from prefect._internal.compatibility.starlette import status 1a
11from prefect.server.api.dependencies import LimitBody 1a
12from prefect.server.concurrency.lease_storage import ( 1a
13 ConcurrencyLimitLeaseMetadata,
14 get_concurrency_lease_storage,
15)
16from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
17from prefect.server.schemas import actions 1a
18from prefect.server.utilities.schemas import PrefectBaseModel 1a
19from prefect.server.utilities.server import PrefectRouter 1a
20from prefect.settings.context import get_current_settings 1a
21from prefect.types._concurrency import ConcurrencyLeaseHolder 1a
22from prefect.utilities.math import clamped_poisson_interval 1a
24router: PrefectRouter = PrefectRouter( 1a
25 prefix="/v2/concurrency_limits", tags=["Concurrency Limits V2"]
26)
29@router.post("/", status_code=status.HTTP_201_CREATED) 1a
30async def create_concurrency_limit_v2( 1a
31 concurrency_limit: actions.ConcurrencyLimitV2Create,
32 db: PrefectDBInterface = Depends(provide_database_interface),
33) -> schemas.core.ConcurrencyLimitV2:
34 """
35 Create a task run concurrency limit.
37 For more information, see https://docs.prefect.io/v3/how-to-guides/workflows/global-concurrency-limits.
38 """
39 async with db.session_context(begin_transaction=True) as session:
40 model = await models.concurrency_limits_v2.create_concurrency_limit(
41 session=session, concurrency_limit=concurrency_limit
42 )
44 return schemas.core.ConcurrencyLimitV2.model_validate(model)
47@router.get("/{id_or_name}") 1a
48async def read_concurrency_limit_v2( 1a
49 id_or_name: Union[UUID, str] = Path(
50 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
51 ),
52 db: PrefectDBInterface = Depends(provide_database_interface),
53) -> schemas.responses.GlobalConcurrencyLimitResponse:
54 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary
55 try:
56 id_or_name = UUID(id_or_name)
57 except ValueError:
58 pass
59 async with db.session_context() as session:
60 if isinstance(id_or_name, UUID):
61 model = await models.concurrency_limits_v2.read_concurrency_limit(
62 session, concurrency_limit_id=id_or_name
63 )
64 else:
65 model = await models.concurrency_limits_v2.read_concurrency_limit(
66 session, name=id_or_name
67 )
69 if not model:
70 raise HTTPException(
71 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
72 )
74 return schemas.responses.GlobalConcurrencyLimitResponse.model_validate(model)
77@router.post("/filter") 1a
78async def read_all_concurrency_limits_v2( 1a
79 limit: int = LimitBody(),
80 offset: int = Body(0, ge=0),
81 db: PrefectDBInterface = Depends(provide_database_interface),
82) -> List[schemas.responses.GlobalConcurrencyLimitResponse]:
83 async with db.session_context() as session:
84 concurrency_limits = (
85 await models.concurrency_limits_v2.read_all_concurrency_limits(
86 session=session,
87 limit=limit,
88 offset=offset,
89 )
90 )
92 return [
93 schemas.responses.GlobalConcurrencyLimitResponse.model_validate(limit)
94 for limit in concurrency_limits
95 ]
98@router.patch("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1a
99async def update_concurrency_limit_v2( 1a
100 concurrency_limit: actions.ConcurrencyLimitV2Update,
101 id_or_name: Union[UUID, str] = Path(
102 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
103 ),
104 db: PrefectDBInterface = Depends(provide_database_interface),
105) -> None:
106 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary
107 try:
108 id_or_name = UUID(id_or_name)
109 except ValueError:
110 pass
111 async with db.session_context(begin_transaction=True) as session:
112 if isinstance(id_or_name, UUID):
113 updated = await models.concurrency_limits_v2.update_concurrency_limit(
114 session,
115 concurrency_limit_id=id_or_name,
116 concurrency_limit=concurrency_limit,
117 )
118 else:
119 updated = await models.concurrency_limits_v2.update_concurrency_limit(
120 session, name=id_or_name, concurrency_limit=concurrency_limit
121 )
123 if not updated:
124 raise HTTPException(
125 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
126 )
129@router.delete("/{id_or_name}", status_code=status.HTTP_204_NO_CONTENT) 1a
130async def delete_concurrency_limit_v2( 1a
131 id_or_name: Union[UUID, str] = Path(
132 ..., description="The ID or name of the concurrency limit", alias="id_or_name"
133 ),
134 db: PrefectDBInterface = Depends(provide_database_interface),
135) -> None:
136 if isinstance(id_or_name, str): # TODO: this seems like it shouldn't be necessary
137 try:
138 id_or_name = UUID(id_or_name)
139 except ValueError:
140 pass
141 async with db.session_context(begin_transaction=True) as session:
142 deleted = False
143 if isinstance(id_or_name, UUID):
144 deleted = await models.concurrency_limits_v2.delete_concurrency_limit(
145 session, concurrency_limit_id=id_or_name
146 )
147 else:
148 deleted = await models.concurrency_limits_v2.delete_concurrency_limit(
149 session, name=id_or_name
150 )
152 if not deleted:
153 raise HTTPException(
154 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency Limit not found"
155 )
158class MinimalConcurrencyLimitResponse(PrefectBaseModel): 1a
159 id: UUID 1a
160 name: str 1a
161 limit: int 1a
164class ConcurrencyLimitWithLeaseResponse(PrefectBaseModel): 1a
165 lease_id: UUID 1a
166 limits: list[MinimalConcurrencyLimitResponse] 1a
169async def _acquire_concurrency_slots( 1a
170 session: AsyncSession,
171 names: List[str],
172 slots: int,
173 mode: Literal["concurrency", "rate_limit"],
174) -> tuple[list[schemas.core.ConcurrencyLimitV2], bool]:
175 limits = [
176 schemas.core.ConcurrencyLimitV2.model_validate(limit)
177 for limit in (
178 await models.concurrency_limits_v2.bulk_read_concurrency_limits(
179 session=session, names=names
180 )
181 )
182 ]
184 active_limits = [limit for limit in limits if bool(limit.active)]
186 if any(limit.limit < slots for limit in active_limits):
187 raise HTTPException(
188 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
189 detail="Slots requested is greater than the limit",
190 )
192 non_decaying = [
193 str(limit.name) for limit in active_limits if limit.slot_decay_per_second == 0.0
194 ]
196 if mode == "rate_limit" and non_decaying:
197 raise HTTPException(
198 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
199 detail=(
200 "Only concurrency limits with slot decay can be used for "
201 "rate limiting. The following limits do not have a decay "
202 f"configured: {','.join(non_decaying)!r}"
203 ),
204 )
205 acquired = await models.concurrency_limits_v2.bulk_increment_active_slots(
206 session=session,
207 concurrency_limit_ids=[limit.id for limit in active_limits],
208 slots=slots,
209 )
211 if not acquired:
212 await session.rollback()
214 return limits, acquired
217async def _generate_concurrency_locked_response( 1a
218 session: AsyncSession,
219 limits: list[schemas.core.ConcurrencyLimitV2],
220 slots: int,
221) -> HTTPException:
222 """
223 Generate a 423 Locked response when concurrency slots cannot be acquired.
225 Calculates an appropriate Retry-After header value based on the blocking limit's
226 characteristics. For limits without slot decay, caps avg_slot_occupancy_seconds
227 at a configured maximum to prevent excessive retry delays from long-running tasks:
229 - Tag-based limits (name starts with "tag:"): Capped at tag_concurrency_slot_wait_seconds
230 (default 30s) to restore V1 behavior that users relied on
231 - Other limits: Capped at maximum_concurrency_slot_wait_seconds (default 30s) to allow
232 more uniform queues while still preventing astronomical delays
234 Low average occupancies are always respected (e.g., 2s stays 2s, not forced higher).
235 Limits with slot decay use the decay rate directly without capping.
237 The final retry value includes jitter via clamped_poisson_interval to prevent
238 thundering herd when many tasks retry simultaneously.
239 """
240 active_limits = [limit for limit in limits if bool(limit.active)]
242 await models.concurrency_limits_v2.bulk_update_denied_slots(
243 session=session,
244 concurrency_limit_ids=[limit.id for limit in active_limits],
245 slots=slots,
246 )
248 def num_blocking_slots(limit: schemas.core.ConcurrencyLimitV2) -> float:
249 if limit.slot_decay_per_second > 0:
250 return slots + limit.denied_slots
251 else:
252 return (slots + limit.denied_slots) / limit.limit
254 blocking_limit = max((limit for limit in active_limits), key=num_blocking_slots)
255 blocking_slots = num_blocking_slots(blocking_limit)
257 if blocking_limit.slot_decay_per_second == 0.0:
258 settings = get_current_settings()
259 max_wait = (
260 settings.server.tasks.tag_concurrency_slot_wait_seconds
261 if blocking_limit.name.startswith("tag:")
262 else settings.server.concurrency.maximum_concurrency_slot_wait_seconds
263 )
264 wait_time_per_slot = min(blocking_limit.avg_slot_occupancy_seconds, max_wait)
265 else:
266 wait_time_per_slot = 1.0 / blocking_limit.slot_decay_per_second
268 retry_after = clamped_poisson_interval(
269 average_interval=wait_time_per_slot * blocking_slots
270 )
272 return HTTPException(
273 status_code=status.HTTP_423_LOCKED,
274 headers={
275 "Retry-After": str(retry_after),
276 },
277 )
280@router.post("/increment", status_code=status.HTTP_200_OK) 1a
281async def bulk_increment_active_slots( 1a
282 slots: int = Body(..., gt=0),
283 names: List[str] = Body(..., min_items=1),
284 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"),
285 create_if_missing: Optional[bool] = Body(
286 None,
287 deprecated="Limits must be explicitly created before acquiring concurrency slots.",
288 ),
289 db: PrefectDBInterface = Depends(provide_database_interface),
290) -> List[MinimalConcurrencyLimitResponse]:
291 async with db.session_context(begin_transaction=True) as session:
292 acquired_limits, acquired = await _acquire_concurrency_slots(
293 session=session,
294 names=names,
295 slots=slots,
296 mode=mode,
297 )
299 if acquired:
300 return [
301 MinimalConcurrencyLimitResponse(
302 id=limit.id, name=str(limit.name), limit=limit.limit
303 )
304 for limit in acquired_limits
305 ]
306 else:
307 async with db.session_context(begin_transaction=True) as session:
308 raise await _generate_concurrency_locked_response(
309 session=session,
310 limits=acquired_limits,
311 slots=slots,
312 )
315@router.post("/increment-with-lease", status_code=status.HTTP_200_OK) 1a
316async def bulk_increment_active_slots_with_lease( 1a
317 slots: int = Body(..., gt=0),
318 names: List[str] = Body(..., min_items=1),
319 mode: Literal["concurrency", "rate_limit"] = Body("concurrency"),
320 lease_duration: float = Body(
321 300, # 5 minutes
322 ge=60, # 1 minute
323 le=60 * 60 * 24, # 1 day
324 description="The duration of the lease in seconds.",
325 ),
326 holder: Optional[ConcurrencyLeaseHolder] = Body(
327 None,
328 description="The holder of the lease with type (flow_run, task_run, or deployment) and id.",
329 ),
330 db: PrefectDBInterface = Depends(provide_database_interface),
331) -> ConcurrencyLimitWithLeaseResponse:
332 async with db.session_context(begin_transaction=True) as session:
333 acquired_limits, acquired = await _acquire_concurrency_slots(
334 session=session,
335 names=names,
336 slots=slots,
337 mode=mode,
338 )
340 if acquired:
341 lease_storage = get_concurrency_lease_storage()
342 lease = await lease_storage.create_lease(
343 resource_ids=[limit.id for limit in acquired_limits],
344 ttl=timedelta(seconds=lease_duration),
345 metadata=ConcurrencyLimitLeaseMetadata(
346 slots=slots,
347 holder=holder,
348 ),
349 )
350 return ConcurrencyLimitWithLeaseResponse(
351 lease_id=lease.id,
352 limits=[
353 MinimalConcurrencyLimitResponse(
354 id=limit.id, name=str(limit.name), limit=limit.limit
355 )
356 for limit in acquired_limits
357 ],
358 )
360 else:
361 async with db.session_context(begin_transaction=True) as session:
362 raise await _generate_concurrency_locked_response(
363 session=session,
364 limits=acquired_limits,
365 slots=slots,
366 )
369@router.post("/decrement", status_code=status.HTTP_200_OK) 1a
370async def bulk_decrement_active_slots( 1a
371 slots: int = Body(..., gt=0),
372 names: List[str] = Body(..., min_items=1),
373 occupancy_seconds: Optional[float] = Body(None, gt=0.0),
374 create_if_missing: bool = Body(
375 None,
376 deprecated="Limits must be explicitly created before decrementing active slots.",
377 ),
378 db: PrefectDBInterface = Depends(provide_database_interface),
379) -> List[MinimalConcurrencyLimitResponse]:
380 async with db.session_context(begin_transaction=True) as session:
381 limits = await models.concurrency_limits_v2.bulk_read_concurrency_limits(
382 session=session, names=names
383 )
385 if not limits:
386 return []
388 await models.concurrency_limits_v2.bulk_decrement_active_slots(
389 session=session,
390 concurrency_limit_ids=[limit.id for limit in limits if bool(limit.active)],
391 slots=slots,
392 occupancy_seconds=occupancy_seconds,
393 )
395 return [
396 MinimalConcurrencyLimitResponse(
397 id=limit.id, name=str(limit.name), limit=limit.limit
398 )
399 for limit in limits
400 ]
403@router.post("/decrement-with-lease", status_code=status.HTTP_204_NO_CONTENT) 1a
404async def bulk_decrement_active_slots_with_lease( 1a
405 lease_id: UUID = Body(
406 ...,
407 description="The ID of the lease corresponding to the concurrency limits to decrement.",
408 embed=True,
409 ),
410 db: PrefectDBInterface = Depends(provide_database_interface),
411) -> None:
412 lease_storage = get_concurrency_lease_storage()
413 lease = await lease_storage.read_lease(lease_id)
414 if not lease:
415 return
417 occupancy_seconds = (datetime.now(timezone.utc) - lease.created_at).total_seconds()
419 async with db.session_context(begin_transaction=True) as session:
420 await models.concurrency_limits_v2.bulk_decrement_active_slots(
421 session=session,
422 concurrency_limit_ids=lease.resource_ids,
423 slots=lease.metadata.slots if lease.metadata else 0,
424 occupancy_seconds=occupancy_seconds,
425 )
426 await lease_storage.revoke_lease(lease_id)
429@router.post("/leases/{lease_id}/renew", status_code=status.HTTP_204_NO_CONTENT) 1a
430async def renew_concurrency_lease( 1a
431 lease_id: UUID = Path(..., description="The ID of the lease to renew"),
432 lease_duration: float = Body(
433 300, # 5 minutes
434 ge=60, # 1 minute
435 le=60 * 60 * 24, # 1 day
436 description="The duration of the lease in seconds.",
437 embed=True,
438 ),
439) -> None:
440 lease_storage = get_concurrency_lease_storage()
442 # Atomically renew the lease (checks existence and updates index in single operation)
443 renewed = await lease_storage.renew_lease(
444 lease_id=lease_id,
445 ttl=timedelta(seconds=lease_duration),
446 )
448 # Handle the three possible return values:
449 # - True: lease successfully renewed
450 # - False: lease not found or expired
451 # - None: legacy implementation (check lease existence to determine success)
452 lease = None
453 if renewed is None:
454 # Legacy implementation returned None - check if lease actually exists
455 lease = await lease_storage.read_lease(lease_id)
457 if renewed is False or (renewed is None and lease is None):
458 raise HTTPException(
459 status_code=status.HTTP_410_GONE,
460 detail="Lease not found - it may have expired or been revoked",
461 )