Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_actions.py: 14%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import os 1a

4from getpass import GetPassWarning 1a

5from pathlib import Path 1a

6from typing import TYPE_CHECKING, Any, Dict, List, Optional 1a

7 

8import prefect.cli.root as root 1a

9from prefect.blocks.system import Secret 1a

10from prefect.cli._prompts import ( 1a

11 confirm, 

12 prompt, 

13 prompt_select_blob_storage_credentials, 

14 prompt_select_remote_flow_storage, 

15) 

16from prefect.utilities._git import get_git_branch, get_git_remote_origin_url 1a

17from prefect.utilities.slugify import slugify 1a

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a

20 from rich.console import Console 

21 

22 

23async def _generate_git_clone_pull_step( 1a

24 console: "Console", 

25 deploy_config: dict[str, Any], 

26 remote_url: str, 

27) -> list[dict[str, Any]]: 

28 # confirm/prompt imported from concrete module 

29 

30 branch = get_git_branch() or "main" 

31 

32 if not remote_url: 

33 remote_url = prompt( 

34 "Please enter the URL to pull your flow code from", console=console 

35 ) 

36 

37 elif not confirm( 

38 f"Is [green]{remote_url}[/] the correct URL to pull your flow code from?", 

39 default=True, 

40 console=console, 

41 ): 

42 remote_url = prompt( 

43 "Please enter the URL to pull your flow code from", console=console 

44 ) 

45 if not confirm( 

46 f"Is [green]{branch}[/] the correct branch to pull your flow code from?", 

47 default=True, 

48 console=console, 

49 ): 

50 branch = prompt( 

51 "Please enter the branch to pull your flow code from", 

52 default="main", 

53 console=console, 

54 ) 

55 token_secret_block_name = None 

56 if confirm("Is this a private repository?", console=console): 

57 token_secret_block_name = f"deployment-{slugify(deploy_config['name'])}-{slugify(deploy_config['flow_name'])}-repo-token" 

58 create_new_block = False 

59 prompt_message = ( 

60 "Please enter a token that can be used to access your private" 

61 " repository. This token will be saved as a secret via the Prefect API" 

62 ) 

63 

64 try: 

65 await Secret.aload(token_secret_block_name) 

66 if not confirm( 

67 ( 

68 "We found an existing token saved for this deployment. Would" 

69 " you like use the existing token?" 

70 ), 

71 default=True, 

72 console=console, 

73 ): 

74 prompt_message = ( 

75 "Please enter a token that can be used to access your private" 

76 " repository (this will overwrite the existing token saved via" 

77 " the Prefect API)." 

78 ) 

79 

80 create_new_block = True 

81 except ValueError: 

82 create_new_block = True 

83 

84 if create_new_block: 

85 try: 

86 repo_token = prompt( 

87 prompt_message, 

88 console=console, 

89 password=True, 

90 ) 

91 except GetPassWarning: 

92 # Handling for when password masking is not supported 

93 repo_token = prompt( 

94 prompt_message, 

95 console=console, 

96 ) 

97 await Secret( 

98 value=repo_token, 

99 ).save(name=token_secret_block_name, overwrite=True) 

100 

101 git_clone_step = { 

102 "prefect.deployments.steps.git_clone": { 

103 "repository": remote_url, 

104 "branch": branch, 

105 } 

106 } 

107 

108 if token_secret_block_name: 

109 git_clone_step["prefect.deployments.steps.git_clone"]["access_token"] = ( 

110 "{{ prefect.blocks.secret." + token_secret_block_name + " }}" 

111 ) 

112 

113 return [git_clone_step] 

114 

115 

116async def _generate_pull_step_for_build_docker_image( 1a

117 console: "Console", deploy_config: Dict[str, Any], auto: bool = True 

118) -> list[dict[str, Any]]: 

119 pull_step: dict[str, Any] = {} 

120 # prompt imported from concrete module 

121 

122 dir_name = os.path.basename(os.getcwd()) 

123 if auto: 

124 pull_step["directory"] = f"/opt/prefect/{dir_name}" 

125 else: 

126 pull_step["directory"] = prompt( 

127 "What is the path to your flow code in your Dockerfile?", 

128 default=f"/opt/prefect/{dir_name}", 

129 console=console, 

130 ) 

131 

132 return [{"prefect.deployments.steps.set_working_directory": pull_step}] 

133 

134 

