Coverage for /usr/local/lib/python3.12/site-packages/prefect/filesystems.py: 27%

300 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import urllib.parse 1a

5from pathlib import Path 1a

6from shutil import copytree 1a

7from typing import Any, Callable, Dict, Optional 1a

8 

9import anyio 1a

10import fsspec 1a

11from pydantic import BaseModel, Field, SecretStr, field_validator 1a

12 

13from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

14from prefect._internal.schemas.validators import ( 1a

15 stringify_path, 

16 validate_basepath, 

17) 

18from prefect.blocks.core import Block 1a

19from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible 1a

20from prefect.utilities.filesystem import filter_files 1a

21 

22from ._internal.compatibility.migration import getattr_migration 1a

23 

24 

25class ReadableFileSystem(Block, abc.ABC): 1a

26 _block_schema_capabilities = ["read-path"] 1a

27 

28 @abc.abstractmethod 1a

29 async def read_path(self, path: str) -> bytes: 1a

30 pass 

31 

32 

33class WritableFileSystem(Block, abc.ABC): 1a

34 _block_schema_capabilities = ["read-path", "write-path"] 1a

35 

36 @abc.abstractmethod 1a

37 async def read_path(self, path: str) -> bytes: 1a

38 pass 

39 

40 @abc.abstractmethod 1a

41 async def write_path(self, path: str, content: bytes) -> None: 1a

42 pass 

43 

44 

45class ReadableDeploymentStorage(Block, abc.ABC): 1a

46 _block_schema_capabilities = ["get-directory"] 1a

47 

48 @abc.abstractmethod 1a

49 async def get_directory( 1a

50 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

51 ) -> None: 

52 pass 

53 

54 

55class WritableDeploymentStorage(Block, abc.ABC): 1a

56 _block_schema_capabilities = ["get-directory", "put-directory"] 1a

57 

58 @abc.abstractmethod 1a

59 async def get_directory( 1a

60 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

61 ) -> None: 

62 pass 

63 

64 @abc.abstractmethod 1a

65 async def put_directory( 1a

66 self, 

67 local_path: Optional[str] = None, 

68 to_path: Optional[str] = None, 

69 ignore_file: Optional[str] = None, 

70 ) -> None: 

71 pass 

72 

73 

74class LocalFileSystem(WritableFileSystem, WritableDeploymentStorage): 1a

75 """ 

76 Store data as a file on a local file system. 

77 

78 Example: 

79 Load stored local file system config: 

80 ```python 

81 from prefect.filesystems import LocalFileSystem 

82 

83 local_file_system_block = LocalFileSystem.load("BLOCK_NAME") 

84 ``` 

85 """ 

86 

87 _block_type_name = "Local File System" 1a

88 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/ad39089fa66d273b943394a68f003f7a19aa850e-48x48.png" 1a

89 _documentation_url = ( 1a

90 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem" 

91 ) 

92 

93 basepath: Optional[str] = Field( 1a

94 default=None, description="Default local path for this block to write to." 

95 ) 

96 

97 @field_validator("basepath", mode="before") 1a

98 def cast_pathlib(cls, value: str | Path | None) -> str | None: 1a

99 if value is None: 

100 return value 

101 return stringify_path(value) 

102 

103 def _resolve_path(self, path: str, validate: bool = False) -> Path: 1a

104 # Only resolve the base path at runtime, default to the current directory 

105 basepath = ( 

106 Path(self.basepath).expanduser().resolve() 

107 if self.basepath 

108 else Path(".").resolve() 

109 ) 

110 

111 # Determine the path to access relative to the base path, ensuring that paths 

112 # outside of the base path are off limits 

113 if path is None: 

114 return basepath 

115 

116 resolved_path: Path = Path(path).expanduser() 

117 

118 if not resolved_path.is_absolute(): 

119 resolved_path = basepath / resolved_path 

120 else: 

121 resolved_path = resolved_path.resolve() 

122 

123 if validate: 

124 if basepath not in resolved_path.parents and (basepath != resolved_path): 

125 raise ValueError( 

126 f"Provided path {resolved_path} is outside of the base path {basepath}." 

127 ) 

128 return resolved_path 

129 

130 async def aget_directory( 1a

131 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

132 ) -> None: 

133 """ 

134 Copies a directory from one place to another on the local filesystem. 

135 

136 Defaults to copying the entire contents of the block's basepath to the current working directory. 

137 """ 

138 if not from_path: 

139 from_path = Path(self.basepath or ".").expanduser().resolve() 

