Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/steps/pull.py: 32%

57 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Core set of steps for specifying a Prefect project pull step. 

3""" 

4 

5import os 1a

6from pathlib import Path 1a

7from typing import TYPE_CHECKING, Any, Optional 1a

8 

9from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

10from prefect._internal.retries import retry_async_fn 1a

11from prefect.logging.loggers import get_logger 1a

12from prefect.runner.storage import BlockStorageAdapter, GitRepository, RemoteStorage 1a

13from prefect.utilities.asyncutils import run_coro_as_sync 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 import logging 

17 

18deployment_logger: "logging.Logger" = get_logger("deployment") 1a

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a

21 from prefect.blocks.core import Block 

22 

23 

24def set_working_directory(directory: str) -> dict[str, str]: 1a

25 """ 

26 Sets the working directory; works with both absolute and relative paths. 

27 

28 Args: 

29 directory (str): the directory to set as the working directory 

30 

31 Returns: 

32 dict: a dictionary containing a `directory` key of the 

33 absolute path of the directory that was set 

34 """ 

35 os.chdir(directory) 

36 return dict(directory=os.getcwd()) 

37 

38 

39@retry_async_fn( 1a

40 max_attempts=3, 

41 base_delay=1, 

42 max_delay=10, 

43 retry_on_exceptions=(RuntimeError,), 

44 operation_name="git_clone", 

45) 

46async def _pull_git_repository_with_retries(repo: GitRepository): 1a

47 await repo.pull_code() 

48 

49 

50async def agit_clone( 1a

51 repository: str, 

52 branch: Optional[str] = None, 

53 commit_sha: Optional[str] = None, 

54 include_submodules: bool = False, 

55 access_token: Optional[str] = None, 

56 credentials: Optional["Block"] = None, 

57 directories: Optional[list[str]] = None, 

58) -> dict[str, str]: 

59 """ 

60 Asynchronously clones a git repository into the current working directory. 

61 

62 Args: 

63 repository: the URL of the repository to clone 

64 branch: the branch to clone; if not provided, the default branch will be used 

65 commit_sha: the commit SHA to clone; if not provided, the default branch will be used 

66 include_submodules (bool): whether to include git submodules when cloning the repository 

67 access_token: an access token to use for cloning the repository; if not provided 

68 the repository will be cloned using the default git credentials 

69 credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the 

70 credentials to use for cloning the repository. 

71 

72 Returns: 

73 dict: a dictionary containing a `directory` key of the new directory that was created 

74 

75 Raises: 

76 subprocess.CalledProcessError: if the git clone command fails for any reason 

77 """ 

78 if access_token and credentials: 

79 raise ValueError( 

80 "Please provide either an access token or credentials but not both." 

81 ) 

82 

83 _credentials = {"access_token": access_token} if access_token else credentials 

84 

85 storage = GitRepository( 

86 url=repository, 

87 credentials=_credentials, 

88 branch=branch, 

89 commit_sha=commit_sha, 

90 include_submodules=include_submodules, 

91 directories=directories, 

92 ) 

93 

94 await _pull_git_repository_with_retries(storage) 

95 

96 return dict(directory=str(storage.destination.relative_to(Path.cwd()))) 

97 

98 

99@async_dispatch(agit_clone) 1a

100def git_clone( 1a

101 repository: str, 

102 branch: Optional[str] = None, 

103 commit_sha: Optional[str] = None, 

104 include_submodules: bool = False, 

105 access_token: Optional[str] = None, 

106 credentials: Optional["Block"] = None, 

107 directories: Optional[list[str]] = None, 

108) -> dict[str, str]: 

109 """ 

110 Clones a git repository into the current working directory. 

111 

112 Args: 

113 repository: the URL of the repository to clone 

114 branch: the branch to clone; if not provided, the default branch will be used 

115 commit_sha: the commit SHA to clone; if not provided, the default branch will be used 

116 include_submodules (bool): whether to include git submodules when cloning the repository 

117 access_token: an access token to use for cloning the repository; if not provided 

118 the repository will be cloned using the default git credentials 

119 credentials: a GitHubCredentials, GitLabCredentials, or BitBucketCredentials block can be used to specify the 

120 credentials to use for cloning the repository. 

121 directories: Specify directories you want to be included (uses git sparse-checkout) 

122 

123 Returns: 

124 dict: a dictionary containing a `directory` key of the new directory that was created 

125 

126 Raises: 

127 subprocess.CalledProcessError: if the git clone command fails for any reason 

128 

129 Examples: 

130 Clone a public repository: 

131 ```yaml 

132 pull: 

133 - prefect.deployments.steps.git_clone: 

134 repository: https://github.com/PrefectHQ/prefect.git 

135 ``` 

136 

137 Clone a branch of a public repository: 

138 ```yaml 

139 pull: 

140 - prefect.deployments.steps.git_clone: 

141 repository: https://github.com/PrefectHQ/prefect.git 

142 branch: my-branch 

143 ``` 

144 

145 Clone a private repository using a GitHubCredentials block: 

