Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/_migratable_resources/blocks.py: 0%

181 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 

2 

3import uuid 

4from typing import Any, cast 

5 

6from typing_extensions import Self 

7 

8from prefect.cli.transfer._exceptions import TransferSkipped 

9from prefect.cli.transfer._migratable_resources import construct_migratable_resource 

10from prefect.cli.transfer._migratable_resources.base import ( 

11 MigratableProtocol, 

12 MigratableResource, 

13) 

14from prefect.client.orchestration import get_client 

15from prefect.client.schemas.actions import ( 

16 BlockDocumentCreate, 

17 BlockSchemaCreate, 

18 BlockTypeCreate, 

19) 

20from prefect.client.schemas.objects import ( 

21 BlockDocument, 

22 BlockSchema, 

23 BlockType, 

24) 

25from prefect.exceptions import ( 

26 ObjectAlreadyExists, 

27) 

28 

29 

30class MigratableBlockType(MigratableResource[BlockType]): 

31 _instances: dict[uuid.UUID, Self] = {} 

32 

33 def __init__(self, block_type: BlockType): 

34 self.source_block_type = block_type 

35 self.destination_block_type: BlockType | None = None 

36 

37 @property 

38 def source_id(self) -> uuid.UUID: 

39 return self.source_block_type.id 

40 

41 @property 

42 def destination_id(self) -> uuid.UUID | None: 

43 return self.destination_block_type.id if self.destination_block_type else None 

44 

45 @classmethod 

46 async def construct(cls, obj: BlockType) -> Self: 

47 if obj.id in cls._instances: 

48 return cls._instances[obj.id] 

49 instance = cls(obj) 

50 cls._instances[obj.id] = instance 

51 return instance 

52 

53 @classmethod 

54 async def get_instance( 

55 cls, id: uuid.UUID 

56 ) -> "MigratableResource[BlockType] | None": 

57 if id in cls._instances: 

58 return cls._instances[id] 

59 return None 

60 

61 async def get_dependencies(self) -> "list[MigratableProtocol]": 

62 return [] 

63 

64 async def migrate(self) -> None: 

65 async with get_client() as client: 

66 try: 

67 block_type = await client.create_block_type( 

68 block_type=BlockTypeCreate( 

69 name=self.source_block_type.name, 

70 slug=self.source_block_type.slug, 

71 ), 

72 ) 

73 self.destination_block_type = block_type 

74 except ObjectAlreadyExists: 

75 self.destination_block_type = await client.read_block_type_by_slug( 

76 self.source_block_type.slug 

77 ) 

78 raise TransferSkipped("Already exists") 

79 

80 

81class MigratableBlockSchema(MigratableResource[BlockSchema]): 

82 _instances: dict[uuid.UUID, Self] = {} 

83 

84 def __init__(self, block_schema: BlockSchema): 

85 self.source_block_schema = block_schema 

86 self.destination_block_schema: BlockSchema | None = None 

87 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {} 

88 

89 @property 

90 def source_id(self) -> uuid.UUID: 

91 return self.source_block_schema.id 

92 

93 @property 

94 def destination_id(self) -> uuid.UUID | None: 

95 return ( 

96 self.destination_block_schema.id if self.destination_block_schema else None 

97 ) 

98 

99 @classmethod 

100 async def construct(cls, obj: BlockSchema) -> Self: 

101 if obj.id in cls._instances: 

102 return cls._instances[obj.id] 

103 instance = cls(obj) 

104 cls._instances[obj.id] = instance 

105 return instance 

106 

107 @classmethod 

108 async def get_instance( 

109 cls, id: uuid.UUID 

110 ) -> "MigratableResource[BlockSchema] | None": 

111 if id in cls._instances: 

112 return cls._instances[id] 

113 return None 

114 

115 async def get_dependencies(self) -> "list[MigratableProtocol]": 

116 if self._dependencies: 

117 return list(self._dependencies.values()) 

118 

119 async with get_client() as client: 

120 if self.source_block_schema.block_type is not None: 

121 if dependency := await MigratableBlockType.get_instance( 

122 id=self.source_block_schema.block_type.id 

123 ): 

124 self._dependencies[self.source_block_schema.block_type.id] = ( 

125 dependency 

126 ) 

127 else: 

128 self._dependencies[ 

129 self.source_block_schema.block_type.id 

130 ] = await construct_migratable_resource( 

131 self.source_block_schema.block_type 

132 ) 

133 elif self.source_block_schema.block_type_id is not None: 

134 if dependency := await MigratableBlockType.get_instance( 

135 id=self.source_block_schema.block_type_id 

136 ): 

