Coverage for /usr/local/lib/python3.12/site-packages/prefect/runtime/flow_run.py: 19%

166 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Access attributes of the current flow run dynamically. 

3 

4Note that if a flow run cannot be discovered, all attributes will return empty values. 

5 

6You can mock the runtime attributes for testing purposes by setting environment variables 

7prefixed with `PREFECT__RUNTIME__FLOW_RUN`. 

8 

9Available attributes: 

10 - `id`: the flow run's unique ID 

11 - `tags`: the flow run's set of tags 

12 - `scheduled_start_time`: the flow run's expected scheduled start time; defaults to now if not present 

13 - `name`: the name of the flow run 

14 - `flow_name`: the name of the flow 

15 - `flow_version`: the version of the flow 

16 - `parameters`: the parameters that were passed to this run; note that these do not necessarily 

17 include default values set on the flow function, only the parameter values explicitly passed for the run 

18 - `parent_flow_run_id`: the ID of the flow run that triggered this run, if any 

19 - `parent_deployment_id`: the ID of the deployment that triggered this run, if any 

20 - `run_count`: the number of times this flow run has been run 

21""" 

22 

23from __future__ import annotations 1a

24 

25import os 1a

26from datetime import datetime 1a

27from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional 1a

28from zoneinfo import ZoneInfo 1a

29 

30from prefect._internal.concurrency.api import create_call, from_sync 1a

31from prefect.client.orchestration import get_client 1a

32from prefect.context import FlowRunContext, TaskRunContext 1a

33from prefect.settings import PREFECT_API_URL, PREFECT_UI_URL 1a

34from prefect.types._datetime import DateTime, now, parse_datetime 1a

35 

36if TYPE_CHECKING: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true1a

37 from prefect.client.schemas.objects import Flow, FlowRun, TaskRun 

38 

39__all__ = [ 1a

40 "id", 

41 "tags", 

42 "scheduled_start_time", 

43 "name", 

44 "flow_name", 

45 "flow_version", 

46 "parameters", 

47 "parent_flow_run_id", 

48 "parent_deployment_id", 

49 "root_flow_run_id", 

50 "run_count", 

51 "api_url", 

52 "ui_url", 

53 "job_variables", 

54] 

55 

56 

57def _parse_datetime_UTC(dt: str) -> datetime: 1a

58 parsed_dt = parse_datetime(dt) 

59 if parsed_dt.tzinfo is None: 

60 # if the datetime is naive, assume it is UTCff 

61 return parsed_dt.replace(tzinfo=ZoneInfo("UTC")) 

62 else: 

63 # if the datetime is timezone-aware, convert to UTC 

64 return parsed_dt.astimezone(ZoneInfo("UTC")) 

65 

66 

67type_cast: dict[ 1a

68 type[bool] | type[int] | type[float] | type[str] | type[None] | type[DateTime], 

69 Callable[[Any], Any], 

70] = { 

71 bool: lambda x: x.lower() == "true", 

72 int: int, 

73 float: float, 

74 str: str, 

75 datetime: _parse_datetime_UTC, 

76 # for optional defined attributes, when real value is NoneType, use str 

77 type(None): str, 

78} 

79 

80 

81def __getattr__(name: str) -> Any: 1a

82 """ 

83 Attribute accessor for this submodule; note that imports also work with this: 

84 

85 from prefect.runtime.flow_run import id 

