Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/base.py: 80%

35 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import uuid 1a

5from typing import Generic, Protocol, TypeVar, Union 1a

6 

7from prefect.client.schemas.objects import ( 1a

8 BlockDocument, 

9 BlockSchema, 

10 BlockType, 

11 Flow, 

12 Variable, 

13 WorkPool, 

14 WorkQueue, 

15) 

16from prefect.client.schemas.responses import ( 1a

17 DeploymentResponse, 

18 GlobalConcurrencyLimitResponse, 

19) 

20from prefect.events.schemas.automations import Automation 1a

21 

22MigratableType = Union[ 1a

23 WorkPool, 

24 WorkQueue, 

25 DeploymentResponse, 

26 Flow, 

27 BlockType, 

28 BlockSchema, 

29 BlockDocument, 

30 Automation, 

31 GlobalConcurrencyLimitResponse, 

32 Variable, 

33] 

34 

35T = TypeVar("T", bound=MigratableType) 1a

36 

37 

38class MigratableProtocol(Protocol): 1a

39 @property 1a

40 def source_id(self) -> uuid.UUID: ... 40 ↛ exitline 40 didn't return from function 'source_id' because 1a

41 

42 @property 1a

43 def destination_id(self) -> uuid.UUID | None: ... 43 ↛ exitline 43 didn't return from function 'destination_id' because 1a

44 

45 async def get_dependencies(self) -> list["MigratableProtocol"]: ... 45 ↛ exitline 45 didn't return from function 'get_dependencies' because 1a

46 async def migrate(self) -> None: ... 46 ↛ exitline 46 didn't return from function 'migrate' because 1a

47 

48 

49class MigratableResource(Generic[T], abc.ABC): 1a

50 @property 1a

51 @abc.abstractmethod 1a

52 def source_id(self) -> uuid.UUID: ... 52 ↛ exitline 52 didn't return from function 'source_id' because 1a

53 

54 @property 1a

55 @abc.abstractmethod 1a

56 def destination_id(self) -> uuid.UUID | None: ... 56 ↛ exitline 56 didn't return from function 'destination_id' because 1a

57 

58 # Using this construct method because we may want to persist a serialized version of the object 

59 # to disk and reload it later to avoid using too much memory. 

60 @classmethod 1a

61 @abc.abstractmethod 1a

62 async def construct(cls, obj: T) -> "MigratableResource[T]": ... 62 ↛ exitline 62 didn't return from function 'construct' because 1a

63 

64 @abc.abstractmethod 1a

65 async def get_dependencies(self) -> "list[MigratableProtocol]": ... 65 ↛ exitline 65 didn't return from function 'get_dependencies' because 1a

66 

67 @classmethod 1a

68 @abc.abstractmethod 1a

69 async def get_instance(cls, id: uuid.UUID) -> "MigratableResource[T] | None": ... 69 ↛ exitline 69 didn't return from function 'get_instance' because 1a

70 

71 @abc.abstractmethod 1a

72 async def migrate(self) -> None: ... 72 ↛ exitline 72 didn't return from function 'migrate' because 1a

73 

74 def __str__(self) -> str: 1a

75 return f"{type(self).__name__}(source_id={self.source_id})"