Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/ip_allowlist.py: 26%

126 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import asyncio 1a

2from logging import Logger 1a

3from typing import Annotated, Optional 1a

4 

5import typer 1a

6from pydantic import BaseModel, IPvAnyNetwork 1a

7from rich.panel import Panel 1a

8from rich.table import Table 1a

9 

10from prefect.cli._types import PrefectTyper 1a

11from prefect.cli._utilities import exit_with_error, exit_with_success 1a

12from prefect.cli.cloud import cloud_app, confirm_logged_in 1a

13from prefect.cli.root import app 1a

14from prefect.client.cloud import get_cloud_client 1a

15from prefect.client.schemas.objects import IPAllowlist, IPAllowlistEntry 1a

16from prefect.exceptions import PrefectHTTPStatusError 1a

17from prefect.logging.loggers import get_logger 1a

18 

19ip_allowlist_app: PrefectTyper = PrefectTyper( 1a

20 name="ip-allowlist", help="Manage Prefect Cloud IP Allowlists" 

21) 

22cloud_app.add_typer(ip_allowlist_app, aliases=["ip-allowlists"]) 1a

23 

24logger: Logger = get_logger(__name__) 1a

25 

26 

27@ip_allowlist_app.callback() 1a

28def require_access_to_ip_allowlisting(ctx: typer.Context) -> None: 1a

29 """Enforce access to IP allowlisting for all subcommands.""" 

30 asyncio.run(_require_access_to_ip_allowlisting(ctx)) 

31 

32 

33async def _require_access_to_ip_allowlisting(ctx: typer.Context) -> None: 1a

34 """Check if the account has access to IP allowlisting. 

35 

36 Exits with an error if the account does not have access to IP allowlisting. 

37 

38 On success, sets Typer context meta["enforce_ip_allowlist"] to 

39 True if the account has IP allowlist enforcement enabled, False otherwise. 

40 """ 

41 confirm_logged_in() 

42 

43 async with get_cloud_client(infer_cloud_url=True) as client: 

44 account_settings = await client.read_account_settings() 

45 

46 if "enforce_ip_allowlist" not in account_settings: 

47 return exit_with_error("IP allowlisting is not available for this account.") 

48 

49 enforce_ip_allowlist = account_settings.get("enforce_ip_allowlist", False) 

50 ctx.meta["enforce_ip_allowlist"] = enforce_ip_allowlist 

51 

52 

53@ip_allowlist_app.command() 1a

54async def enable(ctx: typer.Context) -> None: 1a

55 """Enable the IP allowlist for your account. When enabled, if the allowlist is non-empty, then access to your Prefect Cloud account will be restricted to only those IP addresses on the allowlist.""" 

56 enforcing_ip_allowlist = ctx.meta["enforce_ip_allowlist"] 

57 if enforcing_ip_allowlist: 

58 exit_with_success("IP allowlist is already enabled.") 

59 

60 async with get_cloud_client(infer_cloud_url=True) as client: 

61 my_access_if_enabled = await client.check_ip_allowlist_access() 

62 if not my_access_if_enabled.allowed: 

63 exit_with_error( 

64 f"Error enabling IP allowlist: {my_access_if_enabled.detail}" 

65 ) 

66 

67 logger.debug(my_access_if_enabled.detail) 

68 

69 if not typer.confirm( 

70 "Enabling the IP allowlist will restrict Prefect Cloud API and UI access to only the IP addresses on the list. " 

71 "Continue?" 

72 ): 

73 exit_with_error("Aborted.") 

74 await client.update_account_settings({"enforce_ip_allowlist": True}) 

75 

76 exit_with_success("IP allowlist enabled.") 

77 

78 

79@ip_allowlist_app.command() 1a

80async def disable(): 1a

81 """Disable the IP allowlist for your account. When disabled, all IP addresses will be allowed to access your Prefect Cloud account.""" 

82 async with get_cloud_client(infer_cloud_url=True) as client: 

83 await client.update_account_settings({"enforce_ip_allowlist": False}) 

84 

85 exit_with_success("IP allowlist disabled.") 

86 

87 

88@ip_allowlist_app.command() 1a

89async def ls(ctx: typer.Context): 1a

90 """Fetch and list all IP allowlist entries in your account.""" 

91 async with get_cloud_client(infer_cloud_url=True) as client: 

92 ip_allowlist = await client.read_account_ip_allowlist() 

93 

94 _print_ip_allowlist_table( 

95 ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] 

96 ) 

97 

98 

99class IPNetworkArg(BaseModel): 1a

100 raw: str 1a

101 parsed: IPvAnyNetwork 1a

102 

103 

104def parse_ip_network_argument(val: str) -> IPNetworkArg: 1a

105 return IPNetworkArg( 

106 raw=val, 

107 parsed=val, 

108 ) 

109 

110 

111IP_ARGUMENT = Annotated[ 1a

112 IPNetworkArg, 

113 typer.Argument( 

114 parser=parse_ip_network_argument, 

115 help="An IP address or range in CIDR notation. E.g. 192.168.1.0 or 192.168.1.0/24", 

116 metavar="IP address or range", 

117 ), 

118] 

119 

120 

121@ip_allowlist_app.command() 1a

