Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py: 18%

114 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Core primitives for running Prefect deployment steps. 

3 

4Deployment steps are YAML representations of Python functions along with their inputs. 

5 

6Whenever a step is run, the following actions are taken: 

7 

8- The step's inputs and block / variable references are resolved (see [the `prefect deploy` documentation](https://docs.prefect.io/v3/how-to-guides/deployments/prefect-yaml#templating-options) for more details) 

9- The step's function is imported; if it cannot be found, the `requires` keyword is used to install the necessary packages 

10- The step's function is called with the resolved inputs 

11- The step's output is returned and used to resolve inputs for subsequent steps 

12""" 

13 

14from __future__ import annotations 1a

15 

16import os 1a

17import re 1a

18import subprocess 1a

19import warnings 1a

20from copy import deepcopy 1a

21from importlib import import_module 1a

22from typing import Any 1a

23from uuid import UUID 1a

24 

25from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning 1a

26from prefect._internal.concurrency.api import Call, from_async 1a

27from prefect._internal.installation import install_packages 1a

28from prefect._internal.integrations import KNOWN_EXTRAS_FOR_PACKAGES 1a

29from prefect.events.clients import get_events_client 1a

30from prefect.events.schemas.events import Event, RelatedResource, Resource 1a

31from prefect.logging.loggers import get_logger 1a

32from prefect.settings import PREFECT_DEBUG_MODE 1a

33from prefect.utilities.importtools import import_object 1a

34from prefect.utilities.templating import ( 1a

35 apply_values, 

36 resolve_block_document_references, 

37 resolve_variables, 

38) 

39 

40RESERVED_KEYWORDS = {"id", "requires"} 1a

41 

42 

43class StepExecutionError(Exception): 1a

44 """ 

45 Raised when a step fails to execute. 

46 """ 

47 

48 

49def _strip_version(requirement: str) -> str: 1a

50 """ 

51 Strips the version from a requirement string. 

52 

53 Args: 

54 requirement: A requirement string, e.g. "requests>=2.0.0" 

55 

56 Returns: 

57 The package name, e.g. "requests" 

58 

59 Examples: 

60 ```python 

61 _strip_version("s3fs>=2.0.0<3.0.0") 

62 # "s3fs" 

63 ``` 

64 """ 

65 # split on any of the characters in the set [<>=!~] 

66 # and return the first element which will be the package name 

67 return re.split(r"[<>=!~]", requirement)[0].strip() 

68 

69 

70def _get_function_for_step( 1a

71 fully_qualified_name: str, requires: str | list[str] | None = None 

72): 

73 if not isinstance(requires, list): 

74 packages = [requires] if requires else [] 

75 else: 

76 packages = requires 

77 

78 try: 

79 for package in packages: 

80 import_module(_strip_version(package).replace("-", "_")) 

81 step_func = import_object(fully_qualified_name) 

82 return step_func 

83 except ImportError: 

84 if requires: 

85 print( 

86 f"Unable to load step function: {fully_qualified_name}. Attempting" 

87 f" install of {requires}." 

88 ) 

89 else: 

90 raise 

91 

92 try: 

93 packages = [ 

94 KNOWN_EXTRAS_FOR_PACKAGES.get(package, package) 

95 for package in packages 

96 if package 

97 ] 

98 install_packages(packages, stream_output=True) 

99 

100 except subprocess.CalledProcessError: 

101 get_logger("deployments.steps.core").warning( 

102 "Unable to install required packages for %s", fully_qualified_name 

103 ) 

104 step_func = import_object(fully_qualified_name) 

105 return step_func 

106 

107 

108async def run_step( 1a

109 step: dict[str, Any], upstream_outputs: dict[str, Any] | None = None 

110) -> dict[str, Any]: 

111 """ 

112 Runs a step, returns the step's output. 

113 

