Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/work_pools.py: 0%
74 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources import construct_migratable_resource
9from prefect.cli.transfer._migratable_resources.base import (
10 MigratableProtocol,
11 MigratableResource,
12)
13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument
14from prefect.client.orchestration import get_client
15from prefect.client.schemas.actions import (
16 WorkPoolCreate,
17)
18from prefect.client.schemas.objects import (
19 WorkPool,
20 WorkQueue,
21)
22from prefect.exceptions import (
23 ObjectAlreadyExists,
24 ObjectUnsupported,
25)
28class MigratableWorkPool(MigratableResource[WorkPool]):
29 _instances: dict[uuid.UUID, Self] = {}
31 def __init__(self, work_pool: WorkPool, default_queue: WorkQueue):
32 self.source_work_pool = work_pool
33 self.source_default_queue = default_queue
34 self.destination_work_pool: WorkPool | None = None
35 self._dependencies: list[MigratableProtocol] = []
37 @property
38 def source_id(self) -> uuid.UUID:
39 return self.source_work_pool.id
41 @property
42 def destination_id(self) -> uuid.UUID | None:
43 return self.destination_work_pool.id if self.destination_work_pool else None
45 @classmethod
46 async def construct(cls, obj: WorkPool) -> Self:
47 if obj.id in cls._instances:
48 return cls._instances[obj.id]
49 async with get_client() as client:
50 default_queue = await client.read_work_queue(obj.default_queue_id)
51 instance = cls(obj, default_queue)
52 cls._instances[obj.id] = instance
53 return instance
55 @classmethod
56 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[WorkPool] | None":
57 if id in cls._instances:
58 return cls._instances[id]
59 return None
61 @classmethod
62 async def get_instance_by_name(
63 cls, name: str
64 ) -> "MigratableResource[WorkPool] | None":
65 for instance in cls._instances.values():
66 if instance.source_work_pool.name == name:
67 return instance
68 return None
70 async def get_dependencies(self) -> "list[MigratableProtocol]":
71 if self._dependencies:
72 return self._dependencies
74 async with get_client() as client:
75 if (
76 self.source_work_pool.storage_configuration.default_result_storage_block_id
77 is not None
78 ):
79 if dependency := await MigratableBlockDocument.get_instance(
80 id=self.source_work_pool.storage_configuration.default_result_storage_block_id
81 ):
82 self._dependencies.append(dependency)
83 else:
84 result_storage_block = await client.read_block_document(
85 self.source_work_pool.storage_configuration.default_result_storage_block_id
86 )
87 self._dependencies.append(
88 await construct_migratable_resource(result_storage_block)
89 )
90 if (
91 self.source_work_pool.storage_configuration.bundle_upload_step
92 is not None
93 ):
94 # TODO: Figure out how to find block document references in bundle upload step
95 pass
96 if (
97 self.source_work_pool.storage_configuration.bundle_execution_step
98 is not None
99 ):
100 # TODO: Figure out how to find block document references in bundle download step
101 pass
102 return self._dependencies
104 async def migrate(self) -> None:
105 async with get_client() as client:
106 # Skip managed pools always - they're cloud-specific infrastructure
107 if self.source_work_pool.is_managed_pool:
108 raise TransferSkipped("Skipped managed pool (cloud-specific)")
110 # Allow push pools only if destination is Cloud
111 if self.source_work_pool.is_push_pool:
112 from prefect.client.base import ServerType
114 if client.server_type != ServerType.CLOUD:
115 raise TransferSkipped("Skipped push pool (requires Prefect Cloud)")
116 try:
117 self.destination_work_pool = await client.create_work_pool(
118 work_pool=WorkPoolCreate(
119 name=self.source_work_pool.name,
120 type=self.source_work_pool.type,
121 base_job_template=self.source_work_pool.base_job_template,
122 is_paused=self.source_work_pool.is_paused,
123 concurrency_limit=self.source_work_pool.concurrency_limit,
124 storage_configuration=self.source_work_pool.storage_configuration,
125 ),
126 )
127 except ObjectUnsupported:
128 raise TransferSkipped("Destination requires Standard/Pro tier")
129 except ObjectAlreadyExists:
130 self.destination_work_pool = await client.read_work_pool(
131 self.source_work_pool.name
132 )
133 raise TransferSkipped("Already exists")
135 # Update the default queue after successful creation
136 await client.update_work_queue(
137 id=self.destination_work_pool.default_queue_id,
138 description=self.source_default_queue.description,
139 priority=self.source_default_queue.priority,
140 concurrency_limit=self.source_default_queue.concurrency_limit,
141 )