122async def add( 1a

123 ctx: typer.Context, 

124 ip_address_or_range: IP_ARGUMENT, 

125 description: Optional[str] = typer.Option( 

126 None, 

127 "--description", 

128 "-d", 

129 help="A short description to annotate the entry with.", 

130 ), 

131): 

132 """Add a new IP entry to your account IP allowlist.""" 

133 new_entry = IPAllowlistEntry( 

134 ip_network=ip_address_or_range.parsed, description=description, enabled=True 

135 ) 

136 

137 async with get_cloud_client(infer_cloud_url=True) as client: 

138 ip_allowlist = await client.read_account_ip_allowlist() 

139 

140 existing_entry_with_same_ip = None 

141 for entry in ip_allowlist.entries: 

142 if entry.ip_network == ip_address_or_range.parsed: 

143 existing_entry_with_same_ip = entry 

144 break 

145 

146 if existing_entry_with_same_ip: 

147 if not typer.confirm( 

148 f"There's already an entry for this IP ({ip_address_or_range.raw}). Do you want to overwrite it?" 

149 ): 

150 exit_with_error("Aborted.") 

151 ip_allowlist.entries.remove(existing_entry_with_same_ip) 

152 

153 ip_allowlist.entries.append(new_entry) 

154 

155 try: 

156 await client.update_account_ip_allowlist(ip_allowlist) 

157 except PrefectHTTPStatusError as exc: 

158 _handle_update_error(exc) 

159 

160 updated_ip_allowlist = await client.read_account_ip_allowlist() 

161 _print_ip_allowlist_table( 

162 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] 

163 ) 

164 

165 

166@ip_allowlist_app.command() 1a

167async def remove(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): 1a

168 """Remove an IP entry from your account IP allowlist.""" 

169 async with get_cloud_client(infer_cloud_url=True) as client: 

170 ip_allowlist = await client.read_account_ip_allowlist() 

171 ip_allowlist.entries = [ 

172 entry 

173 for entry in ip_allowlist.entries 

174 if entry.ip_network != ip_address_or_range.parsed 

175 ] 

176 

177 try: 

178 await client.update_account_ip_allowlist(ip_allowlist) 

179 except PrefectHTTPStatusError as exc: 

180 _handle_update_error(exc) 

181 

182 updated_ip_allowlist = await client.read_account_ip_allowlist() 

183 _print_ip_allowlist_table( 

184 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] 

185 ) 

186 

187 

188@ip_allowlist_app.command() 1a

189async def toggle(ctx: typer.Context, ip_address_or_range: IP_ARGUMENT): 1a

190 """Toggle the enabled status of an individual IP entry in your account IP allowlist.""" 

191 async with get_cloud_client(infer_cloud_url=True) as client: 

192 ip_allowlist = await client.read_account_ip_allowlist() 

193 

194 found_matching_entry = False 

195 for entry in ip_allowlist.entries: 

196 if entry.ip_network == ip_address_or_range.parsed: 

197 entry.enabled = not entry.enabled 

198 found_matching_entry = True 

199 break 

200 

201 if not found_matching_entry: 

202 exit_with_error( 

203 f"No entry found with IP address `{ip_address_or_range.raw}`." 

204 ) 

205 

206 try: 

207 await client.update_account_ip_allowlist(ip_allowlist) 

208 except PrefectHTTPStatusError as exc: 

209 _handle_update_error(exc) 

210 

211 updated_ip_allowlist = await client.read_account_ip_allowlist() 

212 _print_ip_allowlist_table( 

213 updated_ip_allowlist, enabled=ctx.meta["enforce_ip_allowlist"] 

214 ) 

215 

216 

217def _print_ip_allowlist_table(ip_allowlist: IPAllowlist, enabled: bool): 1a

218 if not ip_allowlist.entries: 

219 app.console.print( 

220 Panel( 

221 "IP allowlist is empty. Add an entry to secure access to your Prefect Cloud account.", 

222 expand=False, 

223 ) 

224 ) 

225 return 

226 

227 red_asterisk_if_not_enabled = "[red]*[/red]" if enabled is False else "" 

228 

229 table = Table( 

230 title="IP Allowlist " + red_asterisk_if_not_enabled, 

231 caption=f"{red_asterisk_if_not_enabled} Enforcement is " 

232 f"[bold]{'ENABLED' if enabled else '[red]DISABLED[/red]'}[/bold].", 

233 caption_style="not dim", 

234 ) 

235 

236 table.add_column("IP Address", style="cyan", no_wrap=True) 

237 table.add_column("Description", style="blue", no_wrap=False) 

238 table.add_column("Enabled", style="green", justify="right", no_wrap=True) 

239 table.add_column("Last Seen", style="magenta", justify="right", no_wrap=True) 

240 

241 for entry in ip_allowlist.entries: 

242 table.add_row( 

243 str(entry.ip_network), 

244 entry.description, 

245 str(entry.enabled), 

246 entry.last_seen or "Never", 

247 style="dim" if not entry.enabled else None, 

248 ) 

249 

250 app.console.print(table) 

251 

252 

253def _handle_update_error(error: PrefectHTTPStatusError): 1a

254 if error.response.status_code == 422 and ( 

255 details := ( 

256 error.response.json().get("detail") 

257 or error.response.json().get("exception_detail") 

258 ) 

259 ): 

260 exit_with_error(f"Error updating allowlist: {details}") 

261 else: 

262 raise error