Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/websockets.py: 21%

37 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Internal WebSocket proxy utilities for Prefect client connections. 

3 

4This module provides shared WebSocket proxy connection logic and SSL configuration 

5to avoid duplication between events and logs clients. 

6""" 

7 

8import ssl 1a

9import warnings 1a

10from functools import wraps 1a

11from typing import Any, Optional 1a

12from urllib.parse import urlparse 1a

13 

14import certifi 1a

15from websockets.asyncio.client import connect 1a

16 

17from prefect.settings import get_current_settings 1a

18 

19 

20def create_ssl_context_for_websocket(uri: str) -> Optional[ssl.SSLContext]: 1a

21 """Create SSL context for WebSocket connections based on URI scheme.""" 

22 u = urlparse(uri) 

23 

24 if u.scheme != "wss": 

25 return None 

26 

27 if get_current_settings().api.tls_insecure_skip_verify: 

28 # Create an unverified context for insecure connections 

29 ctx = ssl.create_default_context() 

30 ctx.check_hostname = False 

31 ctx.verify_mode = ssl.CERT_NONE 

32 return ctx 

33 else: 

34 # Create a verified context with the certificate file 

35 cert_file = get_current_settings().api.ssl_cert_file 

36 if not cert_file: 

37 cert_file = certifi.where() 

38 return ssl.create_default_context(cafile=cert_file) 

39 

40 

41@wraps(connect) 1a

42def websocket_connect(uri: str, **kwargs: Any) -> connect: 1a

43 """ 

44 Create a WebSocket connection with proxy and SSL support. 

45 

46 Proxy support is automatic via HTTP_PROXY/HTTPS_PROXY environment variables. 

47 The websockets library handles proxy detection and connection automatically. 

48 """ 

49 # Configure SSL context for HTTPS connections 

50 ssl_context = create_ssl_context_for_websocket(uri) 

51 if ssl_context: 

52 kwargs.setdefault("ssl", ssl_context) 

53 

54 # Add custom headers from settings 

55 custom_headers = get_current_settings().client.custom_headers 

56 if custom_headers: 

57 # Get existing additional_headers or create new dict 

58 additional_headers = kwargs.get("additional_headers", {}) 

59 if not isinstance(additional_headers, dict): 

60 additional_headers = {} 

61 

62 for header_name, header_value in custom_headers.items(): 

63 # Check for protected headers that shouldn't be overridden 

64 if header_name.lower() in { 

65 "user-agent", 

66 "sec-websocket-key", 

67 "sec-websocket-version", 

68 "sec-websocket-extensions", 

69 "sec-websocket-protocol", 

70 "connection", 

71 "upgrade", 

72 "host", 

73 }: 

74 warnings.warn( 

75 f"Custom header '{header_name}' is ignored because it conflicts with " 

76 f"a protected WebSocket header. Protected headers include: " 

77 f"User-Agent, Sec-WebSocket-Key, Sec-WebSocket-Version, " 

78 f"Sec-WebSocket-Extensions, Sec-WebSocket-Protocol, Connection, " 

79 f"Upgrade, Host", 

80 UserWarning, 

81 stacklevel=2, 

82 ) 

83 else: 

84 additional_headers[header_name] = header_value 

85 

86 kwargs["additional_headers"] = additional_headers 

87 

88 return connect(uri, **kwargs)