Coverage for /usr/local/lib/python3.12/site-packages/prefect/filesystems.py: 27%
300 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import abc 1a
4import urllib.parse 1a
5from pathlib import Path 1a
6from shutil import copytree 1a
7from typing import Any, Callable, Dict, Optional 1a
9import anyio 1a
10import fsspec 1a
11from pydantic import BaseModel, Field, SecretStr, field_validator 1a
13from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
14from prefect._internal.schemas.validators import ( 1a
15 stringify_path,
16 validate_basepath,
17)
18from prefect.blocks.core import Block 1a
19from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible 1a
20from prefect.utilities.filesystem import filter_files 1a
22from ._internal.compatibility.migration import getattr_migration 1a
25class ReadableFileSystem(Block, abc.ABC): 1a
26 _block_schema_capabilities = ["read-path"] 1a
28 @abc.abstractmethod 1a
29 async def read_path(self, path: str) -> bytes: 1a
30 pass
33class WritableFileSystem(Block, abc.ABC): 1a
34 _block_schema_capabilities = ["read-path", "write-path"] 1a
36 @abc.abstractmethod 1a
37 async def read_path(self, path: str) -> bytes: 1a
38 pass
40 @abc.abstractmethod 1a
41 async def write_path(self, path: str, content: bytes) -> None: 1a
42 pass
45class ReadableDeploymentStorage(Block, abc.ABC): 1a
46 _block_schema_capabilities = ["get-directory"] 1a
48 @abc.abstractmethod 1a
49 async def get_directory( 1a
50 self, from_path: Optional[str] = None, local_path: Optional[str] = None
51 ) -> None:
52 pass
55class WritableDeploymentStorage(Block, abc.ABC): 1a
56 _block_schema_capabilities = ["get-directory", "put-directory"] 1a
58 @abc.abstractmethod 1a
59 async def get_directory( 1a
60 self, from_path: Optional[str] = None, local_path: Optional[str] = None
61 ) -> None:
62 pass
64 @abc.abstractmethod 1a
65 async def put_directory( 1a
66 self,
67 local_path: Optional[str] = None,
68 to_path: Optional[str] = None,
69 ignore_file: Optional[str] = None,
70 ) -> None:
71 pass
74class LocalFileSystem(WritableFileSystem, WritableDeploymentStorage): 1a
75 """
76 Store data as a file on a local file system.
78 Example:
79 Load stored local file system config:
80 ```python
81 from prefect.filesystems import LocalFileSystem
83 local_file_system_block = LocalFileSystem.load("BLOCK_NAME")
84 ```
85 """
87 _block_type_name = "Local File System" 1a
88 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/ad39089fa66d273b943394a68f003f7a19aa850e-48x48.png" 1a
89 _documentation_url = ( 1a
90 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem"
91 )
93 basepath: Optional[str] = Field( 1a
94 default=None, description="Default local path for this block to write to."
95 )
97 @field_validator("basepath", mode="before") 1a
98 def cast_pathlib(cls, value: str | Path | None) -> str | None: 1a
99 if value is None:
100 return value
101 return stringify_path(value)
103 def _resolve_path(self, path: str, validate: bool = False) -> Path: 1a
104 # Only resolve the base path at runtime, default to the current directory
105 basepath = (
106 Path(self.basepath).expanduser().resolve()
107 if self.basepath
108 else Path(".").resolve()
109 )
111 # Determine the path to access relative to the base path, ensuring that paths
112 # outside of the base path are off limits
113 if path is None:
114 return basepath
116 resolved_path: Path = Path(path).expanduser()
118 if not resolved_path.is_absolute():
119 resolved_path = basepath / resolved_path
120 else:
121 resolved_path = resolved_path.resolve()
123 if validate:
124 if basepath not in resolved_path.parents and (basepath != resolved_path):
125 raise ValueError(
126 f"Provided path {resolved_path} is outside of the base path {basepath}."
127 )
128 return resolved_path
130 async def aget_directory( 1a
131 self, from_path: Optional[str] = None, local_path: Optional[str] = None
132 ) -> None:
133 """
134 Copies a directory from one place to another on the local filesystem.
136 Defaults to copying the entire contents of the block's basepath to the current working directory.
137 """
138 if not from_path:
139 from_path = Path(self.basepath or ".").expanduser().resolve()
140 else:
141 from_path = self._resolve_path(from_path)
143 if not local_path:
144 local_path = Path(".").resolve()
145 else:
146 local_path = Path(local_path).resolve()
148 if from_path == local_path:
149 # If the paths are the same there is no need to copy
150 # and we avoid shutil.copytree raising an error
151 return
153 # .prefectignore exists in the original location, not the current location which
154 # is most likely temporary
155 if (from_path / Path(".prefectignore")).exists():
156 ignore_func = await self._get_ignore_func(
157 local_path=from_path, ignore_file=from_path / Path(".prefectignore")
158 )
159 else:
160 ignore_func = None
162 copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func)
164 @async_dispatch(aget_directory) 1a
165 def get_directory( 1a
166 self, from_path: Optional[str] = None, local_path: Optional[str] = None
167 ) -> None:
168 """
169 Copies a directory from one place to another on the local filesystem.
171 Defaults to copying the entire contents of the block's basepath to the current working directory.
172 """
173 if not from_path:
174 from_path = Path(self.basepath or ".").expanduser().resolve()
175 else:
176 from_path = self._resolve_path(from_path)
178 if not local_path:
179 local_path = Path(".").resolve()
180 else:
181 local_path = Path(local_path).resolve()
183 if from_path == local_path:
184 # If the paths are the same there is no need to copy
185 # and we avoid shutil.copytree raising an error
186 return
188 # .prefectignore exists in the original location, not the current location which
189 # is most likely temporary
190 if (from_path / Path(".prefectignore")).exists():
191 with open(from_path / Path(".prefectignore")) as f:
192 ignore_patterns = f.readlines()
193 included_files = filter_files(
194 root=from_path, ignore_patterns=ignore_patterns
195 )
197 def ignore_func(directory, files):
198 relative_path = Path(directory).relative_to(from_path)
199 files_to_ignore = [
200 f for f in files if str(relative_path / f) not in included_files
201 ]
202 return files_to_ignore
203 else:
204 ignore_func = None
206 copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func)
208 async def _get_ignore_func(self, local_path: str, ignore_file: str): 1a
209 with open(ignore_file) as f:
210 ignore_patterns = f.readlines()
211 included_files = filter_files(root=local_path, ignore_patterns=ignore_patterns)
213 def ignore_func(directory, files):
214 relative_path = Path(directory).relative_to(local_path)
216 files_to_ignore = [
217 f for f in files if str(relative_path / f) not in included_files
218 ]
219 return files_to_ignore
221 return ignore_func
223 async def aput_directory( 1a
224 self,
225 local_path: Optional[str] = None,
226 to_path: Optional[str] = None,
227 ignore_file: Optional[str] = None,
228 ) -> None:
229 """
230 Copies a directory from one place to another on the local filesystem.
232 Defaults to copying the entire contents of the current working directory to the block's basepath.
233 An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore.
234 """
235 destination_path = self._resolve_path(to_path, validate=True)
237 if not local_path:
238 local_path = Path(".").absolute()
240 if ignore_file:
241 ignore_func = await self._get_ignore_func(
242 local_path=local_path, ignore_file=ignore_file
243 )
244 else:
245 ignore_func = None
247 if local_path == destination_path:
248 pass
249 else:
250 copytree(
251 src=local_path,
252 dst=destination_path,
253 ignore=ignore_func,
254 dirs_exist_ok=True,
255 )
257 @async_dispatch(aput_directory) 1a
258 def put_directory( 1a
259 self,
260 local_path: Optional[str] = None,
261 to_path: Optional[str] = None,
262 ignore_file: Optional[str] = None,
263 ) -> None:
264 """
265 Copies a directory from one place to another on the local filesystem.
267 Defaults to copying the entire contents of the current working directory to the block's basepath.
268 An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore.
269 """
270 destination_path = self._resolve_path(to_path, validate=True)
272 if not local_path:
273 local_path = Path(".")
275 if ignore_file:
276 with open(ignore_file) as f:
277 ignore_patterns = f.readlines()
278 included_files = filter_files(
279 root=local_path, ignore_patterns=ignore_patterns
280 )
282 def ignore_func(directory, files):
283 relative_path = Path(directory).relative_to(local_path)
284 files_to_ignore = [
285 f for f in files if str(relative_path / f) not in included_files
286 ]
287 return files_to_ignore
288 else:
289 ignore_func = None
291 if local_path == destination_path:
292 pass
293 else:
294 copytree(
295 src=local_path,
296 dst=destination_path,
297 ignore=ignore_func,
298 dirs_exist_ok=True,
299 )
301 async def aread_path(self, path: str) -> bytes: 1a
302 path: Path = self._resolve_path(path)
304 # Check if the path exists
305 if not path.exists():
306 raise ValueError(f"Path {path} does not exist.")
308 # Validate that its a file
309 if not path.is_file():
310 raise ValueError(f"Path {path} is not a file.")
312 async with await anyio.open_file(str(path), mode="rb") as f:
313 content = await f.read()
315 return content
317 @async_dispatch(aread_path) 1a
318 def read_path(self, path: str) -> bytes: 1a
319 path: Path = self._resolve_path(path)
321 # Check if the path exists
322 if not path.exists():
323 raise ValueError(f"Path {path} does not exist.")
325 # Validate that its a file
326 if not path.is_file():
327 raise ValueError(f"Path {path} is not a file.")
329 with open(str(path), mode="rb") as f:
330 content = f.read()
332 return content
334 async def awrite_path(self, path: str, content: bytes) -> str: 1a
335 path: Path = self._resolve_path(path)
337 # Construct the path if it does not exist
338 path.parent.mkdir(exist_ok=True, parents=True)
340 # Check if the file already exists
341 if path.exists() and not path.is_file():
342 raise ValueError(f"Path {path} already exists and is not a file.")
344 async with await anyio.open_file(path, mode="wb") as f:
345 await f.write(content)
346 # Leave path stringify to the OS
347 return str(path)
349 @async_dispatch(awrite_path) 1a
350 def write_path(self, path: str, content: bytes) -> str: 1a
351 path: Path = self._resolve_path(path)
353 # Construct the path if it does not exist
354 path.parent.mkdir(exist_ok=True, parents=True)
356 # Check if the file already exists
357 if path.exists() and not path.is_file():
358 raise ValueError(f"Path {path} already exists and is not a file.")
360 with open(path, mode="wb") as f:
361 f.write(content)
362 # Leave path stringify to the OS
363 return str(path)
366class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage): 1a
367 """
368 Store data as a file on a remote file system.
370 Supports any remote file system supported by `fsspec`. The file system is specified
371 using a protocol. For example, "s3://my-bucket/my-folder/" will use S3.
373 Example:
374 Load stored remote file system config:
375 ```python
376 from prefect.filesystems import RemoteFileSystem
378 remote_file_system_block = RemoteFileSystem.load("BLOCK_NAME")
379 ```
380 """
382 _block_type_name = "Remote File System" 1a
383 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/e86b41bc0f9c99ba9489abeee83433b43d5c9365-48x48.png" 1a
384 _documentation_url = ( 1a
385 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem"
386 )
388 basepath: str = Field( 1a
389 default=...,
390 description="Default path for this block to write to.",
391 examples=["s3://my-bucket/my-folder/"],
392 )
393 settings: Dict[str, Any] = Field( 1a
394 default_factory=dict,
395 description="Additional settings to pass through to fsspec.",
396 )
398 # Cache for the configured fsspec file system used for access
399 _filesystem: fsspec.AbstractFileSystem = None 1a
401 @field_validator("basepath") 1a
402 def check_basepath(cls, value: str) -> str: 1a
403 return validate_basepath(value)
405 def _resolve_path(self, path: str) -> str: 1a
406 base_scheme, base_netloc, base_urlpath, _, _ = urllib.parse.urlsplit(
407 self.basepath
408 )
409 scheme, netloc, urlpath, _, _ = urllib.parse.urlsplit(path)
411 # Confirm that absolute paths are valid
412 if scheme:
413 if scheme != base_scheme:
414 raise ValueError(
415 f"Path {path!r} with scheme {scheme!r} must use the same scheme as"
416 f" the base path {base_scheme!r}."
417 )
419 if netloc:
420 if (netloc != base_netloc) or not urlpath.startswith(base_urlpath):
421 raise ValueError(
422 f"Path {path!r} is outside of the base path {self.basepath!r}."
423 )
425 return f"{self.basepath.rstrip('/')}/{urlpath.lstrip('/')}"
427 @sync_compatible 1a
428 async def get_directory( 1a
429 self, from_path: Optional[str] = None, local_path: Optional[str] = None
430 ) -> None:
431 """
432 Downloads a directory from a given remote path to a local directory.
434 Defaults to downloading the entire contents of the block's basepath to the current working directory.
435 """
436 if from_path is None:
437 from_path = str(self.basepath)
438 else:
439 from_path = self._resolve_path(from_path)
441 if local_path is None:
442 local_path = Path(".").absolute()
444 # validate that from_path has a trailing slash for proper fsspec behavior across versions
445 if not from_path.endswith("/"):
446 from_path += "/"
448 return self.filesystem.get(from_path, local_path, recursive=True)
450 @sync_compatible 1a
451 async def put_directory( 1a
452 self,
453 local_path: Optional[str] = None,
454 to_path: Optional[str] = None,
455 ignore_file: Optional[str] = None,
456 overwrite: bool = True,
457 ) -> int:
458 """
459 Uploads a directory from a given local path to a remote directory.
461 Defaults to uploading the entire contents of the current working directory to the block's basepath.
462 """
463 if to_path is None:
464 to_path = str(self.basepath)
465 else:
466 to_path = self._resolve_path(to_path)
468 if local_path is None:
469 local_path = "."
471 included_files = None
472 if ignore_file:
473 with open(ignore_file) as f:
474 ignore_patterns = f.readlines()
476 included_files = filter_files(
477 local_path, ignore_patterns, include_dirs=True
478 )
480 counter = 0
481 for f in Path(local_path).rglob("*"):
482 relative_path = f.relative_to(local_path)
483 if included_files and str(relative_path) not in included_files:
484 continue
486 if to_path.endswith("/"):
487 fpath = to_path + relative_path.as_posix()
488 else:
489 fpath = to_path + "/" + relative_path.as_posix()
491 if f.is_dir():
492 pass
493 else:
494 f = f.as_posix()
495 if overwrite:
496 self.filesystem.put_file(f, fpath, overwrite=True)
497 else:
498 self.filesystem.put_file(f, fpath)
500 counter += 1
502 return counter
504 @sync_compatible 1a
505 async def read_path(self, path: str) -> bytes: 1a
506 path = self._resolve_path(path)
508 with self.filesystem.open(path, "rb") as file:
509 content = await run_sync_in_worker_thread(file.read)
511 return content
513 @sync_compatible 1a
514 async def write_path(self, path: str, content: bytes) -> str: 1a
515 path = self._resolve_path(path)
516 dirpath = path[: path.rindex("/")]
518 if self.basepath.startswith("smb://"):
519 parsed = urllib.parse.urlparse(dirpath)
520 dirpath = parsed.path
522 self.filesystem.makedirs(dirpath, exist_ok=True)
524 with self.filesystem.open(path, "wb") as file:
525 await run_sync_in_worker_thread(file.write, content)
526 return path
528 @property 1a
529 def filesystem(self) -> fsspec.AbstractFileSystem: 1a
530 if not self._filesystem:
531 scheme, _, _, _, _ = urllib.parse.urlsplit(self.basepath)
533 try:
534 self._filesystem = fsspec.filesystem(scheme, **self.settings)
535 except ImportError as exc:
536 # The path is a remote file system that uses a lib that is not installed
537 raise RuntimeError(
538 f"File system created with scheme {scheme!r} from base path "
539 f"{self.basepath!r} could not be created. "
540 "You are likely missing a Python module required to use the given "
541 "storage protocol."
542 ) from exc
544 return self._filesystem
547class SMB(WritableFileSystem, WritableDeploymentStorage): 1a
548 """
549 Store data as a file on a SMB share.
551 Example:
552 Load stored SMB config:
554 ```python
555 from prefect.filesystems import SMB
556 smb_block = SMB.load("BLOCK_NAME")
557 ```
558 """
560 _block_type_name = "SMB" 1a
561 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/3f624663f7beb97d011d011bffd51ecf6c499efc-195x195.png" 1a
562 _documentation_url = ( 1a
563 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem"
564 )
566 share_path: str = Field( 1a
567 default=...,
568 description="SMB target (requires <SHARE>, followed by <PATH>).",
569 examples=["/SHARE/dir/subdir"],
570 )
571 smb_username: Optional[SecretStr] = Field( 1a
572 default=None,
573 title="SMB Username",
574 description="Username with access to the target SMB SHARE.",
575 )
576 smb_password: Optional[SecretStr] = Field( 1a
577 default=None, title="SMB Password", description="Password for SMB access."
578 )
579 smb_host: str = Field( 1a
580 default=..., title="SMB server/hostname", description="SMB server/hostname."
581 )
582 smb_port: Optional[int] = Field( 1a
583 default=None, title="SMB port", description="SMB port (default: 445)."
584 )
586 _remote_file_system: RemoteFileSystem = None 1a
588 @property 1a
589 def basepath(self) -> str: 1a
590 return f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}"
592 @property 1a
593 def filesystem(self) -> RemoteFileSystem: 1a
594 settings = {}
595 if self.smb_username:
596 settings["username"] = self.smb_username.get_secret_value()
597 if self.smb_password:
598 settings["password"] = self.smb_password.get_secret_value()
599 if self.smb_host:
600 settings["host"] = self.smb_host
601 if self.smb_port:
602 settings["port"] = self.smb_port
603 self._remote_file_system = RemoteFileSystem(
604 basepath=f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}",
605 settings=settings,
606 )
607 return self._remote_file_system
609 @sync_compatible 1a
610 async def get_directory( 1a
611 self, from_path: Optional[str] = None, local_path: Optional[str] = None
612 ) -> bytes:
613 """
614 Downloads a directory from a given remote path to a local directory.
615 Defaults to downloading the entire contents of the block's basepath to the current working directory.
616 """
617 return await self.filesystem.get_directory(
618 from_path=from_path, local_path=local_path
619 )
621 @sync_compatible 1a
622 async def put_directory( 1a
623 self,
624 local_path: Optional[str] = None,
625 to_path: Optional[str] = None,
626 ignore_file: Optional[str] = None,
627 ) -> int:
628 """
629 Uploads a directory from a given local path to a remote directory.
630 Defaults to uploading the entire contents of the current working directory to the block's basepath.
631 """
632 return await self.filesystem.put_directory(
633 local_path=local_path,
634 to_path=to_path,
635 ignore_file=ignore_file,
636 overwrite=False,
637 )
639 @sync_compatible 1a
640 async def read_path(self, path: str) -> bytes: 1a
641 return await self.filesystem.read_path(path)
643 @sync_compatible 1a
644 async def write_path(self, path: str, content: bytes) -> str: 1a
645 return await self.filesystem.write_path(path=path, content=content)
648class NullFileSystem(BaseModel): 1a
649 """
650 A file system that does not store any data.
651 """
653 async def read_path(self, path: str) -> None: 1a
654 pass
656 async def write_path(self, path: str, content: bytes) -> None: 1a
657 pass
659 async def get_directory( 1a
660 self, from_path: Optional[str] = None, local_path: Optional[str] = None
661 ) -> None:
662 pass
664 async def put_directory( 1a
665 self,
666 local_path: Optional[str] = None,
667 to_path: Optional[str] = None,
668 ignore_file: Optional[str] = None,
669 ) -> None:
670 pass
673__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a