Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/storage.py: 18%

379 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import shutil 1a

4import subprocess 1a

5from copy import deepcopy 1a

6from pathlib import Path 1a

7from typing import ( 1a

8 Any, 

9 Optional, 

10 Protocol, 

11 TypedDict, 

12 Union, 

13 runtime_checkable, 

14) 

15from urllib.parse import urlparse, urlsplit, urlunparse 1a

16from uuid import uuid4 1a

17 

18import fsspec # pyright: ignore[reportMissingTypeStubs] 1a

19from anyio import run_process 1a

20from pydantic import SecretStr 1a

21 

22from prefect._internal.concurrency.api import create_call, from_async 1a

23from prefect.blocks.core import Block, BlockNotSavedError 1a

24from prefect.blocks.system import Secret 1a

25from prefect.filesystems import ReadableDeploymentStorage, WritableDeploymentStorage 1a

26from prefect.logging.loggers import get_logger 1a

27from prefect.utilities.collections import visit_collection 1a

28 

29 

30@runtime_checkable 1a

31class RunnerStorage(Protocol): 1a

32 """ 

33 A storage interface for a runner to use to retrieve 

34 remotely stored flow code. 

35 """ 

36 

37 def set_base_path(self, path: Path) -> None: 1a

38 """ 

39 Sets the base path to use when pulling contents from remote storage to 

40 local storage. 

41 """ 

42 ... 

43 

44 @property 1a

45 def pull_interval(self) -> Optional[int]: 1a

46 """ 

47 The interval at which contents from remote storage should be pulled to 

48 local storage. If None, remote storage will perform a one-time sync. 

49 """ 

50 ... 

51 

52 @property 1a

53 def destination(self) -> Path: 1a

54 """ 

55 The local file path to pull contents from remote storage to. 

56 """ 

57 ... 

58 

59 async def pull_code(self) -> None: 1a

60 """ 

61 Pulls contents from remote storage to the local filesystem. 

62 """ 

63 ... 

64 

65 def to_pull_step(self) -> dict[str, Any] | list[dict[str, Any]]: 1a

66 """ 

67 Returns a dictionary representation of the storage object that can be 

68 used as a deployment pull step. 

69 """ 

70 ... 

71 

72 def __eq__(self, __value: Any) -> bool: 1a

73 """ 

74 Equality check for runner storage objects. 

75 """ 

76 ... 

77 

78 

79class GitCredentials(TypedDict, total=False): 1a

80 username: str 1a

81 access_token: str | Secret[str] 1a

82 

83 

84@runtime_checkable 1a

85class _GitCredentialsFormatter(Protocol): 1a

86 """ 

87 Protocol for credential blocks that can format themselves for git URLs. 

88 

89 Implementing this protocol allows credential blocks to own their auth 

90 formatting logic instead of having it centralized in core Prefect. 

91 

92 This enables proper separation of concerns where each git provider 

93 (GitLab, GitHub, BitBucket) can handle their own authentication format 

94 requirements without core needing provider-specific knowledge. 

95 """ 

96 

97 def format_git_credentials(self, url: str) -> str: 1a

98 """ 

99 Format and return the full git URL with credentials embedded. 

100 

101 Args: 

102 url: The repository URL (e.g., "https://gitlab.com/org/repo.git"). 

103 

104 Returns: 

105 Complete URL with credentials embedded in the appropriate format for the provider. 

106 

107 Examples: 

108 - GitLab PAT: "https://oauth2:my-token@gitlab.com/org/repo.git" 

109 - GitLab Deploy Token: "https://username:token@gitlab.com/org/repo.git" 

110 - GitHub: "https://my-token@github.com/org/repo.git" 

111 - BitBucket Cloud: "https://x-token-auth:my-token@bitbucket.org/org/repo.git" 

112 - BitBucket Server: "https://username:token@bitbucketserver.com/scm/project/repo.git" 

113 """ 

114 ... 

115 

116 

117class GitRepository: 1a

