Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/utility.py: 30%

47 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Utility project steps that are useful for managing a project's deployment lifecycle. 

3 

4Steps within this module can be used within a `build`, `push`, or `pull` deployment action. 

5 

6Example: 

7 Use the `run_shell_script` setp to retrieve the short Git commit hash of the current 

8 repository and use it as a Docker image tag: 

9 ```yaml 

10 build: 

11 - prefect.deployments.steps.run_shell_script: 

12 id: get-commit-hash 

13 script: git rev-parse --short HEAD 

14 stream_output: false 

15 - prefect_docker.deployments.steps.build_docker_image: 

16 requires: prefect-docker 

17 image_name: my-image 

18 image_tag: "{{ get-commit-hash.stdout }}" 

19 dockerfile: auto 

20 ``` 

21""" 

22 

23import io 1a

24import os 1a

25import shlex 1a

26import string 1a

27import subprocess 1a

28import sys 1a

29from typing import Any, Dict, Optional 1a

30 

31from anyio import create_task_group 1a

32from anyio.streams.text import TextReceiveStream 1a

33from typing_extensions import TypedDict 1a

34 

35from prefect.utilities.processutils import ( 1a

36 get_sys_executable, 

37 open_process, 

38 stream_text, 

39) 

40 

41 

42async def _stream_capture_process_output( 1a

43 process, 

44 stdout_sink: io.StringIO, 

45 stderr_sink: io.StringIO, 

46 stream_output: bool = True, 

47): 

48 stdout_sinks = [stdout_sink, sys.stdout] if stream_output else [stdout_sink] 

49 stderr_sinks = [stderr_sink, sys.stderr] if stream_output else [stderr_sink] 

50 async with create_task_group() as tg: 

51 tg.start_soon( 

52 stream_text, 

53 TextReceiveStream(process.stdout), 

54 *stdout_sinks, 

55 ) 

56 tg.start_soon( 

57 stream_text, 

58 TextReceiveStream(process.stderr), 

59 *stderr_sinks, 

60 ) 

61 

62 

63class RunShellScriptResult(TypedDict): 1a

64 """ 

65 The result of a `run_shell_script` step. 

66 

67 Attributes: 

68 stdout: The captured standard output of the script. 

69 stderr: The captured standard error of the script. 

70 """ 

71 

72 stdout: str 1a

73 stderr: str 1a

74 

75 

76async def run_shell_script( 1a

77 script: str, 

78 directory: Optional[str] = None, 

79 env: Optional[Dict[str, str]] = None, 

80 stream_output: bool = True, 

81 expand_env_vars: bool = False, 

82) -> RunShellScriptResult: 

83 """ 

84 Runs one or more shell commands in a subprocess. Returns the standard 

85 output and standard error of the script. 

86 

87 Args: 

88 script: The script to run 

89 directory: The directory to run the script in. Defaults to the current 

90 working directory. 

91 env: A dictionary of environment variables to set for the script 

92 stream_output: Whether to stream the output of the script to 

93 stdout/stderr 

94 expand_env_vars: Whether to expand environment variables in the script 

95 before running it 

96 

97 Returns: 

98 A dictionary with the keys `stdout` and `stderr` containing the output 

99 of the script 

100 

101 Examples: 

102 Retrieve the short Git commit hash of the current repository to use as 

103 a Docker image tag: 

104 ```yaml 

105 build: 

106 - prefect.deployments.steps.run_shell_script: 

107 id: get-commit-hash 

108 script: git rev-parse --short HEAD 

109 stream_output: false 

110 - prefect_docker.deployments.steps.build_docker_image: 

111 requires: prefect-docker 

112 image_name: my-image 

113 image_tag: "{{ get-commit-hash.stdout }}" 

114 dockerfile: auto 

115 ``` 

116 

117 Run a multi-line shell script: 

118 ```yaml 

119 build: 

120 - prefect.deployments.steps.run_shell_script: 

121 script: | 

122 echo "Hello" 

123 echo "World" 

124 ``` 

125 

126 Run a shell script with environment variables: 

127 ```yaml 

128 build: 

129 - prefect.deployments.steps.run_shell_script: 

130 script: echo "Hello $NAME" 

131 env: 

132 NAME: World 

133 ``` 

