Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/__init__.py: 17%

139 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for transferring resources between profiles. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8from logging import Logger 1a

9from typing import TYPE_CHECKING, Any, Callable, Sequence 1a

10import uuid 1a

11 

12import typer 1a

13from rich.console import Console 1a

14from rich.panel import Panel 1a

15from rich.progress import ( 1a

16 BarColumn, 

17 Progress, 

18 SpinnerColumn, 

19 TaskProgressColumn, 

20 TextColumn, 

21) 

22from rich.table import Table 1a

23 

24from prefect.cli._utilities import exit_with_error, exit_with_success 1a

25from prefect.cli.root import app, is_interactive 1a

26from prefect.cli.transfer._exceptions import TransferSkipped 1a

27from prefect.cli.transfer._migratable_resources import MigratableType 1a

28from prefect.client.orchestration import PrefectClient, get_client 1a

29from prefect.context import use_profile 1a

30from prefect.events.schemas.events import Event, Resource 1a

31from prefect.logging import get_logger 1a

32from prefect.settings import load_profiles 1a

33from prefect.events import get_events_client 1a

34 

35from ._dag import TransferDAG 1a

36 

37if TYPE_CHECKING: 37 ↛ 40line 37 didn't jump to line 40 because the condition on line 37 was never true1a

38 # we use the forward ref and defer this import because that module imports 

39 # a ton of schemas that we don't want to import here at module load time 

40 from prefect.cli.transfer._migratable_resources import MigratableProtocol 

41 

42logger: Logger = get_logger(__name__) 1a

43 

44 

45@app.command() 1a

46async def transfer( 1a

47 from_profile: str = typer.Option( 

48 ..., "--from", help="Source profile to transfer resources from" 

49 ), 

50 to_profile: str = typer.Option( 

51 ..., "--to", help="Target profile to transfer resources to" 

52 ), 

53): 

54 """ 

55 Transfer resources from one Prefect profile to another. 

56 

57 Automatically handles dependencies between resources and transfers them 

58 in the correct order. 

59 

60 \b 

61 Examples: 

62 \b 

63 Transfer all resources from staging to production: 

64 \b 

65 $ prefect transfer --from staging --to prod 

66 """ 

67 console = Console() 

68 

69 profiles = load_profiles(include_defaults=False) 

70 

71 if from_profile not in profiles: 

72 exit_with_error(f"Source profile '{from_profile}' not found.") 

73 

74 if to_profile not in profiles: 

75 exit_with_error(f"Target profile '{to_profile}' not found.") 

76 

77 if from_profile == to_profile: 

78 exit_with_error("Source and target profiles must be different.") 

79 

80 console.print() 

81 console.print( 

82 Panel( 

83 f"[bold cyan]Source:[/bold cyan] {from_profile}\n" 

84 f"[bold cyan]Target:[/bold cyan] {to_profile}", 

85 title="Transfer Configuration", 

86 expand=False, 

87 padding=(1, 2), 

88 ) 

89 ) 

90 

91 with use_profile(from_profile): 

92 with Progress( 

93 SpinnerColumn(), 

94 TextColumn("[progress.description]{task.description}"), 

95 console=console, 

96 ) as progress: 

97 task = progress.add_task("Collecting resources...", total=None) 

98 async with get_client() as client: 

99 from_url = client.api_url 

100 resources = await _collect_resources(client) 

101 

102 if not resources: 

103 console.print("\n[yellow]No resources found to transfer.[/yellow]") 

104 return 

105 

106 progress.update(task, description="Building dependency graph...") 

107 roots = await _find_root_resources(resources) 

108 dag = TransferDAG() 

109 await dag.build_from_roots(roots) 

110 

111 stats = dag.get_statistics() 

112 

113 if stats["has_cycles"]: 

114 exit_with_error("Cannot transfer resources with circular dependencies.") 

115 

116 console.print() 

117 if is_interactive() and not typer.confirm( 

118 f"Transfer {stats['total_nodes']} resource(s) from '{from_profile}' to '{to_profile}'?" 

119 ): 

