Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/telemetry.py: 69%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2The Telemetry service.
3"""
5import asyncio 1a
6import os 1a
7import platform 1a
8from typing import Any, Optional 1a
9from uuid import uuid4 1a
11import httpx 1a
13import prefect 1a
14from prefect.server.database import PrefectDBInterface 1a
15from prefect.server.database.dependencies import db_injector 1a
16from prefect.server.models import configuration 1a
17from prefect.server.schemas.core import Configuration 1a
18from prefect.server.services.base import ( 1a
19 LoopService,
20 RunInEphemeralServers,
21 RunInWebservers,
22)
23from prefect.settings import PREFECT_DEBUG_MODE 1a
24from prefect.settings.context import get_current_settings 1a
25from prefect.settings.models.server.services import ServicesBaseSetting 1a
26from prefect.types._datetime import now 1a
29class Telemetry(RunInEphemeralServers, RunInWebservers, LoopService): 1a
30 """
31 Sends anonymous data to Prefect to help us improve
33 It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.
34 """
36 loop_seconds: float = 600 1a
38 @classmethod 1a
39 def service_settings(cls) -> ServicesBaseSetting: 1a
40 raise NotImplementedError("Telemetry service does not have settings")
42 @classmethod 1a
43 def environment_variable_name(cls) -> str: 1a
44 return "PREFECT_SERVER_ANALYTICS_ENABLED"
46 @classmethod 1a
47 def enabled(cls) -> bool: 1a
48 return get_current_settings().server.analytics_enabled 1c
50 def __init__(self, loop_seconds: Optional[int] = None, **kwargs: Any): 1a
51 super().__init__(loop_seconds=loop_seconds, **kwargs) 1c
52 self.telemetry_environment: str = os.environ.get( 1c
53 "PREFECT_API_TELEMETRY_ENVIRONMENT", "production"
54 )
56 @db_injector 1a
57 async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface): 1a
58 """
59 This method looks for a telemetry session in the configuration table. If there
60 isn't one, it sets one. It then sets `self.session_id` and
61 `self.session_start_timestamp`.
63 Telemetry sessions last until the database is reset.
64 """
65 async with db.session_context(begin_transaction=True) as session: 1cb
66 telemetry_session = await configuration.read_configuration( 1cb
67 session, "TELEMETRY_SESSION"
68 )
70 if telemetry_session is None:
71 self.logger.debug("No telemetry session found, setting")
72 session_id = str(uuid4())
73 session_start_timestamp = now("UTC").isoformat()
75 telemetry_session = Configuration(
76 key="TELEMETRY_SESSION",
77 value={
78 "session_id": session_id,
79 "session_start_timestamp": session_start_timestamp,
80 },
81 )
83 await configuration.write_configuration(session, telemetry_session) 1b
85 self.session_id = session_id
86 self.session_start_timestamp = session_start_timestamp
87 else:
88 self.logger.debug("Session information retrieved from database")
89 self.session_id: str = telemetry_session.value["session_id"]
90 self.session_start_timestamp: str = telemetry_session.value[
91 "session_start_timestamp"
92 ]
93 self.logger.debug( 1b
94 f"Telemetry Session: {self.session_id}, {self.session_start_timestamp}"
95 )
96 return (self.session_start_timestamp, self.session_id) 1b
98 async def run_once(self) -> None: 1a
99 """
100 Sends a heartbeat to the sens-o-matic
101 """
102 from prefect.client.constants import SERVER_API_VERSION 1c
104 if not hasattr(self, "session_id"): 104 ↛ 107line 104 didn't jump to line 107 because the condition on line 104 was always true1c
105 await self._fetch_or_set_telemetry_session() 1cb
107 heartbeat = { 1b
108 "source": "prefect_server",
109 "type": "heartbeat",
110 "payload": {
111 "platform": platform.system(),
112 "architecture": platform.machine(),
113 "python_version": platform.python_version(),
114 "python_implementation": platform.python_implementation(),
115 "environment": self.telemetry_environment,
116 "ephemeral_server": bool(os.getenv("PREFECT__SERVER_EPHEMERAL", False)),
117 "api_version": SERVER_API_VERSION,
118 "prefect_version": prefect.__version__,
119 "session_id": self.session_id,
120 "session_start_timestamp": self.session_start_timestamp,
121 },
122 }
124 try: 1b
125 async with httpx.AsyncClient() as client: 1bd
126 result = await client.post(
127 "https://sens-o-matic.prefect.io/",
128 json=heartbeat,
129 headers={"x-prefect-event": "prefect_server"},
130 )
131 result.raise_for_status()
132 except Exception as exc:
133 self.logger.error(
134 f"Failed to send telemetry: {exc}\nShutting down telemetry service...",
135 # The traceback is only needed if doing deeper debugging, otherwise
136 # this looks like an impactful server error
137 exc_info=PREFECT_DEBUG_MODE.value(),
138 )
139 await self.stop(block=False)
142if __name__ == "__main__": 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true1a
143 asyncio.run(Telemetry(handle_signals=True).start())