Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/websockets.py: 21%
37 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Internal WebSocket proxy utilities for Prefect client connections.
4This module provides shared WebSocket proxy connection logic and SSL configuration
5to avoid duplication between events and logs clients.
6"""
8import ssl 1a
9import warnings 1a
10from functools import wraps 1a
11from typing import Any, Optional 1a
12from urllib.parse import urlparse 1a
14import certifi 1a
15from websockets.asyncio.client import connect 1a
17from prefect.settings import get_current_settings 1a
20def create_ssl_context_for_websocket(uri: str) -> Optional[ssl.SSLContext]: 1a
21 """Create SSL context for WebSocket connections based on URI scheme."""
22 u = urlparse(uri)
24 if u.scheme != "wss":
25 return None
27 if get_current_settings().api.tls_insecure_skip_verify:
28 # Create an unverified context for insecure connections
29 ctx = ssl.create_default_context()
30 ctx.check_hostname = False
31 ctx.verify_mode = ssl.CERT_NONE
32 return ctx
33 else:
34 # Create a verified context with the certificate file
35 cert_file = get_current_settings().api.ssl_cert_file
36 if not cert_file:
37 cert_file = certifi.where()
38 return ssl.create_default_context(cafile=cert_file)
41@wraps(connect) 1a
42def websocket_connect(uri: str, **kwargs: Any) -> connect: 1a
43 """
44 Create a WebSocket connection with proxy and SSL support.
46 Proxy support is automatic via HTTP_PROXY/HTTPS_PROXY environment variables.
47 The websockets library handles proxy detection and connection automatically.
48 """
49 # Configure SSL context for HTTPS connections
50 ssl_context = create_ssl_context_for_websocket(uri)
51 if ssl_context:
52 kwargs.setdefault("ssl", ssl_context)
54 # Add custom headers from settings
55 custom_headers = get_current_settings().client.custom_headers
56 if custom_headers:
57 # Get existing additional_headers or create new dict
58 additional_headers = kwargs.get("additional_headers", {})
59 if not isinstance(additional_headers, dict):
60 additional_headers = {}
62 for header_name, header_value in custom_headers.items():
63 # Check for protected headers that shouldn't be overridden
64 if header_name.lower() in {
65 "user-agent",
66 "sec-websocket-key",
67 "sec-websocket-version",
68 "sec-websocket-extensions",
69 "sec-websocket-protocol",
70 "connection",
71 "upgrade",
72 "host",
73 }:
74 warnings.warn(
75 f"Custom header '{header_name}' is ignored because it conflicts with "
76 f"a protected WebSocket header. Protected headers include: "
77 f"User-Agent, Sec-WebSocket-Key, Sec-WebSocket-Version, "
78 f"Sec-WebSocket-Extensions, Sec-WebSocket-Protocol, Connection, "
79 f"Upgrade, Host",
80 UserWarning,
81 stacklevel=2,
82 )
83 else:
84 additional_headers[header_name] = header_value
86 kwargs["additional_headers"] = additional_headers
88 return connect(uri, **kwargs)