Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/task_queue.py: 46%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Implements an in-memory task queue for delivering background task runs to TaskWorkers.
3"""
5import asyncio 1a
6from typing import Dict, List, Optional, Tuple 1a
8from typing_extensions import Self 1a
10import prefect.server.schemas as schemas 1a
11from prefect.settings import ( 1a
12 PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE,
13 PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE,
14)
17class TaskQueue: 1a
18 _task_queues: Dict[str, Self] = {} 1a
20 default_scheduled_max_size: int = ( 1a
21 PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE.value()
22 )
23 default_retry_max_size: int = PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE.value() 1a
25 _queue_size_configs: Dict[str, Tuple[int, int]] = {} 1a
27 task_key: str 1a
28 _scheduled_queue: asyncio.Queue 1a
29 _retry_queue: asyncio.Queue 1a
31 @classmethod 1a
32 async def enqueue(cls, task_run: schemas.core.TaskRun) -> None: 1a
33 await cls.for_key(task_run.task_key).put(task_run)
35 @classmethod 1a
36 def configure_task_key( 1a
37 cls,
38 task_key: str,
39 scheduled_size: Optional[int] = None,
40 retry_size: Optional[int] = None,
41 ) -> None:
42 scheduled_size = scheduled_size or cls.default_scheduled_max_size
43 retry_size = retry_size or cls.default_retry_max_size
44 cls._queue_size_configs[task_key] = (scheduled_size, retry_size)
46 @classmethod 1a
47 def for_key(cls, task_key: str) -> Self: 1a
48 if task_key not in cls._task_queues:
49 sizes = cls._queue_size_configs.get(
50 task_key, (cls.default_scheduled_max_size, cls.default_retry_max_size)
51 )
52 cls._task_queues[task_key] = cls(task_key, *sizes)
53 return cls._task_queues[task_key]
55 @classmethod 1a
56 def reset(cls) -> None: 1a
57 """A unit testing utility to reset the state of the task queues subsystem"""
58 cls._task_queues.clear()
59 cls._scheduled_tasks_already_restored = False
61 def __init__(self, task_key: str, scheduled_queue_size: int, retry_queue_size: int): 1a
62 self.task_key = task_key
63 self._scheduled_queue = asyncio.Queue(maxsize=scheduled_queue_size)
64 self._retry_queue = asyncio.Queue(maxsize=retry_queue_size)
66 async def get(self) -> schemas.core.TaskRun: 1a
67 # First, check if there's anything in the retry queue
68 try:
69 return self._retry_queue.get_nowait()
70 except asyncio.QueueEmpty:
71 return await self._scheduled_queue.get()
73 def get_nowait(self) -> schemas.core.TaskRun: 1a
74 # First, check if there's anything in the retry queue
75 try:
76 return self._retry_queue.get_nowait()
77 except asyncio.QueueEmpty:
78 return self._scheduled_queue.get_nowait()
80 async def put(self, task_run: schemas.core.TaskRun) -> None: 1a
81 await self._scheduled_queue.put(task_run)
83 async def retry(self, task_run: schemas.core.TaskRun) -> None: 1a
84 await self._retry_queue.put(task_run)
87class MultiQueue: 1a
88 """A queue that can pull tasks from from any of a number of task queues"""
90 _queues: List[TaskQueue] 1a
92 def __init__(self, task_keys: List[str]): 1a
93 self._queues = [TaskQueue.for_key(task_key) for task_key in task_keys]
95 async def get(self) -> schemas.core.TaskRun: 1a
96 """Gets the next task_run from any of the given queues"""
97 while True:
98 for queue in self._queues:
99 try:
100 return queue.get_nowait()
101 except asyncio.QueueEmpty:
102 continue
103 await asyncio.sleep(0.01)