140 else: 

141 from_path = self._resolve_path(from_path) 

142 

143 if not local_path: 

144 local_path = Path(".").resolve() 

145 else: 

146 local_path = Path(local_path).resolve() 

147 

148 if from_path == local_path: 

149 # If the paths are the same there is no need to copy 

150 # and we avoid shutil.copytree raising an error 

151 return 

152 

153 # .prefectignore exists in the original location, not the current location which 

154 # is most likely temporary 

155 if (from_path / Path(".prefectignore")).exists(): 

156 ignore_func = await self._get_ignore_func( 

157 local_path=from_path, ignore_file=from_path / Path(".prefectignore") 

158 ) 

159 else: 

160 ignore_func = None 

161 

162 copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func) 

163 

164 @async_dispatch(aget_directory) 1a

165 def get_directory( 1a

166 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

167 ) -> None: 

168 """ 

169 Copies a directory from one place to another on the local filesystem. 

170 

171 Defaults to copying the entire contents of the block's basepath to the current working directory. 

172 """ 

173 if not from_path: 

174 from_path = Path(self.basepath or ".").expanduser().resolve() 

175 else: 

176 from_path = self._resolve_path(from_path) 

177 

178 if not local_path: 

179 local_path = Path(".").resolve() 

180 else: 

181 local_path = Path(local_path).resolve() 

182 

183 if from_path == local_path: 

184 # If the paths are the same there is no need to copy 

185 # and we avoid shutil.copytree raising an error 

186 return 

187 

188 # .prefectignore exists in the original location, not the current location which 

189 # is most likely temporary 

190 if (from_path / Path(".prefectignore")).exists(): 

191 with open(from_path / Path(".prefectignore")) as f: 

192 ignore_patterns = f.readlines() 

193 included_files = filter_files( 

194 root=from_path, ignore_patterns=ignore_patterns 

195 ) 

196 

197 def ignore_func(directory, files): 

198 relative_path = Path(directory).relative_to(from_path) 

199 files_to_ignore = [ 

200 f for f in files if str(relative_path / f) not in included_files 

201 ] 

202 return files_to_ignore 

203 else: 

204 ignore_func = None 

205 

206 copytree(from_path, local_path, dirs_exist_ok=True, ignore=ignore_func) 

207 

208 async def _get_ignore_func(self, local_path: str, ignore_file: str): 1a

209 with open(ignore_file) as f: 

210 ignore_patterns = f.readlines() 

211 included_files = filter_files(root=local_path, ignore_patterns=ignore_patterns) 

212 

213 def ignore_func(directory, files): 

214 relative_path = Path(directory).relative_to(local_path) 

215 

216 files_to_ignore = [ 

217 f for f in files if str(relative_path / f) not in included_files 

218 ] 

219 return files_to_ignore 

220 

221 return ignore_func 

222 

223 async def aput_directory( 1a

224 self, 

225 local_path: Optional[str] = None, 

226 to_path: Optional[str] = None, 

227 ignore_file: Optional[str] = None, 

228 ) -> None: 

229 """ 

230 Copies a directory from one place to another on the local filesystem. 

231 

232 Defaults to copying the entire contents of the current working directory to the block's basepath. 

233 An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore. 

234 """ 

235 destination_path = self._resolve_path(to_path, validate=True) 

236 

237 if not local_path: 

238 local_path = Path(".").absolute() 

239 

240 if ignore_file: 

241 ignore_func = await self._get_ignore_func( 

242 local_path=local_path, ignore_file=ignore_file 

243 ) 

244 else: 

245 ignore_func = None 

246 

247 if local_path == destination_path: 

248 pass 

249 else: 

250 copytree( 

251 src=local_path, 

252 dst=destination_path, 

253 ignore=ignore_func, 

254 dirs_exist_ok=True, 

255 ) 

256 

257 @async_dispatch(aput_directory) 1a

258 def put_directory( 1a

259 self, 

260 local_path: Optional[str] = None, 

261 to_path: Optional[str] = None, 

262 ignore_file: Optional[str] = None, 

263 ) -> None: 

264 """ 

265 Copies a directory from one place to another on the local filesystem. 

266 

267 Defaults to copying the entire contents of the current working directory to the block's basepath. 

268 An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore. 

269 """ 

270 destination_path = self._resolve_path(to_path, validate=True) 

271 

272 if not local_path: 

273 local_path = Path(".") 

274 

275 if ignore_file: 

276 with open(ignore_file) as f: 

