Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/storage.py: 18%
379 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import shutil 1a
4import subprocess 1a
5from copy import deepcopy 1a
6from pathlib import Path 1a
7from typing import ( 1a
8 Any,
9 Optional,
10 Protocol,
11 TypedDict,
12 Union,
13 runtime_checkable,
14)
15from urllib.parse import urlparse, urlsplit, urlunparse 1a
16from uuid import uuid4 1a
18import fsspec # pyright: ignore[reportMissingTypeStubs] 1a
19from anyio import run_process 1a
20from pydantic import SecretStr 1a
22from prefect._internal.concurrency.api import create_call, from_async 1a
23from prefect.blocks.core import Block, BlockNotSavedError 1a
24from prefect.blocks.system import Secret 1a
25from prefect.filesystems import ReadableDeploymentStorage, WritableDeploymentStorage 1a
26from prefect.logging.loggers import get_logger 1a
27from prefect.utilities.collections import visit_collection 1a
30@runtime_checkable 1a
31class RunnerStorage(Protocol): 1a
32 """
33 A storage interface for a runner to use to retrieve
34 remotely stored flow code.
35 """
37 def set_base_path(self, path: Path) -> None: 1a
38 """
39 Sets the base path to use when pulling contents from remote storage to
40 local storage.
41 """
42 ...
44 @property 1a
45 def pull_interval(self) -> Optional[int]: 1a
46 """
47 The interval at which contents from remote storage should be pulled to
48 local storage. If None, remote storage will perform a one-time sync.
49 """
50 ...
52 @property 1a
53 def destination(self) -> Path: 1a
54 """
55 The local file path to pull contents from remote storage to.
56 """
57 ...
59 async def pull_code(self) -> None: 1a
60 """
61 Pulls contents from remote storage to the local filesystem.
62 """
63 ...
65 def to_pull_step(self) -> dict[str, Any] | list[dict[str, Any]]: 1a
66 """
67 Returns a dictionary representation of the storage object that can be
68 used as a deployment pull step.
69 """
70 ...
72 def __eq__(self, __value: Any) -> bool: 1a
73 """
74 Equality check for runner storage objects.
75 """
76 ...
79class GitCredentials(TypedDict, total=False): 1a
80 username: str 1a
81 access_token: str | Secret[str] 1a
84@runtime_checkable 1a
85class _GitCredentialsFormatter(Protocol): 1a
86 """
87 Protocol for credential blocks that can format themselves for git URLs.
89 Implementing this protocol allows credential blocks to own their auth
90 formatting logic instead of having it centralized in core Prefect.
92 This enables proper separation of concerns where each git provider
93 (GitLab, GitHub, BitBucket) can handle their own authentication format
94 requirements without core needing provider-specific knowledge.
95 """
97 def format_git_credentials(self, url: str) -> str: 1a
98 """
99 Format and return the full git URL with credentials embedded.
101 Args:
102 url: The repository URL (e.g., "https://gitlab.com/org/repo.git").
104 Returns:
105 Complete URL with credentials embedded in the appropriate format for the provider.
107 Examples:
108 - GitLab PAT: "https://oauth2:my-token@gitlab.com/org/repo.git"
109 - GitLab Deploy Token: "https://username:token@gitlab.com/org/repo.git"
110 - GitHub: "https://my-token@github.com/org/repo.git"
111 - BitBucket Cloud: "https://x-token-auth:my-token@bitbucket.org/org/repo.git"
112 - BitBucket Server: "https://username:token@bitbucketserver.com/scm/project/repo.git"
113 """
114 ...
117class GitRepository: 1a
118 """
119 Pulls the contents of a git repository to the local filesystem.
121 Parameters:
122 url: The URL of the git repository to pull from
123 credentials: A dictionary of credentials to use when pulling from the
124 repository. If a username is provided, an access token must also be
125 provided.
126 name: The name of the repository. If not provided, the name will be
127 inferred from the repository URL.
128 branch: The branch to pull from. Defaults to "main".
129 pull_interval: The interval in seconds at which to pull contents from
130 remote storage to local storage. If None, remote storage will perform
131 a one-time sync.
132 directories: The directories to pull from the Git repository (uses git sparse-checkout)
134 Examples:
135 Pull the contents of a private git repository to the local filesystem:
137 ```python
138 from prefect.runner.storage import GitRepository
140 storage = GitRepository(
141 url="https://github.com/org/repo.git",
142 credentials={"username": "oauth2", "access_token": "my-access-token"},
143 )
145 await storage.pull_code()
146 ```
147 """
149 def __init__( 1a
150 self,
151 url: str,
152 credentials: Union[GitCredentials, Block, dict[str, Any], None] = None,
153 name: str | None = None,
154 branch: str | None = None,
155 commit_sha: str | None = None,
156 include_submodules: bool = False,
157 pull_interval: int | None = 60,
158 directories: list[str] | None = None,
159 ):
160 if credentials is None:
161 credentials = {}
163 if (
164 isinstance(credentials, dict)
165 and credentials.get("username")
166 and not (credentials.get("access_token") or credentials.get("password"))
167 ):
168 raise ValueError(
169 "If a username is provided, an access token or password must also be"
170 " provided."
171 )
173 if branch and commit_sha:
174 raise ValueError(
175 "Cannot provide both a branch and a commit SHA. Please provide only one."
176 )
178 self._url = url
179 self._branch = branch
180 self._commit_sha = commit_sha
181 self._credentials = credentials
182 self._include_submodules = include_submodules
183 repo_name = urlparse(url).path.split("/")[-1].replace(".git", "")
184 safe_branch = branch.replace("/", "-") if branch else None
185 default_name = f"{repo_name}-{safe_branch}" if safe_branch else repo_name
186 self._name = name or default_name
187 self._logger = get_logger(f"runner.storage.git-repository.{self._name}")
188 self._storage_base_path = Path.cwd()
189 self._pull_interval = pull_interval
190 self._directories = directories
192 @property 1a
193 def destination(self) -> Path: 1a
194 return self._storage_base_path / self._name
196 def set_base_path(self, path: Path) -> None: 1a
197 self._storage_base_path = path
199 @property 1a
200 def pull_interval(self) -> Optional[int]: 1a
201 return self._pull_interval
203 @property 1a
204 def _repository_url_with_credentials(self) -> str: 1a
205 """Get the repository URL with credentials embedded."""
206 if not self._credentials:
207 return self._url
209 # If block implements the protocol, let it format the complete URL
210 if isinstance(self._credentials, _GitCredentialsFormatter):
211 return self._credentials.format_git_credentials(self._url)
213 # Otherwise, use legacy formatting for plain dict credentials
214 credentials = (
215 self._credentials.model_dump()
216 if isinstance(self._credentials, Block)
217 else deepcopy(self._credentials)
218 )
220 for k, v in credentials.items():
221 if isinstance(v, Secret):
222 credentials[k] = v.get()
223 elif isinstance(v, SecretStr):
224 credentials[k] = v.get_secret_value()
226 # Get credential string for plain dict credentials
227 block = self._credentials if isinstance(self._credentials, Block) else None
228 credential_string = _format_token_from_credentials(
229 urlparse(self._url).netloc, credentials, block
230 )
232 # Insert credentials into URL
233 components = urlparse(self._url)
234 if components.scheme != "https":
235 return self._url
237 return urlunparse(
238 components._replace(netloc=f"{credential_string}@{components.netloc}")
239 )
241 @property 1a
242 def _git_config(self) -> list[str]: 1a
243 """Build a git configuration to use when running git commands."""
244 config: dict[str, str] = {}
246 # Submodules can be private. The url in .gitmodules
247 # will not include the credentials, we need to
248 # propagate them down here if they exist.
249 if self._include_submodules and self._credentials:
250 # Get base URL (without path) with credentials
251 base_url_parsed = urlparse(self._url)._replace(path="")
252 base_url_without_auth = urlunparse(base_url_parsed)
254 # Create a temporary URL with just the base to get credentials formatting
255 if isinstance(self._credentials, _GitCredentialsFormatter):
256 base_url_with_auth = self._credentials.format_git_credentials(
257 base_url_without_auth
258 )
259 else:
260 # Use legacy credential insertion
261 credentials_dict = (
262 self._credentials.model_dump()
263 if isinstance(self._credentials, Block)
264 else deepcopy(self._credentials)
265 )
266 for k, v in credentials_dict.items():
267 if isinstance(v, Secret):
268 credentials_dict[k] = v.get()
269 elif isinstance(v, SecretStr):
270 credentials_dict[k] = v.get_secret_value()
272 block = (
273 self._credentials if isinstance(self._credentials, Block) else None
274 )
275 credential_string = _format_token_from_credentials(
276 base_url_parsed.netloc, credentials_dict, block
277 )
278 base_url_with_auth = urlunparse(
279 base_url_parsed._replace(
280 netloc=f"{credential_string}@{base_url_parsed.netloc}"
281 )
282 )
284 config[f"url.{base_url_with_auth}.insteadOf"] = base_url_without_auth
286 return ["-c", " ".join(f"{k}={v}" for k, v in config.items())] if config else []
288 async def is_sparsely_checked_out(self) -> bool: 1a
289 """
290 Check if existing repo is sparsely checked out
291 """
292 try:
293 result = await run_process(
294 ["git", "config", "--get", "core.sparseCheckout"], cwd=self.destination
295 )
296 return result.stdout.decode().strip().lower() == "true"
297 except Exception:
298 return False
300 async def is_shallow_clone(self) -> bool: 1a
301 """
302 Check if the repository is a shallow clone
303 """
304 try:
305 result = await run_process(
306 ["git", "rev-parse", "--is-shallow-repository"],
307 cwd=self.destination,
308 )
309 return result.stdout.decode().strip().lower() == "true"
310 except Exception:
311 return False
313 async def is_current_commit(self) -> bool: 1a
314 """
315 Check if the current commit is the same as the commit SHA
316 """
317 if not self._commit_sha:
318 raise ValueError("No commit SHA provided")
319 try:
320 result = await run_process(
321 ["git", "rev-parse", self._commit_sha],
322 cwd=self.destination,
323 )
324 return result.stdout.decode().strip() == self._commit_sha
325 except Exception:
326 return False
328 async def pull_code(self) -> None: 1a
329 """
330 Pulls the contents of the configured repository to the local filesystem.
331 """
332 self._logger.debug(
333 "Pulling contents from repository '%s' to '%s'...",
334 self._name,
335 self.destination,
336 )
338 git_dir = self.destination / ".git"
340 if git_dir.exists():
341 # Check if the existing repository matches the configured repository
342 result = await run_process(
343 ["git", "config", "--get", "remote.origin.url"],
344 cwd=str(self.destination),
345 )
346 existing_repo_url = None
347 existing_repo_url = _strip_auth_from_url(result.stdout.decode().strip())
349 if existing_repo_url != self._url:
350 raise ValueError(
351 f"The existing repository at {str(self.destination)} "
352 f"does not match the configured repository {self._url}"
353 )
355 # Sparsely checkout the repository if directories are specified and the repo is not in sparse-checkout mode already
356 if self._directories and not await self.is_sparsely_checked_out():
357 await run_process(
358 ["git", "sparse-checkout", "set", *self._directories],
359 cwd=self.destination,
360 )
362 self._logger.debug("Pulling latest changes from origin/%s", self._branch)
363 # Update the existing repository
364 cmd = ["git"]
365 # Add the git configuration, must be given after `git` and before the command
366 cmd += self._git_config
368 # If the commit is already checked out, skip the pull
369 if self._commit_sha and await self.is_current_commit():
370 return
372 # If checking out a specific commit, fetch the latest changes and unshallow the repository if necessary
373 elif self._commit_sha:
374 if await self.is_shallow_clone():
375 cmd += ["fetch", "origin", "--unshallow"]
376 else:
377 cmd += ["fetch", "origin", self._commit_sha]
378 try:
379 await run_process(cmd, cwd=self.destination)
380 self._logger.debug("Successfully fetched latest changes")
381 except subprocess.CalledProcessError as exc:
382 self._logger.error(
383 f"Failed to fetch latest changes with exit code {exc}"
384 )
385 shutil.rmtree(self.destination)
386 await self._clone_repo()
388 await run_process(
389 ["git", "checkout", self._commit_sha],
390 cwd=self.destination,
391 )
392 self._logger.debug(
393 f"Successfully checked out commit {self._commit_sha}"
394 )
396 # Otherwise, pull the latest changes from the branch
397 else:
398 cmd += ["pull", "origin"]
399 if self._branch:
400 cmd += [self._branch]
401 if self._include_submodules:
402 cmd += ["--recurse-submodules"]
403 cmd += ["--depth", "1"]
404 try:
405 await run_process(cmd, cwd=self.destination)
406 self._logger.debug("Successfully pulled latest changes")
407 except subprocess.CalledProcessError as exc:
408 self._logger.error(
409 f"Failed to pull latest changes with exit code {exc}"
410 )
411 shutil.rmtree(self.destination)
412 await self._clone_repo()
414 else:
415 await self._clone_repo()
417 async def _clone_repo(self): 1a
418 """
419 Clones the repository into the local destination.
420 """
421 self._logger.debug("Cloning repository %s", self._url)
423 repository_url = self._repository_url_with_credentials
424 cmd = ["git"]
425 # Add the git configuration, must be given after `git` and before the command
426 cmd += self._git_config
427 # Add the clone command and its parameters
428 cmd += ["clone", repository_url]
430 if self._include_submodules:
431 cmd += ["--recurse-submodules"]
433 # This will only checkout the top-level directory
434 if self._directories:
435 cmd += ["--sparse"]
437 if self._commit_sha:
438 cmd += ["--filter=blob:none", "--no-checkout"]
440 else:
441 if self._branch:
442 cmd += ["--branch", self._branch]
444 # Limit git history
445 cmd += ["--depth", "1"]
447 # Set path to clone to
448 cmd += [str(self.destination)]
450 try:
451 await run_process(cmd)
452 except subprocess.CalledProcessError as exc:
453 # Hide the command used to avoid leaking the access token
454 parsed_url = urlparse(self._url)
455 exc_chain = (
456 None
457 if self._credentials or parsed_url.password or parsed_url.username
458 else exc
459 )
460 raise RuntimeError(
461 f"Failed to clone repository {_strip_auth_from_url(self._url)!r} with exit code"
462 f" {exc.returncode}."
463 ) from exc_chain
465 if self._commit_sha:
466 # Fetch the commit
467 await run_process(
468 ["git", "fetch", "origin", self._commit_sha],
469 cwd=self.destination,
470 )
471 # Checkout the specific commit
472 await run_process(
473 ["git", "checkout", self._commit_sha],
474 cwd=self.destination,
475 )
476 self._logger.debug(f"Successfully checked out commit {self._commit_sha}")
478 # Once repository is cloned and the repo is in sparse-checkout mode then grow the working directory
479 if self._directories:
480 self._logger.debug("Will add %s", self._directories)
481 await run_process(
482 ["git", "sparse-checkout", "set", *self._directories],
483 cwd=self.destination,
484 )
486 def __eq__(self, __value: Any) -> bool: 1a
487 if isinstance(__value, GitRepository):
488 return (
489 self._url == __value._url
490 and self._branch == __value._branch
491 and self._name == __value._name
492 )
493 return False
495 def __repr__(self) -> str: 1a
496 return (
497 f"GitRepository(name={self._name!r} repository={self._url!r},"
498 f" branch={self._branch!r})"
499 )
501 def to_pull_step(self) -> dict[str, Any]: 1a
502 pull_step: dict[str, Any] = {
503 "prefect.deployments.steps.git_clone": {
504 "repository": self._url,
505 "branch": self._branch,
506 }
507 }
508 if self._include_submodules:
509 pull_step["prefect.deployments.steps.git_clone"]["include_submodules"] = (
510 self._include_submodules
511 )
512 if self._commit_sha:
513 pull_step["prefect.deployments.steps.git_clone"]["commit_sha"] = (
514 self._commit_sha
515 )
516 if isinstance(self._credentials, Block):
517 pull_step["prefect.deployments.steps.git_clone"]["credentials"] = (
518 f"{{{{ {self._credentials.get_block_placeholder()} }}}}"
519 )
520 elif isinstance(self._credentials, dict): # pyright: ignore[reportUnnecessaryIsInstance]
521 if isinstance(
522 access_token := self._credentials.get("access_token"), Secret
523 ):
524 pull_step["prefect.deployments.steps.git_clone"]["credentials"] = {
525 **self._credentials,
526 "access_token": (
527 f"{{{{ {access_token.get_block_placeholder()} }}}}"
528 ),
529 }
530 elif self._credentials.get("access_token") is not None:
531 raise ValueError(
532 "Please save your access token as a Secret block before converting"
533 " this storage object to a pull step."
534 )
536 return pull_step
539class RemoteStorage: 1a
540 """
541 Pulls the contents of a remote storage location to the local filesystem.
543 Parameters:
544 url: The URL of the remote storage location to pull from. Supports
545 `fsspec` URLs. Some protocols may require an additional `fsspec`
546 dependency to be installed. Refer to the
547 [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations)
548 for more details.
549 pull_interval: The interval in seconds at which to pull contents from
550 remote storage to local storage. If None, remote storage will perform
551 a one-time sync.
552 **settings: Any additional settings to pass the `fsspec` filesystem class.
554 Examples:
555 Pull the contents of a remote storage location to the local filesystem:
557 ```python
558 from prefect.runner.storage import RemoteStorage
560 storage = RemoteStorage(url="s3://my-bucket/my-folder")
562 await storage.pull_code()
563 ```
565 Pull the contents of a remote storage location to the local filesystem
566 with additional settings:
568 ```python
569 from prefect.runner.storage import RemoteStorage
570 from prefect.blocks.system import Secret
572 storage = RemoteStorage(
573 url="s3://my-bucket/my-folder",
574 # Use Secret blocks to keep credentials out of your code
575 key=Secret.load("my-aws-access-key"),
576 secret=Secret.load("my-aws-secret-key"),
577 )
579 await storage.pull_code()
580 ```
581 """
583 def __init__( 1a
584 self,
585 url: str,
586 pull_interval: Optional[int] = 60,
587 **settings: Any,
588 ):
589 self._url = url
590 self._settings = settings
591 self._logger = get_logger("runner.storage.remote-storage")
592 self._storage_base_path = Path.cwd()
593 self._pull_interval = pull_interval
595 @staticmethod 1a
596 def _get_required_package_for_scheme(scheme: str) -> Optional[str]: 1a
597 # attempt to discover the package name for the given scheme
598 # from fsspec's registry
599 known_implementation = fsspec.registry.get(scheme)
600 if known_implementation:
601 return known_implementation.__module__.split(".")[0]
602 # if we don't know the implementation, try to guess it for some
603 # common schemes
604 elif scheme == "s3":
605 return "s3fs"
606 elif scheme == "gs" or scheme == "gcs":
607 return "gcsfs"
608 elif scheme == "abfs" or scheme == "az":
609 return "adlfs"
610 else:
611 return None
613 @property 1a
614 def _filesystem(self) -> fsspec.AbstractFileSystem: 1a
615 scheme, _, _, _, _ = urlsplit(self._url)
617 def replace_blocks_with_values(obj: Any) -> Any:
618 if isinstance(obj, Block):
619 if get := getattr(obj, "get", None):
620 return get()
621 if hasattr(obj, "value"):
622 return getattr(obj, "value")
623 else:
624 return obj.model_dump()
625 return obj
627 settings_with_block_values = visit_collection(
628 self._settings, replace_blocks_with_values, return_data=True
629 )
631 return fsspec.filesystem(scheme, **settings_with_block_values) # pyright: ignore[reportUnknownMemberType] missing type stubs
633 def set_base_path(self, path: Path) -> None: 1a
634 self._storage_base_path = path
636 @property 1a
637 def pull_interval(self) -> Optional[int]: 1a
638 """
639 The interval at which contents from remote storage should be pulled to
640 local storage. If None, remote storage will perform a one-time sync.
641 """
642 return self._pull_interval
644 @property 1a
645 def destination(self) -> Path: 1a
646 """
647 The local file path to pull contents from remote storage to.
648 """
649 return self._storage_base_path / self._remote_path
651 @property 1a
652 def _remote_path(self) -> Path: 1a
653 """
654 The remote file path to pull contents from remote storage to.
655 """
656 _, netloc, urlpath, _, _ = urlsplit(self._url)
657 return Path(netloc) / Path(urlpath.lstrip("/"))
659 async def pull_code(self) -> None: 1a
660 """
661 Pulls contents from remote storage to the local filesystem.
662 """
663 self._logger.debug(
664 "Pulling contents from remote storage '%s' to '%s'...",
665 self._url,
666 self.destination,
667 )
669 if not self.destination.exists():
670 self.destination.mkdir(parents=True, exist_ok=True)
672 remote_path = str(self._remote_path) + "/"
674 try:
675 await from_async.wait_for_call_in_new_thread(
676 create_call(
677 self._filesystem.get, # pyright: ignore[reportUnknownArgumentType, reportUnknownMemberType] missing type stubs
678 remote_path,
679 str(self.destination),
680 recursive=True,
681 )
682 )
683 except Exception as exc:
684 raise RuntimeError(
685 f"Failed to pull contents from remote storage {self._url!r} to"
686 f" {self.destination!r}"
687 ) from exc
689 def to_pull_step(self) -> dict[str, Any]: 1a
690 """
691 Returns a dictionary representation of the storage object that can be
692 used as a deployment pull step.
693 """
695 def replace_block_with_placeholder(obj: Any) -> Any:
696 if isinstance(obj, Block):
697 return f"{{{{ {obj.get_block_placeholder()} }}}}"
698 return obj
700 settings_with_placeholders = visit_collection(
701 self._settings, replace_block_with_placeholder, return_data=True
702 )
703 required_package = self._get_required_package_for_scheme(
704 urlparse(self._url).scheme
705 )
706 step = {
707 "prefect.deployments.steps.pull_from_remote_storage": {
708 "url": self._url,
709 **settings_with_placeholders,
710 }
711 }
712 if required_package:
713 step["prefect.deployments.steps.pull_from_remote_storage"]["requires"] = (
714 required_package
715 )
716 return step
718 def __eq__(self, __value: Any) -> bool: 1a
719 """
720 Equality check for runner storage objects.
721 """
722 if isinstance(__value, RemoteStorage):
723 return self._url == __value._url and self._settings == __value._settings
724 return False
726 def __repr__(self) -> str: 1a
727 return f"RemoteStorage(url={self._url!r})"
730class BlockStorageAdapter: 1a
731 """
732 A storage adapter for a storage block object to allow it to be used as a
733 runner storage object.
734 """
736 def __init__( 1a
737 self,
738 block: Union[ReadableDeploymentStorage, WritableDeploymentStorage],
739 pull_interval: Optional[int] = 60,
740 ):
741 self._block = block
742 self._pull_interval = pull_interval
743 self._storage_base_path = Path.cwd()
744 if not isinstance(block, Block): # pyright: ignore[reportUnnecessaryIsInstance]
745 raise TypeError(
746 f"Expected a block object. Received a {type(block).__name__!r} object."
747 )
748 if not hasattr(block, "get_directory"):
749 raise ValueError("Provided block must have a `get_directory` method.")
751 self._name = f"{block.get_block_type_slug()}-{getattr(block, '_block_document_name', uuid4())}"
753 def set_base_path(self, path: Path) -> None: 1a
754 self._storage_base_path = path
756 @property 1a
757 def pull_interval(self) -> Optional[int]: 1a
758 return self._pull_interval
760 @property 1a
761 def destination(self) -> Path: 1a
762 return self._storage_base_path / self._name
764 async def pull_code(self) -> None: 1a
765 if not self.destination.exists():
766 self.destination.mkdir(parents=True, exist_ok=True)
767 await self._block.get_directory(local_path=str(self.destination))
769 def to_pull_step(self) -> dict[str, Any]: 1a
770 # Give blocks the chance to implement their own pull step
771 if hasattr(self._block, "get_pull_step"):
772 return getattr(self._block, "get_pull_step")()
773 else:
774 if getattr(self._block, "_block_document_name", None) is None:
775 raise BlockNotSavedError(
776 "Block must be saved with `.save()` before it can be converted to a"
777 " pull step."
778 )
779 return {
780 "prefect.deployments.steps.pull_with_block": {
781 "block_type_slug": self._block.get_block_type_slug(),
782 "block_document_name": getattr(self._block, "_block_document_name"),
783 }
784 }
786 def __eq__(self, __value: Any) -> bool: 1a
787 if isinstance(__value, BlockStorageAdapter):
788 return self._block == __value._block
789 return False
792class LocalStorage: 1a
793 """
794 Sets the working directory in the local filesystem.
795 Parameters:
796 Path: Local file path to set the working directory for the flow
797 Examples:
798 Sets the working directory for the local path to the flow:
799 ```python
800 from prefect.runner.storage import Localstorage
801 storage = LocalStorage(
802 path="/path/to/local/flow_directory",
803 )
804 ```
805 """
807 def __init__( 1a
808 self,
809 path: str,
810 pull_interval: Optional[int] = None,
811 ):
812 self._path = Path(path).resolve()
813 self._logger = get_logger("runner.storage.local-storage")
814 self._storage_base_path = Path.cwd()
815 self._pull_interval = pull_interval
817 @property 1a
818 def destination(self) -> Path: 1a
819 return self._path
821 def set_base_path(self, path: Path) -> None: 1a
822 self._storage_base_path = path
824 @property 1a
825 def pull_interval(self) -> Optional[int]: 1a
826 return self._pull_interval
828 async def pull_code(self) -> None: 1a
829 # Local storage assumes the code already exists on the local filesystem
830 # and does not need to be pulled from a remote location
831 pass
833 def to_pull_step(self) -> dict[str, Any]: 1a
834 """
835 Returns a dictionary representation of the storage object that can be
836 used as a deployment pull step.
837 """
838 step = {
839 "prefect.deployments.steps.set_working_directory": {
840 "directory": str(self.destination)
841 }
842 }
843 return step
845 def __eq__(self, __value: Any) -> bool: 1a
846 if isinstance(__value, LocalStorage):
847 return self._path == __value._path
848 return False
850 def __repr__(self) -> str: 1a
851 return f"LocalStorage(path={self._path!r})"
854def create_storage_from_source( 1a
855 source: str, pull_interval: Optional[int] = 60
856) -> RunnerStorage:
857 """
858 Creates a storage object from a URL.
860 Args:
861 url: The URL to create a storage object from. Supports git and `fsspec`
862 URLs.
863 pull_interval: The interval at which to pull contents from remote storage to
864 local storage
866 Returns:
867 RunnerStorage: A runner storage compatible object
868 """
869 logger = get_logger("runner.storage")
870 parsed_source = urlparse(source)
871 if parsed_source.scheme == "git" or parsed_source.path.endswith(".git"):
872 return GitRepository(url=source, pull_interval=pull_interval)
873 elif parsed_source.scheme in ("file", "local"):
874 source_path = source.split("://", 1)[-1]
875 return LocalStorage(path=source_path, pull_interval=pull_interval)
876 elif parsed_source.scheme in fsspec.available_protocols():
877 return RemoteStorage(url=source, pull_interval=pull_interval)
878 else:
879 logger.debug("No valid fsspec protocol found for URL, assuming local storage.")
880 return LocalStorage(path=source, pull_interval=pull_interval)
883def _format_token_from_credentials( 1a
884 netloc: str,
885 credentials: dict[str, Any] | GitCredentials,
886 block: Block | None = None,
887) -> str:
888 """
889 Formats the credentials block for the git provider.
891 If the block implements _GitCredentialsFormatter protocol, delegates to it.
892 Otherwise, uses generic formatting for plain dict credentials.
894 Args:
895 netloc: The network location (hostname) of the git repository
896 credentials: Dictionary containing credential information
897 block: Optional Block object that may implement the formatter protocol
898 """
899 if block is not None and isinstance(block, _GitCredentialsFormatter):
900 # Reconstruct full URL for context (scheme doesn't matter for formatting)
901 url = f"https://{netloc}"
902 return block.format_git_credentials(url)
903 username = credentials.get("username") if credentials else None
904 password = credentials.get("password") if credentials else None
905 token = credentials.get("token") if credentials else None
906 access_token = credentials.get("access_token") if credentials else None
908 user_provided_token: str | Secret[str] | None = access_token or token or password
910 if isinstance(user_provided_token, Secret):
911 user_provided_token = user_provided_token.get()
913 if not user_provided_token:
914 raise ValueError(
915 "Please provide a `token` or `password` in your Credentials block to clone"
916 " a repo."
917 )
919 if username:
920 return f"{username}:{user_provided_token}"
922 # Fallback for plain dict credentials without a block
923 return user_provided_token
926def _strip_auth_from_url(url: str) -> str: 1a
927 parsed = urlparse(url)
929 # Construct a new netloc without the auth info
930 netloc = parsed.hostname
931 if parsed.port and netloc:
932 netloc += f":{parsed.port}"
934 # Build the sanitized URL
935 return urlunparse(
936 (
937 parsed.scheme,
938 netloc,
939 parsed.path,
940 parsed.params,
941 parsed.query,
942 parsed.fragment,
943 )
944 )