Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/task_queue.py: 46%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Implements an in-memory task queue for delivering background task runs to TaskWorkers. 

3""" 

4 

5import asyncio 1a

6from typing import Dict, List, Optional, Tuple 1a

7 

8from typing_extensions import Self 1a

9 

10import prefect.server.schemas as schemas 1a

11from prefect.settings import ( 1a

12 PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE, 

13 PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE, 

14) 

15 

16 

17class TaskQueue: 1a

18 _task_queues: Dict[str, Self] = {} 1a

19 

20 default_scheduled_max_size: int = ( 1a

21 PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE.value() 

22 ) 

23 default_retry_max_size: int = PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE.value() 1a

24 

25 _queue_size_configs: Dict[str, Tuple[int, int]] = {} 1a

26 

27 task_key: str 1a

28 _scheduled_queue: asyncio.Queue 1a

29 _retry_queue: asyncio.Queue 1a

30 

31 @classmethod 1a

32 async def enqueue(cls, task_run: schemas.core.TaskRun) -> None: 1a

33 await cls.for_key(task_run.task_key).put(task_run) 

34 

35 @classmethod 1a

36 def configure_task_key( 1a

37 cls, 

38 task_key: str, 

39 scheduled_size: Optional[int] = None, 

40 retry_size: Optional[int] = None, 

41 ) -> None: 

42 scheduled_size = scheduled_size or cls.default_scheduled_max_size 

43 retry_size = retry_size or cls.default_retry_max_size 

44 cls._queue_size_configs[task_key] = (scheduled_size, retry_size) 

45 

46 @classmethod 1a

47 def for_key(cls, task_key: str) -> Self: 1a

48 if task_key not in cls._task_queues: 

49 sizes = cls._queue_size_configs.get( 

50 task_key, (cls.default_scheduled_max_size, cls.default_retry_max_size) 

51 ) 

52 cls._task_queues[task_key] = cls(task_key, *sizes) 

53 return cls._task_queues[task_key] 

54 

55 @classmethod 1a

56 def reset(cls) -> None: 1a

57 """A unit testing utility to reset the state of the task queues subsystem""" 

58 cls._task_queues.clear() 

59 cls._scheduled_tasks_already_restored = False 

60 

61 def __init__(self, task_key: str, scheduled_queue_size: int, retry_queue_size: int): 1a

62 self.task_key = task_key 

63 self._scheduled_queue = asyncio.Queue(maxsize=scheduled_queue_size) 

64 self._retry_queue = asyncio.Queue(maxsize=retry_queue_size) 

65 

66 async def get(self) -> schemas.core.TaskRun: 1a

67 # First, check if there's anything in the retry queue 

68 try: 

69 return self._retry_queue.get_nowait() 

70 except asyncio.QueueEmpty: 

71 return await self._scheduled_queue.get() 

72 

73 def get_nowait(self) -> schemas.core.TaskRun: 1a

74 # First, check if there's anything in the retry queue 

75 try: 

76 return self._retry_queue.get_nowait() 

77 except asyncio.QueueEmpty: 

78 return self._scheduled_queue.get_nowait() 

79 

80 async def put(self, task_run: schemas.core.TaskRun) -> None: 1a

81 await self._scheduled_queue.put(task_run) 

82 

83 async def retry(self, task_run: schemas.core.TaskRun) -> None: 1a

84 await self._retry_queue.put(task_run) 

85 

86 

87class MultiQueue: 1a

88 """A queue that can pull tasks from from any of a number of task queues""" 

89 

90 _queues: List[TaskQueue] 1a

91 

92 def __init__(self, task_keys: List[str]): 1a

93 self._queues = [TaskQueue.for_key(task_key) for task_key in task_keys] 

94 

95 async def get(self) -> schemas.core.TaskRun: 1a

96 """Gets the next task_run from any of the given queues""" 

97 while True: 

98 for queue in self._queues: 

99 try: 

100 return queue.get_nowait() 

101 except asyncio.QueueEmpty: 

102 continue 

103 await asyncio.sleep(0.01)