Coverage for /usr/local/lib/python3.12/site-packages/prefect/telemetry/run_telemetry.py: 29%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import time 1a
4from dataclasses import dataclass, field 1a
5from typing import TYPE_CHECKING, Any, Union 1a
7from opentelemetry import propagate, trace 1a
8from opentelemetry.context import Context 1a
9from opentelemetry.propagators.textmap import Setter 1a
10from opentelemetry.trace import ( 1a
11 Span,
12 Status,
13 StatusCode,
14 get_tracer,
15)
16from typing_extensions import TypeAlias 1a
18import prefect 1a
19import prefect.settings 1a
20from prefect.client.orchestration import PrefectClient, SyncPrefectClient 1a
21from prefect.client.schemas import FlowRun, TaskRun 1a
22from prefect.client.schemas.objects import State 1a
23from prefect.context import FlowRunContext, TaskRunContext 1a
24from prefect.types import KeyValueLabels 1a
26if TYPE_CHECKING: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true1a
27 from opentelemetry.trace import Tracer
29LABELS_TRACEPARENT_KEY = "__OTEL_TRACEPARENT" 1a
30TRACEPARENT_KEY = "traceparent" 1a
32FlowOrTaskRun: TypeAlias = Union[FlowRun, TaskRun] 1a
35class OTELSetter(Setter[KeyValueLabels]): 1a
36 """
37 A setter for OpenTelemetry that supports Prefect's custom labels.
38 """
40 def set(self, carrier: KeyValueLabels, key: str, value: str) -> None: 1a
41 carrier[key] = value
44@dataclass 1a
45class RunTelemetry: 1a
46 """
47 A class for managing the telemetry of runs.
48 """
50 _tracer: "Tracer" = field( 1a
51 default_factory=lambda: get_tracer("prefect", prefect.__version__)
52 )
53 span: Span | None = None 1a
54 _enabled: bool = field( 1a
55 default_factory=lambda: prefect.settings.get_current_settings().cloud.enable_orchestration_telemetry
56 )
58 async def async_start_span( 1a
59 self,
60 run: FlowOrTaskRun,
61 client: PrefectClient,
62 parameters: dict[str, Any] | None = None,
63 ) -> Span | None:
64 if not self._enabled:
65 return None
66 traceparent, span = self._start_span(run, parameters)
67 if self._run_type(run) == "flow" and traceparent:
68 # Only explicitly update labels if the run is a flow as task runs
69 # are updated via events.
70 await client.update_flow_run_labels(
71 run.id, {LABELS_TRACEPARENT_KEY: traceparent}
72 )
74 return span
76 def start_span( 1a
77 self,
78 run: FlowOrTaskRun,
79 client: SyncPrefectClient,
80 parameters: dict[str, Any] | None = None,
81 ) -> Span | None:
82 if not self._enabled:
83 return None
85 traceparent, span = self._start_span(run, parameters)
87 if self._run_type(run) == "flow" and traceparent:
88 # Only explicitly update labels if the run is a flow as task runs
89 # are updated via events.
90 client.update_flow_run_labels(run.id, {LABELS_TRACEPARENT_KEY: traceparent})
92 return span
94 def _start_span( 1a
95 self,
96 run: FlowOrTaskRun,
97 parameters: dict[str, Any] | None = None,
98 ) -> tuple[str | None, Span]:
99 """
100 Start a span for a run.
101 """
102 if parameters is None:
103 parameters = {}
105 parameter_attributes = {
106 f"prefect.run.parameter.{k}": type(v).__name__
107 for k, v in parameters.items()
108 }
110 # Use existing trace context if this run already has one (e.g., from
111 # server operations like Late), otherwise use parent's trace context if
112 # available (e.g., nested flow / task runs). If neither exists, this
113 # will be a root span (e.g., a top-level flow run).
114 if LABELS_TRACEPARENT_KEY in run.labels:
115 context = self._trace_context_from_labels(run.labels)
116 else:
117 parent_run = self._parent_run()
118 parent_labels = parent_run.labels if parent_run else {}
119 if LABELS_TRACEPARENT_KEY in parent_labels:
120 context = self._trace_context_from_labels(parent_labels)
121 else:
122 context = None
124 run_type = self._run_type(run)
126 self.span = self._tracer.start_span(
127 name=run.name,
128 context=context,
129 attributes={
130 "prefect.run.name": run.name,
131 "prefect.run.type": run_type,
132 "prefect.run.id": str(run.id),
133 "prefect.tags": run.tags,
134 **parameter_attributes,
135 **{
136 key: value
137 for key, value in run.labels.items()
138 if not key.startswith("__") # exclude internal labels
139 },
140 },
141 )
143 if traceparent := RunTelemetry.traceparent_from_span(self.span):
144 run.labels[LABELS_TRACEPARENT_KEY] = traceparent
146 return traceparent, self.span
148 def _run_type(self, run: FlowOrTaskRun) -> str: 1a
149 return "task" if isinstance(run, TaskRun) else "flow"
151 def _trace_context_from_labels( 1a
152 self, labels: KeyValueLabels | None
153 ) -> Context | None:
154 """Get trace context from run labels if it exists."""
155 if not labels or LABELS_TRACEPARENT_KEY not in labels:
156 return None
157 traceparent = labels[LABELS_TRACEPARENT_KEY]
158 carrier = {TRACEPARENT_KEY: traceparent}
159 return propagate.extract(carrier)
161 @staticmethod 1a
162 def traceparent_from_span(span: Span) -> str | None: 1a
163 carrier: dict[str, Any] = {}
164 propagate.inject(carrier, context=trace.set_span_in_context(span))
165 return carrier.get(TRACEPARENT_KEY)
167 def end_span_on_success(self) -> None: 1a
168 """
169 End a span for a run on success.
170 """
171 if self.span:
172 self.span.set_status(Status(StatusCode.OK))
173 self.span.end(time.time_ns())
174 self.span = None
176 def end_span_on_failure(self, terminal_message: str | None = None) -> None: 1a
177 """
178 End a span for a run on failure.
179 """
180 if self.span:
181 self.span.set_status(
182 Status(StatusCode.ERROR, terminal_message or "Run failed")
183 )
184 self.span.end(time.time_ns())
185 self.span = None
187 def record_exception(self, exc: BaseException) -> None: 1a
188 """
189 Record an exception on a span.
190 """
191 if self.span:
192 self.span.record_exception(exc)
194 def update_state(self, new_state: State) -> None: 1a
195 """
196 Update a span with the state of a run.
197 """
198 if self.span:
199 self.span.add_event(
200 new_state.name or new_state.type,
201 {
202 "prefect.state.message": new_state.message or "",
203 "prefect.state.type": new_state.type,
204 "prefect.state.name": new_state.name or new_state.type,
205 "prefect.state.id": str(new_state.id),
206 },
207 )
209 def update_run_name(self, name: str) -> None: 1a
210 """
211 Update the name of the run.
212 """
213 if self.span:
214 self.span.update_name(name=name)
215 self.span.set_attribute("prefect.run.name", name)
217 def _parent_run(self) -> FlowOrTaskRun | None: 1a
218 """
219 Identify the "parent run" for the current execution context.
221 Both flows and tasks can be nested "infinitely," and each creates a
222 corresponding context when executed. This method determines the most
223 appropriate parent context (either a task run or a flow run) based on
224 their relationship in the current hierarchy.
226 Returns:
227 FlowOrTaskRun: The parent run object (task or flow) if applicable.
228 None: If there is no parent context, implying the current run is the top-level parent.
229 """
230 parent_flow_run_context = FlowRunContext.get()
231 parent_task_run_context = TaskRunContext.get()
233 if parent_task_run_context and parent_flow_run_context:
234 # If both contexts exist, which is common for nested flows or tasks,
235 # check if the task's flow_run_id matches the current flow_run.
236 # If they match, the task is a child of the flow and is the parent of the current run.
237 flow_run_id = getattr(parent_flow_run_context.flow_run, "id", None)
238 if parent_task_run_context.task_run.flow_run_id == flow_run_id:
239 return parent_task_run_context.task_run
240 # Otherwise, assume the flow run is the entry point and is the parent.
241 return parent_flow_run_context.flow_run
242 elif parent_flow_run_context:
243 return parent_flow_run_context.flow_run
244 elif parent_task_run_context:
245 return parent_task_run_context.task_run
247 return None