Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/utility.py: 30%
47 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Utility project steps that are useful for managing a project's deployment lifecycle.
4Steps within this module can be used within a `build`, `push`, or `pull` deployment action.
6Example:
7 Use the `run_shell_script` setp to retrieve the short Git commit hash of the current
8 repository and use it as a Docker image tag:
9 ```yaml
10 build:
11 - prefect.deployments.steps.run_shell_script:
12 id: get-commit-hash
13 script: git rev-parse --short HEAD
14 stream_output: false
15 - prefect_docker.deployments.steps.build_docker_image:
16 requires: prefect-docker
17 image_name: my-image
18 image_tag: "{{ get-commit-hash.stdout }}"
19 dockerfile: auto
20 ```
21"""
23import io 1a
24import os 1a
25import shlex 1a
26import string 1a
27import subprocess 1a
28import sys 1a
29from typing import Any, Dict, Optional 1a
31from anyio import create_task_group 1a
32from anyio.streams.text import TextReceiveStream 1a
33from typing_extensions import TypedDict 1a
35from prefect.utilities.processutils import ( 1a
36 get_sys_executable,
37 open_process,
38 stream_text,
39)
42async def _stream_capture_process_output( 1a
43 process,
44 stdout_sink: io.StringIO,
45 stderr_sink: io.StringIO,
46 stream_output: bool = True,
47):
48 stdout_sinks = [stdout_sink, sys.stdout] if stream_output else [stdout_sink]
49 stderr_sinks = [stderr_sink, sys.stderr] if stream_output else [stderr_sink]
50 async with create_task_group() as tg:
51 tg.start_soon(
52 stream_text,
53 TextReceiveStream(process.stdout),
54 *stdout_sinks,
55 )
56 tg.start_soon(
57 stream_text,
58 TextReceiveStream(process.stderr),
59 *stderr_sinks,
60 )
63class RunShellScriptResult(TypedDict): 1a
64 """
65 The result of a `run_shell_script` step.
67 Attributes:
68 stdout: The captured standard output of the script.
69 stderr: The captured standard error of the script.
70 """
72 stdout: str 1a
73 stderr: str 1a
76async def run_shell_script( 1a
77 script: str,
78 directory: Optional[str] = None,
79 env: Optional[Dict[str, str]] = None,
80 stream_output: bool = True,
81 expand_env_vars: bool = False,
82) -> RunShellScriptResult:
83 """
84 Runs one or more shell commands in a subprocess. Returns the standard
85 output and standard error of the script.
87 Args:
88 script: The script to run
89 directory: The directory to run the script in. Defaults to the current
90 working directory.
91 env: A dictionary of environment variables to set for the script
92 stream_output: Whether to stream the output of the script to
93 stdout/stderr
94 expand_env_vars: Whether to expand environment variables in the script
95 before running it
97 Returns:
98 A dictionary with the keys `stdout` and `stderr` containing the output
99 of the script
101 Examples:
102 Retrieve the short Git commit hash of the current repository to use as
103 a Docker image tag:
104 ```yaml
105 build:
106 - prefect.deployments.steps.run_shell_script:
107 id: get-commit-hash
108 script: git rev-parse --short HEAD
109 stream_output: false
110 - prefect_docker.deployments.steps.build_docker_image:
111 requires: prefect-docker
112 image_name: my-image
113 image_tag: "{{ get-commit-hash.stdout }}"
114 dockerfile: auto
115 ```
117 Run a multi-line shell script:
118 ```yaml
119 build:
120 - prefect.deployments.steps.run_shell_script:
121 script: |
122 echo "Hello"
123 echo "World"
124 ```
126 Run a shell script with environment variables:
127 ```yaml
128 build:
129 - prefect.deployments.steps.run_shell_script:
130 script: echo "Hello $NAME"
131 env:
132 NAME: World
133 ```
135 Run a shell script with environment variables expanded
136 from the current environment:
137 ```yaml
138 pull:
139 - prefect.deployments.steps.run_shell_script:
140 script: |
141 echo "User: $USER"
142 echo "Home Directory: $HOME"
143 stream_output: true
144 expand_env_vars: true
145 ```
147 Run a shell script in a specific directory:
148 ```yaml
149 build:
150 - prefect.deployments.steps.run_shell_script:
151 script: echo "Hello"
152 directory: /path/to/directory
153 ```
155 Run a script stored in a file:
156 ```yaml
157 build:
158 - prefect.deployments.steps.run_shell_script:
159 script: "bash path/to/script.sh"
160 ```
161 """
162 current_env = os.environ.copy()
163 current_env.update(env or {})
165 commands = script.splitlines()
166 stdout_sink = io.StringIO()
167 stderr_sink = io.StringIO()
169 for command in commands:
170 if expand_env_vars:
171 # Expand environment variables in command and provided environment
172 command = string.Template(command).safe_substitute(current_env)
173 split_command = shlex.split(command, posix=sys.platform != "win32")
174 if not split_command:
175 continue
176 async with open_process(
177 split_command,
178 stdout=subprocess.PIPE,
179 stderr=subprocess.PIPE,
180 cwd=directory,
181 env=current_env,
182 ) as process:
183 await _stream_capture_process_output(
184 process,
185 stdout_sink=stdout_sink,
186 stderr_sink=stderr_sink,
187 stream_output=stream_output,
188 )
190 await process.wait()
192 if process.returncode != 0:
193 raise RuntimeError(
194 f"`run_shell_script` failed with error code {process.returncode}:"
195 f" {stderr_sink.getvalue()}"
196 )
198 return {
199 "stdout": stdout_sink.getvalue().strip(),
200 "stderr": stderr_sink.getvalue().strip(),
201 }
204async def pip_install_requirements( 1a
205 directory: Optional[str] = None,
206 requirements_file: str = "requirements.txt",
207 stream_output: bool = True,
208) -> dict[str, Any]:
209 """
210 Installs dependencies from a requirements.txt file.
212 Args:
213 requirements_file: The requirements.txt to use for installation.
214 directory: The directory the requirements.txt file is in. Defaults to
215 the current working directory.
216 stream_output: Whether to stream the output from pip install should be
217 streamed to the console
219 Returns:
220 A dictionary with the keys `stdout` and `stderr` containing the output
221 the `pip install` command
223 Raises:
224 subprocess.CalledProcessError: if the pip install command fails for any reason
226 Example:
227 ```yaml
228 pull:
229 - prefect.deployments.steps.git_clone:
230 id: clone-step
231 repository: https://github.com/org/repo.git
232 - prefect.deployments.steps.pip_install_requirements:
233 directory: {{ clone-step.directory }}
234 requirements_file: requirements.txt
235 stream_output: False
236 ```
237 """
238 stdout_sink = io.StringIO()
239 stderr_sink = io.StringIO()
241 async with open_process(
242 [get_sys_executable(), "-m", "pip", "install", "-r", requirements_file],
243 stdout=subprocess.PIPE,
244 stderr=subprocess.PIPE,
245 cwd=directory,
246 ) as process:
247 await _stream_capture_process_output(
248 process,
249 stdout_sink=stdout_sink,
250 stderr_sink=stderr_sink,
251 stream_output=stream_output,
252 )
253 await process.wait()
255 if process.returncode != 0:
256 raise RuntimeError(
257 f"pip_install_requirements failed with error code {process.returncode}:"
258 f" {stderr_sink.getvalue()}"
259 )
261 return {
262 "stdout": stdout_sink.getvalue().strip(),
263 "stderr": stderr_sink.getvalue().strip(),
264 }