Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/deployments.py: 0%

87 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 

2 

3import uuid 

4 

5from typing_extensions import Self 

6 

7from prefect.cli.transfer._exceptions import TransferSkipped 

8from prefect.cli.transfer._migratable_resources import construct_migratable_resource 

9from prefect.cli.transfer._migratable_resources.base import ( 

10 MigratableProtocol, 

11 MigratableResource, 

12) 

13from prefect.cli.transfer._migratable_resources.blocks import MigratableBlockDocument 

14from prefect.cli.transfer._migratable_resources.flows import MigratableFlow 

15from prefect.cli.transfer._migratable_resources.work_pools import MigratableWorkPool 

16from prefect.cli.transfer._migratable_resources.work_queues import MigratableWorkQueue 

17from prefect.client.orchestration import get_client 

18from prefect.client.schemas.actions import DeploymentScheduleCreate 

19from prefect.client.schemas.responses import DeploymentResponse 

20from prefect.exceptions import ( 

21 ObjectAlreadyExists, 

22 ObjectLimitReached, 

23) 

24 

25 

26class MigratableDeployment(MigratableResource[DeploymentResponse]): 

27 _instances: dict[uuid.UUID, Self] = {} 

28 

29 def __init__(self, deployment: DeploymentResponse): 

30 self.source_deployment = deployment 

31 self.destination_deployment: DeploymentResponse | None = None 

32 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {} 

33 

34 @property 

35 def source_id(self) -> uuid.UUID: 

36 return self.source_deployment.id 

37 

38 @property 

39 def destination_id(self) -> uuid.UUID | None: 

40 return self.destination_deployment.id if self.destination_deployment else None 

41 

42 @classmethod 

43 async def construct(cls, obj: DeploymentResponse) -> Self: 

44 if obj.id in cls._instances: 

45 return cls._instances[obj.id] 

46 instance = cls(obj) 

47 cls._instances[obj.id] = instance 

48 return instance 

49 

50 @classmethod 

51 async def get_instance( 

52 cls, id: uuid.UUID 

53 ) -> "MigratableResource[DeploymentResponse] | None": 

54 if id in cls._instances: 

55 return cls._instances[id] 

56 return None 

57 

58 async def get_dependencies(self) -> "list[MigratableProtocol]": 

59 if self._dependencies: 

60 return list(self._dependencies.values()) 

61 

62 async with get_client() as client: 

63 if dependency := await MigratableFlow.get_instance( 

64 id=self.source_deployment.flow_id 

65 ): 

66 self._dependencies[self.source_deployment.flow_id] = dependency 

67 else: 

68 flow = await client.read_flow(self.source_deployment.flow_id) 

69 self._dependencies[ 

70 self.source_deployment.flow_id 

71 ] = await construct_migratable_resource(flow) 

72 if self.source_deployment.work_queue_id is not None: 

73 if dependency := await MigratableWorkQueue.get_instance( 

74 id=self.source_deployment.work_queue_id 

75 ): 

76 self._dependencies[self.source_deployment.work_queue_id] = ( 

77 dependency 

78 ) 

79 else: 

80 work_queue = await client.read_work_queue( 

81 self.source_deployment.work_queue_id 

82 ) 

83 self._dependencies[ 

84 work_queue.id 

85 ] = await construct_migratable_resource(work_queue) 

86 if self.source_deployment.work_pool_name is not None: 

87 if dependency := await MigratableWorkPool.get_instance_by_name( 

88 name=self.source_deployment.work_pool_name 

89 ): 

90 self._dependencies[dependency.source_id] = dependency 

91 else: 

92 work_pool = await client.read_work_pool( 

93 self.source_deployment.work_pool_name 

94 ) 

95 self._dependencies[ 

96 work_pool.id 

97 ] = await construct_migratable_resource(work_pool) 

98 if self.source_deployment.storage_document_id is not None: 

99 if dependency := await MigratableBlockDocument.get_instance( 

100 id=self.source_deployment.storage_document_id 

101 ): 

102 self._dependencies[self.source_deployment.storage_document_id] = ( 

103 dependency 

104 ) 

105 else: 

106 storage_document = await client.read_block_document( 

107 self.source_deployment.storage_document_id 

108 ) 

