Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_actions.py: 14%
86 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import os 1a
4from getpass import GetPassWarning 1a
5from pathlib import Path 1a
6from typing import TYPE_CHECKING, Any, Dict, List, Optional 1a
8import prefect.cli.root as root 1a
9from prefect.blocks.system import Secret 1a
10from prefect.cli._prompts import ( 1a
11 confirm,
12 prompt,
13 prompt_select_blob_storage_credentials,
14 prompt_select_remote_flow_storage,
15)
16from prefect.utilities._git import get_git_branch, get_git_remote_origin_url 1a
17from prefect.utilities.slugify import slugify 1a
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a
20 from rich.console import Console
23async def _generate_git_clone_pull_step( 1a
24 console: "Console",
25 deploy_config: dict[str, Any],
26 remote_url: str,
27) -> list[dict[str, Any]]:
28 # confirm/prompt imported from concrete module
30 branch = get_git_branch() or "main"
32 if not remote_url:
33 remote_url = prompt(
34 "Please enter the URL to pull your flow code from", console=console
35 )
37 elif not confirm(
38 f"Is [green]{remote_url}[/] the correct URL to pull your flow code from?",
39 default=True,
40 console=console,
41 ):
42 remote_url = prompt(
43 "Please enter the URL to pull your flow code from", console=console
44 )
45 if not confirm(
46 f"Is [green]{branch}[/] the correct branch to pull your flow code from?",
47 default=True,
48 console=console,
49 ):
50 branch = prompt(
51 "Please enter the branch to pull your flow code from",
52 default="main",
53 console=console,
54 )
55 token_secret_block_name = None
56 if confirm("Is this a private repository?", console=console):
57 token_secret_block_name = f"deployment-{slugify(deploy_config['name'])}-{slugify(deploy_config['flow_name'])}-repo-token"
58 create_new_block = False
59 prompt_message = (
60 "Please enter a token that can be used to access your private"
61 " repository. This token will be saved as a secret via the Prefect API"
62 )
64 try:
65 await Secret.aload(token_secret_block_name)
66 if not confirm(
67 (
68 "We found an existing token saved for this deployment. Would"
69 " you like use the existing token?"
70 ),
71 default=True,
72 console=console,
73 ):
74 prompt_message = (
75 "Please enter a token that can be used to access your private"
76 " repository (this will overwrite the existing token saved via"
77 " the Prefect API)."
78 )
80 create_new_block = True
81 except ValueError:
82 create_new_block = True
84 if create_new_block:
85 try:
86 repo_token = prompt(
87 prompt_message,
88 console=console,
89 password=True,
90 )
91 except GetPassWarning:
92 # Handling for when password masking is not supported
93 repo_token = prompt(
94 prompt_message,
95 console=console,
96 )
97 await Secret(
98 value=repo_token,
99 ).save(name=token_secret_block_name, overwrite=True)
101 git_clone_step = {
102 "prefect.deployments.steps.git_clone": {
103 "repository": remote_url,
104 "branch": branch,
105 }
106 }
108 if token_secret_block_name:
109 git_clone_step["prefect.deployments.steps.git_clone"]["access_token"] = (
110 "{{ prefect.blocks.secret." + token_secret_block_name + " }}"
111 )
113 return [git_clone_step]
116async def _generate_pull_step_for_build_docker_image( 1a
117 console: "Console", deploy_config: Dict[str, Any], auto: bool = True
118) -> list[dict[str, Any]]:
119 pull_step: dict[str, Any] = {}
120 # prompt imported from concrete module
122 dir_name = os.path.basename(os.getcwd())
123 if auto:
124 pull_step["directory"] = f"/opt/prefect/{dir_name}"
125 else:
126 pull_step["directory"] = prompt(
127 "What is the path to your flow code in your Dockerfile?",
128 default=f"/opt/prefect/{dir_name}",
129 console=console,
130 )
132 return [{"prefect.deployments.steps.set_working_directory": pull_step}]
135async def _check_for_build_docker_image_step( 1a
136 build_action: List[Dict],
137) -> Optional[Dict[str, Any]]:
138 if not build_action:
139 return None
141 build_docker_image_steps = [
142 "prefect_docker.deployments.steps.build_docker_image",
143 ]
144 for build_docker_image_step in build_docker_image_steps:
145 for action in build_action:
146 if action.get(build_docker_image_step):
147 return action.get(build_docker_image_step)
149 return None
152async def _generate_actions_for_remote_flow_storage( 1a
153 console: "Console", deploy_config: dict[str, Any], actions: list[dict[str, Any]]
154) -> dict[str, list[dict[str, Any]]]:
155 # prompt + selection helpers imported from concrete module
157 storage_provider_to_collection: dict[str, str] = {
158 "s3": "prefect_aws",
159 "gcs": "prefect_gcp",
160 "azure_blob_storage": "prefect_azure",
161 }
162 selected_storage_provider = await prompt_select_remote_flow_storage(console=console)
164 if selected_storage_provider == "git":
165 actions["pull"] = await _generate_git_clone_pull_step(
166 console=console,
167 deploy_config=deploy_config,
168 remote_url=get_git_remote_origin_url(),
169 )
171 elif selected_storage_provider in storage_provider_to_collection.keys():
172 collection = storage_provider_to_collection[selected_storage_provider]
174 bucket, folder = prompt("Bucket name"), prompt("Folder name")
176 # Prompt user to select or create credentials for the chosen provider
177 credentials_block = await prompt_select_blob_storage_credentials(
178 console=console, storage_provider=selected_storage_provider
179 )
180 # This prompt returns a Jinja template string referencing a credentials block
181 credentials = credentials_block if credentials_block else None
183 step_fields = {
184 (
185 "container"
186 if selected_storage_provider == "azure_blob_storage"
187 else "bucket"
188 ): bucket,
189 "folder": folder,
190 "credentials": credentials,
191 }
193 actions["push"] = [
194 {
195 f"{collection}.deployments.steps.push_to_{selected_storage_provider}": (
196 step_fields
197 )
198 }
199 ]
201 actions["pull"] = [
202 {
203 f"{collection}.deployments.steps.pull_from_{selected_storage_provider}": (
204 step_fields
205 )
206 }
207 ]
209 return actions
212async def _generate_default_pull_action( 1a
213 console: "Console",
214 deploy_config: dict[str, Any],
215 actions: list[dict[str, Any]],
216) -> list[dict[str, Any]]:
217 from prefect.cli._utilities import exit_with_error
219 build_docker_image_step = await _check_for_build_docker_image_step(
220 deploy_config.get("build") or actions["build"]
221 )
222 if build_docker_image_step:
223 dockerfile = build_docker_image_step.get("dockerfile")
224 if dockerfile == "auto":
225 return await _generate_pull_step_for_build_docker_image(
226 console, deploy_config
227 )
228 if root.is_interactive():
229 if not confirm(
230 "Does your Dockerfile have a line that copies the current working"
231 " directory into your image?"
232 ):
233 exit_with_error(
234 "Your flow code must be copied into your Docker image to run"
235 " your deployment.\nTo do so, you can copy this line into your"
236 " Dockerfile: [yellow]COPY . /opt/prefect/[/yellow]"
237 )
238 return await _generate_pull_step_for_build_docker_image(
239 console, deploy_config, auto=False
240 )
241 else:
242 entrypoint_path, _ = deploy_config["entrypoint"].split(":")
243 console.print(
244 "Your Prefect workers will attempt to load your flow from:"
245 f" [green]{(Path.cwd() / Path(entrypoint_path)).absolute().resolve()}[/]. To"
246 " see more options for managing your flow's code, run:\n\n\t[blue]$"
247 " prefect init[/]\n"
248 )
249 return [
250 {
251 "prefect.deployments.steps.set_working_directory": {
252 "directory": str(Path.cwd().absolute().resolve())
253 }
254 }
255 ]