120 exit_with_error("Transfer cancelled.") 

121 

122 console.print() 

123 with use_profile(to_profile): 

124 async with get_client() as client: 

125 to_url = client.api_url 

126 async with get_events_client() as events_client: 

127 transfer_id = uuid.uuid4() 

128 await events_client.emit( 

129 event=Event( 

130 event="prefect.workspace.transfer.started", 

131 resource=Resource.model_validate( 

132 { 

133 "prefect.resource.id": f"prefect.workspace.transfer.{transfer_id}", 

134 "prefect.resource.name": f"{from_profile} -> {to_profile} transfer", 

135 "prefect.resource.role": "transfer", 

136 } 

137 ), 

138 payload=dict( 

139 source_profile=from_profile, 

140 target_profile=to_profile, 

141 source_url=str(from_url), 

142 target_url=str(to_url), 

143 total_resources=stats["total_nodes"], 

144 ), 

145 ) 

146 ) 

147 results = await _execute_transfer(dag, console) 

148 

149 succeeded: int = 0 

150 failed: int = 0 

151 skipped: int = 0 

152 

153 for result in results.values(): 

154 if result is None: 

155 succeeded += 1 

156 elif isinstance(result, TransferSkipped): 

157 skipped += 1 

158 else: 

159 failed += 1 

160 

161 await events_client.emit( 

162 event=Event( 

163 event="prefect.workspace.transfer.completed", 

164 resource=Resource.model_validate( 

165 { 

166 "prefect.resource.id": f"prefect.workspace.transfer.{transfer_id}", 

167 "prefect.resource.name": f"{from_profile} -> {to_profile} transfer", 

168 "prefect.resource.role": "transfer", 

169 } 

170 ), 

171 payload=dict( 

172 source_profile=from_profile, 

173 target_profile=to_profile, 

174 source_url=str(from_url), 

175 target_url=str(to_url), 

176 total_resources=stats["total_nodes"], 

177 succeeded=succeeded, 

178 failed=failed, 

179 skipped=skipped, 

180 ), 

181 ) 

182 ) 

183 

184 _display_results(results, dag.nodes, console) 

185 

186 

187async def _collect_resources(client: PrefectClient) -> Sequence["MigratableProtocol"]: 1a

188 """Collect all resources from the source profile.""" 

189 from ._migratable_resources import construct_migratable_resource 

190 

191 resources = [] 

192 

193 collections: list[Sequence[MigratableType]] = await asyncio.gather( 

194 client.read_work_pools(), 

195 client.read_work_queues(), 

196 client.read_deployments(), 

197 client.read_block_documents(), 

198 client.read_variables(), 

199 client.read_global_concurrency_limits(), 

200 client.read_automations(), 

201 ) 

202 

203 resources = await asyncio.gather( 

204 *[ 

205 construct_migratable_resource(item) 

206 for collection in collections 

207 for item in collection 

208 ] 

209 ) 

210 

211 return resources 

212 

213 

214async def _find_root_resources( 1a

215 resources: Sequence["MigratableProtocol"], 

216) -> Sequence["MigratableProtocol"]: 

217 """Find resources that aren't dependencies of any other resource.""" 

218 all_ids = {r.source_id for r in resources} 

219 dependency_ids: set[uuid.UUID] = set() 

220 

221 for resource in resources: 

222 deps = await resource.get_dependencies() 

223 dependency_ids.update(d.source_id for d in deps) 

224 

225 root_ids = all_ids - dependency_ids 

226 return ( 

227 resources if not root_ids else [r for r in resources if r.source_id in root_ids] 

228 ) 

229 

230 

231async def _execute_transfer(dag: TransferDAG, console: Console) -> dict[uuid.UUID, Any]: 1a

232 """Execute the transfer with progress reporting.""" 

233 total = len(dag.nodes) 

234 

235 with Progress( 

236 TextColumn("[progress.description]{task.description}"), 

237 BarColumn(), 

238 TaskProgressColumn(), 

239 console=console, 

240 ) as progress: 

