Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/work_queues.py: 0%
58 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources import construct_migratable_resource
9from prefect.cli.transfer._migratable_resources.base import (
10 MigratableProtocol,
11 MigratableResource,
12)
13from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool
14from prefect.client.orchestration import get_client
15from prefect.client.schemas.filters import WorkQueueFilter, WorkQueueFilterName
16from prefect.client.schemas.objects import (
17 WorkQueue,
18)
19from prefect.exceptions import (
20 ObjectAlreadyExists,
21)
24class MigratableWorkQueue(MigratableResource[WorkQueue]):
25 _instances: dict[uuid.UUID, Self] = {}
27 def __init__(self, work_queue: WorkQueue):
28 self.source_work_queue = work_queue
29 self.destination_work_queue: WorkQueue | None = None
30 self._dependencies: list[MigratableProtocol] = []
32 @property
33 def source_id(self) -> uuid.UUID:
34 return self.source_work_queue.id
36 @property
37 def destination_id(self) -> uuid.UUID | None:
38 return self.destination_work_queue.id if self.destination_work_queue else None
40 @classmethod
41 async def construct(cls, obj: WorkQueue) -> Self:
42 if obj.id in cls._instances:
43 return cls._instances[obj.id]
44 instance = cls(obj)
45 cls._instances[obj.id] = instance
46 return instance
48 @classmethod
49 async def get_instance(
50 cls, id: uuid.UUID
51 ) -> "MigratableResource[WorkQueue] | None":
52 if id in cls._instances:
53 return cls._instances[id]
54 return None
56 async def get_dependencies(self) -> "list[MigratableProtocol]":
57 if self._dependencies:
58 return self._dependencies
60 async with get_client() as client:
61 if self.source_work_queue.work_pool_name is not None:
62 if dependency := await MigratableWorkPool.get_instance_by_name(
63 name=self.source_work_queue.work_pool_name
64 ):
65 # Always include the pool as a dependency
66 # If it's push/managed, it will be skipped and this queue will be skipped too
67 self._dependencies.append(dependency)
68 else:
69 work_pool = await client.read_work_pool(
70 self.source_work_queue.work_pool_name
71 )
72 self._dependencies.append(
73 await construct_migratable_resource(work_pool)
74 )
76 return self._dependencies
78 async def migrate(self) -> None:
79 async with get_client() as client:
80 # Skip default work queues as they are created when work pools are transferred
81 if self.source_work_queue.name == "default":
82 work_queues = await client.read_work_queues(
83 work_pool_name=self.source_work_queue.work_pool_name,
84 work_queue_filter=WorkQueueFilter(
85 name=WorkQueueFilterName(any_=["default"]),
86 ),
87 )
88 raise TransferSkipped("Default work queues are created with work pools")
90 try:
91 self.destination_work_queue = await client.create_work_queue(
92 name=self.source_work_queue.name,
93 description=self.source_work_queue.description,
94 priority=self.source_work_queue.priority,
95 concurrency_limit=self.source_work_queue.concurrency_limit,
96 work_pool_name=self.source_work_queue.work_pool_name,
97 )
98 except ObjectAlreadyExists:
99 # Work queue already exists, read it by work pool and name
100 work_queues = await client.read_work_queues(
101 work_pool_name=self.source_work_queue.work_pool_name,
102 work_queue_filter=WorkQueueFilter(
103 name=WorkQueueFilterName(any_=[self.source_work_queue.name]),
104 ),
105 )
106 if work_queues:
107 self.destination_work_queue = work_queues[0]
108 raise TransferSkipped("Already exists")
109 else:
110 raise RuntimeError(
111 "Transfer failed due to conflict, but no existing queue found."
112 )