Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/telemetry.py: 69%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2The Telemetry service. 

3""" 

4 

5import asyncio 1a

6import os 1a

7import platform 1a

8from typing import Any, Optional 1a

9from uuid import uuid4 1a

10 

11import httpx 1a

12 

13import prefect 1a

14from prefect.server.database import PrefectDBInterface 1a

15from prefect.server.database.dependencies import db_injector 1a

16from prefect.server.models import configuration 1a

17from prefect.server.schemas.core import Configuration 1a

18from prefect.server.services.base import ( 1a

19 LoopService, 

20 RunInEphemeralServers, 

21 RunInWebservers, 

22) 

23from prefect.settings import PREFECT_DEBUG_MODE 1a

24from prefect.settings.context import get_current_settings 1a

25from prefect.settings.models.server.services import ServicesBaseSetting 1a

26from prefect.types._datetime import now 1a

27 

28 

29class Telemetry(RunInEphemeralServers, RunInWebservers, LoopService): 1a

30 """ 

31 Sends anonymous data to Prefect to help us improve 

32 

33 It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting. 

34 """ 

35 

36 loop_seconds: float = 600 1a

37 

38 @classmethod 1a

39 def service_settings(cls) -> ServicesBaseSetting: 1a

40 raise NotImplementedError("Telemetry service does not have settings") 

41 

42 @classmethod 1a

43 def environment_variable_name(cls) -> str: 1a

44 return "PREFECT_SERVER_ANALYTICS_ENABLED" 

45 

46 @classmethod 1a

47 def enabled(cls) -> bool: 1a

48 return get_current_settings().server.analytics_enabled 1c

49 

50 def __init__(self, loop_seconds: Optional[int] = None, **kwargs: Any): 1a

51 super().__init__(loop_seconds=loop_seconds, **kwargs) 1c

52 self.telemetry_environment: str = os.environ.get( 1c

53 "PREFECT_API_TELEMETRY_ENVIRONMENT", "production" 

54 ) 

55 

56 @db_injector 1a

57 async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface): 1a

58 """ 

59 This method looks for a telemetry session in the configuration table. If there 

60 isn't one, it sets one. It then sets `self.session_id` and 

61 `self.session_start_timestamp`. 

62 

63 Telemetry sessions last until the database is reset. 

64 """ 

65 async with db.session_context(begin_transaction=True) as session: 1cb

66 telemetry_session = await configuration.read_configuration( 1cb

67 session, "TELEMETRY_SESSION" 

68 ) 

69 

70 if telemetry_session is None: 

71 self.logger.debug("No telemetry session found, setting") 

72 session_id = str(uuid4()) 

73 session_start_timestamp = now("UTC").isoformat() 

74 

75 telemetry_session = Configuration( 

76 key="TELEMETRY_SESSION", 

77 value={ 

78 "session_id": session_id, 

79 "session_start_timestamp": session_start_timestamp, 

80 }, 

81 ) 

82 

83 await configuration.write_configuration(session, telemetry_session) 1b

84 

85 self.session_id = session_id 

86 self.session_start_timestamp = session_start_timestamp 

87 else: 

88 self.logger.debug("Session information retrieved from database") 

89 self.session_id: str = telemetry_session.value["session_id"] 

90 self.session_start_timestamp: str = telemetry_session.value[ 

91 "session_start_timestamp" 

92 ] 

93 self.logger.debug( 1b

94 f"Telemetry Session: {self.session_id}, {self.session_start_timestamp}" 

95 ) 

96 return (self.session_start_timestamp, self.session_id) 1b

97 

98 async def run_once(self) -> None: 1a

99 """ 

100 Sends a heartbeat to the sens-o-matic 

101 """ 

102 from prefect.client.constants import SERVER_API_VERSION 1c

103 

104 if not hasattr(self, "session_id"): 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was always true1c

105 await self._fetch_or_set_telemetry_session() 1cb

106 

107 heartbeat = { 1b

108 "source": "prefect_server", 

109 "type": "heartbeat", 

110 "payload": { 

111 "platform": platform.system(), 

112 "architecture": platform.machine(), 

113 "python_version": platform.python_version(), 

114 "python_implementation": platform.python_implementation(), 

115 "environment": self.telemetry_environment, 

116 "ephemeral_server": bool(os.getenv("PREFECT__SERVER_EPHEMERAL", False)), 

117 "api_version": SERVER_API_VERSION, 

118 "prefect_version": prefect.__version__, 

119 "session_id": self.session_id, 

120 "session_start_timestamp": self.session_start_timestamp, 

121 }, 

122 } 

123 

124 try: 1b

125 async with httpx.AsyncClient() as client: 1bd

126 result = await client.post( 

127 "https://sens-o-matic.prefect.io/", 

128 json=heartbeat, 

129 headers={"x-prefect-event": "prefect_server"}, 

130 ) 

131 result.raise_for_status() 

132 except Exception as exc: 

133 self.logger.error( 

134 f"Failed to send telemetry: {exc}\nShutting down telemetry service...", 

135 # The traceback is only needed if doing deeper debugging, otherwise 

136 # this looks like an impactful server error 

137 exc_info=PREFECT_DEBUG_MODE.value(), 

138 ) 

139 await self.stop(block=False) 

140 

141 

142if __name__ == "__main__": 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true1a

143 asyncio.run(Telemetry(handle_signals=True).start())