Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/variables.py: 0%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources.base import ( 

9 MigratableProtocol, 

10 MigratableResource, 

11) 

12from prefect.client.orchestration import get_client 

13from prefect.client.schemas.actions import ( 

14 VariableCreate, 

15) 

16from prefect.client.schemas.objects import ( 

17 Variable, 

18) 

19from prefect.exceptions import ( 

20 ObjectAlreadyExists, 

21) 

22 

23 

24class MigratableVariable(MigratableResource[Variable]): 

25 _instances: dict[uuid.UUID, Self] = {} 

26 

27 def __init__(self, variable: Variable): 

28 self.source_variable = variable 

29 self.destination_variable: Variable | None = None 

30 

31 @property 

32 def source_id(self) -> uuid.UUID: 

33 return self.source_variable.id 

34 

35 @property 

36 def destination_id(self) -> uuid.UUID | None: 

37 return self.destination_variable.id if self.destination_variable else None 

38 

39 @classmethod 

40 async def construct(cls, obj: Variable) -> Self: 

41 if obj.id in cls._instances: 

42 return cls._instances[obj.id] 

43 instance = cls(obj) 

44 cls._instances[obj.id] = instance 

45 return instance 

46 

47 @classmethod 

48 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[Variable] | None": 

49 if id in cls._instances: 

50 return cls._instances[id] 

51 return None 

52 

53 async def get_dependencies(self) -> "list[MigratableProtocol]": 

54 return [] 

55 

56 async def migrate(self) -> None: 

57 async with get_client() as client: 

58 try: 

59 self.destination_variable = await client.create_variable( 

60 variable=VariableCreate( 

61 name=self.source_variable.name, 

62 value=self.source_variable.value, 

63 tags=self.source_variable.tags, 

64 ), 

65 ) 

66 except ObjectAlreadyExists: 

67 self.destination_variable = await client.read_variable_by_name( 

68 self.source_variable.name 

69 ) 

70 raise TransferSkipped("Already exists")