86 """ 

87 

88 func = FIELDS.get(name) 1a

89 

90 # if `name` is an attribute but it is mocked through environment variable, the mocked type will be str, 

91 # which might be different from original one. For consistency, cast env var to the same type 

92 env_key = f"PREFECT__RUNTIME__FLOW_RUN__{name.upper()}" 1a

93 

94 if func is None: 94 ↛ 100line 94 didn't jump to line 100 because the condition on line 94 was always true1a

95 if env_key in os.environ: 95 ↛ 96line 95 didn't jump to line 96 because the condition on line 95 was never true1a

96 return os.environ[env_key] 

97 else: 

98 raise AttributeError(f"{__name__} has no attribute {name!r}") 1a

99 

100 real_value = func() 

101 if env_key in os.environ: 

102 mocked_value = os.environ[env_key] 

103 # cast `mocked_value` to the same type as `real_value` 

104 try: 

105 cast_func = type_cast[type(real_value)] 

106 return cast_func(mocked_value) 

107 except KeyError: 

108 raise ValueError( 

109 "This runtime context attribute cannot be mocked using an" 

110 " environment variable. Please use monkeypatch instead." 

111 ) 

112 else: 

113 return real_value 

114 

115 

116def __dir__() -> List[str]: 1a

117 return sorted(__all__) 

118 

119 

120async def _get_flow_run(flow_run_id: str) -> "FlowRun": 1a

121 async with get_client() as client: 

122 return await client.read_flow_run(flow_run_id) 

123 

124 

125async def _get_task_run(task_run_id: str) -> "TaskRun": 1a

126 async with get_client() as client: 

127 return await client.read_task_run(task_run_id) 

128 

129 

130async def _get_flow_from_run(flow_run_id: str) -> "Flow": 1a

131 async with get_client() as client: 

132 flow_run = await client.read_flow_run(flow_run_id) 

133 return await client.read_flow(flow_run.flow_id) 

134 

135 

136def get_id() -> Optional[str]: 1a

137 flow_run_ctx = FlowRunContext.get() 

138 task_run_ctx = TaskRunContext.get() 

139 if flow_run_ctx is not None: 

140 return str(flow_run_ctx.flow_run.id) 

141 if task_run_ctx is not None: 

142 return str(task_run_ctx.task_run.flow_run_id) 

143 else: 

144 return os.getenv("PREFECT__FLOW_RUN_ID") 

145 

146 

147def get_tags() -> List[str]: 1a

148 flow_run_ctx = FlowRunContext.get() 

149 run_id = get_id() 

150 if flow_run_ctx is None and run_id is None: 

151 return [] 

152 elif flow_run_ctx is None: 

153 flow_run = from_sync.call_soon_in_loop_thread( 

154 create_call(_get_flow_run, run_id) 

155 ).result() 

156 

157 return flow_run.tags 

158 else: 

159 return flow_run_ctx.flow_run.tags 

160 

161 

162def get_run_count() -> int: 1a

163 flow_run_ctx = FlowRunContext.get() 

164 run_id = get_id() 

165 if flow_run_ctx is None and run_id is None: 

166 return 0 

167 elif flow_run_ctx is None: 

168 flow_run = from_sync.call_soon_in_loop_thread( 

169 create_call(_get_flow_run, run_id) 

170 ).result() 

171 

172 return flow_run.run_count 

173 else: 

174 return flow_run_ctx.flow_run.run_count 

175 

176 

177def get_name() -> Optional[str]: 1a

178 flow_run_ctx = FlowRunContext.get() 

179 run_id = get_id() 

180 if flow_run_ctx is None and run_id is None: 

181 return None 

182 elif flow_run_ctx is None: 

183 flow_run = from_sync.call_soon_in_loop_thread( 

184 create_call(_get_flow_run, run_id) 

185 ).result() 

186 

187 return flow_run.name 

188 else: 

189 return flow_run_ctx.flow_run.name 

190 

191 

192def get_flow_name() -> Optional[str]: 1a

193 flow_run_ctx = FlowRunContext.get() 

194 run_id = get_id() 

195 if flow_run_ctx is None and run_id is None: 

196 return None 

197 elif flow_run_ctx is None: 

198 flow = from_sync.call_soon_in_loop_thread( 

199 create_call(_get_flow_from_run, run_id) 

200 ).result() 

201 

202 return flow.name 

203 else: 

204 return flow_run_ctx.flow.name 

205 

206 

207def get_flow_version() -> Optional[str]: 1a

208 flow_run_ctx = FlowRunContext.get() 

209 run_id = get_id() 

210 if flow_run_ctx is None and run_id is None: 

211 return None 

212 elif flow_run_ctx is None: 

213 flow = from_sync.call_soon_in_loop_thread( 

214 create_call(_get_flow_from_run, run_id) 

215 ).result() 

216 

217 return flow.version 

218 else: 

219 return flow_run_ctx.flow.version 

220 

221 

222def get_scheduled_start_time() -> DateTime: 1a

223 flow_run_ctx = FlowRunContext.get() 

224 run_id = get_id() 

225 if flow_run_ctx is None and run_id is None: 

226 return now("UTC") 

227 elif flow_run_ctx is None: 

228 flow_run = from_sync.call_soon_in_loop_thread( 

229 create_call(_get_flow_run, run_id) 

230 ).result() 

231 

232 return flow_run.expected_start_time 

233 else: 

234 return flow_run_ctx.flow_run.expected_start_time 

235 

236 

237def get_parameters() -> Dict[str, Any]: 1a

238 flow_run_ctx = FlowRunContext.get() 

239 run_id = get_id() 

240 if flow_run_ctx is not None: 

241 # Use the unserialized parameters from the context if available 

242 return flow_run_ctx.parameters 

243 elif run_id is not None: 

244 flow_run = from_sync.call_soon_in_loop_thread( 

245 create_call(_get_flow_run, run_id) 

246 ).result() 

247 

248 return flow_run.parameters 

249 else: 

250 return {} 

251 

252 

253def get_parent_flow_run_id() -> Optional[str]: 1a

254 flow_run_ctx = FlowRunContext.get() 

255 run_id = get_id() 

256 if flow_run_ctx is not None: 

257 parent_task_run_id = flow_run_ctx.flow_run.parent_task_run_id 

258 elif run_id is not None: 

259 flow_run = from_sync.call_soon_in_loop_thread( 

260 create_call(_get_flow_run, run_id) 

261 ).result() 

262 parent_task_run_id = flow_run.parent_task_run_id 

263 else: 

264 parent_task_run_id = None 

265 

266 if parent_task_run_id is not None: 

267 parent_task_run = from_sync.call_soon_in_loop_thread( 

268 create_call(_get_task_run, parent_task_run_id) 

269 ).result() 

270 return str(parent_task_run.flow_run_id) if parent_task_run.flow_run_id else None 

271 

272 return None 

273 

274 

275def get_parent_deployment_id() -> Optional[str]: 1a

276 parent_flow_run_id = get_parent_flow_run_id() 

277 if parent_flow_run_id is None: 

278 return None 

279 

280 parent_flow_run = from_sync.call_soon_in_loop_thread( 

281 create_call(_get_flow_run, parent_flow_run_id) 

282 ).result() 

283 

284 if parent_flow_run: 

285 return ( 

286 str(parent_flow_run.deployment_id) 

287 if parent_flow_run.deployment_id 

288 else None 

289 ) 

290 

291 return None 

292 

293 

294def get_root_flow_run_id() -> str: 1a

295 run_id = get_id() 

296 parent_flow_run_id = get_parent_flow_run_id() 

297 if parent_flow_run_id is None: 

298 return run_id 

299 

300 def _get_root_flow_run_id(flow_run_id): 

301 flow_run = from_sync.call_soon_in_loop_thread( 

302 create_call(_get_flow_run, flow_run_id) 

303 ).result() 

304 

305 if flow_run.parent_task_run_id is None: 

306 return str(flow_run_id) 

307 else: 

308 parent_task_run = from_sync.call_soon_in_loop_thread( 

309 create_call(_get_task_run, flow_run.parent_task_run_id) 

310 ).result() 

311 return _get_root_flow_run_id(parent_task_run.flow_run_id) 

312 

313 root_flow_run_id = _get_root_flow_run_id(parent_flow_run_id) 

314 

315 return root_flow_run_id 

316 

317 

318def get_flow_run_api_url() -> Optional[str]: 1a

319 flow_run_id = get_id() 

320 if flow_run_id is None: 

321 return None 

322 return f"{PREFECT_API_URL.value()}/flow-runs/flow-run/{flow_run_id}" 

323 

324 

325def get_flow_run_ui_url() -> Optional[str]: 1a

326 flow_run_id = get_id() 

327 if flow_run_id is None: 

328 return None 

329 return f"{PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run_id}" 

330 

331 

332def get_job_variables() -> Optional[Dict[str, Any]]: 1a

333 flow_run_ctx = FlowRunContext.get() 

334 return flow_run_ctx.flow_run.job_variables if flow_run_ctx else None 

335 

336 

337FIELDS: dict[str, Callable[[], Any]] = { 1a

338 "id": get_id, 

339 "tags": get_tags, 

340 "scheduled_start_time": get_scheduled_start_time, 

341 "name": get_name, 

342 "flow_name": get_flow_name, 

343 "parameters": get_parameters, 

344 "parent_flow_run_id": get_parent_flow_run_id, 

345 "parent_deployment_id": get_parent_deployment_id, 

346 "root_flow_run_id": get_root_flow_run_id, 

347 "run_count": get_run_count, 

348 "api_url": get_flow_run_api_url, 

349 "ui_url": get_flow_run_ui_url, 

350 "flow_version": get_flow_version, 

351 "job_variables": get_job_variables, 

352}