118 """ 

119 Pulls the contents of a git repository to the local filesystem. 

120 

121 Parameters: 

122 url: The URL of the git repository to pull from 

123 credentials: A dictionary of credentials to use when pulling from the 

124 repository. If a username is provided, an access token must also be 

125 provided. 

126 name: The name of the repository. If not provided, the name will be 

127 inferred from the repository URL. 

128 branch: The branch to pull from. Defaults to "main". 

129 pull_interval: The interval in seconds at which to pull contents from 

130 remote storage to local storage. If None, remote storage will perform 

131 a one-time sync. 

132 directories: The directories to pull from the Git repository (uses git sparse-checkout) 

133 

134 Examples: 

135 Pull the contents of a private git repository to the local filesystem: 

136 

137 ```python 

138 from prefect.runner.storage import GitRepository 

139 

140 storage = GitRepository( 

141 url="https://github.com/org/repo.git", 

142 credentials={"username": "oauth2", "access_token": "my-access-token"}, 

143 ) 

144 

145 await storage.pull_code() 

146 ``` 

147 """ 

148 

149 def __init__( 1a

150 self, 

151 url: str, 

152 credentials: Union[GitCredentials, Block, dict[str, Any], None] = None, 

153 name: str | None = None, 

154 branch: str | None = None, 

155 commit_sha: str | None = None, 

156 include_submodules: bool = False, 

157 pull_interval: int | None = 60, 

158 directories: list[str] | None = None, 

159 ): 

160 if credentials is None: 

161 credentials = {} 

162 

163 if ( 

164 isinstance(credentials, dict) 

165 and credentials.get("username") 

166 and not (credentials.get("access_token") or credentials.get("password")) 

167 ): 

168 raise ValueError( 

169 "If a username is provided, an access token or password must also be" 

170 " provided." 

171 ) 

172 

173 if branch and commit_sha: 

174 raise ValueError( 

175 "Cannot provide both a branch and a commit SHA. Please provide only one." 

176 ) 

177 

178 self._url = url 

179 self._branch = branch 

180 self._commit_sha = commit_sha 

181 self._credentials = credentials 

182 self._include_submodules = include_submodules 

183 repo_name = urlparse(url).path.split("/")[-1].replace(".git", "") 

184 safe_branch = branch.replace("/", "-") if branch else None 

185 default_name = f"{repo_name}-{safe_branch}" if safe_branch else repo_name 

186 self._name = name or default_name 

187 self._logger = get_logger(f"runner.storage.git-repository.{self._name}") 

188 self._storage_base_path = Path.cwd() 

189 self._pull_interval = pull_interval 

190 self._directories = directories 

191 

192 @property 1a

193 def destination(self) -> Path: 1a

194 return self._storage_base_path / self._name 

195 

196 def set_base_path(self, path: Path) -> None: 1a

197 self._storage_base_path = path 

198 

199 @property 1a

200 def pull_interval(self) -> Optional[int]: 1a

201 return self._pull_interval 

202 

203 @property 1a

204 def _repository_url_with_credentials(self) -> str: 1a

205 """Get the repository URL with credentials embedded.""" 

206 if not self._credentials: 

207 return self._url 

208 

209 # If block implements the protocol, let it format the complete URL 

210 if isinstance(self._credentials, _GitCredentialsFormatter): 

211 return self._credentials.format_git_credentials(self._url) 

212 

213 # Otherwise, use legacy formatting for plain dict credentials 

214 credentials = ( 

215 self._credentials.model_dump() 

216 if isinstance(self._credentials, Block) 

217 else deepcopy(self._credentials) 

218 ) 

219 

220 for k, v in credentials.items(): 

221 if isinstance(v, Secret): 

222 credentials[k] = v.get() 

223 elif isinstance(v, SecretStr): 

224 credentials[k] = v.get_secret_value() 

225 

226 # Get credential string for plain dict credentials 

227 block = self._credentials if isinstance(self._credentials, Block) else None 

228 credential_string = _format_token_from_credentials( 

229 urlparse(self._url).netloc, credentials, block 

230 ) 

231 

232 # Insert credentials into URL 

233 components = urlparse(self._url) 

234 if components.scheme != "https": 

235 return self._url 

236 

237 return urlunparse( 

238 components._replace(netloc=f"{credential_string}@{components.netloc}") 

239 ) 

240 

241 @property 1a

242 def _git_config(self) -> list[str]: 1a