277 ignore_patterns = f.readlines() 

278 included_files = filter_files( 

279 root=local_path, ignore_patterns=ignore_patterns 

280 ) 

281 

282 def ignore_func(directory, files): 

283 relative_path = Path(directory).relative_to(local_path) 

284 files_to_ignore = [ 

285 f for f in files if str(relative_path / f) not in included_files 

286 ] 

287 return files_to_ignore 

288 else: 

289 ignore_func = None 

290 

291 if local_path == destination_path: 

292 pass 

293 else: 

294 copytree( 

295 src=local_path, 

296 dst=destination_path, 

297 ignore=ignore_func, 

298 dirs_exist_ok=True, 

299 ) 

300 

301 async def aread_path(self, path: str) -> bytes: 1a

302 path: Path = self._resolve_path(path) 

303 

304 # Check if the path exists 

305 if not path.exists(): 

306 raise ValueError(f"Path {path} does not exist.") 

307 

308 # Validate that its a file 

309 if not path.is_file(): 

310 raise ValueError(f"Path {path} is not a file.") 

311 

312 async with await anyio.open_file(str(path), mode="rb") as f: 

313 content = await f.read() 

314 

315 return content 

316 

317 @async_dispatch(aread_path) 1a

318 def read_path(self, path: str) -> bytes: 1a

319 path: Path = self._resolve_path(path) 

320 

321 # Check if the path exists 

322 if not path.exists(): 

323 raise ValueError(f"Path {path} does not exist.") 

324 

325 # Validate that its a file 

326 if not path.is_file(): 

327 raise ValueError(f"Path {path} is not a file.") 

328 

329 with open(str(path), mode="rb") as f: 

330 content = f.read() 

331 

332 return content 

333 

334 async def awrite_path(self, path: str, content: bytes) -> str: 1a

335 path: Path = self._resolve_path(path) 

336 

337 # Construct the path if it does not exist 

338 path.parent.mkdir(exist_ok=True, parents=True) 

339 

340 # Check if the file already exists 

341 if path.exists() and not path.is_file(): 

342 raise ValueError(f"Path {path} already exists and is not a file.") 

343 

344 async with await anyio.open_file(path, mode="wb") as f: 

345 await f.write(content) 

346 # Leave path stringify to the OS 

347 return str(path) 

348 

349 @async_dispatch(awrite_path) 1a

350 def write_path(self, path: str, content: bytes) -> str: 1a

351 path: Path = self._resolve_path(path) 

352 

353 # Construct the path if it does not exist 

354 path.parent.mkdir(exist_ok=True, parents=True) 

355 

356 # Check if the file already exists 

357 if path.exists() and not path.is_file(): 

358 raise ValueError(f"Path {path} already exists and is not a file.") 

359 

360 with open(path, mode="wb") as f: 

361 f.write(content) 

362 # Leave path stringify to the OS 

363 return str(path) 

364 

365 

366class RemoteFileSystem(WritableFileSystem, WritableDeploymentStorage): 1a

367 """ 

368 Store data as a file on a remote file system. 

369 

370 Supports any remote file system supported by `fsspec`. The file system is specified 

371 using a protocol. For example, "s3://my-bucket/my-folder/" will use S3. 

372 

373 Example: 

374 Load stored remote file system config: 

375 ```python 

376 from prefect.filesystems import RemoteFileSystem 

377 

378 remote_file_system_block = RemoteFileSystem.load("BLOCK_NAME") 

379 ``` 

380 """ 

381 

382 _block_type_name = "Remote File System" 1a

383 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/e86b41bc0f9c99ba9489abeee83433b43d5c9365-48x48.png" 1a

384 _documentation_url = ( 1a

385 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem" 

386 ) 

387 

388 basepath: str = Field( 1a

389 default=..., 

390 description="Default path for this block to write to.", 

391 examples=["s3://my-bucket/my-folder/"], 

392 ) 

393 settings: Dict[str, Any] = Field( 1a

394 default_factory=dict, 

395 description="Additional settings to pass through to fsspec.", 

396 ) 

397 

398 # Cache for the configured fsspec file system used for access 

399 _filesystem: fsspec.AbstractFileSystem = None 1a

400 

401 @field_validator("basepath") 1a

402 def check_basepath(cls, value: str) -> str: 1a

403 return validate_basepath(value) 

404 

405 def _resolve_path(self, path: str) -> str: 1a

406 base_scheme, base_netloc, base_urlpath, _, _ = urllib.parse.urlsplit( 

407 self.basepath 

408 ) 

