Coverage for /usr/local/lib/python3.12/site-packages/prefect/runtime/flow_run.py: 19%
166 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Access attributes of the current flow run dynamically.
4Note that if a flow run cannot be discovered, all attributes will return empty values.
6You can mock the runtime attributes for testing purposes by setting environment variables
7prefixed with `PREFECT__RUNTIME__FLOW_RUN`.
9Available attributes:
10 - `id`: the flow run's unique ID
11 - `tags`: the flow run's set of tags
12 - `scheduled_start_time`: the flow run's expected scheduled start time; defaults to now if not present
13 - `name`: the name of the flow run
14 - `flow_name`: the name of the flow
15 - `flow_version`: the version of the flow
16 - `parameters`: the parameters that were passed to this run; note that these do not necessarily
17 include default values set on the flow function, only the parameter values explicitly passed for the run
18 - `parent_flow_run_id`: the ID of the flow run that triggered this run, if any
19 - `parent_deployment_id`: the ID of the deployment that triggered this run, if any
20 - `run_count`: the number of times this flow run has been run
21"""
23from __future__ import annotations 1a
25import os 1a
26from datetime import datetime 1a
27from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional 1a
28from zoneinfo import ZoneInfo 1a
30from prefect._internal.concurrency.api import create_call, from_sync 1a
31from prefect.client.orchestration import get_client 1a
32from prefect.context import FlowRunContext, TaskRunContext 1a
33from prefect.settings import PREFECT_API_URL, PREFECT_UI_URL 1a
34from prefect.types._datetime import DateTime, now, parse_datetime 1a
36if TYPE_CHECKING: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true1a
37 from prefect.client.schemas.objects import Flow, FlowRun, TaskRun
39__all__ = [ 1a
40 "id",
41 "tags",
42 "scheduled_start_time",
43 "name",
44 "flow_name",
45 "flow_version",
46 "parameters",
47 "parent_flow_run_id",
48 "parent_deployment_id",
49 "root_flow_run_id",
50 "run_count",
51 "api_url",
52 "ui_url",
53 "job_variables",
54]
57def _parse_datetime_UTC(dt: str) -> datetime: 1a
58 parsed_dt = parse_datetime(dt)
59 if parsed_dt.tzinfo is None:
60 # if the datetime is naive, assume it is UTCff
61 return parsed_dt.replace(tzinfo=ZoneInfo("UTC"))
62 else:
63 # if the datetime is timezone-aware, convert to UTC
64 return parsed_dt.astimezone(ZoneInfo("UTC"))
67type_cast: dict[ 1a
68 type[bool] | type[int] | type[float] | type[str] | type[None] | type[DateTime],
69 Callable[[Any], Any],
70] = {
71 bool: lambda x: x.lower() == "true",
72 int: int,
73 float: float,
74 str: str,
75 datetime: _parse_datetime_UTC,
76 # for optional defined attributes, when real value is NoneType, use str
77 type(None): str,
78}
81def __getattr__(name: str) -> Any: 1a
82 """
83 Attribute accessor for this submodule; note that imports also work with this:
85 from prefect.runtime.flow_run import id
86 """
88 func = FIELDS.get(name) 1a
90 # if `name` is an attribute but it is mocked through environment variable, the mocked type will be str,
91 # which might be different from original one. For consistency, cast env var to the same type
92 env_key = f"PREFECT__RUNTIME__FLOW_RUN__{name.upper()}" 1a
94 if func is None: 94 ↛ 100line 94 didn't jump to line 100 because the condition on line 94 was always true1a
95 if env_key in os.environ: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1a
96 return os.environ[env_key]
97 else:
98 raise AttributeError(f"{__name__} has no attribute {name!r}") 1a
100 real_value = func()
101 if env_key in os.environ:
102 mocked_value = os.environ[env_key]
103 # cast `mocked_value` to the same type as `real_value`
104 try:
105 cast_func = type_cast[type(real_value)]
106 return cast_func(mocked_value)
107 except KeyError:
108 raise ValueError(
109 "This runtime context attribute cannot be mocked using an"
110 " environment variable. Please use monkeypatch instead."
111 )
112 else:
113 return real_value
116def __dir__() -> List[str]: 1a
117 return sorted(__all__)
120async def _get_flow_run(flow_run_id: str) -> "FlowRun": 1a
121 async with get_client() as client:
122 return await client.read_flow_run(flow_run_id)
125async def _get_task_run(task_run_id: str) -> "TaskRun": 1a
126 async with get_client() as client:
127 return await client.read_task_run(task_run_id)
130async def _get_flow_from_run(flow_run_id: str) -> "Flow": 1a
131 async with get_client() as client:
132 flow_run = await client.read_flow_run(flow_run_id)
133 return await client.read_flow(flow_run.flow_id)
136def get_id() -> Optional[str]: 1a
137 flow_run_ctx = FlowRunContext.get()
138 task_run_ctx = TaskRunContext.get()
139 if flow_run_ctx is not None:
140 return str(flow_run_ctx.flow_run.id)
141 if task_run_ctx is not None:
142 return str(task_run_ctx.task_run.flow_run_id)
143 else:
144 return os.getenv("PREFECT__FLOW_RUN_ID")
147def get_tags() -> List[str]: 1a
148 flow_run_ctx = FlowRunContext.get()
149 run_id = get_id()
150 if flow_run_ctx is None and run_id is None:
151 return []
152 elif flow_run_ctx is None:
153 flow_run = from_sync.call_soon_in_loop_thread(
154 create_call(_get_flow_run, run_id)
155 ).result()
157 return flow_run.tags
158 else:
159 return flow_run_ctx.flow_run.tags
162def get_run_count() -> int: 1a
163 flow_run_ctx = FlowRunContext.get()
164 run_id = get_id()
165 if flow_run_ctx is None and run_id is None:
166 return 0
167 elif flow_run_ctx is None:
168 flow_run = from_sync.call_soon_in_loop_thread(
169 create_call(_get_flow_run, run_id)
170 ).result()
172 return flow_run.run_count
173 else:
174 return flow_run_ctx.flow_run.run_count
177def get_name() -> Optional[str]: 1a
178 flow_run_ctx = FlowRunContext.get()
179 run_id = get_id()
180 if flow_run_ctx is None and run_id is None:
181 return None
182 elif flow_run_ctx is None:
183 flow_run = from_sync.call_soon_in_loop_thread(
184 create_call(_get_flow_run, run_id)
185 ).result()
187 return flow_run.name
188 else:
189 return flow_run_ctx.flow_run.name
192def get_flow_name() -> Optional[str]: 1a
193 flow_run_ctx = FlowRunContext.get()
194 run_id = get_id()
195 if flow_run_ctx is None and run_id is None:
196 return None
197 elif flow_run_ctx is None:
198 flow = from_sync.call_soon_in_loop_thread(
199 create_call(_get_flow_from_run, run_id)
200 ).result()
202 return flow.name
203 else:
204 return flow_run_ctx.flow.name
207def get_flow_version() -> Optional[str]: 1a
208 flow_run_ctx = FlowRunContext.get()
209 run_id = get_id()
210 if flow_run_ctx is None and run_id is None:
211 return None
212 elif flow_run_ctx is None:
213 flow = from_sync.call_soon_in_loop_thread(
214 create_call(_get_flow_from_run, run_id)
215 ).result()
217 return flow.version
218 else:
219 return flow_run_ctx.flow.version
222def get_scheduled_start_time() -> DateTime: 1a
223 flow_run_ctx = FlowRunContext.get()
224 run_id = get_id()
225 if flow_run_ctx is None and run_id is None:
226 return now("UTC")
227 elif flow_run_ctx is None:
228 flow_run = from_sync.call_soon_in_loop_thread(
229 create_call(_get_flow_run, run_id)
230 ).result()
232 return flow_run.expected_start_time
233 else:
234 return flow_run_ctx.flow_run.expected_start_time
237def get_parameters() -> Dict[str, Any]: 1a
238 flow_run_ctx = FlowRunContext.get()
239 run_id = get_id()
240 if flow_run_ctx is not None:
241 # Use the unserialized parameters from the context if available
242 return flow_run_ctx.parameters
243 elif run_id is not None:
244 flow_run = from_sync.call_soon_in_loop_thread(
245 create_call(_get_flow_run, run_id)
246 ).result()
248 return flow_run.parameters
249 else:
250 return {}
253def get_parent_flow_run_id() -> Optional[str]: 1a
254 flow_run_ctx = FlowRunContext.get()
255 run_id = get_id()
256 if flow_run_ctx is not None:
257 parent_task_run_id = flow_run_ctx.flow_run.parent_task_run_id
258 elif run_id is not None:
259 flow_run = from_sync.call_soon_in_loop_thread(
260 create_call(_get_flow_run, run_id)
261 ).result()
262 parent_task_run_id = flow_run.parent_task_run_id
263 else:
264 parent_task_run_id = None
266 if parent_task_run_id is not None:
267 parent_task_run = from_sync.call_soon_in_loop_thread(
268 create_call(_get_task_run, parent_task_run_id)
269 ).result()
270 return str(parent_task_run.flow_run_id) if parent_task_run.flow_run_id else None
272 return None
275def get_parent_deployment_id() -> Optional[str]: 1a
276 parent_flow_run_id = get_parent_flow_run_id()
277 if parent_flow_run_id is None:
278 return None
280 parent_flow_run = from_sync.call_soon_in_loop_thread(
281 create_call(_get_flow_run, parent_flow_run_id)
282 ).result()
284 if parent_flow_run:
285 return (
286 str(parent_flow_run.deployment_id)
287 if parent_flow_run.deployment_id
288 else None
289 )
291 return None
294def get_root_flow_run_id() -> str: 1a
295 run_id = get_id()
296 parent_flow_run_id = get_parent_flow_run_id()
297 if parent_flow_run_id is None:
298 return run_id
300 def _get_root_flow_run_id(flow_run_id):
301 flow_run = from_sync.call_soon_in_loop_thread(
302 create_call(_get_flow_run, flow_run_id)
303 ).result()
305 if flow_run.parent_task_run_id is None:
306 return str(flow_run_id)
307 else:
308 parent_task_run = from_sync.call_soon_in_loop_thread(
309 create_call(_get_task_run, flow_run.parent_task_run_id)
310 ).result()
311 return _get_root_flow_run_id(parent_task_run.flow_run_id)
313 root_flow_run_id = _get_root_flow_run_id(parent_flow_run_id)
315 return root_flow_run_id
318def get_flow_run_api_url() -> Optional[str]: 1a
319 flow_run_id = get_id()
320 if flow_run_id is None:
321 return None
322 return f"{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run_id}"
325def get_flow_run_ui_url() -> Optional[str]: 1a
326 flow_run_id = get_id()
327 if flow_run_id is None:
328 return None
329 return f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run_id}"
332def get_job_variables() -> Optional[Dict[str, Any]]: 1a
333 flow_run_ctx = FlowRunContext.get()
334 return flow_run_ctx.flow_run.job_variables if flow_run_ctx else None
337FIELDS: dict[str, Callable[[], Any]] = { 1a
338 "id": get_id,
339 "tags": get_tags,
340 "scheduled_start_time": get_scheduled_start_time,
341 "name": get_name,
342 "flow_name": get_flow_name,
343 "parameters": get_parameters,
344 "parent_flow_run_id": get_parent_flow_run_id,
345 "parent_deployment_id": get_parent_deployment_id,
346 "root_flow_run_id": get_root_flow_run_id,
347 "run_count": get_run_count,
348 "api_url": get_flow_run_api_url,
349 "ui_url": get_flow_run_ui_url,
350 "flow_version": get_flow_version,
351 "job_variables": get_job_variables,
352}