109 self._dependencies[ 

110 storage_document.id 

111 ] = await construct_migratable_resource(storage_document) 

112 if self.source_deployment.infrastructure_document_id is not None: 

113 if dependency := await MigratableBlockDocument.get_instance( 

114 id=self.source_deployment.infrastructure_document_id 

115 ): 

116 self._dependencies[ 

117 self.source_deployment.infrastructure_document_id 

118 ] = dependency 

119 else: 

120 infrastructure_document = await client.read_block_document( 

121 self.source_deployment.infrastructure_document_id 

122 ) 

123 self._dependencies[ 

124 infrastructure_document.id 

125 ] = await construct_migratable_resource(infrastructure_document) 

126 if self.source_deployment.pull_steps: 

127 # TODO: Figure out how to find block document references in pull steps 

128 pass 

129 

130 return list(self._dependencies.values()) 

131 

132 async def migrate(self) -> None: 

133 async with get_client() as client: 

134 try: 

135 if ( 

136 destination_flow_id := getattr( 

137 self._dependencies.get(self.source_deployment.flow_id), 

138 "destination_id", 

139 None, 

140 ) 

141 ) is None: 

142 raise ValueError("Unable to find destination flow") 

143 if ( 

144 self.source_deployment.storage_document_id 

145 and ( 

146 destination_storage_document_id := getattr( 

147 self._dependencies.get( 

148 self.source_deployment.storage_document_id 

149 ), 

150 "destination_id", 

151 None, 

152 ) 

153 ) 

154 is None 

155 ): 

156 raise ValueError("Unable to find destination storage document") 

157 else: 

158 destination_storage_document_id = None 

159 if ( 

160 self.source_deployment.infrastructure_document_id 

161 and ( 

162 destination_infrastructure_document_id := getattr( 

163 self._dependencies.get( 

164 self.source_deployment.infrastructure_document_id 

165 ), 

166 "destination_id", 

167 None, 

168 ) 

169 ) 

170 is None 

171 ): 

172 raise ValueError( 

173 "Unable to find destination infrastructure document" 

174 ) 

175 else: 

176 destination_infrastructure_document_id = None 

177 

178 destination_deployment_id = await client.create_deployment( 

179 flow_id=destination_flow_id, 

180 name=self.source_deployment.name, 

181 version=self.source_deployment.version, 

182 version_info=self.source_deployment.version_info, 

183 schedules=[ 

184 DeploymentScheduleCreate( 

185 schedule=schedule.schedule, 

186 active=schedule.active, 

187 max_scheduled_runs=schedule.max_scheduled_runs, 

188 parameters=schedule.parameters, 

189 slug=schedule.slug, 

190 ) 

191 for schedule in self.source_deployment.schedules 

192 ], 

193 concurrency_limit=self.source_deployment.concurrency_limit, 

194 concurrency_options=self.source_deployment.concurrency_options, 

195 parameters=self.source_deployment.parameters, 

196 description=self.source_deployment.description, 

197 work_queue_name=self.source_deployment.work_queue_name, 

198 work_pool_name=self.source_deployment.work_pool_name, 

199 tags=self.source_deployment.tags, 

200 storage_document_id=destination_storage_document_id, 

201 path=self.source_deployment.path, 

202 entrypoint=self.source_deployment.entrypoint, 

203 infrastructure_document_id=destination_infrastructure_document_id, 

204 parameter_openapi_schema=self.source_deployment.parameter_openapi_schema, 

205 paused=self.source_deployment.paused, 

206 pull_steps=self.source_deployment.pull_steps, 

207 enforce_parameter_schema=self.source_deployment.enforce_parameter_schema, 

208 job_variables=self.source_deployment.job_variables, 

209 branch=self.source_deployment.branch, 

210 base=self.source_deployment.base, 

211 root=self.source_deployment.root, 

212 ) 

213 self.destination_deployment = await client.read_deployment( 

214 destination_deployment_id 

215 ) 

216 except ObjectLimitReached: 

217 raise TransferSkipped("Deployment limit reached (upgrade tier)") 

218 except ObjectAlreadyExists: 

219 self.destination_deployment = await client.read_deployment( 

220 self.source_deployment.id 

221 ) 

222 raise TransferSkipped("Already exists")