Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/transfer/__init__.py: 17%
139 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Command line interface for transferring resources between profiles.
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8from logging import Logger 1a
9from typing import TYPE_CHECKING, Any, Callable, Sequence 1a
10import uuid 1a
12import typer 1a
13from rich.console import Console 1a
14from rich.panel import Panel 1a
15from rich.progress import ( 1a
16 BarColumn,
17 Progress,
18 SpinnerColumn,
19 TaskProgressColumn,
20 TextColumn,
21)
22from rich.table import Table 1a
24from prefect.cli._utilities import exit_with_error, exit_with_success 1a
25from prefect.cli.root import app, is_interactive 1a
26from prefect.cli.transfer._exceptions import TransferSkipped 1a
27from prefect.cli.transfer._migratable_resources import MigratableType 1a
28from prefect.client.orchestration import PrefectClient, get_client 1a
29from prefect.context import use_profile 1a
30from prefect.events.schemas.events import Event, Resource 1a
31from prefect.logging import get_logger 1a
32from prefect.settings import load_profiles 1a
33from prefect.events import get_events_client 1a
35from ._dag import TransferDAG 1a
37if TYPE_CHECKING: 37 ↛ 40line 37 didn't jump to line 40 because the condition on line 37 was never true1a
38 # we use the forward ref and defer this import because that module imports
39 # a ton of schemas that we don't want to import here at module load time
40 from prefect.cli.transfer._migratable_resources import MigratableProtocol
42logger: Logger = get_logger(__name__) 1a
45@app.command() 1a
46async def transfer( 1a
47 from_profile: str = typer.Option(
48 ..., "--from", help="Source profile to transfer resources from"
49 ),
50 to_profile: str = typer.Option(
51 ..., "--to", help="Target profile to transfer resources to"
52 ),
53):
54 """
55 Transfer resources from one Prefect profile to another.
57 Automatically handles dependencies between resources and transfers them
58 in the correct order.
60 \b
61 Examples:
62 \b
63 Transfer all resources from staging to production:
64 \b
65 $ prefect transfer --from staging --to prod
66 """
67 console = Console()
69 profiles = load_profiles(include_defaults=False)
71 if from_profile not in profiles:
72 exit_with_error(f"Source profile '{from_profile}' not found.")
74 if to_profile not in profiles:
75 exit_with_error(f"Target profile '{to_profile}' not found.")
77 if from_profile == to_profile:
78 exit_with_error("Source and target profiles must be different.")
80 console.print()
81 console.print(
82 Panel(
83 f"[bold cyan]Source:[/bold cyan] {from_profile}\n"
84 f"[bold cyan]Target:[/bold cyan] {to_profile}",
85 title="Transfer Configuration",
86 expand=False,
87 padding=(1, 2),
88 )
89 )
91 with use_profile(from_profile):
92 with Progress(
93 SpinnerColumn(),
94 TextColumn("[progress.description]{task.description}"),
95 console=console,
96 ) as progress:
97 task = progress.add_task("Collecting resources...", total=None)
98 async with get_client() as client:
99 from_url = client.api_url
100 resources = await _collect_resources(client)
102 if not resources:
103 console.print("\n[yellow]No resources found to transfer.[/yellow]")
104 return
106 progress.update(task, description="Building dependency graph...")
107 roots = await _find_root_resources(resources)
108 dag = TransferDAG()
109 await dag.build_from_roots(roots)
111 stats = dag.get_statistics()
113 if stats["has_cycles"]:
114 exit_with_error("Cannot transfer resources with circular dependencies.")
116 console.print()
117 if is_interactive() and not typer.confirm(
118 f"Transfer {stats['total_nodes']} resource(s) from '{from_profile}' to '{to_profile}'?"
119 ):
120 exit_with_error("Transfer cancelled.")
122 console.print()
123 with use_profile(to_profile):
124 async with get_client() as client:
125 to_url = client.api_url
126 async with get_events_client() as events_client:
127 transfer_id = uuid.uuid4()
128 await events_client.emit(
129 event=Event(
130 event="prefect.workspace.transfer.started",
131 resource=Resource.model_validate(
132 {
133 "prefect.resource.id": f"prefect.workspace.transfer.{transfer_id}",
134 "prefect.resource.name": f"{from_profile} -> {to_profile} transfer",
135 "prefect.resource.role": "transfer",
136 }
137 ),
138 payload=dict(
139 source_profile=from_profile,
140 target_profile=to_profile,
141 source_url=str(from_url),
142 target_url=str(to_url),
143 total_resources=stats["total_nodes"],
144 ),
145 )
146 )
147 results = await _execute_transfer(dag, console)
149 succeeded: int = 0
150 failed: int = 0
151 skipped: int = 0
153 for result in results.values():
154 if result is None:
155 succeeded += 1
156 elif isinstance(result, TransferSkipped):
157 skipped += 1
158 else:
159 failed += 1
161 await events_client.emit(
162 event=Event(
163 event="prefect.workspace.transfer.completed",
164 resource=Resource.model_validate(
165 {
166 "prefect.resource.id": f"prefect.workspace.transfer.{transfer_id}",
167 "prefect.resource.name": f"{from_profile} -> {to_profile} transfer",
168 "prefect.resource.role": "transfer",
169 }
170 ),
171 payload=dict(
172 source_profile=from_profile,
173 target_profile=to_profile,
174 source_url=str(from_url),
175 target_url=str(to_url),
176 total_resources=stats["total_nodes"],
177 succeeded=succeeded,
178 failed=failed,
179 skipped=skipped,
180 ),
181 )
182 )
184 _display_results(results, dag.nodes, console)
187async def _collect_resources(client: PrefectClient) -> Sequence["MigratableProtocol"]: 1a
188 """Collect all resources from the source profile."""
189 from ._migratable_resources import construct_migratable_resource
191 resources = []
193 collections: list[Sequence[MigratableType]] = await asyncio.gather(
194 client.read_work_pools(),
195 client.read_work_queues(),
196 client.read_deployments(),
197 client.read_block_documents(),
198 client.read_variables(),
199 client.read_global_concurrency_limits(),
200 client.read_automations(),
201 )
203 resources = await asyncio.gather(
204 *[
205 construct_migratable_resource(item)
206 for collection in collections
207 for item in collection
208 ]
209 )
211 return resources
214async def _find_root_resources( 1a
215 resources: Sequence["MigratableProtocol"],
216) -> Sequence["MigratableProtocol"]:
217 """Find resources that aren't dependencies of any other resource."""
218 all_ids = {r.source_id for r in resources}
219 dependency_ids: set[uuid.UUID] = set()
221 for resource in resources:
222 deps = await resource.get_dependencies()
223 dependency_ids.update(d.source_id for d in deps)
225 root_ids = all_ids - dependency_ids
226 return (
227 resources if not root_ids else [r for r in resources if r.source_id in root_ids]
228 )
231async def _execute_transfer(dag: TransferDAG, console: Console) -> dict[uuid.UUID, Any]: 1a
232 """Execute the transfer with progress reporting."""
233 total = len(dag.nodes)
235 with Progress(
236 TextColumn("[progress.description]{task.description}"),
237 BarColumn(),
238 TaskProgressColumn(),
239 console=console,
240 ) as progress:
241 task = progress.add_task("Transferring resources...", total=total)
243 async def migrate_with_progress(resource: "MigratableProtocol"):
244 try:
245 await resource.migrate()
246 progress.update(task, advance=1)
247 return None
248 except Exception as e:
249 progress.update(task, advance=1)
250 raise e
252 results = await dag.execute_concurrent(
253 migrate_with_progress,
254 max_workers=5,
255 skip_on_failure=True,
256 )
258 return results
261def _get_resource_display_name(resource: "MigratableProtocol") -> str: 1a
262 """Get a display name for a resource."""
263 mappings: list[tuple[str, Callable[["MigratableProtocol"], str]]] = [
264 ("source_work_pool", lambda r: f"work-pool/{r.source_work_pool.name}"),
265 ("source_work_queue", lambda r: f"work-queue/{r.source_work_queue.name}"),
266 ("source_deployment", lambda r: f"deployment/{r.source_deployment.name}"),
267 ("source_flow", lambda r: f"flow/{r.source_flow.name}"),
268 (
269 "source_block_document",
270 lambda r: f"block-document/{r.source_block_document.name}",
271 ),
272 ("source_block_type", lambda r: f"block-type/{r.source_block_type.slug}"),
273 (
274 "source_block_schema",
275 lambda r: f"block-schema/{str(r.source_block_schema.id)[:8]}",
276 ),
277 ("source_variable", lambda r: f"variable/{r.source_variable.name}"),
278 ("source_automation", lambda r: f"automation/{r.source_automation.name}"),
279 (
280 "source_global_concurrency_limit",
281 lambda r: f"concurrency-limit/{r.source_global_concurrency_limit.name}",
282 ),
283 ]
285 for attr, formatter in mappings:
286 if hasattr(resource, attr):
287 return formatter(resource)
289 return str(resource)
292def _display_results( 1a
293 results: dict[uuid.UUID, Any],
294 nodes: dict[uuid.UUID, "MigratableProtocol"],
295 console: Console,
296):
297 """Display transfer results."""
298 succeeded: list[str] = []
299 failed: list[tuple[str, str]] = []
300 skipped: list[tuple[str, str]] = []
302 for node_id, result in results.items():
303 resource = nodes[node_id]
304 resource_name = _get_resource_display_name(resource)
306 if result is None:
307 succeeded.append(resource_name)
308 elif isinstance(result, TransferSkipped):
309 skipped.append((resource_name, str(result)))
310 else:
311 failed.append((resource_name, str(result)))
313 if succeeded or failed or skipped:
314 results_table = Table(title="Transfer Results", show_header=True)
315 results_table.add_column("Resource", style="cyan")
316 results_table.add_column("Status", style="white")
317 results_table.add_column("Details", style="dim", no_wrap=False)
319 for name in succeeded:
320 results_table.add_row(name, "[green]✓ Success[/green]", "")
322 for name, error in failed:
323 results_table.add_row(name, "[red]✗ Failed[/red]", str(error))
325 for name, reason in skipped:
326 results_table.add_row(name, "[yellow]⊘ Skipped[/yellow]", reason)
328 console.print()
329 console.print(results_table)
331 console.print()
332 if failed:
333 exit_with_error(
334 f"Transfer completed with errors: {len(succeeded)} succeeded, "
335 f"{len(failed)} failed, {len(skipped)} skipped"
336 )
337 elif skipped:
338 exit_with_success(
339 f"Transfer completed: {len(succeeded)} succeeded, {len(skipped)} skipped"
340 )
341 else:
342 exit_with_success(
343 f"Transfer completed successfully: {len(succeeded)} resource(s) transferred"
344 )