409 scheme, netloc, urlpath, _, _ = urllib.parse.urlsplit(path) 

410 

411 # Confirm that absolute paths are valid 

412 if scheme: 

413 if scheme != base_scheme: 

414 raise ValueError( 

415 f"Path {path!r} with scheme {scheme!r} must use the same scheme as" 

416 f" the base path {base_scheme!r}." 

417 ) 

418 

419 if netloc: 

420 if (netloc != base_netloc) or not urlpath.startswith(base_urlpath): 

421 raise ValueError( 

422 f"Path {path!r} is outside of the base path {self.basepath!r}." 

423 ) 

424 

425 return f"{self.basepath.rstrip('/')}/{urlpath.lstrip('/')}" 

426 

427 @sync_compatible 1a

428 async def get_directory( 1a

429 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

430 ) -> None: 

431 """ 

432 Downloads a directory from a given remote path to a local directory. 

433 

434 Defaults to downloading the entire contents of the block's basepath to the current working directory. 

435 """ 

436 if from_path is None: 

437 from_path = str(self.basepath) 

438 else: 

439 from_path = self._resolve_path(from_path) 

440 

441 if local_path is None: 

442 local_path = Path(".").absolute() 

443 

444 # validate that from_path has a trailing slash for proper fsspec behavior across versions 

445 if not from_path.endswith("/"): 

446 from_path += "/" 

447 

448 return self.filesystem.get(from_path, local_path, recursive=True) 

449 

450 @sync_compatible 1a

451 async def put_directory( 1a

452 self, 

453 local_path: Optional[str] = None, 

454 to_path: Optional[str] = None, 

455 ignore_file: Optional[str] = None, 

456 overwrite: bool = True, 

457 ) -> int: 

458 """ 

459 Uploads a directory from a given local path to a remote directory. 

460 

461 Defaults to uploading the entire contents of the current working directory to the block's basepath. 

462 """ 

463 if to_path is None: 

464 to_path = str(self.basepath) 

465 else: 

466 to_path = self._resolve_path(to_path) 

467 

468 if local_path is None: 

469 local_path = "." 

470 

471 included_files = None 

472 if ignore_file: 

473 with open(ignore_file) as f: 

474 ignore_patterns = f.readlines() 

475 

476 included_files = filter_files( 

477 local_path, ignore_patterns, include_dirs=True 

478 ) 

479 

480 counter = 0 

481 for f in Path(local_path).rglob("*"): 

482 relative_path = f.relative_to(local_path) 

483 if included_files and str(relative_path) not in included_files: 

484 continue 

485 

486 if to_path.endswith("/"): 

487 fpath = to_path + relative_path.as_posix() 

488 else: 

489 fpath = to_path + "/" + relative_path.as_posix() 

490 

491 if f.is_dir(): 

492 pass 

493 else: 

494 f = f.as_posix() 

495 if overwrite: 

496 self.filesystem.put_file(f, fpath, overwrite=True) 

497 else: 

498 self.filesystem.put_file(f, fpath) 

499 

500 counter += 1 

501 

502 return counter 

503 

504 @sync_compatible 1a

505 async def read_path(self, path: str) -> bytes: 1a

506 path = self._resolve_path(path) 

507 

508 with self.filesystem.open(path, "rb") as file: 

509 content = await run_sync_in_worker_thread(file.read) 

510 

511 return content 

512 

513 @sync_compatible 1a

514 async def write_path(self, path: str, content: bytes) -> str: 1a

515 path = self._resolve_path(path) 

516 dirpath = path[: path.rindex("/")] 

517 

518 if self.basepath.startswith("smb://"): 

519 parsed = urllib.parse.urlparse(dirpath) 

520 dirpath = parsed.path 

521 

522 self.filesystem.makedirs(dirpath, exist_ok=True) 

523 

524 with self.filesystem.open(path, "wb") as file: 

525 await run_sync_in_worker_thread(file.write, content) 

526 return path 

527 

528 @property 1a

529 def filesystem(self) -> fsspec.AbstractFileSystem: 1a

530 if not self._filesystem: 

531 scheme, _, _, _, _ = urllib.parse.urlsplit(self.basepath) 

532 

533 try: 

534 self._filesystem = fsspec.filesystem(scheme, **self.settings) 

535 except ImportError as exc: 

536 # The path is a remote file system that uses a lib that is not installed 

