Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/urls.py: 13%
114 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import inspect 1a
2import ipaddress 1a
3import socket 1a
4import urllib.parse 1a
5from logging import Logger 1a
6from string import Formatter 1a
7from typing import TYPE_CHECKING, Any, Literal, Optional, Union 1a
8from urllib.parse import urlparse 1a
9from uuid import UUID 1a
11from pydantic import BaseModel 1a
13from prefect import settings 1a
14from prefect.logging.loggers import get_logger 1a
16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true1a
17 from prefect.blocks.core import Block
18 from prefect.events.schemas.automations import Automation
19 from prefect.events.schemas.events import ReceivedEvent, Resource
20 from prefect.futures import PrefectFuture
21 from prefect.variables import Variable
23logger: Logger = get_logger("utilities.urls") 1a
25# The following objects are excluded from UI URL generation because we lack a
26# directly-addressable URL:
27# artifact
28# variable
29# saved-search
30UI_URL_FORMATS = { 1a
31 "flow": "flows/flow/{obj_id}",
32 "flow-run": "runs/flow-run/{obj_id}",
33 "flow-run-response": "runs/flow-run/{obj_id}",
34 "task-run": "runs/task-run/{obj_id}",
35 "block": "blocks/block/{obj_id}",
36 "block-document": "blocks/block/{obj_id}",
37 "work-pool": "work-pools/work-pool/{obj_id}",
38 "work-queue": "work-queues/work-queue/{obj_id}",
39 "concurrency-limit": "concurrency-limits/concurrency-limit/{obj_id}",
40 "deployment": "deployments/deployment/{obj_id}",
41 "automation": "automations/automation/{obj_id}",
42 "received-event": "events/event/{occurred}/{obj_id}",
43 "worker": "work-pools/work-pool/{work_pool_name}/worker/{obj_id}",
44}
46# The following objects are excluded from API URL generation because we lack a
47# directly-addressable URL:
48# worker
49# artifact
50# saved-search
51# received-event
52API_URL_FORMATS = { 1a
53 "flow": "flows/{obj_id}",
54 "flow-run": "flow_runs/{obj_id}",
55 "task-run": "task_runs/{obj_id}",
56 "variable": "variables/name/{obj_id}",
57 "block": "blocks/{obj_id}",
58 "work-pool": "work_pools/{obj_id}",
59 "work-queue": "work_queues/{obj_id}",
60 "concurrency-limit": "concurrency_limits/{obj_id}",
61 "deployment": "deployments/{obj_id}",
62 "automation": "automations/{obj_id}",
63}
65URLType = Literal["ui", "api"] 1a
66RUN_TYPES = {"flow-run", "task-run"} 1a
69def validate_restricted_url(url: str) -> None: 1a
70 """
71 Validate that the provided URL is safe for outbound requests. This prevents
72 attacks like SSRF (Server Side Request Forgery), where an attacker can make
73 requests to internal services (like the GCP metadata service, localhost addresses,
74 or in-cluster Kubernetes services)
76 Args:
77 url: The URL to validate.
79 Raises:
80 ValueError: If the URL is a restricted URL.
81 """
83 try:
84 parsed_url = urlparse(url)
85 except ValueError:
86 raise ValueError(f"{url!r} is not a valid URL.")
88 if parsed_url.scheme not in ("http", "https"):
89 raise ValueError(
90 f"{url!r} is not a valid URL. Only HTTP and HTTPS URLs are allowed."
91 )
93 hostname = parsed_url.hostname or ""
95 # Remove IPv6 brackets if present
96 if hostname.startswith("[") and hostname.endswith("]"):
97 hostname = hostname[1:-1]
99 if not hostname:
100 raise ValueError(f"{url!r} is not a valid URL.")
102 try:
103 ip_address = socket.gethostbyname(hostname)
104 ip = ipaddress.ip_address(ip_address)
105 except socket.gaierror:
106 try:
107 ip = ipaddress.ip_address(hostname)
108 except ValueError:
109 raise ValueError(f"{url!r} is not a valid URL. It could not be resolved.")
111 if ip.is_private:
112 raise ValueError(
113 f"{url!r} is not a valid URL. It resolves to the private address {ip}."
114 )
117def convert_class_to_name(obj: Any) -> str: 1a
118 """
119 Convert CamelCase class name to dash-separated lowercase name
120 """
121 cls = obj if inspect.isclass(obj) else obj.__class__
122 name = cls.__name__
123 return "".join(["-" + i.lower() if i.isupper() else i for i in name]).lstrip("-")
126def url_for( 1a
127 obj: Union[
128 "PrefectFuture[Any]",
129 "Block",
130 "Variable",
131 "Automation",
132 "Resource",
133 "ReceivedEvent",
134 BaseModel,
135 str,
136 ],
137 obj_id: Optional[Union[str, UUID]] = None,
138 url_type: URLType = "ui",
139 default_base_url: Optional[str] = None,
140 **additional_format_kwargs: Any,
141) -> Optional[str]:
142 """
143 Returns the URL for a Prefect object.
145 Pass in a supported object directly or provide an object name and ID.
147 Args:
148 obj (Union[PrefectFuture, Block, Variable, Automation, Resource, ReceivedEvent, BaseModel, str]):
149 A Prefect object to get the URL for, or its URL name and ID.
150 obj_id (Union[str, UUID], optional):
151 The UUID of the object.
152 url_type (Literal["ui", "api"], optional):
153 Whether to return the URL for the UI (default) or API.
154 default_base_url (str, optional):
155 The default base URL to use if no URL is configured.
156 additional_format_kwargs (Dict[str, Any], optional):
157 Additional keyword arguments to pass to the URL format.
159 Returns:
160 Optional[str]: The URL for the given object or None if the object is not supported.
162 Examples:
163 url_for(my_flow_run)
164 url_for(obj=my_flow_run)
165 url_for("flow-run", obj_id="123e4567-e89b-12d3-a456-426614174000")
166 """
167 from prefect.blocks.core import Block
168 from prefect.client.schemas.objects import WorkPool
169 from prefect.events.schemas.automations import Automation
170 from prefect.events.schemas.events import ReceivedEvent, Resource
171 from prefect.futures import PrefectFuture
173 if isinstance(obj, PrefectFuture):
174 name = "task-run"
175 elif isinstance(obj, Block):
176 name = "block"
177 elif isinstance(obj, Automation):
178 name = "automation"
179 elif isinstance(obj, ReceivedEvent):
180 name = "received-event"
181 elif isinstance(obj, Resource):
182 if obj.id.startswith("prefect."):
183 name = obj.id.split(".")[1]
184 else:
185 logger.debug(f"No URL known for resource with ID: {obj.id}")
186 return None
187 elif isinstance(obj, str):
188 name = obj
189 else:
190 name = convert_class_to_name(obj)
192 # Can't do an isinstance check here because the client build
193 # doesn't have access to that server schema.
194 if name == "work-queue-with-status":
195 name = "work-queue"
197 if url_type != "ui" and url_type != "api":
198 raise ValueError(f"Invalid URL type: {url_type}. Use 'ui' or 'api'.")
200 if url_type == "ui" and name not in UI_URL_FORMATS:
201 logger.debug("No UI URL known for this object: %s", name)
202 return None
203 elif url_type == "api" and name not in API_URL_FORMATS:
204 logger.debug("No API URL known for this object: %s", name)
205 return None
207 if isinstance(obj, str) and not obj_id:
208 raise ValueError(
209 "If passing an object name, you must also provide an object ID."
210 )
212 base_url = (
213 settings.PREFECT_UI_URL.value()
214 if url_type == "ui"
215 else settings.PREFECT_API_URL.value()
216 )
217 base_url = base_url or default_base_url
219 if not base_url:
220 logger.debug(
221 f"No URL found for the Prefect {'UI' if url_type == 'ui' else 'API'}, "
222 f"and no default base path provided."
223 )
224 return None
226 if not obj_id:
227 # We treat PrefectFuture as if it was the underlying task run,
228 # so we need to check the object type here instead of name.
229 if isinstance(obj, PrefectFuture):
230 obj_id = getattr(obj, "task_run_id", None)
231 elif name == "block":
232 # Blocks are client-side objects whose API representation is a
233 # BlockDocument.
234 obj_id = getattr(obj, "_block_document_id")
235 elif name in ("variable", "work-pool"):
236 if TYPE_CHECKING:
237 assert isinstance(obj, (Variable, WorkPool))
238 obj_id = obj.name
239 elif isinstance(obj, Resource):
240 obj_id = obj.id.rpartition(".")[2]
241 else:
242 obj_id = getattr(obj, "id", None)
243 if not obj_id:
244 logger.debug(
245 "An ID is required to build a URL, but object did not have one: %s", obj
246 )
247 return ""
249 url_format = (
250 UI_URL_FORMATS.get(name) if url_type == "ui" else API_URL_FORMATS.get(name)
251 )
252 assert url_format is not None
254 if isinstance(obj, ReceivedEvent):
255 url = url_format.format(
256 occurred=obj.occurred.strftime("%Y-%m-%d"), obj_id=obj_id
257 )
258 else:
259 obj_keys = [
260 fname
261 for _, fname, _, _ in Formatter().parse(url_format)
262 if fname is not None and fname != "obj_id"
263 ]
265 if not all(key in additional_format_kwargs for key in obj_keys):
266 raise ValueError(
267 f"Unable to generate URL for {name} because the following keys are missing: {', '.join(obj_keys)}"
268 )
270 url = url_format.format(obj_id=obj_id, **additional_format_kwargs)
272 if not base_url.endswith("/"):
273 base_url += "/"
274 return urllib.parse.urljoin(base_url, url)