Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/flows.py: 0%

47 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources.base import ( 

9 MigratableProtocol, 

10 MigratableResource, 

11) 

12from prefect.client.orchestration import get_client 

13from prefect.client.schemas.actions import FlowCreate 

14from prefect.client.schemas.filters import FlowFilter, FlowFilterName 

15from prefect.client.schemas.objects import Flow 

16from prefect.exceptions import ( 

17 ObjectAlreadyExists, 

18) 

19 

20 

21class MigratableFlow(MigratableResource[Flow]): 

22 _instances: dict[uuid.UUID, Self] = {} 

23 

24 def __init__(self, flow: Flow): 

25 self.source_flow = flow 

26 self.destination_flow: Flow | None = None 

27 

28 @property 

29 def source_id(self) -> uuid.UUID: 

30 return self.source_flow.id 

31 

32 @property 

33 def destination_id(self) -> uuid.UUID | None: 

34 return self.destination_flow.id if self.destination_flow else None 

35 

36 @classmethod 

37 async def construct(cls, obj: Flow) -> Self: 

38 if obj.id in cls._instances: 

39 return cls._instances[obj.id] 

40 instance = cls(obj) 

41 cls._instances[obj.id] = instance 

42 return instance 

43 

44 @classmethod 

45 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[Flow] | None": 

46 if id in cls._instances: 

47 return cls._instances[id] 

48 return None 

49 

50 async def get_dependencies(self) -> "list[MigratableProtocol]": 

51 return [] 

52 

53 async def migrate(self) -> None: 

54 async with get_client() as client: 

55 try: 

56 flow_data = FlowCreate( 

57 name=self.source_flow.name, 

58 tags=self.source_flow.tags, 

59 labels=self.source_flow.labels, 

60 ) 

61 # We don't have a pre-built client method that accepts tags and labels 

62 response = await client.request( 

63 "POST", "/flows/", json=flow_data.model_dump(mode="json") 

64 ) 

65 self.destination_flow = Flow.model_validate(response.json()) 

66 except ObjectAlreadyExists: 

67 # Flow already exists, read it by name 

68 flows = await client.read_flows( 

69 flow_filter=FlowFilter( 

70 name=FlowFilterName(any_=[self.source_flow.name]) 

71 ) 

72 ) 

73 if flows and len(flows) == 1: 

74 self.destination_flow = flows[0] 

75 else: 

76 raise RuntimeError("Unable to find destination flow") 

77 raise TransferSkipped("Already exists")