135async def _check_for_build_docker_image_step( 1a

136 build_action: List[Dict], 

137) -> Optional[Dict[str, Any]]: 

138 if not build_action: 

139 return None 

140 

141 build_docker_image_steps = [ 

142 "prefect_docker.deployments.steps.build_docker_image", 

143 ] 

144 for build_docker_image_step in build_docker_image_steps: 

145 for action in build_action: 

146 if action.get(build_docker_image_step): 

147 return action.get(build_docker_image_step) 

148 

149 return None 

150 

151 

152async def _generate_actions_for_remote_flow_storage( 1a

153 console: "Console", deploy_config: dict[str, Any], actions: list[dict[str, Any]] 

154) -> dict[str, list[dict[str, Any]]]: 

155 # prompt + selection helpers imported from concrete module 

156 

157 storage_provider_to_collection: dict[str, str] = { 

158 "s3": "prefect_aws", 

159 "gcs": "prefect_gcp", 

160 "azure_blob_storage": "prefect_azure", 

161 } 

162 selected_storage_provider = await prompt_select_remote_flow_storage(console=console) 

163 

164 if selected_storage_provider == "git": 

165 actions["pull"] = await _generate_git_clone_pull_step( 

166 console=console, 

167 deploy_config=deploy_config, 

168 remote_url=get_git_remote_origin_url(), 

169 ) 

170 

171 elif selected_storage_provider in storage_provider_to_collection.keys(): 

172 collection = storage_provider_to_collection[selected_storage_provider] 

173 

174 bucket, folder = prompt("Bucket name"), prompt("Folder name") 

175 

176 # Prompt user to select or create credentials for the chosen provider 

177 credentials_block = await prompt_select_blob_storage_credentials( 

178 console=console, storage_provider=selected_storage_provider 

179 ) 

180 # This prompt returns a Jinja template string referencing a credentials block 

181 credentials = credentials_block if credentials_block else None 

182 

183 step_fields = { 

184 ( 

185 "container" 

186 if selected_storage_provider == "azure_blob_storage" 

187 else "bucket" 

188 ): bucket, 

189 "folder": folder, 

190 "credentials": credentials, 

191 } 

192 

193 actions["push"] = [ 

194 { 

195 f"{collection}.deployments.steps.push_to_{selected_storage_provider}": ( 

196 step_fields 

197 ) 

198 } 

199 ] 

200 

201 actions["pull"] = [ 

202 { 

203 f"{collection}.deployments.steps.pull_from_{selected_storage_provider}": ( 

204 step_fields 

205 ) 

206 } 

207 ] 

208 

209 return actions 

210 

211 

212async def _generate_default_pull_action( 1a

213 console: "Console", 

214 deploy_config: dict[str, Any], 

215 actions: list[dict[str, Any]], 

216) -> list[dict[str, Any]]: 

217 from prefect.cli._utilities import exit_with_error 

218 

219 build_docker_image_step = await _check_for_build_docker_image_step( 

220 deploy_config.get("build") or actions["build"] 

221 ) 

222 if build_docker_image_step: 

223 dockerfile = build_docker_image_step.get("dockerfile") 

224 if dockerfile == "auto": 

225 return await _generate_pull_step_for_build_docker_image( 

226 console, deploy_config 

227 ) 

228 if root.is_interactive(): 

229 if not confirm( 

230 "Does your Dockerfile have a line that copies the current working" 

231 " directory into your image?" 

232 ): 

233 exit_with_error( 

234 "Your flow code must be copied into your Docker image to run" 

235 " your deployment.\nTo do so, you can copy this line into your" 

236 " Dockerfile: [yellow]COPY . /opt/prefect/[/yellow]" 

237 ) 

238 return await _generate_pull_step_for_build_docker_image( 

239 console, deploy_config, auto=False 

240 ) 

241 else: 

242 entrypoint_path, _ = deploy_config["entrypoint"].split(":") 

243 console.print( 

244 "Your Prefect workers will attempt to load your flow from:" 

245 f" [green]{(Path.cwd() / Path(entrypoint_path)).absolute().resolve()}[/]. To" 

246 " see more options for managing your flow's code, run:\n\n\t[blue]$" 

247 " prefect init[/]\n" 

248 ) 

249 return [ 

250 { 

251 "prefect.deployments.steps.set_working_directory": { 

252 "directory": str(Path.cwd().absolute().resolve()) 

253 } 

254 } 

255 ]