137 self._dependencies[self.source_block_schema.block_type_id] = ( 

138 dependency 

139 ) 

140 else: 

141 response = await client.request( 

142 "GET", 

143 "/block_types/{id}", 

144 params={"id": self.source_block_schema.block_type_id}, 

145 ) 

146 block_type = BlockType.model_validate(response.json()) 

147 self._dependencies[ 

148 block_type.id 

149 ] = await construct_migratable_resource(block_type) 

150 else: 

151 raise ValueError("Block schema has no associated block type") 

152 

153 block_schema_references: dict[str, dict[str, Any]] = ( 

154 self.source_block_schema.fields.get("block_schema_references", {}) 

155 ) 

156 for block_schema_reference in block_schema_references.values(): 

157 if isinstance(block_schema_reference, list): 

158 for nested_block_schema_reference in block_schema_reference: 

159 if block_schema_checksum := cast( 

160 dict[str, str], nested_block_schema_reference 

161 ).get("block_schema_checksum"): 

162 block_schema = await client.read_block_schema_by_checksum( 

163 block_schema_checksum 

164 ) 

165 if dependency := await MigratableBlockSchema.get_instance( 

166 id=block_schema.id 

167 ): 

168 self._dependencies[block_schema.id] = dependency 

169 else: 

170 self._dependencies[ 

171 block_schema.id 

172 ] = await construct_migratable_resource(block_schema) 

173 else: 

174 if block_schema_checksum := block_schema_reference.get( 

175 "block_schema_checksum" 

176 ): 

177 block_schema = await client.read_block_schema_by_checksum( 

178 block_schema_checksum 

179 ) 

180 if dependency := await MigratableBlockSchema.get_instance( 

181 id=block_schema.id 

182 ): 

183 self._dependencies[block_schema.id] = dependency 

184 else: 

185 self._dependencies[ 

186 block_schema.id 

187 ] = await construct_migratable_resource(block_schema) 

188 

189 return list(self._dependencies.values()) 

190 

191 async def migrate(self) -> None: 

192 if self.source_block_schema.block_type_id is None: 

193 raise ValueError("Block schema has no associated block type") 

194 if ( 

195 destination_block_type := self._dependencies.get( 

196 self.source_block_schema.block_type_id 

197 ) 

198 ) is None: 

199 raise ValueError("Unable to find destination block type") 

200 async with get_client() as client: 

201 try: 

202 self.destination_block_schema = await client.create_block_schema( 

203 block_schema=BlockSchemaCreate( 

204 fields=self.source_block_schema.fields, 

205 block_type_id=destination_block_type.destination_id, 

206 capabilities=self.source_block_schema.capabilities, 

207 version=self.source_block_schema.version, 

208 ), 

209 ) 

210 except ObjectAlreadyExists: 

211 self.destination_block_schema = ( 

212 await client.read_block_schema_by_checksum( 

213 self.source_block_schema.checksum 

214 ) 

215 ) 

216 raise TransferSkipped("Already exists") 

217 

218 

219class MigratableBlockDocument(MigratableResource[BlockDocument]): 

220 _instances: dict[uuid.UUID, Self] = {} 

221 

222 def __init__(self, block_document: BlockDocument): 

223 self.source_block_document = block_document 

224 self.destination_block_document: BlockDocument | None = None 

225 self._dependencies: dict[uuid.UUID, MigratableProtocol] = {} 

226 

227 @property 

228 def source_id(self) -> uuid.UUID: 

229 return self.source_block_document.id 

230 

231 @property 

232 def destination_id(self) -> uuid.UUID | None: 

233 return ( 

234 self.destination_block_document.id 

235 if self.destination_block_document 

236 else None 

237 ) 

238 

239 @classmethod 

240 async def construct(cls, obj: BlockDocument) -> Self: 

241 if obj.id in cls._instances: 

242 return cls._instances[obj.id] 

243 instance = cls(obj) 

244 cls._instances[obj.id] = instance 

245 return instance 

246 

247 @classmethod 

248 async def get_instance( 

249 cls, id: uuid.UUID 

250 ) -> "MigratableResource[BlockDocument] | None": 

251 if id in cls._instances: 

252 return cls._instances[id] 

253 return None 

254 

255 async def get_dependencies(self) -> "list[MigratableProtocol]": 

256 if self._dependencies: 

257 return list(self._dependencies.values()) 

258 

259 # TODO: When we write serialized versions of the objects to disk, we should have a way to 

260 # use a client, but read from disk if the object has already been fetched. 

261 async with get_client() as client: 

262 if self.source_block_document.block_type is not None: 

263 if dependency := await MigratableBlockType.get_instance( 

264 id=self.source_block_document.block_type.id 

265 ): 

