Coverage for polar/logfire.py: 86%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import os 1ba

2from collections.abc import Callable, Sequence 1ba

3from typing import TYPE_CHECKING, Any, Literal 1ba

4 

5import httpx 1ba

6import logfire 1ba

7from fastapi import FastAPI 1ba

8from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor 1ba

9from opentelemetry.sdk.trace.sampling import ( 1ba

10 ALWAYS_OFF, 

11 ALWAYS_ON, 

12 ParentBased, 

13 Sampler, 

14 SamplingResult, 

15) 

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1ba

18 from opentelemetry.context import Context 

19 from opentelemetry.trace import Link, SpanKind 

20 from opentelemetry.trace.span import TraceState 

21 from opentelemetry.util.types import Attributes 

22 

23from polar.config import settings 1ba

24from polar.kit.db.postgres import Engine 1ba

25 

26Matcher = Callable[[str, "Attributes | None"], bool] 1ba

27 

28 

29class IgnoreSampler(Sampler): 1ba

30 def __init__(self, matchers: Sequence[Matcher]) -> None: 1ba

31 super().__init__() 1a

32 self.matchers = matchers 1a

33 

34 def should_sample( 1ba

35 self, 

36 parent_context: "Context | None", 

37 trace_id: int, 

38 name: str, 

39 kind: "SpanKind | None" = None, 

40 attributes: "Attributes | None" = None, 

41 links: Sequence["Link"] | None = None, 

42 trace_state: "TraceState | None" = None, 

43 ) -> SamplingResult: 

44 sampler = ALWAYS_ON 1dc

45 

46 for matcher in self.matchers: 1dc

47 if matcher(name, attributes): 1dc

48 sampler = ALWAYS_OFF 

49 break 

50 

51 return sampler.should_sample( 1dc

52 parent_context, 

53 trace_id, 

54 name, 

55 kind, 

56 attributes, 

57 links, 

58 trace_state, 

59 ) 

60 

61 def get_description(self) -> str: 1ba

62 return "IgnoreSampler" 

63 

64 

65def _healthz_matcher(name: str, attributes: "Attributes | None") -> bool: 1ba

66 return attributes is not None and attributes.get("http.route") == "/healthz" 1dc

67 

68 

69def _worker_health_matcher(name: str, attributes: "Attributes | None") -> bool: 1ba

70 lower_name = name.lower() 1dc

71 return lower_name.startswith("recording health:") or lower_name.startswith( 1dc

72 "health check successful" 

73 ) 

74 

75 

76def _scrubbing_callback(match: logfire.ScrubMatch) -> Any | None: 1ba

77 # Don't scrub auth subject in log messages 

78 if match.path == ("attributes", "subject"): 78 ↛ 79line 78 didn't jump to line 79 because the condition on line 78 was never true1d

79 return match.value 

80 return None 1d

81 

82 

83def configure_logfire(service_name: Literal["server", "worker"]) -> None: 1ba

84 logfire.configure( 1ac

85 send_to_logfire="if-token-present", 

86 token=settings.LOGFIRE_TOKEN, 

87 service_name=service_name, 

88 service_version=os.environ.get("RELEASE_VERSION", "development"), 

89 console=False, 

90 sampling=logfire.SamplingOptions( 

91 head=ParentBased(IgnoreSampler((_healthz_matcher, _worker_health_matcher))), 

92 ), 

93 scrubbing=logfire.ScrubbingOptions(callback=_scrubbing_callback), 

94 ) 

95 

96 

97def instrument_httpx(client: httpx.AsyncClient | httpx.Client | None = None) -> None: 1ba

98 if client: 1bac

99 HTTPXClientInstrumentor().instrument_client(client) 1ba

100 else: 

101 HTTPXClientInstrumentor().instrument() 

102 

103 

104def instrument_fastapi(app: FastAPI) -> None: 1ba

105 logfire.instrument_fastapi(app, capture_headers=True) 

106 

107 

108def instrument_sqlalchemy(engines: Sequence[Engine]) -> None: 1ba

109 logfire.instrument_sqlalchemy(engines=engines) 

110 

111 

112__all__ = [ 1ba

113 "configure_logfire", 

114 "instrument_fastapi", 

115 "instrument_sqlalchemy", 

116]