Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/urls.py: 13%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import inspect 1a

2import ipaddress 1a

3import socket 1a

4import urllib.parse 1a

5from logging import Logger 1a

6from string import Formatter 1a

7from typing import TYPE_CHECKING, Any, Literal, Optional, Union 1a

8from urllib.parse import urlparse 1a

9from uuid import UUID 1a

10 

11from pydantic import BaseModel 1a

12 

13from prefect import settings 1a

14from prefect.logging.loggers import get_logger 1a

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true1a

17 from prefect.blocks.core import Block 

18 from prefect.events.schemas.automations import Automation 

19 from prefect.events.schemas.events import ReceivedEvent, Resource 

20 from prefect.futures import PrefectFuture 

21 from prefect.variables import Variable 

22 

23logger: Logger = get_logger("utilities.urls") 1a

24 

25# The following objects are excluded from UI URL generation because we lack a 

26# directly-addressable URL: 

27# artifact 

28# variable 

29# saved-search 

30UI_URL_FORMATS = { 1a

31 "flow": "flows/flow/{obj_id}", 

32 "flow-run": "runs/flow-run/{obj_id}", 

33 "flow-run-response": "runs/flow-run/{obj_id}", 

34 "task-run": "runs/task-run/{obj_id}", 

35 "block": "blocks/block/{obj_id}", 

36 "block-document": "blocks/block/{obj_id}", 

37 "work-pool": "work-pools/work-pool/{obj_id}", 

38 "work-queue": "work-queues/work-queue/{obj_id}", 

39 "concurrency-limit": "concurrency-limits/concurrency-limit/{obj_id}", 

40 "deployment": "deployments/deployment/{obj_id}", 

41 "automation": "automations/automation/{obj_id}", 

42 "received-event": "events/event/{occurred}/{obj_id}", 

43 "worker": "work-pools/work-pool/{work_pool_name}/worker/{obj_id}", 

44} 

45 

46# The following objects are excluded from API URL generation because we lack a 

47# directly-addressable URL: 

48# worker 

49# artifact 

50# saved-search 

51# received-event 

52API_URL_FORMATS = { 1a

53 "flow": "flows/{obj_id}", 

54 "flow-run": "flow_runs/{obj_id}", 

55 "task-run": "task_runs/{obj_id}", 

56 "variable": "variables/name/{obj_id}", 

57 "block": "blocks/{obj_id}", 

58 "work-pool": "work_pools/{obj_id}", 

59 "work-queue": "work_queues/{obj_id}", 

60 "concurrency-limit": "concurrency_limits/{obj_id}", 

61 "deployment": "deployments/{obj_id}", 

62 "automation": "automations/{obj_id}", 

63} 

64 

65URLType = Literal["ui", "api"] 1a

66RUN_TYPES = {"flow-run", "task-run"} 1a

67 

68 

69def validate_restricted_url(url: str) -> None: 1a

70 """ 

71 Validate that the provided URL is safe for outbound requests. This prevents 

72 attacks like SSRF (Server Side Request Forgery), where an attacker can make 

73 requests to internal services (like the GCP metadata service, localhost addresses, 

74 or in-cluster Kubernetes services) 

75 

76 Args: 

77 url: The URL to validate. 

78 

79 Raises: 

80 ValueError: If the URL is a restricted URL. 

81 """ 

82 

83 try: 

84 parsed_url = urlparse(url) 

85 except ValueError: 

86 raise ValueError(f"{url!r} is not a valid URL.") 

87 

88 if parsed_url.scheme not in ("http", "https"): 

89 raise ValueError( 

90 f"{url!r} is not a valid URL. Only HTTP and HTTPS URLs are allowed." 

91 ) 

92 

93 hostname = parsed_url.hostname or "" 

94 

95 # Remove IPv6 brackets if present 

96 if hostname.startswith("[") and hostname.endswith("]"): 

97 hostname = hostname[1:-1] 

98 

99 if not hostname: 

100 raise ValueError(f"{url!r} is not a valid URL.") 

101 

102 try: 

103 ip_address = socket.gethostbyname(hostname) 

104 ip = ipaddress.ip_address(ip_address) 

105 except socket.gaierror: 

106 try: 

107 ip = ipaddress.ip_address(hostname) 

108 except ValueError: 

109 raise ValueError(f"{url!r} is not a valid URL. It could not be resolved.") 

110 

111 if ip.is_private: 

112 raise ValueError( 

113 f"{url!r} is not a valid URL. It resolves to the private address {ip}." 

114 ) 

115 

116 

117def convert_class_to_name(obj: Any) -> str: 1a

118 """ 

119 Convert CamelCase class name to dash-separated lowercase name 

120 """ 

121 cls = obj if inspect.isclass(obj) else obj.__class__ 

122 name = cls.__name__ 

123 return "".join(["-" + i.lower() if i.isupper() else i for i in name]).lstrip("-") 

124 

125 

126def url_for( 1a

127 obj: Union[ 

128 "PrefectFuture[Any]", 

129 "Block", 

130 "Variable", 

131 "Automation", 

132 "Resource", 

133 "ReceivedEvent", 

134 BaseModel, 

135 str, 

136 ], 

137 obj_id: Optional[Union[str, UUID]] = None, 

138 url_type: URLType = "ui", 

139 default_base_url: Optional[str] = None, 

140 **additional_format_kwargs: Any, 

141) -> Optional[str]: 

