Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/work_queues.py: 0%

58 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources import construct_migratable_resource 

9from prefect.cli.transfer._migratable_resources.base import ( 

10 MigratableProtocol, 

11 MigratableResource, 

12) 

13from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool 

14from prefect.client.orchestration import get_client 

15from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName 

16from prefect.client.schemas.objects import ( 

17 WorkQueue, 

18) 

19from prefect.exceptions import ( 

20 ObjectAlreadyExists, 

21) 

22 

23 

24class MigratableWorkQueue(MigratableResource[WorkQueue]): 

25 _instances: dict[uuid.UUID, Self] = {} 

26 

27 def __init__(self, work_queue: WorkQueue): 

28 self.source_work_queue = work_queue 

29 self.destination_work_queue: WorkQueue | None = None 

30 self._dependencies: list[MigratableProtocol] = [] 

31 

32 @property 

33 def source_id(self) -> uuid.UUID: 

34 return self.source_work_queue.id 

35 

36 @property 

37 def destination_id(self) -> uuid.UUID | None: 

38 return self.destination_work_queue.id if self.destination_work_queue else None 

39 

40 @classmethod 

41 async def construct(cls, obj: WorkQueue) -> Self: 

42 if obj.id in cls._instances: 

43 return cls._instances[obj.id] 

44 instance = cls(obj) 

45 cls._instances[obj.id] = instance 

46 return instance 

47 

48 @classmethod 

49 async def get_instance( 

50 cls, id: uuid.UUID 

51 ) -> "MigratableResource[WorkQueue] | None": 

52 if id in cls._instances: 

53 return cls._instances[id] 

54 return None 

55 

56 async def get_dependencies(self) -> "list[MigratableProtocol]": 

57 if self._dependencies: 

58 return self._dependencies 

59 

60 async with get_client() as client: 

61 if self.source_work_queue.work_pool_name is not None: 

62 if dependency := await MigratableWorkPool.get_instance_by_name( 

63 name=self.source_work_queue.work_pool_name 

64 ): 

65 # Always include the pool as a dependency 

66 # If it's push/managed, it will be skipped and this queue will be skipped too 

67 self._dependencies.append(dependency) 

68 else: 

69 work_pool = await client.read_work_pool( 

70 self.source_work_queue.work_pool_name 

71 ) 

72 self._dependencies.append( 

73 await construct_migratable_resource(work_pool) 

74 ) 

75 

76 return self._dependencies 

77 

78 async def migrate(self) -> None: 

79 async with get_client() as client: 

80 # Skip default work queues as they are created when work pools are transferred 

81 if self.source_work_queue.name == "default": 

82 work_queues = await client.read_work_queues( 

83 work_pool_name=self.source_work_queue.work_pool_name, 

84 work_queue_filter=WorkQueueFilter( 

85 name=WorkQueueFilterName(any_=["default"]), 

86 ), 

87 ) 

88 raise TransferSkipped("Default work queues are created with work pools") 

89 

90 try: 

91 self.destination_work_queue = await client.create_work_queue( 

92 name=self.source_work_queue.name, 

93 description=self.source_work_queue.description, 

94 priority=self.source_work_queue.priority, 

95 concurrency_limit=self.source_work_queue.concurrency_limit, 

96 work_pool_name=self.source_work_queue.work_pool_name, 

97 ) 

98 except ObjectAlreadyExists: 

99 # Work queue already exists, read it by work pool and name 

100 work_queues = await client.read_work_queues( 

101 work_pool_name=self.source_work_queue.work_pool_name, 

102 work_queue_filter=WorkQueueFilter( 

103 name=WorkQueueFilterName(any_=[self.source_work_queue.name]), 

104 ), 

105 ) 

106 if work_queues: 

107 self.destination_work_queue = work_queues[0] 

108 raise TransferSkipped("Already exists") 

109 else: 

110 raise RuntimeError( 

111 "Transfer failed due to conflict, but no existing queue found." 

112 )