Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/base.py: 80%
35 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import abc 1a
4import uuid 1a
5from typing import Generic, Protocol, TypeVar, Union 1a
7from prefect.client.schemas.objects import ( 1a
8 BlockDocument,
9 BlockSchema,
10 BlockType,
11 Flow,
12 Variable,
13 WorkPool,
14 WorkQueue,
15)
16from prefect.client.schemas.responses import ( 1a
17 DeploymentResponse,
18 GlobalConcurrencyLimitResponse,
19)
20from prefect.events.schemas.automations import Automation 1a
22MigratableType = Union[ 1a
23 WorkPool,
24 WorkQueue,
25 DeploymentResponse,
26 Flow,
27 BlockType,
28 BlockSchema,
29 BlockDocument,
30 Automation,
31 GlobalConcurrencyLimitResponse,
32 Variable,
33]
35T = TypeVar("T", bound=MigratableType) 1a
38class MigratableProtocol(Protocol): 1a
39 @property 1a
40 def source_id(self) -> uuid.UUID: ... 40 ↛ exitline 40 didn't return from function 'source_id' because 1a
42 @property 1a
43 def destination_id(self) -> uuid.UUID | None: ... 43 ↛ exitline 43 didn't return from function 'destination_id' because 1a
45 async def get_dependencies(self) -> list["MigratableProtocol"]: ... 45 ↛ exitline 45 didn't return from function 'get_dependencies' because 1a
46 async def migrate(self) -> None: ... 46 ↛ exitline 46 didn't return from function 'migrate' because 1a
49class MigratableResource(Generic[T], abc.ABC): 1a
50 @property 1a
51 @abc.abstractmethod 1a
52 def source_id(self) -> uuid.UUID: ... 52 ↛ exitline 52 didn't return from function 'source_id' because 1a
54 @property 1a
55 @abc.abstractmethod 1a
56 def destination_id(self) -> uuid.UUID | None: ... 56 ↛ exitline 56 didn't return from function 'destination_id' because 1a
58 # Using this construct method because we may want to persist a serialized version of the object
59 # to disk and reload it later to avoid using too much memory.
60 @classmethod 1a
61 @abc.abstractmethod 1a
62 async def construct(cls, obj: T) -> "MigratableResource[T]": ... 62 ↛ exitline 62 didn't return from function 'construct' because 1a
64 @abc.abstractmethod 1a
65 async def get_dependencies(self) -> "list[MigratableProtocol]": ... 65 ↛ exitline 65 didn't return from function 'get_dependencies' because 1a
67 @classmethod 1a
68 @abc.abstractmethod 1a
69 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[T] | None": ... 69 ↛ exitline 69 didn't return from function 'get_instance' because 1a
71 @abc.abstractmethod 1a
72 async def migrate(self) -> None: ... 72 ↛ exitline 72 didn't return from function 'migrate' because 1a
74 def __str__(self) -> str: 1a
75 return f"{type(self).__name__}(source_id={self.source_id})"