142 """ 

143 Returns the URL for a Prefect object. 

144 

145 Pass in a supported object directly or provide an object name and ID. 

146 

147 Args: 

148 obj (Union[PrefectFuture, Block, Variable, Automation, Resource, ReceivedEvent, BaseModel, str]): 

149 A Prefect object to get the URL for, or its URL name and ID. 

150 obj_id (Union[str, UUID], optional): 

151 The UUID of the object. 

152 url_type (Literal["ui", "api"], optional): 

153 Whether to return the URL for the UI (default) or API. 

154 default_base_url (str, optional): 

155 The default base URL to use if no URL is configured. 

156 additional_format_kwargs (Dict[str, Any], optional): 

157 Additional keyword arguments to pass to the URL format. 

158 

159 Returns: 

160 Optional[str]: The URL for the given object or None if the object is not supported. 

161 

162 Examples: 

163 url_for(my_flow_run) 

164 url_for(obj=my_flow_run) 

165 url_for("flow-run", obj_id="123e4567-e89b-12d3-a456-426614174000") 

166 """ 

167 from prefect.blocks.core import Block 

168 from prefect.client.schemas.objects import WorkPool 

169 from prefect.events.schemas.automations import Automation 

170 from prefect.events.schemas.events import ReceivedEvent, Resource 

171 from prefect.futures import PrefectFuture 

172 

173 if isinstance(obj, PrefectFuture): 

174 name = "task-run" 

175 elif isinstance(obj, Block): 

176 name = "block" 

177 elif isinstance(obj, Automation): 

178 name = "automation" 

179 elif isinstance(obj, ReceivedEvent): 

180 name = "received-event" 

181 elif isinstance(obj, Resource): 

182 if obj.id.startswith("prefect."): 

183 name = obj.id.split(".")[1] 

184 else: 

185 logger.debug(f"No URL known for resource with ID: {obj.id}") 

186 return None 

187 elif isinstance(obj, str): 

188 name = obj 

189 else: 

190 name = convert_class_to_name(obj) 

191 

192 # Can't do an isinstance check here because the client build 

193 # doesn't have access to that server schema. 

194 if name == "work-queue-with-status": 

195 name = "work-queue" 

196 

197 if url_type != "ui" and url_type != "api": 

198 raise ValueError(f"Invalid URL type: {url_type}. Use 'ui' or 'api'.") 

199 

200 if url_type == "ui" and name not in UI_URL_FORMATS: 

201 logger.debug("No UI URL known for this object: %s", name) 

202 return None 

203 elif url_type == "api" and name not in API_URL_FORMATS: 

204 logger.debug("No API URL known for this object: %s", name) 

205 return None 

206 

207 if isinstance(obj, str) and not obj_id: 

208 raise ValueError( 

209 "If passing an object name, you must also provide an object ID." 

210 ) 

211 

212 base_url = ( 

213 settings.PREFECT_UI_URL.value() 

214 if url_type == "ui" 

215 else settings.PREFECT_API_URL.value() 

216 ) 

217 base_url = base_url or default_base_url 

218 

219 if not base_url: 

220 logger.debug( 

221 f"No URL found for the Prefect {'UI' if url_type == 'ui' else 'API'}, " 

222 f"and no default base path provided." 

223 ) 

224 return None 

225 

226 if not obj_id: 

227 # We treat PrefectFuture as if it was the underlying task run, 

228 # so we need to check the object type here instead of name. 

229 if isinstance(obj, PrefectFuture): 

230 obj_id = getattr(obj, "task_run_id", None) 

231 elif name == "block": 

232 # Blocks are client-side objects whose API representation is a 

233 # BlockDocument. 

234 obj_id = getattr(obj, "_block_document_id") 

235 elif name in ("variable", "work-pool"): 

236 if TYPE_CHECKING: 

237 assert isinstance(obj, (Variable, WorkPool)) 

238 obj_id = obj.name 

239 elif isinstance(obj, Resource): 

240 obj_id = obj.id.rpartition(".")[2] 

241 else: 

242 obj_id = getattr(obj, "id", None) 

243 if not obj_id: 

244 logger.debug( 

245 "An ID is required to build a URL, but object did not have one: %s", obj 

246 ) 

247 return "" 

248 

249 url_format = ( 

250 UI_URL_FORMATS.get(name) if url_type == "ui" else API_URL_FORMATS.get(name) 

251 ) 

252 assert url_format is not None 

253 

254 if isinstance(obj, ReceivedEvent): 

255 url = url_format.format( 

256 occurred=obj.occurred.strftime("%Y-%m-%d"), obj_id=obj_id 

257 ) 

258 else: 

259 obj_keys = [ 

260 fname 

261 for _, fname, _, _ in Formatter().parse(url_format) 

262 if fname is not None and fname != "obj_id" 

263 ] 

264 

265 if not all(key in additional_format_kwargs for key in obj_keys): 

266 raise ValueError( 

267 f"Unable to generate URL for {name} because the following keys are missing: {', '.join(obj_keys)}" 

268 ) 

269 

270 url = url_format.format(obj_id=obj_id, **additional_format_kwargs) 

271 

272 if not base_url.endswith("/"): 

273 base_url += "/" 

274 return urllib.parse.urljoin(base_url, url)