243 """Build a git configuration to use when running git commands.""" 

244 config: dict[str, str] = {} 

245 

246 # Submodules can be private. The url in .gitmodules 

247 # will not include the credentials, we need to 

248 # propagate them down here if they exist. 

249 if self._include_submodules and self._credentials: 

250 # Get base URL (without path) with credentials 

251 base_url_parsed = urlparse(self._url)._replace(path="") 

252 base_url_without_auth = urlunparse(base_url_parsed) 

253 

254 # Create a temporary URL with just the base to get credentials formatting 

255 if isinstance(self._credentials, _GitCredentialsFormatter): 

256 base_url_with_auth = self._credentials.format_git_credentials( 

257 base_url_without_auth 

258 ) 

259 else: 

260 # Use legacy credential insertion 

261 credentials_dict = ( 

262 self._credentials.model_dump() 

263 if isinstance(self._credentials, Block) 

264 else deepcopy(self._credentials) 

265 ) 

266 for k, v in credentials_dict.items(): 

267 if isinstance(v, Secret): 

268 credentials_dict[k] = v.get() 

269 elif isinstance(v, SecretStr): 

270 credentials_dict[k] = v.get_secret_value() 

271 

272 block = ( 

273 self._credentials if isinstance(self._credentials, Block) else None 

274 ) 

275 credential_string = _format_token_from_credentials( 

276 base_url_parsed.netloc, credentials_dict, block 

277 ) 

278 base_url_with_auth = urlunparse( 

279 base_url_parsed._replace( 

280 netloc=f"{credential_string}@{base_url_parsed.netloc}" 

281 ) 

282 ) 

283 

284 config[f"url.{base_url_with_auth}.insteadOf"] = base_url_without_auth 

285 

286 return ["-c", " ".join(f"{k}={v}" for k, v in config.items())] if config else [] 

287 

288 async def is_sparsely_checked_out(self) -> bool: 1a

289 """ 

290 Check if existing repo is sparsely checked out 

291 """ 

292 try: 

293 result = await run_process( 

294 ["git", "config", "--get", "core.sparseCheckout"], cwd=self.destination 

295 ) 

296 return result.stdout.decode().strip().lower() == "true" 

297 except Exception: 

298 return False 

299 

300 async def is_shallow_clone(self) -> bool: 1a

301 """ 

302 Check if the repository is a shallow clone 

303 """ 

304 try: 

305 result = await run_process( 

306 ["git", "rev-parse", "--is-shallow-repository"], 

307 cwd=self.destination, 

308 ) 

309 return result.stdout.decode().strip().lower() == "true" 

310 except Exception: 

311 return False 

312 

313 async def is_current_commit(self) -> bool: 1a

314 """ 

315 Check if the current commit is the same as the commit SHA 

316 """ 

317 if not self._commit_sha: 

318 raise ValueError("No commit SHA provided") 

319 try: 

320 result = await run_process( 

321 ["git", "rev-parse", self._commit_sha], 

322 cwd=self.destination, 

323 ) 

324 return result.stdout.decode().strip() == self._commit_sha 

325 except Exception: 

326 return False 

327 

328 async def pull_code(self) -> None: 1a

329 """ 

330 Pulls the contents of the configured repository to the local filesystem. 

331 """ 

332 self._logger.debug( 

333 "Pulling contents from repository '%s' to '%s'...", 

334 self._name, 

335 self.destination, 

336 ) 

337 

338 git_dir = self.destination / ".git" 

339 

340 if git_dir.exists(): 

341 # Check if the existing repository matches the configured repository 

342 result = await run_process( 

343 ["git", "config", "--get", "remote.origin.url"], 

344 cwd=str(self.destination), 

345 ) 

346 existing_repo_url = None 

347 existing_repo_url = _strip_auth_from_url(result.stdout.decode().strip()) 

348 

349 if existing_repo_url != self._url: 

350 raise ValueError( 

351 f"The existing repository at {str(self.destination)} " 

352 f"does not match the configured repository {self._url}" 

353 ) 

354 

355 # Sparsely checkout the repository if directories are specified and the repo is not in sparse-checkout mode already 

