Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/variables.py: 0%
41 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources.base import (
9 MigratableProtocol,
10 MigratableResource,
11)
12from prefect.client.orchestration import get_client
13from prefect.client.schemas.actions import (
14 VariableCreate,
15)
16from prefect.client.schemas.objects import (
17 Variable,
18)
19from prefect.exceptions import (
20 ObjectAlreadyExists,
21)
24class MigratableVariable(MigratableResource[Variable]):
25 _instances: dict[uuid.UUID, Self] = {}
27 def __init__(self, variable: Variable):
28 self.source_variable = variable
29 self.destination_variable: Variable | None = None
31 @property
32 def source_id(self) -> uuid.UUID:
33 return self.source_variable.id
35 @property
36 def destination_id(self) -> uuid.UUID | None:
37 return self.destination_variable.id if self.destination_variable else None
39 @classmethod
40 async def construct(cls, obj: Variable) -> Self:
41 if obj.id in cls._instances:
42 return cls._instances[obj.id]
43 instance = cls(obj)
44 cls._instances[obj.id] = instance
45 return instance
47 @classmethod
48 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[Variable] | None":
49 if id in cls._instances:
50 return cls._instances[id]
51 return None
53 async def get_dependencies(self) -> "list[MigratableProtocol]":
54 return []
56 async def migrate(self) -> None:
57 async with get_client() as client:
58 try:
59 self.destination_variable = await client.create_variable(
60 variable=VariableCreate(
61 name=self.source_variable.name,
62 value=self.source_variable.value,
63 tags=self.source_variable.tags,
64 ),
65 )
66 except ObjectAlreadyExists:
67 self.destination_variable = await client.read_variable_by_name(
68 self.source_variable.name
69 )
70 raise TransferSkipped("Already exists")