Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/concurrency_limits.py: 34%
260 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with concurrency limit objects.
4This module provides a V1 API adapter that routes requests to the V2 concurrency
5system. After the migration, V1 limits are converted to V2, but the V1 API
6continues to work for backward compatibility.
7"""
9from __future__ import annotations 1c
11import logging 1c
12from datetime import timedelta 1c
13from typing import List, Optional, Sequence 1c
14from uuid import UUID 1c
16from fastapi import Body, Depends, HTTPException, Path, Response, status 1c
18import prefect.server.api.dependencies as dependencies 1c
19import prefect.server.models as models 1c
20import prefect.server.schemas as schemas 1c
21from prefect.logging.loggers import get_logger 1c
22from prefect.server.api.concurrency_limits_v2 import MinimalConcurrencyLimitResponse 1c
23from prefect.server.concurrency.lease_storage import ( 1c
24 ConcurrencyLimitLeaseMetadata,
25 get_concurrency_lease_storage,
26)
27from prefect.server.database import PrefectDBInterface, provide_database_interface 1c
28from prefect.server.models import concurrency_limits 1c
29from prefect.server.models import concurrency_limits_v2 as cl_v2_models 1c
30from prefect.server.utilities.server import PrefectRouter 1c
31from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS 1c
32from prefect.types._concurrency import ConcurrencyLeaseHolder 1c
34router: PrefectRouter = PrefectRouter( 1c
35 prefix="/concurrency_limits", tags=["Concurrency Limits"]
36)
37logger: logging.Logger = get_logger(__name__) 1c
38# V1 clients cannot renew leases; use a long TTL
39V1_LEASE_TTL = timedelta(days=100 * 365) # ~100 years 1c
42@router.post("/") 1c
43async def create_concurrency_limit( 1c
44 concurrency_limit: schemas.actions.ConcurrencyLimitCreate,
45 response: Response,
46 db: PrefectDBInterface = Depends(provide_database_interface),
47) -> schemas.core.ConcurrencyLimit:
48 """
49 Create a task run concurrency limit.
51 For more information, see https://docs.prefect.io/v3/concepts/tag-based-concurrency-limits.
52 """
53 # Always create V2 limits (no V1 record)
54 v2_name = f"tag:{concurrency_limit.tag}" 1adb
56 async with db.session_context(begin_transaction=True) as session: 1adeb
57 # Check if V2 already exists (upsert behavior)
58 existing = await cl_v2_models.read_concurrency_limit( 1adeb
59 session=session, name=v2_name
60 )
62 if existing:
63 # Update existing V2 limit
64 await cl_v2_models.update_concurrency_limit( 1aeb
65 session=session,
66 concurrency_limit_id=existing.id,
67 concurrency_limit=schemas.actions.ConcurrencyLimitV2Update(
68 limit=concurrency_limit.concurrency_limit
69 ),
70 )
71 model = existing
72 model.limit = concurrency_limit.concurrency_limit
73 response.status_code = status.HTTP_200_OK
74 else:
75 # Create new V2 limit
76 model = await cl_v2_models.create_concurrency_limit( 1ab
77 session=session,
78 concurrency_limit=schemas.core.ConcurrencyLimitV2(
79 name=v2_name,
80 limit=concurrency_limit.concurrency_limit,
81 active=True,
82 ),
83 )
84 response.status_code = status.HTTP_201_CREATED
86 # Return V1 format
87 lease_storage = get_concurrency_lease_storage() 1aeb
88 holders = await lease_storage.list_holders_for_limit(model.id) 1aeb
89 active_slots = [h.id for _, h in holders if h.type == "task_run"] 1aeb
91 return schemas.core.ConcurrencyLimit( 1aeb
92 id=model.id,
93 tag=concurrency_limit.tag,
94 concurrency_limit=model.limit,
95 active_slots=active_slots,
96 created=model.created,
97 updated=model.updated,
98 )
101@router.get("/{id:uuid}") 1c
102async def read_concurrency_limit( 1c
103 concurrency_limit_id: UUID = Path(
104 ..., description="The concurrency limit id", alias="id"
105 ),
106 db: PrefectDBInterface = Depends(provide_database_interface),
107) -> schemas.core.ConcurrencyLimit:
108 """
109 Get a concurrency limit by id.
111 The `active slots` field contains a list of TaskRun IDs currently using a
112 concurrency slot for the specified tag.
113 """
114 # Try V2 first
115 async with db.session_context() as session: 1adb
116 v2_limit = await cl_v2_models.read_concurrency_limit( 1adb
117 session=session, concurrency_limit_id=concurrency_limit_id
118 )
120 if v2_limit and v2_limit.name.startswith("tag:"):
121 tag = v2_limit.name.removeprefix("tag:")
122 lease_storage = get_concurrency_lease_storage()
123 holders = await lease_storage.list_holders_for_limit(v2_limit.id)
124 active_slots = [h.id for _, h in holders if h.type == "task_run"]
126 return schemas.core.ConcurrencyLimit(
127 id=v2_limit.id,
128 tag=tag,
129 concurrency_limit=v2_limit.limit,
130 active_slots=active_slots,
131 created=v2_limit.created,
132 updated=v2_limit.updated,
133 )
135 # Fall back to V1 (for pre-migration compatibility)
136 async with db.session_context() as session: 1ab
137 model = await models.concurrency_limits.read_concurrency_limit( 1ab
138 session=session, concurrency_limit_id=concurrency_limit_id
139 )
140 if not model: 140 ↛ 144line 140 didn't jump to line 144 because the condition on line 140 was always true1ab
141 raise HTTPException( 1ab
142 status_code=status.HTTP_404_NOT_FOUND, detail="Concurrency limit not found"
143 )
144 return model
147@router.get("/tag/{tag}") 1c
148async def read_concurrency_limit_by_tag( 1c
149 tag: str = Path(..., description="The tag name", alias="tag"),
150 db: PrefectDBInterface = Depends(provide_database_interface),
151) -> schemas.core.ConcurrencyLimit:
152 """
153 Get a concurrency limit by tag.
155 The `active slots` field contains a list of TaskRun IDs currently using a
156 concurrency slot for the specified tag.
157 """
158 # Try V2 first
159 v2_name = f"tag:{tag}" 1ab
161 async with db.session_context() as session: 1ab
162 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1ab
164 if model: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true1ab
165 lease_storage = get_concurrency_lease_storage()
166 holders = await lease_storage.list_holders_for_limit(model.id)
167 active_slots = [h.id for _, h in holders if h.type == "task_run"]
169 return schemas.core.ConcurrencyLimit(
170 id=model.id,
171 tag=tag,
172 concurrency_limit=model.limit,
173 active_slots=active_slots,
174 created=model.created,
175 updated=model.updated,
176 )
178 # Fall back to V1 (for pre-migration compatibility)
179 async with db.session_context() as session: 1ab
180 v1_model = await models.concurrency_limits.read_concurrency_limit_by_tag( 1ab
181 session=session, tag=tag
182 )
183 if not v1_model: 183 ↛ 187line 183 didn't jump to line 187 because the condition on line 183 was always true1ab
184 raise HTTPException( 1ab
185 status.HTTP_404_NOT_FOUND, detail="Concurrency limit not found"
186 )
187 return v1_model
190@router.post("/filter") 1c
191async def read_concurrency_limits( 1c
192 limit: int = dependencies.LimitBody(),
193 offset: int = Body(0, ge=0),
194 db: PrefectDBInterface = Depends(provide_database_interface),
195) -> Sequence[schemas.core.ConcurrencyLimit]:
196 """
197 Query for concurrency limits.
199 For each concurrency limit the `active slots` field contains a list of TaskRun IDs
200 currently using a concurrency slot for the specified tag.
201 """
202 # Get both V1 and V2, then merge
203 async with db.session_context() as session: 1adb
204 v1_limits = await models.concurrency_limits.read_concurrency_limits( 1adb
205 session=session, limit=limit + offset, offset=0
206 )
207 v2_limits = await cl_v2_models.read_all_concurrency_limits( 1adb
208 session=session, limit=limit + offset, offset=0
209 )
211 # Convert V2 to V1 format
212 converted_v2: list[schemas.core.ConcurrencyLimit] = [] 1adb
213 lease_storage = get_concurrency_lease_storage() 1adb
215 for v2_limit in v2_limits: 1adb
216 if not v2_limit.name.startswith("tag:"): 1adb
217 continue 1adb
218 tag = v2_limit.name.removeprefix("tag:") 1adb
219 holders = await lease_storage.list_holders_for_limit(v2_limit.id) 1adb
220 active_slots = [h.id for _, h in holders if h.type == "task_run"] 1adb
222 converted_v2.append( 1adb
223 schemas.core.ConcurrencyLimit(
224 id=v2_limit.id,
225 tag=tag,
226 concurrency_limit=v2_limit.limit,
227 active_slots=active_slots,
228 created=v2_limit.created,
229 updated=v2_limit.updated,
230 )
231 )
233 # Merge and deduplicate by tag (prefer V2)
234 seen_tags = {cl.tag for cl in converted_v2} 1adb
235 combined = converted_v2 + [cl for cl in v1_limits if cl.tag not in seen_tags] 1adb
237 return combined[offset : offset + limit] 1adb
240@router.post("/tag/{tag}/reset") 1c
241async def reset_concurrency_limit_by_tag( 1c
242 tag: str = Path(..., description="The tag name"),
243 slot_override: Optional[List[UUID]] = Body(
244 None,
245 embed=True,
246 description="Manual override for active concurrency limit slots.",
247 ),
248 db: PrefectDBInterface = Depends(provide_database_interface),
249) -> None:
250 # Try V2 first
251 v2_name = f"tag:{tag}" 1a
253 async with db.session_context(begin_transaction=True) as session: 1a
254 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1a
256 if model:
257 # Revoke all existing leases
258 lease_storage = get_concurrency_lease_storage()
260 # Keep fetching and revoking in batches until all are gone
261 batch_size = 100
262 offset = 0
263 while True:
264 active_lease_ids = await lease_storage.read_active_lease_ids(
265 limit=batch_size, offset=offset
266 )
267 if not active_lease_ids:
268 break
270 revoked_any = False
271 for lease_id in active_lease_ids:
272 lease = await lease_storage.read_lease(lease_id)
273 if lease and model.id in lease.resource_ids:
274 await lease_storage.revoke_lease(lease_id)
275 revoked_any = True
277 # If we didn't revoke any in this batch, we're done with this limit
278 if not revoked_any:
279 offset += batch_size
280 else:
281 # Start from beginning since we modified the list
282 offset = 0
284 # Create new leases for slot_override if provided
285 if slot_override:
286 for task_run_id in slot_override:
287 await cl_v2_models.bulk_increment_active_slots(
288 session=session,
289 concurrency_limit_ids=[model.id],
290 slots=1,
291 )
292 await lease_storage.create_lease(
293 resource_ids=[model.id],
294 ttl=V1_LEASE_TTL,
295 metadata=ConcurrencyLimitLeaseMetadata(
296 slots=1,
297 holder=ConcurrencyLeaseHolder(
298 type="task_run", id=task_run_id
299 ),
300 ),
301 )
302 return
304 # Fall back to V1
305 model = await models.concurrency_limits.reset_concurrency_limit_by_tag( 1a
306 session=session, tag=tag, slot_override=slot_override
307 )
308 if not model:
309 raise HTTPException(
310 status_code=status.HTTP_404_NOT_FOUND,
311 detail="Concurrency limit not found",
312 )
315@router.delete("/{id:uuid}") 1c
316async def delete_concurrency_limit( 1c
317 concurrency_limit_id: UUID = Path(
318 ..., description="The concurrency limit id", alias="id"
319 ),
320 db: PrefectDBInterface = Depends(provide_database_interface),
321) -> None:
322 # Try V2 first
323 async with db.session_context(begin_transaction=True) as session: 1ab
324 v2 = await cl_v2_models.read_concurrency_limit( 1ab
325 session=session, concurrency_limit_id=concurrency_limit_id
326 )
327 if v2:
328 # Clean up leases
329 lease_storage = get_concurrency_lease_storage()
331 # Keep fetching and revoking in batches until all are gone
332 batch_size = 100
333 offset = 0
334 while True:
335 active_lease_ids = await lease_storage.read_active_lease_ids(
336 limit=batch_size, offset=offset
337 )
338 if not active_lease_ids:
339 break
341 revoked_any = False
342 for lease_id in active_lease_ids:
343 lease = await lease_storage.read_lease(lease_id)
344 if lease and v2.id in lease.resource_ids:
345 await lease_storage.revoke_lease(lease_id)
346 revoked_any = True
348 # If we didn't revoke any in this batch, we're done with this limit
349 if not revoked_any:
350 offset += batch_size
351 else:
352 # Start from beginning since we modified the list
353 offset = 0
355 # Delete V2
356 await cl_v2_models.delete_concurrency_limit( 1ab
357 session=session, concurrency_limit_id=v2.id
358 )
359 return
361 # Try V1
362 async with db.session_context(begin_transaction=True) as session: 1ab
363 v1_deleted = await models.concurrency_limits.delete_concurrency_limit( 1ab
364 session=session, concurrency_limit_id=concurrency_limit_id
365 )
366 if not v1_deleted:
367 raise HTTPException(
368 status_code=status.HTTP_404_NOT_FOUND,
369 detail="Concurrency limit not found",
370 )
373@router.delete("/tag/{tag}") 1c
374async def delete_concurrency_limit_by_tag( 1c
375 tag: str = Path(..., description="The tag name"),
376 db: PrefectDBInterface = Depends(provide_database_interface),
377) -> None:
378 # Try V2 first
379 v2_name = f"tag:{tag}" 1ab
380 async with db.session_context(begin_transaction=True) as session: 1ab
381 model = await cl_v2_models.read_concurrency_limit(session=session, name=v2_name) 1ab
382 if model:
383 # Clean up leases
384 lease_storage = get_concurrency_lease_storage()
386 # Keep fetching and revoking in batches until all are gone
387 batch_size = 100
388 offset = 0
389 while True:
390 active_lease_ids = await lease_storage.read_active_lease_ids(
391 limit=batch_size, offset=offset
392 )
393 if not active_lease_ids:
394 break
396 revoked_any = False
397 for lease_id in active_lease_ids:
398 lease = await lease_storage.read_lease(lease_id)
399 if lease and model.id in lease.resource_ids:
400 await lease_storage.revoke_lease(lease_id)
401 revoked_any = True
403 # If we didn't revoke any in this batch, we're done with this limit
404 if not revoked_any:
405 offset += batch_size
406 else:
407 # Start from beginning since we modified the list
408 offset = 0
410 # Delete V2
411 await cl_v2_models.delete_concurrency_limit(
412 session=session, concurrency_limit_id=model.id
413 )
414 return
416 # Try V1
417 async with db.session_context(begin_transaction=True) as session: 1ab
418 v1_deleted = await models.concurrency_limits.delete_concurrency_limit_by_tag( 1ab
419 session=session, tag=tag
420 )
421 if not v1_deleted:
422 raise HTTPException(
423 status_code=status.HTTP_404_NOT_FOUND,
424 detail="Concurrency limit not found",
425 )
428class Abort(Exception): 1c
429 def __init__(self, reason: str): 1c
430 self.reason = reason
433class Delay(Exception): 1c
434 def __init__(self, delay_seconds: float, reason: str): 1c
435 self.delay_seconds = delay_seconds
436 self.reason = reason
439@router.post("/increment") 1c
440async def increment_concurrency_limits_v1( 1c
441 names: List[str] = Body(..., description="The tags to acquire a slot for"),
442 task_run_id: UUID = Body(
443 ..., description="The ID of the task run acquiring the slot"
444 ),
445 db: PrefectDBInterface = Depends(provide_database_interface),
446) -> List[MinimalConcurrencyLimitResponse]:
447 """
448 Increment concurrency limits for the given tags.
450 During migration, this handles both V1 and V2 limits to support mixed states.
451 Post-migration, it only uses V2 with lease-based concurrency.
452 """
453 results = [] 1ab
454 v2_names = [f"tag:{tag}" for tag in names] 1ab
456 async with db.session_context(begin_transaction=True) as session: 1ab
457 # Get V2 limits
458 v2_limits = await cl_v2_models.bulk_read_concurrency_limits( 1ab
459 session=session, names=v2_names
460 )
461 v2_by_name = {limit.name: limit for limit in v2_limits}
463 # Get V1 limits (for pre-migration compatibility)
464 v1_limits = (
465 await concurrency_limits.filter_concurrency_limits_for_orchestration(
466 session, tags=names
467 )
468 )
469 v1_by_tag = {limit.tag: limit for limit in v1_limits}
471 # Check all zero limits upfront before acquiring any
472 for tag in names:
473 v2_limit = v2_by_name.get(f"tag:{tag}")
474 v1_limit = v1_by_tag.get(tag)
476 if v2_limit and v2_limit.limit == 0:
477 raise HTTPException(
478 status_code=status.HTTP_423_LOCKED,
479 detail=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.',
480 )
481 elif v1_limit and v1_limit.concurrency_limit == 0:
482 raise HTTPException(
483 status_code=status.HTTP_423_LOCKED,
484 detail=f'The concurrency limit on tag "{tag}" is 0 and will deadlock if the task tries to run again.',
485 )
487 # Collect V2 limits to acquire in bulk
488 v2_to_acquire = []
489 v2_tags_map = {} # Map limit IDs to tags for results
491 # Check V1 limits availability upfront
492 v1_to_acquire = []
494 for tag in names:
495 v2_limit = v2_by_name.get(f"tag:{tag}")
496 v1_limit = v1_by_tag.get(tag)
498 if v2_limit and v2_limit.active:
499 v2_to_acquire.append(v2_limit.id)
500 v2_tags_map[v2_limit.id] = tag
501 elif v1_limit:
502 # Check V1 limit availability
503 if len(v1_limit.active_slots) >= v1_limit.concurrency_limit:
504 raise HTTPException(
505 status_code=status.HTTP_423_LOCKED,
506 detail=f"Concurrency limit for the {tag} tag has been reached",
507 headers={
508 "Retry-After": str(
509 PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value()
510 )
511 },
512 )
513 v1_to_acquire.append(v1_limit)
515 # Bulk acquire all V2 limits at once
516 acquired_v2_ids = []
517 if v2_to_acquire:
518 acquired = await cl_v2_models.bulk_increment_active_slots(
519 session=session,
520 concurrency_limit_ids=v2_to_acquire,
521 slots=1,
522 )
523 if not acquired:
524 # Find which limit was at capacity
525 for lid in v2_to_acquire:
526 tag = v2_tags_map[lid]
527 raise HTTPException(
528 status_code=status.HTTP_423_LOCKED,
529 detail=f"Concurrency limit for the {tag} tag has been reached",
530 headers={
531 "Retry-After": str(
532 PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS.value()
533 )
534 },
535 )
536 acquired_v2_ids = v2_to_acquire
538 # Add V2 results
539 for lid in acquired_v2_ids:
540 tag = v2_tags_map[lid]
541 limit = v2_by_name[f"tag:{tag}"]
542 results.append(
543 MinimalConcurrencyLimitResponse(
544 id=limit.id, name=tag, limit=limit.limit
545 )
546 )
548 # Apply V1 increments (already checked availability)
549 for v1_limit in v1_to_acquire:
550 active_slots = set(v1_limit.active_slots)
551 active_slots.add(str(task_run_id))
552 v1_limit.active_slots = list(active_slots)
553 results.append(
554 MinimalConcurrencyLimitResponse(
555 id=v1_limit.id, name=v1_limit.tag, limit=v1_limit.concurrency_limit
556 )
557 )
559 # Create lease for V2 limits
560 if acquired_v2_ids: 1ab
561 lease_storage = get_concurrency_lease_storage()
562 await lease_storage.create_lease(
563 resource_ids=acquired_v2_ids,
564 ttl=V1_LEASE_TTL,
565 metadata=ConcurrencyLimitLeaseMetadata(
566 slots=1,
567 holder=ConcurrencyLeaseHolder(type="task_run", id=task_run_id),
568 ),
569 )
571 return results 1ab
574@router.post("/decrement") 1c
575async def decrement_concurrency_limits_v1( 1c
576 names: List[str] = Body(..., description="The tags to release a slot for"),
577 task_run_id: UUID = Body(
578 ..., description="The ID of the task run releasing the slot"
579 ),
580 db: PrefectDBInterface = Depends(provide_database_interface),
581) -> List[MinimalConcurrencyLimitResponse]:
582 """
583 Decrement concurrency limits for the given tags.
585 Finds and revokes the lease for V2 limits or decrements V1 active slots.
586 Returns the list of limits that were decremented.
587 """
588 results: list[MinimalConcurrencyLimitResponse] = [] 1ab
589 lease_storage = get_concurrency_lease_storage() 1ab
590 v2_names = [f"tag:{tag}" for tag in names] 1ab
592 async with db.session_context(begin_transaction=True) as session: 1ab
593 # Bulk read V2 limits
594 v2_limits = await cl_v2_models.bulk_read_concurrency_limits( 1ab
595 session=session, names=v2_names
596 )
597 v2_by_name = {limit.name: limit for limit in v2_limits}
599 # Find and revoke lease for V2 limits
600 if v2_limits:
601 leases_ids_to_revoke: set[UUID] = set()
603 for limit in v2_limits:
604 holders = await lease_storage.list_holders_for_limit(limit.id)
605 for lease_id, holder in holders:
606 if holder.id == task_run_id:
607 lease = await lease_storage.read_lease(lease_id)
608 if lease:
609 leases_ids_to_revoke.add(lease.id)
611 for lease_id in leases_ids_to_revoke:
612 lease = await lease_storage.read_lease(lease_id)
613 if lease:
614 await cl_v2_models.bulk_decrement_active_slots(
615 session=session,
616 concurrency_limit_ids=lease.resource_ids,
617 slots=lease.metadata.slots if lease.metadata else 0,
618 )
619 await lease_storage.revoke_lease(lease.id)
620 else:
621 logger.warning(f"Lease {lease_id} not found during decrement")
623 results.extend(
624 [
625 MinimalConcurrencyLimitResponse(
626 id=limit.id, name=limit.name, limit=limit.limit
627 )
628 for limit in v2_limits
629 ]
630 )
632 # Handle V1 decrements (for pre-migration compatibility)
633 v1_limits = (
634 await concurrency_limits.filter_concurrency_limits_for_orchestration(
635 session, tags=names
636 )
637 )
638 for cl in v1_limits:
639 # Skip if already handled as V2
640 v2_name = f"tag:{cl.tag}"
641 if v2_name not in v2_by_name:
642 active_slots = set(cl.active_slots)
643 if str(task_run_id) in active_slots:
644 active_slots.discard(str(task_run_id))
645 cl.active_slots = list(active_slots)
646 results.append(
647 MinimalConcurrencyLimitResponse(
648 id=cl.id, name=cl.tag, limit=cl.concurrency_limit
649 )
650 )
652 return results 1ab