Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/blocks.py: 0%
181 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations
3import uuid
4from typing import Any, cast
6from typing_extensions import Self
8from prefect.cli.transfer._exceptions import TransferSkipped
9from prefect.cli.transfer._migratable_resources import construct_migratable_resource
10from prefect.cli.transfer._migratable_resources.base import (
11 MigratableProtocol,
12 MigratableResource,
13)
14from prefect.client.orchestration import get_client
15from prefect.client.schemas.actions import (
16 BlockDocumentCreate,
17 BlockSchemaCreate,
18 BlockTypeCreate,
19)
20from prefect.client.schemas.objects import (
21 BlockDocument,
22 BlockSchema,
23 BlockType,
24)
25from prefect.exceptions import (
26 ObjectAlreadyExists,
27)
30class MigratableBlockType(MigratableResource[BlockType]):
31 _instances: dict[uuid.UUID, Self] = {}
33 def __init__(self, block_type: BlockType):
34 self.source_block_type = block_type
35 self.destination_block_type: BlockType | None = None
37 @property
38 def source_id(self) -> uuid.UUID:
39 return self.source_block_type.id
41 @property
42 def destination_id(self) -> uuid.UUID | None:
43 return self.destination_block_type.id if self.destination_block_type else None
45 @classmethod
46 async def construct(cls, obj: BlockType) -> Self:
47 if obj.id in cls._instances:
48 return cls._instances[obj.id]
49 instance = cls(obj)
50 cls._instances[obj.id] = instance
51 return instance
53 @classmethod
54 async def get_instance(
55 cls, id: uuid.UUID
56 ) -> "MigratableResource[BlockType] | None":
57 if id in cls._instances:
58 return cls._instances[id]
59 return None
61 async def get_dependencies(self) -> "list[MigratableProtocol]":
62 return []
64 async def migrate(self) -> None:
65 async with get_client() as client:
66 try:
67 block_type = await client.create_block_type(
68 block_type=BlockTypeCreate(
69 name=self.source_block_type.name,
70 slug=self.source_block_type.slug,
71 ),
72 )
73 self.destination_block_type = block_type
74 except ObjectAlreadyExists:
75 self.destination_block_type = await client.read_block_type_by_slug(
76 self.source_block_type.slug
77 )
78 raise TransferSkipped("Already exists")
81class MigratableBlockSchema(MigratableResource[BlockSchema]):
82 _instances: dict[uuid.UUID, Self] = {}
84 def __init__(self, block_schema: BlockSchema):
85 self.source_block_schema = block_schema
86 self.destination_block_schema: BlockSchema | None = None
87 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {}
89 @property
90 def source_id(self) -> uuid.UUID:
91 return self.source_block_schema.id
93 @property
94 def destination_id(self) -> uuid.UUID | None:
95 return (
96 self.destination_block_schema.id if self.destination_block_schema else None
97 )
99 @classmethod
100 async def construct(cls, obj: BlockSchema) -> Self:
101 if obj.id in cls._instances:
102 return cls._instances[obj.id]
103 instance = cls(obj)
104 cls._instances[obj.id] = instance
105 return instance
107 @classmethod
108 async def get_instance(
109 cls, id: uuid.UUID
110 ) -> "MigratableResource[BlockSchema] | None":
111 if id in cls._instances:
112 return cls._instances[id]
113 return None
115 async def get_dependencies(self) -> "list[MigratableProtocol]":
116 if self._dependencies:
117 return list(self._dependencies.values())
119 async with get_client() as client:
120 if self.source_block_schema.block_type is not None:
121 if dependency := await MigratableBlockType.get_instance(
122 id=self.source_block_schema.block_type.id
123 ):
124 self._dependencies[self.source_block_schema.block_type.id] = (
125 dependency
126 )
127 else:
128 self._dependencies[
129 self.source_block_schema.block_type.id
130 ] = await construct_migratable_resource(
131 self.source_block_schema.block_type
132 )
133 elif self.source_block_schema.block_type_id is not None:
134 if dependency := await MigratableBlockType.get_instance(
135 id=self.source_block_schema.block_type_id
136 ):
137 self._dependencies[self.source_block_schema.block_type_id] = (
138 dependency
139 )
140 else:
141 response = await client.request(
142 "GET",
143 "/block_types/{id}",
144 params={"id": self.source_block_schema.block_type_id},
145 )
146 block_type = BlockType.model_validate(response.json())
147 self._dependencies[
148 block_type.id
149 ] = await construct_migratable_resource(block_type)
150 else:
151 raise ValueError("Block schema has no associated block type")
153 block_schema_references: dict[str, dict[str, Any]] = (
154 self.source_block_schema.fields.get("block_schema_references", {})
155 )
156 for block_schema_reference in block_schema_references.values():
157 if isinstance(block_schema_reference, list):
158 for nested_block_schema_reference in block_schema_reference:
159 if block_schema_checksum := cast(
160 dict[str, str], nested_block_schema_reference
161 ).get("block_schema_checksum"):
162 block_schema = await client.read_block_schema_by_checksum(
163 block_schema_checksum
164 )
165 if dependency := await MigratableBlockSchema.get_instance(
166 id=block_schema.id
167 ):
168 self._dependencies[block_schema.id] = dependency
169 else:
170 self._dependencies[
171 block_schema.id
172 ] = await construct_migratable_resource(block_schema)
173 else:
174 if block_schema_checksum := block_schema_reference.get(
175 "block_schema_checksum"
176 ):
177 block_schema = await client.read_block_schema_by_checksum(
178 block_schema_checksum
179 )
180 if dependency := await MigratableBlockSchema.get_instance(
181 id=block_schema.id
182 ):
183 self._dependencies[block_schema.id] = dependency
184 else:
185 self._dependencies[
186 block_schema.id
187 ] = await construct_migratable_resource(block_schema)
189 return list(self._dependencies.values())
191 async def migrate(self) -> None:
192 if self.source_block_schema.block_type_id is None:
193 raise ValueError("Block schema has no associated block type")
194 if (
195 destination_block_type := self._dependencies.get(
196 self.source_block_schema.block_type_id
197 )
198 ) is None:
199 raise ValueError("Unable to find destination block type")
200 async with get_client() as client:
201 try:
202 self.destination_block_schema = await client.create_block_schema(
203 block_schema=BlockSchemaCreate(
204 fields=self.source_block_schema.fields,
205 block_type_id=destination_block_type.destination_id,
206 capabilities=self.source_block_schema.capabilities,
207 version=self.source_block_schema.version,
208 ),
209 )
210 except ObjectAlreadyExists:
211 self.destination_block_schema = (
212 await client.read_block_schema_by_checksum(
213 self.source_block_schema.checksum
214 )
215 )
216 raise TransferSkipped("Already exists")
219class MigratableBlockDocument(MigratableResource[BlockDocument]):
220 _instances: dict[uuid.UUID, Self] = {}
222 def __init__(self, block_document: BlockDocument):
223 self.source_block_document = block_document
224 self.destination_block_document: BlockDocument | None = None
225 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {}
227 @property
228 def source_id(self) -> uuid.UUID:
229 return self.source_block_document.id
231 @property
232 def destination_id(self) -> uuid.UUID | None:
233 return (
234 self.destination_block_document.id
235 if self.destination_block_document
236 else None
237 )
239 @classmethod
240 async def construct(cls, obj: BlockDocument) -> Self:
241 if obj.id in cls._instances:
242 return cls._instances[obj.id]
243 instance = cls(obj)
244 cls._instances[obj.id] = instance
245 return instance
247 @classmethod
248 async def get_instance(
249 cls, id: uuid.UUID
250 ) -> "MigratableResource[BlockDocument] | None":
251 if id in cls._instances:
252 return cls._instances[id]
253 return None
255 async def get_dependencies(self) -> "list[MigratableProtocol]":
256 if self._dependencies:
257 return list(self._dependencies.values())
259 # TODO: When we write serialized versions of the objects to disk, we should have a way to
260 # use a client, but read from disk if the object has already been fetched.
261 async with get_client() as client:
262 if self.source_block_document.block_type is not None:
263 if dependency := await MigratableBlockType.get_instance(
264 id=self.source_block_document.block_type.id
265 ):
266 self._dependencies[self.source_block_document.block_type.id] = (
267 dependency
268 )
269 else:
270 self._dependencies[
271 self.source_block_document.block_type.id
272 ] = await construct_migratable_resource(
273 self.source_block_document.block_type
274 )
275 else:
276 if dependency := await MigratableBlockType.get_instance(
277 id=self.source_block_document.block_type_id
278 ):
279 self._dependencies[self.source_block_document.block_type_id] = (
280 dependency
281 )
282 else:
283 response = await client.request(
284 "GET",
285 "/block_types/{id}",
286 params={"id": self.source_block_document.block_type_id},
287 )
288 block_type = BlockType.model_validate(response.json())
289 self._dependencies[
290 block_type.id
291 ] = await construct_migratable_resource(block_type)
293 if self.source_block_document.block_schema is not None:
294 if dependency := await MigratableBlockSchema.get_instance(
295 id=self.source_block_document.block_schema.id
296 ):
297 self._dependencies[self.source_block_document.block_schema.id] = (
298 dependency
299 )
300 else:
301 self._dependencies[
302 self.source_block_document.block_schema.id
303 ] = await construct_migratable_resource(
304 self.source_block_document.block_schema
305 )
306 else:
307 if dependency := await MigratableBlockSchema.get_instance(
308 id=self.source_block_document.block_schema_id
309 ):
310 self._dependencies[self.source_block_document.block_schema_id] = (
311 dependency
312 )
313 else:
314 response = await client.request(
315 "GET",
316 "/block_schemas/{id}",
317 params={"id": self.source_block_document.block_schema_id},
318 )
319 block_schema = BlockSchema.model_validate(response.json())
320 self._dependencies[
321 block_schema.id
322 ] = await construct_migratable_resource(block_schema)
324 if self.source_block_document.block_document_references:
325 for (
326 block_document_reference
327 ) in self.source_block_document.block_document_references.values():
328 if block_document_id := block_document_reference.get(
329 "block_document_id"
330 ):
331 if dependency := await MigratableBlockDocument.get_instance(
332 id=block_document_id
333 ):
334 self._dependencies[block_document_id] = dependency
335 else:
336 block_document = await client.read_block_document(
337 block_document_id
338 )
339 self._dependencies[
340 block_document.id
341 ] = await construct_migratable_resource(block_document)
343 return list(self._dependencies.values())
345 async def migrate(self) -> None:
346 if (
347 destination_block_type := self._dependencies.get(
348 self.source_block_document.block_type_id
349 )
350 ) is None or not destination_block_type.destination_id:
351 raise ValueError("Unable to find destination block type")
352 if (
353 destination_block_schema := self._dependencies.get(
354 self.source_block_document.block_schema_id
355 )
356 ) is None or not destination_block_schema.destination_id:
357 raise ValueError("Unable to find destination block schema")
359 async with get_client() as client:
360 try:
361 # TODO: Check if data needs to be written differently to maintain composition
362 self.destination_block_document = await client.create_block_document(
363 block_document=BlockDocumentCreate(
364 name=self.source_block_document.name,
365 block_type_id=destination_block_type.destination_id,
366 block_schema_id=destination_block_schema.destination_id,
367 data=self.source_block_document.data,
368 ),
369 )
370 except ObjectAlreadyExists:
371 if self.source_block_document.name is None:
372 # This is technically impossible, but our typing thinks it's possible
373 raise ValueError(
374 "Block document has no name, which should be impossible. "
375 "Please report this as a bug."
376 )
378 if self.source_block_document.block_type is not None:
379 block_type_slug = self.source_block_document.block_type.slug
380 else:
381 # TODO: Add real client methods for places where we use `client.request`
382 response = await client.request(
383 "GET",
384 "/block_types/{id}",
385 params={"id": self.source_block_document.block_type_id},
386 )
387 block_type = BlockType.model_validate(response.json())
388 block_type_slug = block_type.slug
390 self.destination_block_document = (
391 await client.read_block_document_by_name(
392 block_type_slug=block_type_slug,
393 name=self.source_block_document.name,
394 )
395 )
396 raise TransferSkipped("Already exists")