114 Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`. 

115 

116 The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the 

117 inputs before passing to the step function: 

118 

119 This keyword is used to specify packages that should be installed before running the step. 

120 """ 

121 fqn, inputs = _get_step_fully_qualified_name_and_inputs(step) 

122 upstream_outputs = upstream_outputs or {} 

123 

124 if len(step.keys()) > 1: 

125 raise ValueError( 

126 f"Step has unexpected additional keys: {', '.join(list(step.keys())[1:])}" 

127 ) 

128 

129 keywords = { 

130 keyword: inputs.pop(keyword) 

131 for keyword in RESERVED_KEYWORDS 

132 if keyword in inputs 

133 } 

134 

135 inputs = apply_values(inputs, upstream_outputs) 

136 inputs = await resolve_block_document_references(inputs) 

137 inputs = await resolve_variables(inputs) 

138 inputs = apply_values(inputs, os.environ) 

139 step_func = _get_function_for_step(fqn, requires=keywords.get("requires")) 

140 result = await from_async.call_soon_in_new_thread( 

141 Call.new(step_func, **inputs) 

142 ).aresult() 

143 return result 

144 

145 

146async def run_steps( 1a

147 steps: list[dict[str, Any]], 

148 upstream_outputs: dict[str, Any] | None = None, 

149 print_function: Any = print, 

150 deployment: Any | None = None, 

151 flow_run: Any | None = None, 

152 logger: Any | None = None, 

153) -> dict[str, Any]: 

154 upstream_outputs = deepcopy(upstream_outputs) if upstream_outputs else {} 

155 for step_index, step in enumerate(steps): 

156 if not step: 

157 continue 

158 fqn, inputs = _get_step_fully_qualified_name_and_inputs(step) 

159 step_name = fqn.split(".")[-1] 

160 print_function(f" > Running {step_name} step...") 

161 

162 # SECURITY: Serialize inputs BEFORE running the step (and thus before templating). 

163 # This ensures that the event payload contains template strings like 

164 # "{{ prefect.blocks.secret.api-key }}" rather than resolved secret values. 

165 # Templating (which resolves blocks, variables, and env vars) happens inside 

166 # run_step(), so by serializing here we prevent secrets from leaking in events. 

167 serialized_step = { 

168 "index": step_index, 

169 "qualified_name": fqn, 

170 "step_name": step_name, 

171 "id": inputs.get("id"), 

172 "inputs": inputs, # Keep all inputs including reserved keywords like 'requires' 

173 } 

174 

175 try: 

176 # catch warnings to ensure deprecation warnings are printed 

177 with warnings.catch_warnings(record=True) as w: 

178 warnings.simplefilter( 

179 "always", 

180 category=PrefectDeprecationWarning, 

181 ) 

182 warnings.simplefilter( 

183 "always", 

184 category=DeprecationWarning, 

185 ) 

186 step_output = await run_step(step, upstream_outputs) 

187 if w: 

188 printed_messages = [] 

189 for warning in w: 

190 message = str(warning.message) 

191 # prevent duplicate warnings from being printed 

192 if message not in printed_messages: 

193 try: 

194 # try using rich styling 

195 print_function(message, style="yellow") 

196 except Exception: 

197 # default to printing without styling 

198 print_function(message) 

199 printed_messages.append(message) 

200 

201 if not isinstance(step_output, dict): 

202 if PREFECT_DEBUG_MODE: 

203 get_logger().warning( 

204 "Step function %s returned unexpected type: %s", 

205 fqn, 

206 type(step_output), 

207 ) 

208 continue 

209 # store step output under step id to prevent clobbering 

210 if inputs.get("id"): 

211 upstream_outputs[inputs.get("id")] = step_output 

212 upstream_outputs.update(step_output) 

213 

214 # Emit success event for this step 

215 await _emit_pull_step_event( 

216 serialized_step, 

217 event_type="prefect.flow-run.pull-step.executed", 

218 deployment=deployment, 

219 flow_run=flow_run, 

220 logger=logger, 

221 ) 

222 except Exception as exc: 

223 # Emit failure event for this step 

224 await _emit_pull_step_event( 

225 serialized_step, 

226 event_type="prefect.flow-run.pull-step.failed", 

227 deployment=deployment, 

228 flow_run=flow_run, 

229 logger=logger, 

230 ) 

231 raise StepExecutionError(f"Encountered error while running {fqn}") from exc 

232 

233 return upstream_outputs 

234 

235 

236def _get_step_fully_qualified_name_and_inputs(step: dict) -> tuple[str, dict]: 1a

237 step = deepcopy(step) 

238 return step.popitem() 

239 

240 

241async def _emit_pull_step_event( 1a

242 serialized_step: dict[str, Any], 

243 *, 

244 event_type: str, 

245 deployment: Any | None = None, 

246 flow_run: Any | None = None, 

247 logger: Any | None = None, 

248) -> None: 

249 # Get flow_run_id from flow_run param or environment 

250 flow_run_id = None 

251 if flow_run: 

252 flow_run_id = flow_run.id 

253 else: 

254 # Read directly from environment variable 

255 flow_run_id_str = os.getenv("PREFECT__FLOW_RUN_ID") 

256 if flow_run_id_str: 

257 flow_run_id = UUID(flow_run_id_str) 

258 

259 if not flow_run_id: 

260 return 

261 

262 # Build related resources 

263 related: list[RelatedResource] = [] 

264 if deployment: 

265 related.append( 

266 RelatedResource( 

267 { 

268 "prefect.resource.id": f"prefect.deployment.{deployment.id}", 

269 "prefect.resource.role": "deployment", 

270 } 

271 ) 

272 ) 

273 

274 try: 

275 # Use events client directly with checkpoint_every=1 to avoid buffering issues 

276 async with get_events_client(checkpoint_every=1) as events_client: 

277 await events_client.emit( 

278 Event( 

279 event=event_type, 

280 resource=Resource( 

281 { 

282 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}", 

283 } 

284 ), 

285 related=related, 

286 payload=serialized_step, 

287 ) 

288 ) 

289 except Exception: 

290 if logger: 

291 logger.warning( 

292 "Failed to emit pull-step event for flow run %s", flow_run_id 

293 ) 

294 else: 

295 get_logger(__name__).warning( 

296 "Failed to emit pull-step event for flow run %s", flow_run_id 

297 )