356 if self._directories and not await self.is_sparsely_checked_out(): 

357 await run_process( 

358 ["git", "sparse-checkout", "set", *self._directories], 

359 cwd=self.destination, 

360 ) 

361 

362 self._logger.debug("Pulling latest changes from origin/%s", self._branch) 

363 # Update the existing repository 

364 cmd = ["git"] 

365 # Add the git configuration, must be given after `git` and before the command 

366 cmd += self._git_config 

367 

368 # If the commit is already checked out, skip the pull 

369 if self._commit_sha and await self.is_current_commit(): 

370 return 

371 

372 # If checking out a specific commit, fetch the latest changes and unshallow the repository if necessary 

373 elif self._commit_sha: 

374 if await self.is_shallow_clone(): 

375 cmd += ["fetch", "origin", "--unshallow"] 

376 else: 

377 cmd += ["fetch", "origin", self._commit_sha] 

378 try: 

379 await run_process(cmd, cwd=self.destination) 

380 self._logger.debug("Successfully fetched latest changes") 

381 except subprocess.CalledProcessError as exc: 

382 self._logger.error( 

383 f"Failed to fetch latest changes with exit code {exc}" 

384 ) 

385 shutil.rmtree(self.destination) 

386 await self._clone_repo() 

387 

388 await run_process( 

389 ["git", "checkout", self._commit_sha], 

390 cwd=self.destination, 

391 ) 

392 self._logger.debug( 

393 f"Successfully checked out commit {self._commit_sha}" 

394 ) 

395 

396 # Otherwise, pull the latest changes from the branch 

397 else: 

398 cmd += ["pull", "origin"] 

399 if self._branch: 

400 cmd += [self._branch] 

401 if self._include_submodules: 

402 cmd += ["--recurse-submodules"] 

403 cmd += ["--depth", "1"] 

404 try: 

405 await run_process(cmd, cwd=self.destination) 

406 self._logger.debug("Successfully pulled latest changes") 

407 except subprocess.CalledProcessError as exc: 

408 self._logger.error( 

409 f"Failed to pull latest changes with exit code {exc}" 

410 ) 

411 shutil.rmtree(self.destination) 

412 await self._clone_repo() 

413 

414 else: 

415 await self._clone_repo() 

416 

417 async def _clone_repo(self): 1a

418 """ 

419 Clones the repository into the local destination. 

420 """ 

421 self._logger.debug("Cloning repository %s", self._url) 

422 

423 repository_url = self._repository_url_with_credentials 

424 cmd = ["git"] 

425 # Add the git configuration, must be given after `git` and before the command 

426 cmd += self._git_config 

427 # Add the clone command and its parameters 

428 cmd += ["clone", repository_url] 

429 

430 if self._include_submodules: 

431 cmd += ["--recurse-submodules"] 

432 

433 # This will only checkout the top-level directory 

434 if self._directories: 

435 cmd += ["--sparse"] 

436 

437 if self._commit_sha: 

438 cmd += ["--filter=blob:none", "--no-checkout"] 

439 

440 else: 

441 if self._branch: 

442 cmd += ["--branch", self._branch] 

443 

444 # Limit git history 

445 cmd += ["--depth", "1"] 

446 

447 # Set path to clone to 

448 cmd += [str(self.destination)] 

449 

450 try: 

451 await run_process(cmd) 

452 except subprocess.CalledProcessError as exc: 

453 # Hide the command used to avoid leaking the access token 

454 parsed_url = urlparse(self._url) 

455 exc_chain = ( 

456 None 

457 if self._credentials or parsed_url.password or parsed_url.username 

458 else exc 

459 ) 

460 raise RuntimeError( 

461 f"Failed to clone repository {_strip_auth_from_url(self._url)!r} with exit code" 

462 f" {exc.returncode}." 

463 ) from exc_chain 

464 

465 if self._commit_sha: 

466 # Fetch the commit 

467 await run_process( 

468 ["git", "fetch", "origin", self._commit_sha], 

469 cwd=self.destination, 

470 ) 

471 # Checkout the specific commit 

472 await run_process( 

473 ["git", "checkout", self._commit_sha], 

474 cwd=self.destination, 

475 ) 

