Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/api.py: 11%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for making direct API requests. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import json 1a

8import sys 1a

9from pathlib import Path 1a

10from typing import Any, Optional 1a

11 

12import httpx 1a

13import typer 1a

14from rich.console import Console 1a

15from rich.syntax import Syntax 1a

16 

17from prefect.cli._utilities import exit_with_error 1a

18from prefect.cli.root import app 1a

19from prefect.client.cloud import get_cloud_client 1a

20from prefect.client.orchestration import get_client 1a

21from prefect.settings import get_current_settings 1a

22 

23console: Console = Console() 1a

24console_err: Console = Console(stderr=True) 1a

25 

26 

27def parse_headers(header_list: list[str]) -> dict[str, str]: 1a

28 """Parse header strings in format 'Key: Value' into a dict.""" 

29 headers = {} 

30 for header in header_list: 

31 if ":" not in header: 

32 exit_with_error( 

33 f"Invalid header format: {header!r}. Use 'Key: Value' format." 

34 ) 

35 key, value = header.split(":", 1) 

36 headers[key.strip()] = value.strip() 

37 return headers 

38 

39 

40def parse_data(data: str | None) -> dict[str, Any] | str | None: 1a

41 """Parse data input - can be JSON string, @filename, or None.""" 

42 if data is None: 

43 return None 

44 

45 if data.startswith("@"): 

46 filepath = Path(data[1:]) 

47 if not filepath.exists(): 

48 exit_with_error(f"File not found: {filepath}") 

49 try: 

50 content = filepath.read_text() 

51 return json.loads(content) 

52 except json.JSONDecodeError as e: 

53 exit_with_error(f"Invalid JSON in file {filepath}: {e}") 

54 else: 

55 try: 

56 return json.loads(data) 

57 except json.JSONDecodeError as e: 

58 exit_with_error(f"Invalid JSON data: {e}") 

59 

60 

61def read_stdin_data() -> dict[str, Any] | None: 1a

62 """Read and parse JSON data from stdin if available.""" 

63 if not sys.stdin.isatty(): 

64 try: 

65 content = sys.stdin.read() 

66 if content.strip(): 

67 return json.loads(content) 

68 except json.JSONDecodeError as e: 

69 exit_with_error(f"Invalid JSON from stdin: {e}") 

70 return None 

71 

72 

73def format_output(response: httpx.Response, verbose: bool) -> None: 1a

74 """Format and print the response using rich.""" 

75 if verbose: 

76 req = response.request 

77 console.print(f"[dim]> {req.method} {req.url}[/dim]") 

78 for key, value in req.headers.items(): 

79 if key.lower() == "authorization": 

80 value = "Bearer ***" 

81 console.print(f"[dim]> {key}: {value}[/dim]") 

82 console.print() 

83 

84 console.print(f"[dim]< {response.status_code} {response.reason_phrase}[/dim]") 

85 for key, value in response.headers.items(): 

86 console.print(f"[dim]< {key}: {value}[/dim]") 

87 console.print() 

88 

89 if response.text: 

90 try: 

91 data = response.json() 

92 json_str = json.dumps(data, indent=2) 

93 if sys.stdout.isatty(): 

94 syntax = Syntax(json_str, "json", theme="monokai", word_wrap=True) 

95 console.print(syntax) 

96 else: 

97 print(json.dumps(data)) 

98 except (json.JSONDecodeError, ValueError): 

99 console.print(response.text) 

100 

101 

102def get_exit_code(error: Exception) -> int: 1a

103 """Determine the appropriate exit code for an error.""" 

104 if isinstance(error, httpx.HTTPStatusError): 

105 status = error.response.status_code 

106 if status in (401, 403): 

107 return 3 

108 elif 400 <= status < 500: 

109 return 4 

110 elif 500 <= status < 600: 

111 return 5 

112 elif isinstance( 

113 error, (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) 

114 ): 

115 return 7 

116 

117 return 1 

118 

119 

120@app.command(name="api") 1a

121async def api_request( 1a

122 method: str = typer.Argument( 

123 ..., help="HTTP method (GET, POST, PUT, PATCH, DELETE)" 

124 ), 

125 path: str = typer.Argument(..., help="API path (e.g., /flows, /flows/filter)"), 

126 data: Optional[str] = typer.Option( 

127 None, 

128 "--data", 

129 help="Request body as JSON string or @filename", 

130 ), 

131 headers: list[str] = typer.Option( 

132 None, 

133 "-H", 

134 "--header", 

135 help="Custom header in 'Key: Value' format", 

136 ), 

137 verbose: bool = typer.Option( 

138 False, 

139 "-v", 

140 "--verbose", 

141 help="Show request/response headers", 

142 ), 

143 root: bool = typer.Option( 

144 False, 

145 "--root", 

146 help="Access API root level (e.g., /api/me)", 

147 ), 

148 account: bool = typer.Option( 

149 False, 

150 "--account", 

151 help="Access account level (Cloud only)", 

152 ), 

153): 

