Coverage for opt/mealie/lib/python3.12/site-packages/mealie/pkgs/safehttp/transport.py: 64%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import ipaddress 1b

2import logging 1b

3import socket 1b

4 

5import httpx 1b

6 

7 

8class ForcedTimeoutException(Exception): 1b

9 """ 

10 Raised when a request takes longer than the timeout value. 

11 """ 

12 

13 ... 1b

14 

15 

16class InvalidDomainError(Exception): 1b

17 """ 

18 Raised when a request is made to a local IP address. 

19 """ 

20 

21 ... 1b

22 

23 

24class AsyncSafeTransport(httpx.AsyncBaseTransport): 1b

25 """ 

26 A wrapper around the httpx transport class that enforces a timeout value 

27 and that the request is not made to a local IP address. 

28 """ 

29 

30 timeout: int = 15 1b

31 

32 def __init__(self, log: logging.Logger | None = None, **kwargs): 1b

33 self.timeout = kwargs.pop("timeout", self.timeout) 

34 self._wrapper = httpx.AsyncHTTPTransport(**kwargs) 

35 self._log = log 

36 

37 async def handle_async_request(self, request) -> httpx.Response: 1b

38 # override timeout value for _all_ requests 

39 request.extensions["timeout"] = httpx.Timeout(self.timeout, pool=self.timeout).as_dict() 

40 

41 # validate the request is not attempting to connect to a local IP 

42 # This is a security measure to prevent SSRF attacks 

43 

44 ip: ipaddress.IPv4Address | ipaddress.IPv6Address | None = None 

45 

46 netloc = request.url.netloc.decode() 

47 if ":" in netloc: # Either an IP, or a hostname:port combo 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true

48 netloc_parts = netloc.split(":") 

49 

50 netloc = netloc_parts[0] 

51 

52 try: 

53 ip = ipaddress.ip_address(netloc) 

54 except ValueError: 

55 if self._log: 

56 self._log.debug(f"failed to parse ip for {netloc=} falling back to domain resolution") 

57 pass 

58 

59 # Request is a domain or a hostname. 

60 if not ip: 60 ↛ 70line 60 didn't jump to line 70 because the condition on line 60 was always true

61 if self._log: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true

62 self._log.debug(f"resolving IP for domain: {netloc}") 

63 

64 ip_str = socket.gethostbyname(netloc) 

65 ip = ipaddress.ip_address(ip_str) 

66 

67 if self._log: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true

68 self._log.debug(f"resolved IP for domain: {netloc} -> {ip}") 

69 

70 if ip.is_private: 70 ↛ 75line 70 didn't jump to line 75 because the condition on line 70 was always true

71 if self._log: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 self._log.warning(f"invalid request on local resource: {request.url} -> {ip}") 

73 raise InvalidDomainError(f"invalid request on local resource: {request.url} -> {ip}") 

74 

75 return await self._wrapper.handle_async_request(request) 

76 

77 async def aclose(self): 1b

78 await self._wrapper.aclose()