476 self._logger.debug(f"Successfully checked out commit {self._commit_sha}") 

477 

478 # Once repository is cloned and the repo is in sparse-checkout mode then grow the working directory 

479 if self._directories: 

480 self._logger.debug("Will add %s", self._directories) 

481 await run_process( 

482 ["git", "sparse-checkout", "set", *self._directories], 

483 cwd=self.destination, 

484 ) 

485 

486 def __eq__(self, __value: Any) -> bool: 1a

487 if isinstance(__value, GitRepository): 

488 return ( 

489 self._url == __value._url 

490 and self._branch == __value._branch 

491 and self._name == __value._name 

492 ) 

493 return False 

494 

495 def __repr__(self) -> str: 1a

496 return ( 

497 f"GitRepository(name={self._name!r} repository={self._url!r}," 

498 f" branch={self._branch!r})" 

499 ) 

500 

501 def to_pull_step(self) -> dict[str, Any]: 1a

502 pull_step: dict[str, Any] = { 

503 "prefect.deployments.steps.git_clone": { 

504 "repository": self._url, 

505 "branch": self._branch, 

506 } 

507 } 

508 if self._include_submodules: 

509 pull_step["prefect.deployments.steps.git_clone"]["include_submodules"] = ( 

510 self._include_submodules 

511 ) 

512 if self._commit_sha: 

513 pull_step["prefect.deployments.steps.git_clone"]["commit_sha"] = ( 

514 self._commit_sha 

515 ) 

516 if isinstance(self._credentials, Block): 

517 pull_step["prefect.deployments.steps.git_clone"]["credentials"] = ( 

518 f"{{{{ {self._credentials.get_block_placeholder()} }}}}" 

519 ) 

520 elif isinstance(self._credentials, dict): # pyright: ignore[reportUnnecessaryIsInstance] 

521 if isinstance( 

522 access_token := self._credentials.get("access_token"), Secret 

523 ): 

524 pull_step["prefect.deployments.steps.git_clone"]["credentials"] = { 

525 **self._credentials, 

526 "access_token": ( 

527 f"{{{{ {access_token.get_block_placeholder()} }}}}" 

528 ), 

529 } 

530 elif self._credentials.get("access_token") is not None: 

531 raise ValueError( 

532 "Please save your access token as a Secret block before converting" 

533 " this storage object to a pull step." 

534 ) 

535 

536 return pull_step 

537 

538 

539class RemoteStorage: 1a

540 """ 

541 Pulls the contents of a remote storage location to the local filesystem. 

542 

543 Parameters: 

544 url: The URL of the remote storage location to pull from. Supports 

545 `fsspec` URLs. Some protocols may require an additional `fsspec` 

546 dependency to be installed. Refer to the 

547 [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations) 

548 for more details. 

549 pull_interval: The interval in seconds at which to pull contents from 

550 remote storage to local storage. If None, remote storage will perform 

551 a one-time sync. 

552 **settings: Any additional settings to pass the `fsspec` filesystem class. 

553 

554 Examples: 

555 Pull the contents of a remote storage location to the local filesystem: 

556 

557 ```python 

558 from prefect.runner.storage import RemoteStorage 

559 

560 storage = RemoteStorage(url="s3://my-bucket/my-folder") 

561 

562 await storage.pull_code() 

563 ``` 

564 

565 Pull the contents of a remote storage location to the local filesystem 

566 with additional settings: 

567 

568 ```python 

569 from prefect.runner.storage import RemoteStorage 

570 from prefect.blocks.system import Secret 

571 

572 storage = RemoteStorage( 

573 url="s3://my-bucket/my-folder", 

574 # Use Secret blocks to keep credentials out of your code 

575 key=Secret.load("my-aws-access-key"), 

576 secret=Secret.load("my-aws-secret-key"), 

577 ) 

578 

579 await storage.pull_code() 

580 ``` 

581 """ 

582 

583 def __init__( 1a

584 self, 

585 url: str, 

586 pull_interval: Optional[int] = 60, 

587 **settings: Any, 

588 ): 

589 self._url = url 

590 self._settings = settings 

591 self._logger = get_logger("runner.storage.remote-storage") 