146 ```yaml 

147 pull: 

148 - prefect.deployments.steps.git_clone: 

149 repository: https://github.com/org/repo.git 

150 credentials: "{{ prefect.blocks.github-credentials.my-github-credentials-block }}" 

151 ``` 

152 

153 Clone a private repository using an access token: 

154 ```yaml 

155 pull: 

156 - prefect.deployments.steps.git_clone: 

157 repository: https://github.com/org/repo.git 

158 access_token: "{{ prefect.blocks.secret.github-access-token }}" # Requires creation of a Secret block 

159 ``` 

160 Note that you will need to [create a Secret block](https://docs.prefect.io/v3/concepts/blocks/#pre-registered-blocks) to store the 

161 value of your git credentials. You can also store a username/password combo or token prefix (e.g. `x-token-auth`) 

162 in your secret block. Refer to your git providers documentation for the correct authentication schema. 

163 

164 Clone a repository with submodules: 

165 ```yaml 

166 pull: 

167 - prefect.deployments.steps.git_clone: 

168 repository: https://github.com/org/repo.git 

169 include_submodules: true 

170 ``` 

171 

172 Clone a repository with an SSH key (note that the SSH key must be added to the worker 

173 before executing flows): 

174 ```yaml 

175 pull: 

176 - prefect.deployments.steps.git_clone: 

177 repository: git@github.com:org/repo.git 

178 ``` 

179 

180 Clone a repository using sparse-checkout (allows specific folders of the repository to be checked out) 

181 ```yaml 

182 pull: 

183 - prefect.deployments.steps.git_clone: 

184 repository: https://github.com/org/repo.git 

185 directories: ["dir_1", "dir_2", "prefect"] 

186 ``` 

187 """ 

188 if access_token and credentials: 

189 raise ValueError( 

190 "Please provide either an access token or credentials but not both." 

191 ) 

192 

193 _credentials = {"access_token": access_token} if access_token else credentials 

194 

195 storage = GitRepository( 

196 url=repository, 

197 credentials=_credentials, 

198 branch=branch, 

199 commit_sha=commit_sha, 

200 include_submodules=include_submodules, 

201 directories=directories, 

202 ) 

203 

204 run_coro_as_sync(_pull_git_repository_with_retries(storage)) 

205 

206 return dict(directory=str(storage.destination.relative_to(Path.cwd()))) 

207 

208 

209async def pull_from_remote_storage(url: str, **settings: Any) -> dict[str, Any]: 1a

210 """ 

211 Pulls code from a remote storage location into the current working directory. 

212 

213 Works with protocols supported by `fsspec`. 

214 

215 Args: 

216 url (str): the URL of the remote storage location. Should be a valid `fsspec` URL. 

217 Some protocols may require an additional `fsspec` dependency to be installed. 

218 Refer to the [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations) 

219 for more details. 

220 **settings (Any): any additional settings to pass the `fsspec` filesystem class. 

221 

222 Returns: 

223 dict: a dictionary containing a `directory` key of the new directory that was created 

224 

225 Examples: 

226 Pull code from a remote storage location: 

227 ```yaml 

228 pull: 

229 - prefect.deployments.steps.pull_from_remote_storage: 

230 url: s3://my-bucket/my-folder 

231 ``` 

232 

233 Pull code from a remote storage location with additional settings: 

234 ```yaml 

235 pull: 

236 - prefect.deployments.steps.pull_from_remote_storage: 

237 url: s3://my-bucket/my-folder 

238 key: {{ prefect.blocks.secret.my-aws-access-key }}} 

239 secret: {{ prefect.blocks.secret.my-aws-secret-key }}} 

240 ``` 

241 """ 

242 storage = RemoteStorage(url, **settings) 

243 

244 await storage.pull_code() 

245 

246 directory = str(storage.destination.relative_to(Path.cwd())) 

247 deployment_logger.info(f"Pulled code from {url!r} into {directory!r}") 

248 return {"directory": directory} 

249 

250 

251async def pull_with_block( 1a

252 block_document_name: str, block_type_slug: str 

253) -> dict[str, Any]: 

254 """ 

255 Pulls code using a block. 

256 

257 Args: 

258 block_document_name: The name of the block document to use 

259 block_type_slug: The slug of the type of block to use 

260 """ 

261 from prefect.blocks.core import Block 

262 

263 full_slug = f"{block_type_slug}/{block_document_name}" 

264 try: 

265 block = await Block.aload(full_slug) 

266 except Exception: 

267 deployment_logger.exception("Unable to load block '%s'", full_slug) 

268 raise 

269 

270 try: 

271 storage = BlockStorageAdapter(block) 

272 except Exception: 

273 deployment_logger.exception( 

274 "Unable to create storage adapter for block '%s'", full_slug 

275 ) 

276 raise 

277 

278 await storage.pull_code() 

279 

280 directory = str(storage.destination.relative_to(Path.cwd())) 

281 deployment_logger.info( 

282 "Pulled code using block '%s' into '%s'", full_slug, directory 

283 ) 

284 return {"directory": directory}