537 raise RuntimeError( 

538 f"File system created with scheme {scheme!r} from base path " 

539 f"{self.basepath!r} could not be created. " 

540 "You are likely missing a Python module required to use the given " 

541 "storage protocol." 

542 ) from exc 

543 

544 return self._filesystem 

545 

546 

547class SMB(WritableFileSystem, WritableDeploymentStorage): 1a

548 """ 

549 Store data as a file on a SMB share. 

550 

551 Example: 

552 Load stored SMB config: 

553 

554 ```python 

555 from prefect.filesystems import SMB 

556 smb_block = SMB.load("BLOCK_NAME") 

557 ``` 

558 """ 

559 

560 _block_type_name = "SMB" 1a

561 _logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/3f624663f7beb97d011d011bffd51ecf6c499efc-195x195.png" 1a

562 _documentation_url = ( 1a

563 "https://docs.prefect.io/latest/develop/results#specifying-a-default-filesystem" 

564 ) 

565 

566 share_path: str = Field( 1a

567 default=..., 

568 description="SMB target (requires <SHARE>, followed by <PATH>).", 

569 examples=["/SHARE/dir/subdir"], 

570 ) 

571 smb_username: Optional[SecretStr] = Field( 1a

572 default=None, 

573 title="SMB Username", 

574 description="Username with access to the target SMB SHARE.", 

575 ) 

576 smb_password: Optional[SecretStr] = Field( 1a

577 default=None, title="SMB Password", description="Password for SMB access." 

578 ) 

579 smb_host: str = Field( 1a

580 default=..., title="SMB server/hostname", description="SMB server/hostname." 

581 ) 

582 smb_port: Optional[int] = Field( 1a

583 default=None, title="SMB port", description="SMB port (default: 445)." 

584 ) 

585 

586 _remote_file_system: RemoteFileSystem = None 1a

587 

588 @property 1a

589 def basepath(self) -> str: 1a

590 return f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}" 

591 

592 @property 1a

593 def filesystem(self) -> RemoteFileSystem: 1a

594 settings = {} 

595 if self.smb_username: 

596 settings["username"] = self.smb_username.get_secret_value() 

597 if self.smb_password: 

598 settings["password"] = self.smb_password.get_secret_value() 

599 if self.smb_host: 

600 settings["host"] = self.smb_host 

601 if self.smb_port: 

602 settings["port"] = self.smb_port 

603 self._remote_file_system = RemoteFileSystem( 

604 basepath=f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}", 

605 settings=settings, 

606 ) 

607 return self._remote_file_system 

608 

609 @sync_compatible 1a

610 async def get_directory( 1a

611 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

612 ) -> bytes: 

613 """ 

614 Downloads a directory from a given remote path to a local directory. 

615 Defaults to downloading the entire contents of the block's basepath to the current working directory. 

616 """ 

617 return await self.filesystem.get_directory( 

618 from_path=from_path, local_path=local_path 

619 ) 

620 

621 @sync_compatible 1a

622 async def put_directory( 1a

623 self, 

624 local_path: Optional[str] = None, 

625 to_path: Optional[str] = None, 

626 ignore_file: Optional[str] = None, 

627 ) -> int: 

628 """ 

629 Uploads a directory from a given local path to a remote directory. 

630 Defaults to uploading the entire contents of the current working directory to the block's basepath. 

631 """ 

632 return await self.filesystem.put_directory( 

633 local_path=local_path, 

634 to_path=to_path, 

635 ignore_file=ignore_file, 

636 overwrite=False, 

637 ) 

638 

639 @sync_compatible 1a

640 async def read_path(self, path: str) -> bytes: 1a

641 return await self.filesystem.read_path(path) 

642 

643 @sync_compatible 1a

644 async def write_path(self, path: str, content: bytes) -> str: 1a

645 return await self.filesystem.write_path(path=path, content=content) 

646 

647 

648class NullFileSystem(BaseModel): 1a

649 """ 

650 A file system that does not store any data. 

651 """ 

652 

653 async def read_path(self, path: str) -> None: 1a

654 pass 

655 

656 async def write_path(self, path: str, content: bytes) -> None: 1a

657 pass 

658 

659 async def get_directory( 1a

660 self, from_path: Optional[str] = None, local_path: Optional[str] = None 

661 ) -> None: 

662 pass 

663 

664 async def put_directory( 1a

665 self, 

666 local_path: Optional[str] = None, 

667 to_path: Optional[str] = None, 

668 ignore_file: Optional[str] = None, 

669 ) -> None: 

670 pass 

671 

672 

673__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a