592 self._storage_base_path = Path.cwd() 

593 self._pull_interval = pull_interval 

594 

595 @staticmethod 1a

596 def _get_required_package_for_scheme(scheme: str) -> Optional[str]: 1a

597 # attempt to discover the package name for the given scheme 

598 # from fsspec's registry 

599 known_implementation = fsspec.registry.get(scheme) 

600 if known_implementation: 

601 return known_implementation.__module__.split(".")[0] 

602 # if we don't know the implementation, try to guess it for some 

603 # common schemes 

604 elif scheme == "s3": 

605 return "s3fs" 

606 elif scheme == "gs" or scheme == "gcs": 

607 return "gcsfs" 

608 elif scheme == "abfs" or scheme == "az": 

609 return "adlfs" 

610 else: 

611 return None 

612 

613 @property 1a

614 def _filesystem(self) -> fsspec.AbstractFileSystem: 1a

615 scheme, _, _, _, _ = urlsplit(self._url) 

616 

617 def replace_blocks_with_values(obj: Any) -> Any: 

618 if isinstance(obj, Block): 

619 if get := getattr(obj, "get", None): 

620 return get() 

621 if hasattr(obj, "value"): 

622 return getattr(obj, "value") 

623 else: 

624 return obj.model_dump() 

625 return obj 

626 

627 settings_with_block_values = visit_collection( 

628 self._settings, replace_blocks_with_values, return_data=True 

629 ) 

630 

631 return fsspec.filesystem(scheme, **settings_with_block_values) # pyright: ignore[reportUnknownMemberType] missing type stubs 

632 

633 def set_base_path(self, path: Path) -> None: 1a

634 self._storage_base_path = path 

635 

636 @property 1a

637 def pull_interval(self) -> Optional[int]: 1a

638 """ 

639 The interval at which contents from remote storage should be pulled to 

640 local storage. If None, remote storage will perform a one-time sync. 

641 """ 

642 return self._pull_interval 

643 

644 @property 1a

645 def destination(self) -> Path: 1a

646 """ 

647 The local file path to pull contents from remote storage to. 

648 """ 

649 return self._storage_base_path / self._remote_path 

650 

651 @property 1a

652 def _remote_path(self) -> Path: 1a

653 """ 

654 The remote file path to pull contents from remote storage to. 

655 """ 

656 _, netloc, urlpath, _, _ = urlsplit(self._url) 

657 return Path(netloc) / Path(urlpath.lstrip("/")) 

658 

659 async def pull_code(self) -> None: 1a

660 """ 

661 Pulls contents from remote storage to the local filesystem. 

662 """ 

663 self._logger.debug( 

664 "Pulling contents from remote storage '%s' to '%s'...", 

665 self._url, 

666 self.destination, 

667 ) 

668 

669 if not self.destination.exists(): 

670 self.destination.mkdir(parents=True, exist_ok=True) 

671 

672 remote_path = str(self._remote_path) + "/" 

673 

674 try: 

675 await from_async.wait_for_call_in_new_thread( 

676 create_call( 

677 self._filesystem.get, # pyright: ignore[reportUnknownArgumentType, reportUnknownMemberType] missing type stubs 

678 remote_path, 

679 str(self.destination), 

680 recursive=True, 

681 ) 

682 ) 

683 except Exception as exc: 

684 raise RuntimeError( 

685 f"Failed to pull contents from remote storage {self._url!r} to" 

686 f" {self.destination!r}" 

687 ) from exc 

688 

689 def to_pull_step(self) -> dict[str, Any]: 1a

690 """ 

691 Returns a dictionary representation of the storage object that can be 

692 used as a deployment pull step. 

693 """ 

694 

695 def replace_block_with_placeholder(obj: Any) -> Any: 

696 if isinstance(obj, Block): 

697 return f"{{{{ {obj.get_block_placeholder()} }}}}" 

698 return obj 

699 

700 settings_with_placeholders = visit_collection( 

701 self._settings, replace_block_with_placeholder, return_data=True 

702 ) 

703 required_package = self._get_required_package_for_scheme( 

704 urlparse(self._url).scheme 

705 ) 