241 task = progress.add_task("Transferring resources...", total=total) 

242 

243 async def migrate_with_progress(resource: "MigratableProtocol"): 

244 try: 

245 await resource.migrate() 

246 progress.update(task, advance=1) 

247 return None 

248 except Exception as e: 

249 progress.update(task, advance=1) 

250 raise e 

251 

252 results = await dag.execute_concurrent( 

253 migrate_with_progress, 

254 max_workers=5, 

255 skip_on_failure=True, 

256 ) 

257 

258 return results 

259 

260 

261def _get_resource_display_name(resource: "MigratableProtocol") -> str: 1a

262 """Get a display name for a resource.""" 

263 mappings: list[tuple[str, Callable[["MigratableProtocol"], str]]] = [ 

264 ("source_work_pool", lambda r: f"work-pool/{r.source_work_pool.name}"), 

265 ("source_work_queue", lambda r: f"work-queue/{r.source_work_queue.name}"), 

266 ("source_deployment", lambda r: f"deployment/{r.source_deployment.name}"), 

267 ("source_flow", lambda r: f"flow/{r.source_flow.name}"), 

268 ( 

269 "source_block_document", 

270 lambda r: f"block-document/{r.source_block_document.name}", 

271 ), 

272 ("source_block_type", lambda r: f"block-type/{r.source_block_type.slug}"), 

273 ( 

274 "source_block_schema", 

275 lambda r: f"block-schema/{str(r.source_block_schema.id)[:8]}", 

276 ), 

277 ("source_variable", lambda r: f"variable/{r.source_variable.name}"), 

278 ("source_automation", lambda r: f"automation/{r.source_automation.name}"), 

279 ( 

280 "source_global_concurrency_limit", 

281 lambda r: f"concurrency-limit/{r.source_global_concurrency_limit.name}", 

282 ), 

283 ] 

284 

285 for attr, formatter in mappings: 

286 if hasattr(resource, attr): 

287 return formatter(resource) 

288 

289 return str(resource) 

290 

291 

292def _display_results( 1a

293 results: dict[uuid.UUID, Any], 

294 nodes: dict[uuid.UUID, "MigratableProtocol"], 

295 console: Console, 

296): 

297 """Display transfer results.""" 

298 succeeded: list[str] = [] 

299 failed: list[tuple[str, str]] = [] 

300 skipped: list[tuple[str, str]] = [] 

301 

302 for node_id, result in results.items(): 

303 resource = nodes[node_id] 

304 resource_name = _get_resource_display_name(resource) 

305 

306 if result is None: 

307 succeeded.append(resource_name) 

308 elif isinstance(result, TransferSkipped): 

309 skipped.append((resource_name, str(result))) 

310 else: 

311 failed.append((resource_name, str(result))) 

312 

313 if succeeded or failed or skipped: 

314 results_table = Table(title="Transfer Results", show_header=True) 

315 results_table.add_column("Resource", style="cyan") 

316 results_table.add_column("Status", style="white") 

317 results_table.add_column("Details", style="dim", no_wrap=False) 

318 

319 for name in succeeded: 

320 results_table.add_row(name, "[green]✓ Success[/green]", "") 

321 

322 for name, error in failed: 

323 results_table.add_row(name, "[red]✗ Failed[/red]", str(error)) 

324 

325 for name, reason in skipped: 

326 results_table.add_row(name, "[yellow]⊘ Skipped[/yellow]", reason) 

327 

328 console.print() 

329 console.print(results_table) 

330 

331 console.print() 

332 if failed: 

333 exit_with_error( 

334 f"Transfer completed with errors: {len(succeeded)} succeeded, " 

335 f"{len(failed)} failed, {len(skipped)} skipped" 

336 ) 

337 elif skipped: 

338 exit_with_success( 

339 f"Transfer completed: {len(succeeded)} succeeded, {len(skipped)} skipped" 

340 ) 

341 else: 

342 exit_with_success( 

343 f"Transfer completed successfully: {len(succeeded)} resource(s) transferred" 

344 )