Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/flows.py: 0%
47 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3import uuid
5from typing_extensions import Self
7from prefect.cli.transfer._exceptions import TransferSkipped
8from prefect.cli.transfer._migratable_resources.base import (
9 MigratableProtocol,
10 MigratableResource,
11)
12from prefect.client.orchestration import get_client
13from prefect.client.schemas.actions import FlowCreate
14from prefect.client.schemas.filters import FlowFilter, FlowFilterName
15from prefect.client.schemas.objects import Flow
16from prefect.exceptions import (
17 ObjectAlreadyExists,
18)
21class MigratableFlow(MigratableResource[Flow]):
22 _instances: dict[uuid.UUID, Self] = {}
24 def __init__(self, flow: Flow):
25 self.source_flow = flow
26 self.destination_flow: Flow | None = None
28 @property
29 def source_id(self) -> uuid.UUID:
30 return self.source_flow.id
32 @property
33 def destination_id(self) -> uuid.UUID | None:
34 return self.destination_flow.id if self.destination_flow else None
36 @classmethod
37 async def construct(cls, obj: Flow) -> Self:
38 if obj.id in cls._instances:
39 return cls._instances[obj.id]
40 instance = cls(obj)
41 cls._instances[obj.id] = instance
42 return instance
44 @classmethod
45 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[Flow] | None":
46 if id in cls._instances:
47 return cls._instances[id]
48 return None
50 async def get_dependencies(self) -> "list[MigratableProtocol]":
51 return []
53 async def migrate(self) -> None:
54 async with get_client() as client:
55 try:
56 flow_data = FlowCreate(
57 name=self.source_flow.name,
58 tags=self.source_flow.tags,
59 labels=self.source_flow.labels,
60 )
61 # We don't have a pre-built client method that accepts tags and labels
62 response = await client.request(
63 "POST", "/flows/", json=flow_data.model_dump(mode="json")
64 )
65 self.destination_flow = Flow.model_validate(response.json())
66 except ObjectAlreadyExists:
67 # Flow already exists, read it by name
68 flows = await client.read_flows(
69 flow_filter=FlowFilter(
70 name=FlowFilterName(any_=[self.source_flow.name])
71 )
72 )
73 if flows and len(flows) == 1:
74 self.destination_flow = flows[0]
75 else:
76 raise RuntimeError("Unable to find destination flow")
77 raise TransferSkipped("Already exists")