Coverage for /usr/local/lib/python3.12/site-packages/prefect/telemetry/run_telemetry.py: 29%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import time 1a

4from dataclasses import dataclass, field 1a

5from typing import TYPE_CHECKING, Any, Union 1a

6 

7from opentelemetry import propagate, trace 1a

8from opentelemetry.context import Context 1a

9from opentelemetry.propagators.textmap import Setter 1a

10from opentelemetry.trace import ( 1a

11 Span, 

12 Status, 

13 StatusCode, 

14 get_tracer, 

15) 

16from typing_extensions import TypeAlias 1a

17 

18import prefect 1a

19import prefect.settings 1a

20from prefect.client.orchestration import PrefectClient, SyncPrefectClient 1a

21from prefect.client.schemas import FlowRun, TaskRun 1a

22from prefect.client.schemas.objects import State 1a

23from prefect.context import FlowRunContext, TaskRunContext 1a

24from prefect.types import KeyValueLabels 1a

25 

26if TYPE_CHECKING: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true1a

27 from opentelemetry.trace import Tracer 

28 

29LABELS_TRACEPARENT_KEY = "__OTEL_TRACEPARENT" 1a

30TRACEPARENT_KEY = "traceparent" 1a

31 

32FlowOrTaskRun: TypeAlias = Union[FlowRun, TaskRun] 1a

33 

34 

35class OTELSetter(Setter[KeyValueLabels]): 1a

36 """ 

37 A setter for OpenTelemetry that supports Prefect's custom labels. 

38 """ 

39 

40 def set(self, carrier: KeyValueLabels, key: str, value: str) -> None: 1a

41 carrier[key] = value 

42 

43 

44@dataclass 1a

45class RunTelemetry: 1a

46 """ 

47 A class for managing the telemetry of runs. 

48 """ 

49 

50 _tracer: "Tracer" = field( 1a

51 default_factory=lambda: get_tracer("prefect", prefect.__version__) 

52 ) 

53 span: Span | None = None 1a

54 _enabled: bool = field( 1a

55 default_factory=lambda: prefect.settings.get_current_settings().cloud.enable_orchestration_telemetry 

56 ) 

57 

58 async def async_start_span( 1a

59 self, 

60 run: FlowOrTaskRun, 

61 client: PrefectClient, 

62 parameters: dict[str, Any] | None = None, 

63 ) -> Span | None: 

64 if not self._enabled: 

65 return None 

66 traceparent, span = self._start_span(run, parameters) 

67 if self._run_type(run) == "flow" and traceparent: 

68 # Only explicitly update labels if the run is a flow as task runs 

69 # are updated via events. 

70 await client.update_flow_run_labels( 

71 run.id, {LABELS_TRACEPARENT_KEY: traceparent} 

72 ) 

73 

74 return span 

75 

76 def start_span( 1a

77 self, 

78 run: FlowOrTaskRun, 

79 client: SyncPrefectClient, 

80 parameters: dict[str, Any] | None = None, 

81 ) -> Span | None: 

82 if not self._enabled: 

83 return None 

84 

85 traceparent, span = self._start_span(run, parameters) 

86 

87 if self._run_type(run) == "flow" and traceparent: 

88 # Only explicitly update labels if the run is a flow as task runs 

89 # are updated via events. 

90 client.update_flow_run_labels(run.id, {LABELS_TRACEPARENT_KEY: traceparent}) 

91 

92 return span 

93 

94 def _start_span( 1a

95 self, 

96 run: FlowOrTaskRun, 

97 parameters: dict[str, Any] | None = None, 

98 ) -> tuple[str | None, Span]: 

99 """ 

100 Start a span for a run. 

101 """ 

102 if parameters is None: 

103 parameters = {} 

104 

105 parameter_attributes = { 

106 f"prefect.run.parameter.{k}": type(v).__name__ 

107 for k, v in parameters.items() 

108 } 

109 

110 # Use existing trace context if this run already has one (e.g., from 

111 # server operations like Late), otherwise use parent's trace context if 

112 # available (e.g., nested flow / task runs). If neither exists, this 

113 # will be a root span (e.g., a top-level flow run). 

114 if LABELS_TRACEPARENT_KEY in run.labels: 

115 context = self._trace_context_from_labels(run.labels) 

116 else: 

117 parent_run = self._parent_run() 

118 parent_labels = parent_run.labels if parent_run else {} 

119 if LABELS_TRACEPARENT_KEY in parent_labels: 

120 context = self._trace_context_from_labels(parent_labels) 

121 else: 

122 context = None 

123 

124 run_type = self._run_type(run) 

125 

