Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/ip_allowlist.py: 26%
126 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio 1a
2from logging import Logger 1a
3from typing import Annotated, Optional 1a
5import typer 1a
6from pydantic import BaseModel, IPvAnyNetwork 1a
7from rich.panel import Panel 1a
8from rich.table import Table 1a
10from prefect.cli._types import PrefectTyper 1a
11from prefect.cli._utilities import exit_with_error, exit_with_success 1a
12from prefect.cli.cloud import cloud_app, confirm_logged_in 1a
13from prefect.cli.root import app 1a
14from prefect.client.cloud import get_cloud_client 1a
15from prefect.client.schemas.objects import IPAllowlist, IPAllowlistEntry 1a
16from prefect.exceptions import PrefectHTTPStatusError 1a
17from prefect.logging.loggers import get_logger 1a
19ip_allowlist_app: PrefectTyper = PrefectTyper( 1a
20 name="ip-allowlist", help="Manage Prefect Cloud IP Allowlists"
21)
22cloud_app.add_typer(ip_allowlist_app, aliases=["ip-allowlists"]) 1a
24logger: Logger = get_logger(__name__) 1a
27@ip_allowlist_app.callback() 1a
28def require_access_to_ip_allowlisting(ctx: typer.Context) -> None: 1a
29 """Enforce access to IP allowlisting for all subcommands."""
30 asyncio.run(_require_access_to_ip_allowlisting(ctx))
33async def _require_access_to_ip_allowlisting(ctx: typer.Context) -> None: 1a
34 """Check if the account has access to IP allowlisting.
36 Exits with an error if the account does not have access to IP allowlisting.
38 On success, sets Typer context meta["enforce_ip_allowlist"] to
39 True if the account has IP allowlist enforcement enabled, False otherwise.
40 """
41 confirm_logged_in()
43 async with get_cloud_client(infer_cloud_url=True) as client:
44 account_settings = await client.read_account_settings()
46 if "enforce_ip_allowlist" not in account_settings:
47 return exit_with_error("IP allowlisting is not available for this account.")
49 enforce_ip_allowlist = account_settings.get("enforce_ip_allowlist", False)
50 ctx.meta["enforce_ip_allowlist"] = enforce_ip_allowlist
53@ip_allowlist_app.command() 1a
54async def enable(ctx: typer.Context) -> None: 1a
55 """Enable the IP allowlist for your account. When enabled, if the allowlist is non-empty, then access to your Prefect Cloud account will be restricted to only those IP addresses on the allowlist."""
56 enforcing_ip_allowlist = ctx.meta["enforce_ip_allowlist"]
57 if enforcing_ip_allowlist:
58 exit_with_success("IP allowlist is already enabled.")
60 async with get_cloud_client(infer_cloud_url=True) as client:
61 my_access_if_enabled = await client.check_ip_allowlist_access()
62 if not my_access_if_enabled.allowed:
63 exit_with_error(
64 f"Error enabling IP allowlist: {my_access_if_enabled.detail}"
65 )
67 logger.debug(my_access_if_enabled.detail)
69 if not typer.confirm(
70 "Enabling the IP allowlist will restrict Prefect Cloud API and UI access to only the IP addresses on the list. "
71 "Continue?"
72 ):
73 exit_with_error("Aborted.")
74 await client.update_account_settings({"enforce_ip_allowlist": True})
76 exit_with_success("IP allowlist enabled.")
79@ip_allowlist_app.command() 1a
80async def disable(): 1a
81 """Disable the IP allowlist for your account. When disabled, all IP addresses will be allowed to access your Prefect Cloud account."""
82 async with get_cloud_client(infer_cloud_url=True) as client:
83 await client.update_account_settings({"enforce_ip_allowlist": False})
85 exit_with_success("IP allowlist disabled.")
88@ip_allowlist_app.command() 1a
89async def ls(ctx: typer.Context): 1a
90 """Fetch and list all IP allowlist entries in your account."""
91 async with get_cloud_client(infer_cloud_url=True) as client:
92 ip_allowlist = await client.read_account_ip_allowlist()
94 _print_ip_allowlist_table(
95 ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"]
96 )
99class IPNetworkArg(BaseModel): 1a
100 raw: str 1a
101 parsed: IPvAnyNetwork 1a
104def parse_ip_network_argument(val: str) -> IPNetworkArg: 1a
105 return IPNetworkArg(
106 raw=val,
107 parsed=val,
108 )
111IP_ARGUMENT = Annotated[ 1a
112 IPNetworkArg,
113 typer.Argument(
114 parser=parse_ip_network_argument,
115 help="An IP address or range in CIDR notation. E.g. 192.168.1.0 or 192.168.1.0/24",
116 metavar="IP address or range",
117 ),
118]
121@ip_allowlist_app.command() 1a
122async def add( 1a
123 ctx: typer.Context,
124 ip_address_or_range: IP_ARGUMENT,
125 description: Optional[str] = typer.Option(
126 None,
127 "--description",
128 "-d",
129 help="A short description to annotate the entry with.",
130 ),
131):
132 """Add a new IP entry to your account IP allowlist."""
133 new_entry = IPAllowlistEntry(
134 ip_network=ip_address_or_range.parsed, description=description, enabled=True
135 )
137 async with get_cloud_client(infer_cloud_url=True) as client:
138 ip_allowlist = await client.read_account_ip_allowlist()
140 existing_entry_with_same_ip = None
141 for entry in ip_allowlist.entries:
142 if entry.ip_network == ip_address_or_range.parsed:
143 existing_entry_with_same_ip = entry
144 break
146 if existing_entry_with_same_ip:
147 if not typer.confirm(
148 f"There's already an entry for this IP ({ip_address_or_range.raw}). Do you want to overwrite it?"
149 ):
150 exit_with_error("Aborted.")
151 ip_allowlist.entries.remove(existing_entry_with_same_ip)
153 ip_allowlist.entries.append(new_entry)
155 try:
156 await client.update_account_ip_allowlist(ip_allowlist)
157 except PrefectHTTPStatusError as exc:
158 _handle_update_error(exc)
160 updated_ip_allowlist = await client.read_account_ip_allowlist()
161 _print_ip_allowlist_table(
162 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"]
163 )
166@ip_allowlist_app.command() 1a
167async def remove(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): 1a
168 """Remove an IP entry from your account IP allowlist."""
169 async with get_cloud_client(infer_cloud_url=True) as client:
170 ip_allowlist = await client.read_account_ip_allowlist()
171 ip_allowlist.entries = [
172 entry
173 for entry in ip_allowlist.entries
174 if entry.ip_network != ip_address_or_range.parsed
175 ]
177 try:
178 await client.update_account_ip_allowlist(ip_allowlist)
179 except PrefectHTTPStatusError as exc:
180 _handle_update_error(exc)
182 updated_ip_allowlist = await client.read_account_ip_allowlist()
183 _print_ip_allowlist_table(
184 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"]
185 )
188@ip_allowlist_app.command() 1a
189async def toggle(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): 1a
190 """Toggle the enabled status of an individual IP entry in your account IP allowlist."""
191 async with get_cloud_client(infer_cloud_url=True) as client:
192 ip_allowlist = await client.read_account_ip_allowlist()
194 found_matching_entry = False
195 for entry in ip_allowlist.entries:
196 if entry.ip_network == ip_address_or_range.parsed:
197 entry.enabled = not entry.enabled
198 found_matching_entry = True
199 break
201 if not found_matching_entry:
202 exit_with_error(
203 f"No entry found with IP address `{ip_address_or_range.raw}`."
204 )
206 try:
207 await client.update_account_ip_allowlist(ip_allowlist)
208 except PrefectHTTPStatusError as exc:
209 _handle_update_error(exc)
211 updated_ip_allowlist = await client.read_account_ip_allowlist()
212 _print_ip_allowlist_table(
213 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"]
214 )
217def _print_ip_allowlist_table(ip_allowlist: IPAllowlist, enabled: bool): 1a
218 if not ip_allowlist.entries:
219 app.console.print(
220 Panel(
221 "IP allowlist is empty. Add an entry to secure access to your Prefect Cloud account.",
222 expand=False,
223 )
224 )
225 return
227 red_asterisk_if_not_enabled = "[red]*[/red]" if enabled is False else ""
229 table = Table(
230 title="IP Allowlist " + red_asterisk_if_not_enabled,
231 caption=f"{red_asterisk_if_not_enabled} Enforcement is "
232 f"[bold]{'ENABLED' if enabled else '[red]DISABLED[/red]'}[/bold].",
233 caption_style="not dim",
234 )
236 table.add_column("IP Address", style="cyan", no_wrap=True)
237 table.add_column("Description", style="blue", no_wrap=False)
238 table.add_column("Enabled", style="green", justify="right", no_wrap=True)
239 table.add_column("Last Seen", style="magenta", justify="right", no_wrap=True)
241 for entry in ip_allowlist.entries:
242 table.add_row(
243 str(entry.ip_network),
244 entry.description,
245 str(entry.enabled),
246 entry.last_seen or "Never",
247 style="dim" if not entry.enabled else None,
248 )
250 app.console.print(table)
253def _handle_update_error(error: PrefectHTTPStatusError): 1a
254 if error.response.status_code == 422 and (
255 details := (
256 error.response.json().get("detail")
257 or error.response.json().get("exception_detail")
258 )
259 ):
260 exit_with_error(f"Error updating allowlist: {details}")
261 else:
262 raise error