Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/api.py: 11%
142 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for making direct API requests.
3"""
5from __future__ import annotations 1a
7import json 1a
8import sys 1a
9from pathlib import Path 1a
10from typing import Any, Optional 1a
12import httpx 1a
13import typer 1a
14from rich.console import Console 1a
15from rich.syntax import Syntax 1a
17from prefect.cli._utilities import exit_with_error 1a
18from prefect.cli.root import app 1a
19from prefect.client.cloud import get_cloud_client 1a
20from prefect.client.orchestration import get_client 1a
21from prefect.settings import get_current_settings 1a
23console: Console = Console() 1a
24console_err: Console = Console(stderr=True) 1a
27def parse_headers(header_list: list[str]) -> dict[str, str]: 1a
28 """Parse header strings in format 'Key: Value' into a dict."""
29 headers = {}
30 for header in header_list:
31 if ":" not in header:
32 exit_with_error(
33 f"Invalid header format: {header!r}. Use 'Key: Value' format."
34 )
35 key, value = header.split(":", 1)
36 headers[key.strip()] = value.strip()
37 return headers
40def parse_data(data: str | None) -> dict[str, Any] | str | None: 1a
41 """Parse data input - can be JSON string, @filename, or None."""
42 if data is None:
43 return None
45 if data.startswith("@"):
46 filepath = Path(data[1:])
47 if not filepath.exists():
48 exit_with_error(f"File not found: {filepath}")
49 try:
50 content = filepath.read_text()
51 return json.loads(content)
52 except json.JSONDecodeError as e:
53 exit_with_error(f"Invalid JSON in file {filepath}: {e}")
54 else:
55 try:
56 return json.loads(data)
57 except json.JSONDecodeError as e:
58 exit_with_error(f"Invalid JSON data: {e}")
61def read_stdin_data() -> dict[str, Any] | None: 1a
62 """Read and parse JSON data from stdin if available."""
63 if not sys.stdin.isatty():
64 try:
65 content = sys.stdin.read()
66 if content.strip():
67 return json.loads(content)
68 except json.JSONDecodeError as e:
69 exit_with_error(f"Invalid JSON from stdin: {e}")
70 return None
73def format_output(response: httpx.Response, verbose: bool) -> None: 1a
74 """Format and print the response using rich."""
75 if verbose:
76 req = response.request
77 console.print(f"[dim]> {req.method} {req.url}[/dim]")
78 for key, value in req.headers.items():
79 if key.lower() == "authorization":
80 value = "Bearer ***"
81 console.print(f"[dim]> {key}: {value}[/dim]")
82 console.print()
84 console.print(f"[dim]< {response.status_code} {response.reason_phrase}[/dim]")
85 for key, value in response.headers.items():
86 console.print(f"[dim]< {key}: {value}[/dim]")
87 console.print()
89 if response.text:
90 try:
91 data = response.json()
92 json_str = json.dumps(data, indent=2)
93 if sys.stdout.isatty():
94 syntax = Syntax(json_str, "json", theme="monokai", word_wrap=True)
95 console.print(syntax)
96 else:
97 print(json.dumps(data))
98 except (json.JSONDecodeError, ValueError):
99 console.print(response.text)
102def get_exit_code(error: Exception) -> int: 1a
103 """Determine the appropriate exit code for an error."""
104 if isinstance(error, httpx.HTTPStatusError):
105 status = error.response.status_code
106 if status in (401, 403):
107 return 3
108 elif 400 <= status < 500:
109 return 4
110 elif 500 <= status < 600:
111 return 5
112 elif isinstance(
113 error, (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError)
114 ):
115 return 7
117 return 1
120@app.command(name="api") 1a
121async def api_request( 1a
122 method: str = typer.Argument(
123 ..., help="HTTP method (GET, POST, PUT, PATCH, DELETE)"
124 ),
125 path: str = typer.Argument(..., help="API path (e.g., /flows, /flows/filter)"),
126 data: Optional[str] = typer.Option(
127 None,
128 "--data",
129 help="Request body as JSON string or @filename",
130 ),
131 headers: list[str] = typer.Option(
132 None,
133 "-H",
134 "--header",
135 help="Custom header in 'Key: Value' format",
136 ),
137 verbose: bool = typer.Option(
138 False,
139 "-v",
140 "--verbose",
141 help="Show request/response headers",
142 ),
143 root: bool = typer.Option(
144 False,
145 "--root",
146 help="Access API root level (e.g., /api/me)",
147 ),
148 account: bool = typer.Option(
149 False,
150 "--account",
151 help="Access account level (Cloud only)",
152 ),
153):
154 """
155 Make a direct request to the Prefect API.
157 Examples:
158 ```bash
159 # GET request
160 $ prefect api GET /flows/abc-123
162 # POST request with data
163 $ prefect api POST /flows/filter --data '{"limit": 10}'
165 # POST to filter endpoint (defaults to empty object)
166 $ prefect api POST /flows/filter
168 # Custom headers
169 $ prefect api POST /flows/filter -H "X-Custom: value" --data '{}'
171 # Verbose output
172 $ prefect api GET /flows --verbose
174 # Account-level operation (Cloud)
175 $ prefect api GET /workspaces --account
177 # API root level (Cloud only)
178 $ prefect api GET /me --root
179 ```
180 """
181 if headers is None:
182 headers = []
184 http_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
185 http_method = method.upper()
187 if http_method not in http_methods:
188 exit_with_error(
189 f"Invalid HTTP method: {method!r}. "
190 f"Must be one of: {', '.join(sorted(http_methods))}"
191 )
193 settings = get_current_settings()
194 configured_api_url = settings.api.url
195 if not configured_api_url:
196 exit_with_error(
197 "No API URL configured. Set PREFECT_API_URL or run 'prefect cloud login'."
198 )
200 cloud_api_url = settings.cloud.api_url
201 is_cloud = cloud_api_url and configured_api_url.startswith(cloud_api_url)
203 if (root or account) and not is_cloud:
204 exit_with_error(
205 "--root and --account flags are only valid for Prefect Cloud. "
206 "For self-hosted servers, paths are relative to your configured API URL."
207 )
209 body_data = parse_data(data)
211 if body_data is None and data is None:
212 stdin_data = read_stdin_data()
213 if stdin_data is not None:
214 body_data = stdin_data
215 elif http_method in ("POST", "PUT", "PATCH"):
216 body_data = {}
218 custom_headers = parse_headers(headers)
220 try:
221 if root or account:
222 async with get_cloud_client() as client:
223 if account:
224 route = f"{client.account_base_url}{path}"
225 else:
226 route = path
228 response = await client.raw_request(
229 http_method,
230 route,
231 json=body_data if body_data is not None else None,
232 headers=custom_headers if custom_headers else None,
233 )
234 response.raise_for_status()
235 else:
236 async with get_client() as client:
237 response = await client.request(
238 http_method,
239 path,
240 json=body_data if body_data is not None else None,
241 headers=custom_headers if custom_headers else None,
242 )
243 response.raise_for_status()
245 format_output(response, verbose)
247 except httpx.HTTPStatusError as e:
248 if verbose:
249 format_output(e.response, verbose)
250 else:
251 console_err.print(
252 f"[red]Error: {e.response.status_code} {e.response.reason_phrase}[/red]"
253 )
254 if e.response.text:
255 try:
256 error_data = e.response.json()
257 json_str = json.dumps(error_data, indent=2)
258 syntax = Syntax(json_str, "json", theme="monokai")
259 console_err.print(syntax)
260 except (json.JSONDecodeError, ValueError):
261 console_err.print(e.response.text)
263 if e.response.status_code == 401:
264 console_err.print(
265 "\n[yellow]Check your API key configuration with:[/yellow] prefect config view"
266 )
268 raise typer.Exit(get_exit_code(e))
270 except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) as e:
271 console_err.print(f"[red]Error: Network error - {e}[/red]")
272 console_err.print(f"\nCould not connect to API at: {configured_api_url}")
273 raise typer.Exit(7)
275 except Exception as e:
276 console_err.print(f"[red]Error: {e}[/red]")
277 raise typer.Exit(1)