126 self.span = self._tracer.start_span( 

127 name=run.name, 

128 context=context, 

129 attributes={ 

130 "prefect.run.name": run.name, 

131 "prefect.run.type": run_type, 

132 "prefect.run.id": str(run.id), 

133 "prefect.tags": run.tags, 

134 **parameter_attributes, 

135 **{ 

136 key: value 

137 for key, value in run.labels.items() 

138 if not key.startswith("__") # exclude internal labels 

139 }, 

140 }, 

141 ) 

142 

143 if traceparent := RunTelemetry.traceparent_from_span(self.span): 

144 run.labels[LABELS_TRACEPARENT_KEY] = traceparent 

145 

146 return traceparent, self.span 

147 

148 def _run_type(self, run: FlowOrTaskRun) -> str: 1a

149 return "task" if isinstance(run, TaskRun) else "flow" 

150 

151 def _trace_context_from_labels( 1a

152 self, labels: KeyValueLabels | None 

153 ) -> Context | None: 

154 """Get trace context from run labels if it exists.""" 

155 if not labels or LABELS_TRACEPARENT_KEY not in labels: 

156 return None 

157 traceparent = labels[LABELS_TRACEPARENT_KEY] 

158 carrier = {TRACEPARENT_KEY: traceparent} 

159 return propagate.extract(carrier) 

160 

161 @staticmethod 1a

162 def traceparent_from_span(span: Span) -> str | None: 1a

163 carrier: dict[str, Any] = {} 

164 propagate.inject(carrier, context=trace.set_span_in_context(span)) 

165 return carrier.get(TRACEPARENT_KEY) 

166 

167 def end_span_on_success(self) -> None: 1a

168 """ 

169 End a span for a run on success. 

170 """ 

171 if self.span: 

172 self.span.set_status(Status(StatusCode.OK)) 

173 self.span.end(time.time_ns()) 

174 self.span = None 

175 

176 def end_span_on_failure(self, terminal_message: str | None = None) -> None: 1a

177 """ 

178 End a span for a run on failure. 

179 """ 

180 if self.span: 

181 self.span.set_status( 

182 Status(StatusCode.ERROR, terminal_message or "Run failed") 

183 ) 

184 self.span.end(time.time_ns()) 

185 self.span = None 

186 

187 def record_exception(self, exc: BaseException) -> None: 1a

188 """ 

189 Record an exception on a span. 

190 """ 

191 if self.span: 

192 self.span.record_exception(exc) 

193 

194 def update_state(self, new_state: State) -> None: 1a

195 """ 

196 Update a span with the state of a run. 

197 """ 

198 if self.span: 

199 self.span.add_event( 

200 new_state.name or new_state.type, 

201 { 

202 "prefect.state.message": new_state.message or "", 

203 "prefect.state.type": new_state.type, 

204 "prefect.state.name": new_state.name or new_state.type, 

205 "prefect.state.id": str(new_state.id), 

206 }, 

207 ) 

208 

209 def update_run_name(self, name: str) -> None: 1a

210 """ 

211 Update the name of the run. 

212 """ 

213 if self.span: 

214 self.span.update_name(name=name) 

215 self.span.set_attribute("prefect.run.name", name) 

216 

217 def _parent_run(self) -> FlowOrTaskRun | None: 1a

218 """ 

219 Identify the "parent run" for the current execution context. 

220 

221 Both flows and tasks can be nested "infinitely," and each creates a 

222 corresponding context when executed. This method determines the most 

223 appropriate parent context (either a task run or a flow run) based on 

224 their relationship in the current hierarchy. 

225 

226 Returns: 

227 FlowOrTaskRun: The parent run object (task or flow) if applicable. 

228 None: If there is no parent context, implying the current run is the top-level parent. 

229 """ 

230 parent_flow_run_context = FlowRunContext.get() 

231 parent_task_run_context = TaskRunContext.get() 

232 

233 if parent_task_run_context and parent_flow_run_context: 

234 # If both contexts exist, which is common for nested flows or tasks, 

235 # check if the task's flow_run_id matches the current flow_run. 

236 # If they match, the task is a child of the flow and is the parent of the current run. 

237 flow_run_id = getattr(parent_flow_run_context.flow_run, "id", None) 

238 if parent_task_run_context.task_run.flow_run_id == flow_run_id: 

239 return parent_task_run_context.task_run 

240 # Otherwise, assume the flow run is the entry point and is the parent. 

241 return parent_flow_run_context.flow_run 

242 elif parent_flow_run_context: 

243 return parent_flow_run_context.flow_run 

244 elif parent_task_run_context: 

245 return parent_task_run_context.task_run 

246 

247 return None