Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/__init__.py: 33%

65 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import TYPE_CHECKING, overload 1a

2from prefect.cli.transfer._migratable_resources.base import ( 1a

3 MigratableProtocol, 

4 MigratableType, 

5) 

6 

7 

8if TYPE_CHECKING: 8 ↛ 9line 8 didn't jump to line 9 because the condition on line 8 was never true1a

9 from prefect.client.schemas.objects import ( 

10 BlockDocument, 

11 BlockSchema, 

12 BlockType, 

13 Flow, 

14 WorkPool, 

15 WorkQueue, 

16 Variable, 

17 ) 

18 from prefect.client.schemas.responses import ( 

19 DeploymentResponse, 

20 GlobalConcurrencyLimitResponse, 

21 ) 

22 from prefect.events.schemas.automations import Automation 

23 

24 from prefect.cli.transfer._migratable_resources.blocks import ( 

25 MigratableBlockDocument, 

26 MigratableBlockSchema, 

27 MigratableBlockType, 

28 ) 

29 from prefect.cli.transfer._migratable_resources.concurrency_limits import ( 

30 MigratableGlobalConcurrencyLimit, 

31 ) 

32 from prefect.cli.transfer._migratable_resources.deployments import ( 

33 MigratableDeployment, 

34 ) 

35 from prefect.cli.transfer._migratable_resources.flows import MigratableFlow 

36 from prefect.cli.transfer._migratable_resources.variables import MigratableVariable 

37 from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool 

38 from prefect.cli.transfer._migratable_resources.work_queues import ( 

39 MigratableWorkQueue, 

40 ) 

41 from prefect.cli.transfer._migratable_resources.automations import ( 

42 MigratableAutomation, 

43 ) 

44 

45 

46@overload 1a

47async def construct_migratable_resource(obj: "WorkPool") -> "MigratableWorkPool": ... 47 ↛ exitline 47 didn't return from function 'construct_migratable_resource' because 1a

48@overload 1a

49async def construct_migratable_resource(obj: "WorkQueue") -> "MigratableWorkQueue": ... 49 ↛ exitline 49 didn't return from function 'construct_migratable_resource' because 1a

50@overload 1a

51async def construct_migratable_resource( 51 ↛ exitline 51 didn't return from function 'construct_migratable_resource' because 1a

52 obj: "DeploymentResponse", 

53) -> "MigratableDeployment": ... 

54@overload 1a

55async def construct_migratable_resource(obj: "Flow") -> "MigratableFlow": ... 55 ↛ exitline 55 didn't return from function 'construct_migratable_resource' because 1a

56@overload 1a

57async def construct_migratable_resource(obj: "BlockType") -> "MigratableBlockType": ... 57 ↛ exitline 57 didn't return from function 'construct_migratable_resource' because 1a

58@overload 1a

59async def construct_migratable_resource( 59 ↛ exitline 59 didn't return from function 'construct_migratable_resource' because 1a

60 obj: "BlockSchema", 

61) -> "MigratableBlockSchema": ... 

62@overload 1a

63async def construct_migratable_resource( 63 ↛ exitline 63 didn't return from function 'construct_migratable_resource' because 1a

64 obj: "BlockDocument", 

65) -> "MigratableBlockDocument": ... 

66@overload 1a

67async def construct_migratable_resource( 67 ↛ exitline 67 didn't return from function 'construct_migratable_resource' because 1a

68 obj: "Automation", 

69) -> "MigratableAutomation": ... 

70@overload 1a

71async def construct_migratable_resource( 71 ↛ exitline 71 didn't return from function 'construct_migratable_resource' because 1a

72 obj: "GlobalConcurrencyLimitResponse", 

73) -> "MigratableGlobalConcurrencyLimit": ... 

74@overload 1a

75async def construct_migratable_resource(obj: "Variable") -> "MigratableVariable": ... 75 ↛ exitline 75 didn't return from function 'construct_migratable_resource' because 1a

76 

77 

78async def construct_migratable_resource( 1a

79 obj: MigratableType, 

80) -> MigratableProtocol: 

81 from prefect.client.schemas.objects import ( 

82 BlockDocument, 

83 BlockSchema, 

84 BlockType, 

85 Flow, 

86 WorkPool, 

87 WorkQueue, 

88 ) 

89 from prefect.client.schemas.responses import ( 

90 DeploymentResponse, 

91 GlobalConcurrencyLimitResponse, 

92 ) 

93 from prefect.events.schemas.automations import Automation 

94 

95 from prefect.cli.transfer._migratable_resources.blocks import ( 

96 MigratableBlockDocument, 

97 MigratableBlockSchema, 

98 MigratableBlockType, 

99 ) 

100 from prefect.cli.transfer._migratable_resources.concurrency_limits import ( 

101 MigratableGlobalConcurrencyLimit, 

102 ) 

103 from prefect.cli.transfer._migratable_resources.deployments import ( 

104 MigratableDeployment, 

105 ) 

106 from prefect.cli.transfer._migratable_resources.flows import MigratableFlow 

107 from prefect.cli.transfer._migratable_resources.variables import MigratableVariable 

108 from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool 

109 from prefect.cli.transfer._migratable_resources.work_queues import ( 

110 MigratableWorkQueue, 

111 ) 

112 from prefect.cli.transfer._migratable_resources.automations import ( 

113 MigratableAutomation, 

114 ) 

115 

116 if isinstance(obj, WorkPool): 

117 return await MigratableWorkPool.construct(obj) 

118 elif isinstance(obj, WorkQueue): 

119 return await MigratableWorkQueue.construct(obj) 

120 elif isinstance(obj, DeploymentResponse): 

121 return await MigratableDeployment.construct(obj) 

122 elif isinstance(obj, Flow): 

123 return await MigratableFlow.construct(obj) 

124 elif isinstance(obj, BlockType): 

125 return await MigratableBlockType.construct(obj) 

126 elif isinstance(obj, BlockSchema): 

127 return await MigratableBlockSchema.construct(obj) 

128 elif isinstance(obj, BlockDocument): 

129 return await MigratableBlockDocument.construct(obj) 

130 elif isinstance(obj, Automation): 

131 return await MigratableAutomation.construct(obj) 

132 elif isinstance(obj, GlobalConcurrencyLimitResponse): 

133 return await MigratableGlobalConcurrencyLimit.construct(obj) 

134 else: 

135 return await MigratableVariable.construct(obj)