706 step = { 

707 "prefect.deployments.steps.pull_from_remote_storage": { 

708 "url": self._url, 

709 **settings_with_placeholders, 

710 } 

711 } 

712 if required_package: 

713 step["prefect.deployments.steps.pull_from_remote_storage"]["requires"] = ( 

714 required_package 

715 ) 

716 return step 

717 

718 def __eq__(self, __value: Any) -> bool: 1a

719 """ 

720 Equality check for runner storage objects. 

721 """ 

722 if isinstance(__value, RemoteStorage): 

723 return self._url == __value._url and self._settings == __value._settings 

724 return False 

725 

726 def __repr__(self) -> str: 1a

727 return f"RemoteStorage(url={self._url!r})" 

728 

729 

730class BlockStorageAdapter: 1a

731 """ 

732 A storage adapter for a storage block object to allow it to be used as a 

733 runner storage object. 

734 """ 

735 

736 def __init__( 1a

737 self, 

738 block: Union[ReadableDeploymentStorage, WritableDeploymentStorage], 

739 pull_interval: Optional[int] = 60, 

740 ): 

741 self._block = block 

742 self._pull_interval = pull_interval 

743 self._storage_base_path = Path.cwd() 

744 if not isinstance(block, Block): # pyright: ignore[reportUnnecessaryIsInstance] 

745 raise TypeError( 

746 f"Expected a block object. Received a {type(block).__name__!r} object." 

747 ) 

748 if not hasattr(block, "get_directory"): 

749 raise ValueError("Provided block must have a `get_directory` method.") 

750 

751 self._name = f"{block.get_block_type_slug()}-{getattr(block, '_block_document_name', uuid4())}" 

752 

753 def set_base_path(self, path: Path) -> None: 1a

754 self._storage_base_path = path 

755 

756 @property 1a

757 def pull_interval(self) -> Optional[int]: 1a

758 return self._pull_interval 

759 

760 @property 1a

761 def destination(self) -> Path: 1a

762 return self._storage_base_path / self._name 

763 

764 async def pull_code(self) -> None: 1a

765 if not self.destination.exists(): 

766 self.destination.mkdir(parents=True, exist_ok=True) 

767 await self._block.get_directory(local_path=str(self.destination)) 

768 

769 def to_pull_step(self) -> dict[str, Any]: 1a

770 # Give blocks the chance to implement their own pull step 

771 if hasattr(self._block, "get_pull_step"): 

772 return getattr(self._block, "get_pull_step")() 

773 else: 

774 if getattr(self._block, "_block_document_name", None) is None: 

775 raise BlockNotSavedError( 

776 "Block must be saved with `.save()` before it can be converted to a" 

777 " pull step." 

778 ) 

779 return { 

780 "prefect.deployments.steps.pull_with_block": { 

781 "block_type_slug": self._block.get_block_type_slug(), 

782 "block_document_name": getattr(self._block, "_block_document_name"), 

783 } 

784 } 

785 

786 def __eq__(self, __value: Any) -> bool: 1a

787 if isinstance(__value, BlockStorageAdapter): 

788 return self._block == __value._block 

789 return False 

790 

791 

792class LocalStorage: 1a

793 """ 

794 Sets the working directory in the local filesystem. 

795 Parameters: 

796 Path: Local file path to set the working directory for the flow 

797 Examples: 

798 Sets the working directory for the local path to the flow: 

799 ```python 

800 from prefect.runner.storage import Localstorage 

801 storage = LocalStorage( 

802 path="/path/to/local/flow_directory", 

803 ) 

804 ``` 

805 """ 

806 

807 def __init__( 1a

808 self, 

809 path: str, 

810 pull_interval: Optional[int] = None, 

811 ): 

812 self._path = Path(path).resolve() 

813 self._logger = get_logger("runner.storage.local-storage") 

814 self._storage_base_path = Path.cwd() 

815 self._pull_interval = pull_interval 

816 

817 @property 1a

818 def destination(self) -> Path: 1a

819 return self._path 

820 

821 def set_base_path(self, path: Path) -> None: 1a

822 self._storage_base_path = path 

823 

824 @property 1a

825 def pull_interval(self) -> Optional[int]: 1a

826 return self._pull_interval 

827 