134 

135 Run a shell script with environment variables expanded 

136 from the current environment: 

137 ```yaml 

138 pull: 

139 - prefect.deployments.steps.run_shell_script: 

140 script: | 

141 echo "User: $USER" 

142 echo "Home Directory: $HOME" 

143 stream_output: true 

144 expand_env_vars: true 

145 ``` 

146 

147 Run a shell script in a specific directory: 

148 ```yaml 

149 build: 

150 - prefect.deployments.steps.run_shell_script: 

151 script: echo "Hello" 

152 directory: /path/to/directory 

153 ``` 

154 

155 Run a script stored in a file: 

156 ```yaml 

157 build: 

158 - prefect.deployments.steps.run_shell_script: 

159 script: "bash path/to/script.sh" 

160 ``` 

161 """ 

162 current_env = os.environ.copy() 

163 current_env.update(env or {}) 

164 

165 commands = script.splitlines() 

166 stdout_sink = io.StringIO() 

167 stderr_sink = io.StringIO() 

168 

169 for command in commands: 

170 if expand_env_vars: 

171 # Expand environment variables in command and provided environment 

172 command = string.Template(command).safe_substitute(current_env) 

173 split_command = shlex.split(command, posix=sys.platform != "win32") 

174 if not split_command: 

175 continue 

176 async with open_process( 

177 split_command, 

178 stdout=subprocess.PIPE, 

179 stderr=subprocess.PIPE, 

180 cwd=directory, 

181 env=current_env, 

182 ) as process: 

183 await _stream_capture_process_output( 

184 process, 

185 stdout_sink=stdout_sink, 

186 stderr_sink=stderr_sink, 

187 stream_output=stream_output, 

188 ) 

189 

190 await process.wait() 

191 

192 if process.returncode != 0: 

193 raise RuntimeError( 

194 f"`run_shell_script` failed with error code {process.returncode}:" 

195 f" {stderr_sink.getvalue()}" 

196 ) 

197 

198 return { 

199 "stdout": stdout_sink.getvalue().strip(), 

200 "stderr": stderr_sink.getvalue().strip(), 

201 } 

202 

203 

204async def pip_install_requirements( 1a

205 directory: Optional[str] = None, 

206 requirements_file: str = "requirements.txt", 

207 stream_output: bool = True, 

208) -> dict[str, Any]: 

209 """ 

210 Installs dependencies from a requirements.txt file. 

211 

212 Args: 

213 requirements_file: The requirements.txt to use for installation. 

214 directory: The directory the requirements.txt file is in. Defaults to 

215 the current working directory. 

216 stream_output: Whether to stream the output from pip install should be 

217 streamed to the console 

218 

219 Returns: 

220 A dictionary with the keys `stdout` and `stderr` containing the output 

221 the `pip install` command 

222 

223 Raises: 

224 subprocess.CalledProcessError: if the pip install command fails for any reason 

225 

226 Example: 

227 ```yaml 

228 pull: 

229 - prefect.deployments.steps.git_clone: 

230 id: clone-step 

231 repository: https://github.com/org/repo.git 

232 - prefect.deployments.steps.pip_install_requirements: 

233 directory: {{ clone-step.directory }} 

234 requirements_file: requirements.txt 

235 stream_output: False 

236 ``` 

237 """ 

238 stdout_sink = io.StringIO() 

239 stderr_sink = io.StringIO() 

240 

241 async with open_process( 

242 [get_sys_executable(), "-m", "pip", "install", "-r", requirements_file], 

243 stdout=subprocess.PIPE, 

244 stderr=subprocess.PIPE, 

245 cwd=directory, 

246 ) as process: 

247 await _stream_capture_process_output( 

248 process, 

249 stdout_sink=stdout_sink, 

250 stderr_sink=stderr_sink, 

251 stream_output=stream_output, 

252 ) 

253 await process.wait() 

254 

255 if process.returncode != 0: 

256 raise RuntimeError( 

257 f"pip_install_requirements failed with error code {process.returncode}:" 

258 f" {stderr_sink.getvalue()}" 

259 ) 

260 

261 return { 

262 "stdout": stdout_sink.getvalue().strip(), 

263 "stderr": stderr_sink.getvalue().strip(), 

264 }