Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/automations.py: 0%

100 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources import construct_migratable_resource 

9from prefect.cli.transfer._migratable_resources.base import ( 

10 MigratableProtocol, 

11 MigratableResource, 

12) 

13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument 

14from prefect.cli.transfer._migratable_resources.deployments import MigratableDeployment 

15from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool 

16from prefect.cli.transfer._migratable_resources.work_queues import MigratableWorkQueue 

17from prefect.client.orchestration import get_client 

18from prefect.client.schemas.filters import ( 

19 WorkPoolFilter, 

20 WorkPoolFilterId, 

21) 

22from prefect.events.actions import ( 

23 AutomationAction, 

24 CallWebhook, 

25 DeploymentAction, 

26 SendNotification, 

27 WorkPoolAction, 

28 WorkQueueAction, 

29) 

30from prefect.events.schemas.automations import Automation, AutomationCore 

31 

32 

33class MigratableAutomation(MigratableResource[Automation]): 

34 _instances: dict[uuid.UUID, Self] = {} 

35 

36 def __init__(self, automation: Automation): 

37 self.source_automation = automation 

38 self.destination_automation: Automation | None = None 

39 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {} 

40 

41 @property 

42 def source_id(self) -> uuid.UUID: 

43 return self.source_automation.id 

44 

45 @property 

46 def destination_id(self) -> uuid.UUID | None: 

47 return self.destination_automation.id if self.destination_automation else None 

48 

49 @classmethod 

50 async def construct(cls, obj: Automation) -> Self: 

51 if obj.id in cls._instances: 

52 return cls._instances[obj.id] 

53 instance = cls(obj) 

54 cls._instances[obj.id] = instance 

55 return instance 

56 

57 @classmethod 

58 async def get_instance( 

59 cls, id: uuid.UUID 

60 ) -> "MigratableResource[Automation] | None": 

61 if id in cls._instances: 

62 return cls._instances[id] 

63 return None 

64 

65 async def get_dependencies(self) -> "list[MigratableProtocol]": 

66 if self._dependencies: 

67 return list(self._dependencies.values()) 

68 

69 async with get_client() as client: 

70 for action in self.source_automation.actions: 

71 if ( 

72 isinstance(action, DeploymentAction) 

73 and action.deployment_id is not None 

74 ): 

75 if dependency := await MigratableDeployment.get_instance( 

76 id=action.deployment_id 

77 ): 

78 self._dependencies[action.deployment_id] = dependency 

79 else: 

80 deployment = await client.read_deployment(action.deployment_id) 

81 self._dependencies[ 

82 deployment.id 

83 ] = await construct_migratable_resource(deployment) 

84 elif ( 

85 isinstance(action, WorkPoolAction) 

86 and action.work_pool_id is not None 

87 ): 

88 # TODO: Find a better way to get a work pool by id 

89 if dependency := await MigratableWorkPool.get_instance( 

90 id=action.work_pool_id 

91 ): 

92 self._dependencies[action.work_pool_id] = dependency 

93 else: 

94 work_pool = await client.read_work_pools( 

95 work_pool_filter=WorkPoolFilter( 

96 id=WorkPoolFilterId(any_=[action.work_pool_id]) 

97 ) 

98 ) 

99 if work_pool: 

100 self._dependencies[ 

101 work_pool[0].id 

102 ] = await construct_migratable_resource(work_pool[0]) 

103 elif ( 

104 isinstance(action, WorkQueueAction) 

105 and action.work_queue_id is not None 

106 ): 

107 if dependency := await MigratableWorkQueue.get_instance( 

108 id=action.work_queue_id 

109 ): 

110 self._dependencies[action.work_queue_id] = dependency 

111 else: 

112 work_queue = await client.read_work_queue(action.work_queue_id) 

113 self._dependencies[ 

114 work_queue.id 

115 ] = await construct_migratable_resource(work_queue) 

116 elif ( 

117 isinstance(action, AutomationAction) 

118 and action.automation_id is not None 

119 ): 

120 if dependency := await MigratableAutomation.get_instance( 

121 id=action.automation_id 

122 ): 

123 self._dependencies[action.automation_id] = dependency 

124 else: 

125 automation = await client.find_automation(action.automation_id) 

126 if automation: 

127 self._dependencies[ 

128 automation.id 

129 ] = await construct_migratable_resource(automation) 

130 elif isinstance(action, CallWebhook): 

131 if dependency := await MigratableBlockDocument.get_instance( 

132 id=action.block_document_id 

133 ): 

134 self._dependencies[action.block_document_id] = dependency 

135 else: 

136 block_document = await client.read_block_document( 

137 action.block_document_id 

138 ) 

139 self._dependencies[ 

140 block_document.id 

141 ] = await construct_migratable_resource(block_document) 

142 elif isinstance(action, SendNotification): 

143 if dependency := await MigratableBlockDocument.get_instance( 

144 id=action.block_document_id 

145 ): 

146 self._dependencies[action.block_document_id] = dependency 

147 else: 

148 block_document = await client.read_block_document( 

149 action.block_document_id 

150 ) 

151 self._dependencies[ 

152 block_document.id 

153 ] = await construct_migratable_resource(block_document) 

154 return list(self._dependencies.values()) 

155 

156 async def migrate(self) -> None: 

157 async with get_client() as client: 

158 automations = await client.read_automations_by_name( 

159 name=self.source_automation.name 

160 ) 

161 if automations: 

162 self.destination_automation = automations[0] 

163 raise TransferSkipped("Already exists") 

164 else: 

165 automation_copy = AutomationCore.model_validate( 

166 self.source_automation.model_dump(mode="json") 

167 ) 

168 for action in automation_copy.actions: 

169 if ( 

170 isinstance(action, DeploymentAction) 

171 and action.deployment_id is not None 

172 ): 

173 action.deployment_id = self._dependencies[ 

174 action.deployment_id 

175 ].destination_id 

176 elif ( 

177 isinstance(action, WorkPoolAction) 

178 and action.work_pool_id is not None 

179 ): 

180 action.work_pool_id = self._dependencies[ 

181 action.work_pool_id 

182 ].destination_id 

183 elif ( 

184 isinstance(action, WorkQueueAction) 

185 and action.work_queue_id is not None 

186 ): 

187 action.work_queue_id = self._dependencies[ 

188 action.work_queue_id 

189 ].destination_id 

190 elif ( 

191 isinstance(action, AutomationAction) 

192 and action.automation_id is not None 

193 ): 

194 action.automation_id = self._dependencies[ 

195 action.automation_id 

196 ].destination_id 

197 elif isinstance(action, CallWebhook): 

198 if destination_block_document_id := getattr( 

199 self._dependencies.get(action.block_document_id), 

200 "destination_id", 

201 None, 

202 ): 

203 action.block_document_id = destination_block_document_id 

204 elif isinstance(action, SendNotification): 

205 if destination_block_document_id := getattr( 

206 self._dependencies.get(action.block_document_id), 

207 "destination_id", 

208 None, 

209 ): 

210 action.block_document_id = destination_block_document_id 

211 

212 automation_id = await client.create_automation( 

213 automation=automation_copy 

214 ) 

215 self.destination_automation = await client.read_automation( 

216 automation_id=automation_id 

217 )