266 self._dependencies[self.source_block_document.block_type.id] = ( 

267 dependency 

268 ) 

269 else: 

270 self._dependencies[ 

271 self.source_block_document.block_type.id 

272 ] = await construct_migratable_resource( 

273 self.source_block_document.block_type 

274 ) 

275 else: 

276 if dependency := await MigratableBlockType.get_instance( 

277 id=self.source_block_document.block_type_id 

278 ): 

279 self._dependencies[self.source_block_document.block_type_id] = ( 

280 dependency 

281 ) 

282 else: 

283 response = await client.request( 

284 "GET", 

285 "/block_types/{id}", 

286 params={"id": self.source_block_document.block_type_id}, 

287 ) 

288 block_type = BlockType.model_validate(response.json()) 

289 self._dependencies[ 

290 block_type.id 

291 ] = await construct_migratable_resource(block_type) 

292 

293 if self.source_block_document.block_schema is not None: 

294 if dependency := await MigratableBlockSchema.get_instance( 

295 id=self.source_block_document.block_schema.id 

296 ): 

297 self._dependencies[self.source_block_document.block_schema.id] = ( 

298 dependency 

299 ) 

300 else: 

301 self._dependencies[ 

302 self.source_block_document.block_schema.id 

303 ] = await construct_migratable_resource( 

304 self.source_block_document.block_schema 

305 ) 

306 else: 

307 if dependency := await MigratableBlockSchema.get_instance( 

308 id=self.source_block_document.block_schema_id 

309 ): 

310 self._dependencies[self.source_block_document.block_schema_id] = ( 

311 dependency 

312 ) 

313 else: 

314 response = await client.request( 

315 "GET", 

316 "/block_schemas/{id}", 

317 params={"id": self.source_block_document.block_schema_id}, 

318 ) 

319 block_schema = BlockSchema.model_validate(response.json()) 

320 self._dependencies[ 

321 block_schema.id 

322 ] = await construct_migratable_resource(block_schema) 

323 

324 if self.source_block_document.block_document_references: 

325 for ( 

326 block_document_reference 

327 ) in self.source_block_document.block_document_references.values(): 

328 if block_document_id := block_document_reference.get( 

329 "block_document_id" 

330 ): 

331 if dependency := await MigratableBlockDocument.get_instance( 

332 id=block_document_id 

333 ): 

334 self._dependencies[block_document_id] = dependency 

335 else: 

336 block_document = await client.read_block_document( 

337 block_document_id 

338 ) 

339 self._dependencies[ 

340 block_document.id 

341 ] = await construct_migratable_resource(block_document) 

342 

343 return list(self._dependencies.values()) 

344 

345 async def migrate(self) -> None: 

346 if ( 

347 destination_block_type := self._dependencies.get( 

348 self.source_block_document.block_type_id 

349 ) 

350 ) is None or not destination_block_type.destination_id: 

351 raise ValueError("Unable to find destination block type") 

352 if ( 

353 destination_block_schema := self._dependencies.get( 

354 self.source_block_document.block_schema_id 

355 ) 

356 ) is None or not destination_block_schema.destination_id: 

357 raise ValueError("Unable to find destination block schema") 

358 

359 async with get_client() as client: 

360 try: 

361 # TODO: Check if data needs to be written differently to maintain composition 

362 self.destination_block_document = await client.create_block_document( 

363 block_document=BlockDocumentCreate( 

364 name=self.source_block_document.name, 

365 block_type_id=destination_block_type.destination_id, 

366 block_schema_id=destination_block_schema.destination_id, 

367 data=self.source_block_document.data, 

368 ), 

369 ) 

370 except ObjectAlreadyExists: 

371 if self.source_block_document.name is None: 

372 # This is technically impossible, but our typing thinks it's possible 

373 raise ValueError( 

374 "Block document has no name, which should be impossible. " 

375 "Please report this as a bug." 

376 ) 

377 

378 if self.source_block_document.block_type is not None: 

379 block_type_slug = self.source_block_document.block_type.slug 

380 else: 

381 # TODO: Add real client methods for places where we use `client.request` 

382 response = await client.request( 

383 "GET", 

384 "/block_types/{id}", 

385 params={"id": self.source_block_document.block_type_id}, 

386 ) 

387 block_type = BlockType.model_validate(response.json()) 

388 block_type_slug = block_type.slug 

389 

390 self.destination_block_document = ( 

391 await client.read_block_document_by_name( 

392 block_type_slug=block_type_slug, 

393 name=self.source_block_document.name, 

394 ) 

395 ) 

396 raise TransferSkipped("Already exists")