Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/core.py: 18%
114 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Core primitives for running Prefect deployment steps.
4Deployment steps are YAML representations of Python functions along with their inputs.
6Whenever a step is run, the following actions are taken:
8- The step's inputs and block / variable references are resolved (see [the `prefect deploy` documentation](https://docs.prefect.io/v3/how-to-guides/deployments/prefect-yaml#templating-options) for more details)
9- The step's function is imported; if it cannot be found, the `requires` keyword is used to install the necessary packages
10- The step's function is called with the resolved inputs
11- The step's output is returned and used to resolve inputs for subsequent steps
12"""
14from __future__ import annotations 1a
16import os 1a
17import re 1a
18import subprocess 1a
19import warnings 1a
20from copy import deepcopy 1a
21from importlib import import_module 1a
22from typing import Any 1a
23from uuid import UUID 1a
25from prefect._internal.compatibility.deprecated import PrefectDeprecationWarning 1a
26from prefect._internal.concurrency.api import Call, from_async 1a
27from prefect._internal.installation import install_packages 1a
28from prefect._internal.integrations import KNOWN_EXTRAS_FOR_PACKAGES 1a
29from prefect.events.clients import get_events_client 1a
30from prefect.events.schemas.events import Event, RelatedResource, Resource 1a
31from prefect.logging.loggers import get_logger 1a
32from prefect.settings import PREFECT_DEBUG_MODE 1a
33from prefect.utilities.importtools import import_object 1a
34from prefect.utilities.templating import ( 1a
35 apply_values,
36 resolve_block_document_references,
37 resolve_variables,
38)
40RESERVED_KEYWORDS = {"id", "requires"} 1a
43class StepExecutionError(Exception): 1a
44 """
45 Raised when a step fails to execute.
46 """
49def _strip_version(requirement: str) -> str: 1a
50 """
51 Strips the version from a requirement string.
53 Args:
54 requirement: A requirement string, e.g. "requests>=2.0.0"
56 Returns:
57 The package name, e.g. "requests"
59 Examples:
60 ```python
61 _strip_version("s3fs>=2.0.0<3.0.0")
62 # "s3fs"
63 ```
64 """
65 # split on any of the characters in the set [<>=!~]
66 # and return the first element which will be the package name
67 return re.split(r"[<>=!~]", requirement)[0].strip()
70def _get_function_for_step( 1a
71 fully_qualified_name: str, requires: str | list[str] | None = None
72):
73 if not isinstance(requires, list):
74 packages = [requires] if requires else []
75 else:
76 packages = requires
78 try:
79 for package in packages:
80 import_module(_strip_version(package).replace("-", "_"))
81 step_func = import_object(fully_qualified_name)
82 return step_func
83 except ImportError:
84 if requires:
85 print(
86 f"Unable to load step function: {fully_qualified_name}. Attempting"
87 f" install of {requires}."
88 )
89 else:
90 raise
92 try:
93 packages = [
94 KNOWN_EXTRAS_FOR_PACKAGES.get(package, package)
95 for package in packages
96 if package
97 ]
98 install_packages(packages, stream_output=True)
100 except subprocess.CalledProcessError:
101 get_logger("deployments.steps.core").warning(
102 "Unable to install required packages for %s", fully_qualified_name
103 )
104 step_func = import_object(fully_qualified_name)
105 return step_func
108async def run_step( 1a
109 step: dict[str, Any], upstream_outputs: dict[str, Any] | None = None
110) -> dict[str, Any]:
111 """
112 Runs a step, returns the step's output.
114 Steps are assumed to be in the format `{"importable.func.name": {"kwarg1": "value1", ...}}`.
116 The 'id and 'requires' keywords are reserved for specific purposes and will be removed from the
117 inputs before passing to the step function:
119 This keyword is used to specify packages that should be installed before running the step.
120 """
121 fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
122 upstream_outputs = upstream_outputs or {}
124 if len(step.keys()) > 1:
125 raise ValueError(
126 f"Step has unexpected additional keys: {', '.join(list(step.keys())[1:])}"
127 )
129 keywords = {
130 keyword: inputs.pop(keyword)
131 for keyword in RESERVED_KEYWORDS
132 if keyword in inputs
133 }
135 inputs = apply_values(inputs, upstream_outputs)
136 inputs = await resolve_block_document_references(inputs)
137 inputs = await resolve_variables(inputs)
138 inputs = apply_values(inputs, os.environ)
139 step_func = _get_function_for_step(fqn, requires=keywords.get("requires"))
140 result = await from_async.call_soon_in_new_thread(
141 Call.new(step_func, **inputs)
142 ).aresult()
143 return result
146async def run_steps( 1a
147 steps: list[dict[str, Any]],
148 upstream_outputs: dict[str, Any] | None = None,
149 print_function: Any = print,
150 deployment: Any | None = None,
151 flow_run: Any | None = None,
152 logger: Any | None = None,
153) -> dict[str, Any]:
154 upstream_outputs = deepcopy(upstream_outputs) if upstream_outputs else {}
155 for step_index, step in enumerate(steps):
156 if not step:
157 continue
158 fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
159 step_name = fqn.split(".")[-1]
160 print_function(f" > Running {step_name} step...")
162 # SECURITY: Serialize inputs BEFORE running the step (and thus before templating).
163 # This ensures that the event payload contains template strings like
164 # "{{ prefect.blocks.secret.api-key }}" rather than resolved secret values.
165 # Templating (which resolves blocks, variables, and env vars) happens inside
166 # run_step(), so by serializing here we prevent secrets from leaking in events.
167 serialized_step = {
168 "index": step_index,
169 "qualified_name": fqn,
170 "step_name": step_name,
171 "id": inputs.get("id"),
172 "inputs": inputs, # Keep all inputs including reserved keywords like 'requires'
173 }
175 try:
176 # catch warnings to ensure deprecation warnings are printed
177 with warnings.catch_warnings(record=True) as w:
178 warnings.simplefilter(
179 "always",
180 category=PrefectDeprecationWarning,
181 )
182 warnings.simplefilter(
183 "always",
184 category=DeprecationWarning,
185 )
186 step_output = await run_step(step, upstream_outputs)
187 if w:
188 printed_messages = []
189 for warning in w:
190 message = str(warning.message)
191 # prevent duplicate warnings from being printed
192 if message not in printed_messages:
193 try:
194 # try using rich styling
195 print_function(message, style="yellow")
196 except Exception:
197 # default to printing without styling
198 print_function(message)
199 printed_messages.append(message)
201 if not isinstance(step_output, dict):
202 if PREFECT_DEBUG_MODE:
203 get_logger().warning(
204 "Step function %s returned unexpected type: %s",
205 fqn,
206 type(step_output),
207 )
208 continue
209 # store step output under step id to prevent clobbering
210 if inputs.get("id"):
211 upstream_outputs[inputs.get("id")] = step_output
212 upstream_outputs.update(step_output)
214 # Emit success event for this step
215 await _emit_pull_step_event(
216 serialized_step,
217 event_type="prefect.flow-run.pull-step.executed",
218 deployment=deployment,
219 flow_run=flow_run,
220 logger=logger,
221 )
222 except Exception as exc:
223 # Emit failure event for this step
224 await _emit_pull_step_event(
225 serialized_step,
226 event_type="prefect.flow-run.pull-step.failed",
227 deployment=deployment,
228 flow_run=flow_run,
229 logger=logger,
230 )
231 raise StepExecutionError(f"Encountered error while running {fqn}") from exc
233 return upstream_outputs
236def _get_step_fully_qualified_name_and_inputs(step: dict) -> tuple[str, dict]: 1a
237 step = deepcopy(step)
238 return step.popitem()
241async def _emit_pull_step_event( 1a
242 serialized_step: dict[str, Any],
243 *,
244 event_type: str,
245 deployment: Any | None = None,
246 flow_run: Any | None = None,
247 logger: Any | None = None,
248) -> None:
249 # Get flow_run_id from flow_run param or environment
250 flow_run_id = None
251 if flow_run:
252 flow_run_id = flow_run.id
253 else:
254 # Read directly from environment variable
255 flow_run_id_str = os.getenv("PREFECT__FLOW_RUN_ID")
256 if flow_run_id_str:
257 flow_run_id = UUID(flow_run_id_str)
259 if not flow_run_id:
260 return
262 # Build related resources
263 related: list[RelatedResource] = []
264 if deployment:
265 related.append(
266 RelatedResource(
267 {
268 "prefect.resource.id": f"prefect.deployment.{deployment.id}",
269 "prefect.resource.role": "deployment",
270 }
271 )
272 )
274 try:
275 # Use events client directly with checkpoint_every=1 to avoid buffering issues
276 async with get_events_client(checkpoint_every=1) as events_client:
277 await events_client.emit(
278 Event(
279 event=event_type,
280 resource=Resource(
281 {
282 "prefect.resource.id": f"prefect.flow-run.{flow_run_id}",
283 }
284 ),
285 related=related,
286 payload=serialized_step,
287 )
288 )
289 except Exception:
290 if logger:
291 logger.warning(
292 "Failed to emit pull-step event for flow run %s", flow_run_id
293 )
294 else:
295 get_logger(__name__).warning(
296 "Failed to emit pull-step event for flow run %s", flow_run_id
297 )