Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/pull.py: 32%
57 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Core set of steps for specifying a Prefect project pull step.
3"""
5import os 1a
6from pathlib import Path 1a
7from typing import TYPE_CHECKING, Any, Optional 1a
9from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
10from prefect._internal.retries import retry_async_fn 1a
11from prefect.logging.loggers import get_logger 1a
12from prefect.runner.storage import BlockStorageAdapter, GitRepository, RemoteStorage 1a
13from prefect.utilities.asyncutils import run_coro_as_sync 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 import logging
18deployment_logger: "logging.Logger" = get_logger("deployment") 1a
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a
21 from prefect.blocks.core import Block
24def set_working_directory(directory: str) -> dict[str, str]: 1a
25 """
26 Sets the working directory; works with both absolute and relative paths.
28 Args:
29 directory (str): the directory to set as the working directory
31 Returns:
32 dict: a dictionary containing a `directory` key of the
33 absolute path of the directory that was set
34 """
35 os.chdir(directory)
36 return dict(directory=os.getcwd())
39@retry_async_fn( 1a
40 max_attempts=3,
41 base_delay=1,
42 max_delay=10,
43 retry_on_exceptions=(RuntimeError,),
44 operation_name="git_clone",
45)
46async def _pull_git_repository_with_retries(repo: GitRepository): 1a
47 await repo.pull_code()
50async def agit_clone( 1a
51 repository: str,
52 branch: Optional[str] = None,
53 commit_sha: Optional[str] = None,
54 include_submodules: bool = False,
55 access_token: Optional[str] = None,
56 credentials: Optional["Block"] = None,
57 directories: Optional[list[str]] = None,
58) -> dict[str, str]:
59 """
60 Asynchronously clones a git repository into the current working directory.
62 Args:
63 repository: the URL of the repository to clone
64 branch: the branch to clone; if not provided, the default branch will be used
65 commit_sha: the commit SHA to clone; if not provided, the default branch will be used
66 include_submodules (bool): whether to include git submodules when cloning the repository
67 access_token: an access token to use for cloning the repository; if not provided
68 the repository will be cloned using the default git credentials
69 credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the
70 credentials to use for cloning the repository.
72 Returns:
73 dict: a dictionary containing a `directory` key of the new directory that was created
75 Raises:
76 subprocess.CalledProcessError: if the git clone command fails for any reason
77 """
78 if access_token and credentials:
79 raise ValueError(
80 "Please provide either an access token or credentials but not both."
81 )
83 _credentials = {"access_token": access_token} if access_token else credentials
85 storage = GitRepository(
86 url=repository,
87 credentials=_credentials,
88 branch=branch,
89 commit_sha=commit_sha,
90 include_submodules=include_submodules,
91 directories=directories,
92 )
94 await _pull_git_repository_with_retries(storage)
96 return dict(directory=str(storage.destination.relative_to(Path.cwd())))
99@async_dispatch(agit_clone) 1a
100def git_clone( 1a
101 repository: str,
102 branch: Optional[str] = None,
103 commit_sha: Optional[str] = None,
104 include_submodules: bool = False,
105 access_token: Optional[str] = None,
106 credentials: Optional["Block"] = None,
107 directories: Optional[list[str]] = None,
108) -> dict[str, str]:
109 """
110 Clones a git repository into the current working directory.
112 Args:
113 repository: the URL of the repository to clone
114 branch: the branch to clone; if not provided, the default branch will be used
115 commit_sha: the commit SHA to clone; if not provided, the default branch will be used
116 include_submodules (bool): whether to include git submodules when cloning the repository
117 access_token: an access token to use for cloning the repository; if not provided
118 the repository will be cloned using the default git credentials
119 credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the
120 credentials to use for cloning the repository.
121 directories: Specify directories you want to be included (uses git sparse-checkout)
123 Returns:
124 dict: a dictionary containing a `directory` key of the new directory that was created
126 Raises:
127 subprocess.CalledProcessError: if the git clone command fails for any reason
129 Examples:
130 Clone a public repository:
131 ```yaml
132 pull:
133 - prefect.deployments.steps.git_clone:
134 repository: https://github.com/PrefectHQ/prefect.git
135 ```
137 Clone a branch of a public repository:
138 ```yaml
139 pull:
140 - prefect.deployments.steps.git_clone:
141 repository: https://github.com/PrefectHQ/prefect.git
142 branch: my-branch
143 ```
145 Clone a private repository using a GitHubCredentials block:
146 ```yaml
147 pull:
148 - prefect.deployments.steps.git_clone:
149 repository: https://github.com/org/repo.git
150 credentials: "{{ prefect.blocks.github-credentials.my-github-credentials-block }}"
151 ```
153 Clone a private repository using an access token:
154 ```yaml
155 pull:
156 - prefect.deployments.steps.git_clone:
157 repository: https://github.com/org/repo.git
158 access_token: "{{ prefect.blocks.secret.github-access-token }}" # Requires creation of a Secret block
159 ```
160 Note that you will need to [create a Secret block](https://docs.prefect.io/v3/concepts/blocks/#pre-registered-blocks) to store the
161 value of your git credentials. You can also store a username/password combo or token prefix (e.g. `x-token-auth`)
162 in your secret block. Refer to your git providers documentation for the correct authentication schema.
164 Clone a repository with submodules:
165 ```yaml
166 pull:
167 - prefect.deployments.steps.git_clone:
168 repository: https://github.com/org/repo.git
169 include_submodules: true
170 ```
172 Clone a repository with an SSH key (note that the SSH key must be added to the worker
173 before executing flows):
174 ```yaml
175 pull:
176 - prefect.deployments.steps.git_clone:
177 repository: git@github.com:org/repo.git
178 ```
180 Clone a repository using sparse-checkout (allows specific folders of the repository to be checked out)
181 ```yaml
182 pull:
183 - prefect.deployments.steps.git_clone:
184 repository: https://github.com/org/repo.git
185 directories: ["dir_1", "dir_2", "prefect"]
186 ```
187 """
188 if access_token and credentials:
189 raise ValueError(
190 "Please provide either an access token or credentials but not both."
191 )
193 _credentials = {"access_token": access_token} if access_token else credentials
195 storage = GitRepository(
196 url=repository,
197 credentials=_credentials,
198 branch=branch,
199 commit_sha=commit_sha,
200 include_submodules=include_submodules,
201 directories=directories,
202 )
204 run_coro_as_sync(_pull_git_repository_with_retries(storage))
206 return dict(directory=str(storage.destination.relative_to(Path.cwd())))
209async def pull_from_remote_storage(url: str, **settings: Any) -> dict[str, Any]: 1a
210 """
211 Pulls code from a remote storage location into the current working directory.
213 Works with protocols supported by `fsspec`.
215 Args:
216 url (str): the URL of the remote storage location. Should be a valid `fsspec` URL.
217 Some protocols may require an additional `fsspec` dependency to be installed.
218 Refer to the [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations)
219 for more details.
220 **settings (Any): any additional settings to pass the `fsspec` filesystem class.
222 Returns:
223 dict: a dictionary containing a `directory` key of the new directory that was created
225 Examples:
226 Pull code from a remote storage location:
227 ```yaml
228 pull:
229 - prefect.deployments.steps.pull_from_remote_storage:
230 url: s3://my-bucket/my-folder
231 ```
233 Pull code from a remote storage location with additional settings:
234 ```yaml
235 pull:
236 - prefect.deployments.steps.pull_from_remote_storage:
237 url: s3://my-bucket/my-folder
238 key: {{ prefect.blocks.secret.my-aws-access-key }}}
239 secret: {{ prefect.blocks.secret.my-aws-secret-key }}}
240 ```
241 """
242 storage = RemoteStorage(url, **settings)
244 await storage.pull_code()
246 directory = str(storage.destination.relative_to(Path.cwd()))
247 deployment_logger.info(f"Pulled code from {url!r} into {directory!r}")
248 return {"directory": directory}
251async def pull_with_block( 1a
252 block_document_name: str, block_type_slug: str
253) -> dict[str, Any]:
254 """
255 Pulls code using a block.
257 Args:
258 block_document_name: The name of the block document to use
259 block_type_slug: The slug of the type of block to use
260 """
261 from prefect.blocks.core import Block
263 full_slug = f"{block_type_slug}/{block_document_name}"
264 try:
265 block = await Block.aload(full_slug)
266 except Exception:
267 deployment_logger.exception("Unable to load block '%s'", full_slug)
268 raise
270 try:
271 storage = BlockStorageAdapter(block)
272 except Exception:
273 deployment_logger.exception(
274 "Unable to create storage adapter for block '%s'", full_slug
275 )
276 raise
278 await storage.pull_code()
280 directory = str(storage.destination.relative_to(Path.cwd()))
281 deployment_logger.info(
282 "Pulled code using block '%s' into '%s'", full_slug, directory
283 )
284 return {"directory": directory}