828 async def pull_code(self) -> None: 1a

829 # Local storage assumes the code already exists on the local filesystem 

830 # and does not need to be pulled from a remote location 

831 pass 

832 

833 def to_pull_step(self) -> dict[str, Any]: 1a

834 """ 

835 Returns a dictionary representation of the storage object that can be 

836 used as a deployment pull step. 

837 """ 

838 step = { 

839 "prefect.deployments.steps.set_working_directory": { 

840 "directory": str(self.destination) 

841 } 

842 } 

843 return step 

844 

845 def __eq__(self, __value: Any) -> bool: 1a

846 if isinstance(__value, LocalStorage): 

847 return self._path == __value._path 

848 return False 

849 

850 def __repr__(self) -> str: 1a

851 return f"LocalStorage(path={self._path!r})" 

852 

853 

854def create_storage_from_source( 1a

855 source: str, pull_interval: Optional[int] = 60 

856) -> RunnerStorage: 

857 """ 

858 Creates a storage object from a URL. 

859 

860 Args: 

861 url: The URL to create a storage object from. Supports git and `fsspec` 

862 URLs. 

863 pull_interval: The interval at which to pull contents from remote storage to 

864 local storage 

865 

866 Returns: 

867 RunnerStorage: A runner storage compatible object 

868 """ 

869 logger = get_logger("runner.storage") 

870 parsed_source = urlparse(source) 

871 if parsed_source.scheme == "git" or parsed_source.path.endswith(".git"): 

872 return GitRepository(url=source, pull_interval=pull_interval) 

873 elif parsed_source.scheme in ("file", "local"): 

874 source_path = source.split("://", 1)[-1] 

875 return LocalStorage(path=source_path, pull_interval=pull_interval) 

876 elif parsed_source.scheme in fsspec.available_protocols(): 

877 return RemoteStorage(url=source, pull_interval=pull_interval) 

878 else: 

879 logger.debug("No valid fsspec protocol found for URL, assuming local storage.") 

880 return LocalStorage(path=source, pull_interval=pull_interval) 

881 

882 

883def _format_token_from_credentials( 1a

884 netloc: str, 

885 credentials: dict[str, Any] | GitCredentials, 

886 block: Block | None = None, 

887) -> str: 

888 """ 

889 Formats the credentials block for the git provider. 

890 

891 If the block implements _GitCredentialsFormatter protocol, delegates to it. 

892 Otherwise, uses generic formatting for plain dict credentials. 

893 

894 Args: 

895 netloc: The network location (hostname) of the git repository 

896 credentials: Dictionary containing credential information 

897 block: Optional Block object that may implement the formatter protocol 

898 """ 

899 if block is not None and isinstance(block, _GitCredentialsFormatter): 

900 # Reconstruct full URL for context (scheme doesn't matter for formatting) 

901 url = f"https://{netloc}" 

902 return block.format_git_credentials(url) 

903 username = credentials.get("username") if credentials else None 

904 password = credentials.get("password") if credentials else None 

905 token = credentials.get("token") if credentials else None 

906 access_token = credentials.get("access_token") if credentials else None 

907 

908 user_provided_token: str | Secret[str] | None = access_token or token or password 

909 

910 if isinstance(user_provided_token, Secret): 

911 user_provided_token = user_provided_token.get() 

912 

913 if not user_provided_token: 

914 raise ValueError( 

915 "Please provide a `token` or `password` in your Credentials block to clone" 

916 " a repo." 

917 ) 

918 

919 if username: 

920 return f"{username}:{user_provided_token}" 

921 

922 # Fallback for plain dict credentials without a block 

923 return user_provided_token 

924 

925 

926def _strip_auth_from_url(url: str) -> str: 1a

927 parsed = urlparse(url) 

928 

929 # Construct a new netloc without the auth info 

930 netloc = parsed.hostname 

931 if parsed.port and netloc: 

932 netloc += f":{parsed.port}" 

933 

934 # Build the sanitized URL 

935 return urlunparse( 

936 ( 

937 parsed.scheme, 

938 netloc, 

939 parsed.path, 

940 parsed.params, 

941 parsed.query, 

942 parsed.fragment, 

943 ) 

944 )