Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/deployments.py: 0%
87 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources import construct_migratable_resource
9from prefect.cli.transfer._migratable_resources.base import (
10 MigratableProtocol,
11 MigratableResource,
12)
13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument
14from prefect.cli.transfer._migratable_resources.flows import MigratableFlow
15from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool
16from prefect.cli.transfer._migratable_resources.work_queues import MigratableWorkQueue
17from prefect.client.orchestration import get_client
18from prefect.client.schemas.actions import DeploymentScheduleCreate
19from prefect.client.schemas.responses import DeploymentResponse
20from prefect.exceptions import (
21 ObjectAlreadyExists,
22 ObjectLimitReached,
23)
26class MigratableDeployment(MigratableResource[DeploymentResponse]):
27 _instances: dict[uuid.UUID, Self] = {}
29 def __init__(self, deployment: DeploymentResponse):
30 self.source_deployment = deployment
31 self.destination_deployment: DeploymentResponse | None = None
32 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {}
34 @property
35 def source_id(self) -> uuid.UUID:
36 return self.source_deployment.id
38 @property
39 def destination_id(self) -> uuid.UUID | None:
40 return self.destination_deployment.id if self.destination_deployment else None
42 @classmethod
43 async def construct(cls, obj: DeploymentResponse) -> Self:
44 if obj.id in cls._instances:
45 return cls._instances[obj.id]
46 instance = cls(obj)
47 cls._instances[obj.id] = instance
48 return instance
50 @classmethod
51 async def get_instance(
52 cls, id: uuid.UUID
53 ) -> "MigratableResource[DeploymentResponse] | None":
54 if id in cls._instances:
55 return cls._instances[id]
56 return None
58 async def get_dependencies(self) -> "list[MigratableProtocol]":
59 if self._dependencies:
60 return list(self._dependencies.values())
62 async with get_client() as client:
63 if dependency := await MigratableFlow.get_instance(
64 id=self.source_deployment.flow_id
65 ):
66 self._dependencies[self.source_deployment.flow_id] = dependency
67 else:
68 flow = await client.read_flow(self.source_deployment.flow_id)
69 self._dependencies[
70 self.source_deployment.flow_id
71 ] = await construct_migratable_resource(flow)
72 if self.source_deployment.work_queue_id is not None:
73 if dependency := await MigratableWorkQueue.get_instance(
74 id=self.source_deployment.work_queue_id
75 ):
76 self._dependencies[self.source_deployment.work_queue_id] = (
77 dependency
78 )
79 else:
80 work_queue = await client.read_work_queue(
81 self.source_deployment.work_queue_id
82 )
83 self._dependencies[
84 work_queue.id
85 ] = await construct_migratable_resource(work_queue)
86 if self.source_deployment.work_pool_name is not None:
87 if dependency := await MigratableWorkPool.get_instance_by_name(
88 name=self.source_deployment.work_pool_name
89 ):
90 self._dependencies[dependency.source_id] = dependency
91 else:
92 work_pool = await client.read_work_pool(
93 self.source_deployment.work_pool_name
94 )
95 self._dependencies[
96 work_pool.id
97 ] = await construct_migratable_resource(work_pool)
98 if self.source_deployment.storage_document_id is not None:
99 if dependency := await MigratableBlockDocument.get_instance(
100 id=self.source_deployment.storage_document_id
101 ):
102 self._dependencies[self.source_deployment.storage_document_id] = (
103 dependency
104 )
105 else:
106 storage_document = await client.read_block_document(
107 self.source_deployment.storage_document_id
108 )
109 self._dependencies[
110 storage_document.id
111 ] = await construct_migratable_resource(storage_document)
112 if self.source_deployment.infrastructure_document_id is not None:
113 if dependency := await MigratableBlockDocument.get_instance(
114 id=self.source_deployment.infrastructure_document_id
115 ):
116 self._dependencies[
117 self.source_deployment.infrastructure_document_id
118 ] = dependency
119 else:
120 infrastructure_document = await client.read_block_document(
121 self.source_deployment.infrastructure_document_id
122 )
123 self._dependencies[
124 infrastructure_document.id
125 ] = await construct_migratable_resource(infrastructure_document)
126 if self.source_deployment.pull_steps:
127 # TODO: Figure out how to find block document references in pull steps
128 pass
130 return list(self._dependencies.values())
132 async def migrate(self) -> None:
133 async with get_client() as client:
134 try:
135 if (
136 destination_flow_id := getattr(
137 self._dependencies.get(self.source_deployment.flow_id),
138 "destination_id",
139 None,
140 )
141 ) is None:
142 raise ValueError("Unable to find destination flow")
143 if (
144 self.source_deployment.storage_document_id
145 and (
146 destination_storage_document_id := getattr(
147 self._dependencies.get(
148 self.source_deployment.storage_document_id
149 ),
150 "destination_id",
151 None,
152 )
153 )
154 is None
155 ):
156 raise ValueError("Unable to find destination storage document")
157 else:
158 destination_storage_document_id = None
159 if (
160 self.source_deployment.infrastructure_document_id
161 and (
162 destination_infrastructure_document_id := getattr(
163 self._dependencies.get(
164 self.source_deployment.infrastructure_document_id
165 ),
166 "destination_id",
167 None,
168 )
169 )
170 is None
171 ):
172 raise ValueError(
173 "Unable to find destination infrastructure document"
174 )
175 else:
176 destination_infrastructure_document_id = None
178 destination_deployment_id = await client.create_deployment(
179 flow_id=destination_flow_id,
180 name=self.source_deployment.name,
181 version=self.source_deployment.version,
182 version_info=self.source_deployment.version_info,
183 schedules=[
184 DeploymentScheduleCreate(
185 schedule=schedule.schedule,
186 active=schedule.active,
187 max_scheduled_runs=schedule.max_scheduled_runs,
188 parameters=schedule.parameters,
189 slug=schedule.slug,
190 )
191 for schedule in self.source_deployment.schedules
192 ],
193 concurrency_limit=self.source_deployment.concurrency_limit,
194 concurrency_options=self.source_deployment.concurrency_options,
195 parameters=self.source_deployment.parameters,
196 description=self.source_deployment.description,
197 work_queue_name=self.source_deployment.work_queue_name,
198 work_pool_name=self.source_deployment.work_pool_name,
199 tags=self.source_deployment.tags,
200 storage_document_id=destination_storage_document_id,
201 path=self.source_deployment.path,
202 entrypoint=self.source_deployment.entrypoint,
203 infrastructure_document_id=destination_infrastructure_document_id,
204 parameter_openapi_schema=self.source_deployment.parameter_openapi_schema,
205 paused=self.source_deployment.paused,
206 pull_steps=self.source_deployment.pull_steps,
207 enforce_parameter_schema=self.source_deployment.enforce_parameter_schema,
208 job_variables=self.source_deployment.job_variables,
209 branch=self.source_deployment.branch,
210 base=self.source_deployment.base,
211 root=self.source_deployment.root,
212 )
213 self.destination_deployment = await client.read_deployment(
214 destination_deployment_id
215 )
216 except ObjectLimitReached:
217 raise TransferSkipped("Deployment limit reached (upgrade tier)")
218 except ObjectAlreadyExists:
219 self.destination_deployment = await client.read_deployment(
220 self.source_deployment.id
221 )
222 raise TransferSkipped("Already exists")