154 """ 

155 Make a direct request to the Prefect API. 

156 

157 Examples: 

158 ```bash 

159 # GET request 

160 $ prefect api GET /flows/abc-123 

161 

162 # POST request with data 

163 $ prefect api POST /flows/filter --data '{"limit": 10}' 

164 

165 # POST to filter endpoint (defaults to empty object) 

166 $ prefect api POST /flows/filter 

167 

168 # Custom headers 

169 $ prefect api POST /flows/filter -H "X-Custom: value" --data '{}' 

170 

171 # Verbose output 

172 $ prefect api GET /flows --verbose 

173 

174 # Account-level operation (Cloud) 

175 $ prefect api GET /workspaces --account 

176 

177 # API root level (Cloud only) 

178 $ prefect api GET /me --root 

179 ``` 

180 """ 

181 if headers is None: 

182 headers = [] 

183 

184 http_methods = {"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"} 

185 http_method = method.upper() 

186 

187 if http_method not in http_methods: 

188 exit_with_error( 

189 f"Invalid HTTP method: {method!r}. " 

190 f"Must be one of: {', '.join(sorted(http_methods))}" 

191 ) 

192 

193 settings = get_current_settings() 

194 configured_api_url = settings.api.url 

195 if not configured_api_url: 

196 exit_with_error( 

197 "No API URL configured. Set PREFECT_API_URL or run 'prefect cloud login'." 

198 ) 

199 

200 cloud_api_url = settings.cloud.api_url 

201 is_cloud = cloud_api_url and configured_api_url.startswith(cloud_api_url) 

202 

203 if (root or account) and not is_cloud: 

204 exit_with_error( 

205 "--root and --account flags are only valid for Prefect Cloud. " 

206 "For self-hosted servers, paths are relative to your configured API URL." 

207 ) 

208 

209 body_data = parse_data(data) 

210 

211 if body_data is None and data is None: 

212 stdin_data = read_stdin_data() 

213 if stdin_data is not None: 

214 body_data = stdin_data 

215 elif http_method in ("POST", "PUT", "PATCH"): 

216 body_data = {} 

217 

218 custom_headers = parse_headers(headers) 

219 

220 try: 

221 if root or account: 

222 async with get_cloud_client() as client: 

223 if account: 

224 route = f"{client.account_base_url}{path}" 

225 else: 

226 route = path 

227 

228 response = await client.raw_request( 

229 http_method, 

230 route, 

231 json=body_data if body_data is not None else None, 

232 headers=custom_headers if custom_headers else None, 

233 ) 

234 response.raise_for_status() 

235 else: 

236 async with get_client() as client: 

237 response = await client.request( 

238 http_method, 

239 path, 

240 json=body_data if body_data is not None else None, 

241 headers=custom_headers if custom_headers else None, 

242 ) 

243 response.raise_for_status() 

244 

245 format_output(response, verbose) 

246 

247 except httpx.HTTPStatusError as e: 

248 if verbose: 

249 format_output(e.response, verbose) 

250 else: 

251 console_err.print( 

252 f"[red]Error: {e.response.status_code} {e.response.reason_phrase}[/red]" 

253 ) 

254 if e.response.text: 

255 try: 

256 error_data = e.response.json() 

257 json_str = json.dumps(error_data, indent=2) 

258 syntax = Syntax(json_str, "json", theme="monokai") 

259 console_err.print(syntax) 

260 except (json.JSONDecodeError, ValueError): 

261 console_err.print(e.response.text) 

262 

263 if e.response.status_code == 401: 

264 console_err.print( 

265 "\n[yellow]Check your API key configuration with:[/yellow] prefect config view" 

266 ) 

267 

268 raise typer.Exit(get_exit_code(e)) 

269 

270 except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) as e: 

271 console_err.print(f"[red]Error: Network error - {e}[/red]") 

272 console_err.print(f"\nCould not connect to API at: {configured_api_url}") 

273 raise typer.Exit(7) 

274 

275 except Exception as e: 

276 console_err.print(f"[red]Error: {e}[/red]") 

277 raise typer.Exit(1)