Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/dockerutils.py: 20%

262 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import hashlib 1a

2import os 1a

3import shutil 1a

4import subprocess 1a

5import sys 1a

6import warnings 1a

7from collections.abc import Generator, Iterable, Iterator 1a

8from contextlib import contextmanager 1a

9from pathlib import Path, PurePosixPath 1a

10from tempfile import TemporaryDirectory 1a

11from types import TracebackType 1a

12from typing import TYPE_CHECKING, Any, Optional, TextIO, Union, cast 1a

13from urllib.parse import urlsplit 1a

14 

15from packaging.version import Version 1a

16from typing_extensions import Self 1a

17 

18import prefect 1a

19import prefect.types._datetime 1a

20from prefect.utilities.importtools import lazy_import 1a

21from prefect.utilities.slugify import slugify 1a

22 

23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a

24 import docker 

25 import docker.errors 

26 from docker import DockerClient 

27 from docker.models.images import Image 

28 

29CONTAINER_LABELS = { 1a

30 "io.prefect.version": prefect.__version__, 

31} 

32 

33 

34def python_version_minor() -> str: 1a

35 return f"{sys.version_info.major}.{sys.version_info.minor}" 1a

36 

37 

38def python_version_micro() -> str: 1a

39 return f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" 

40 

41 

42def get_prefect_image_name( 1a

43 prefect_version: Optional[str] = None, 

44 python_version: Optional[str] = None, 

45 flavor: Optional[str] = None, 

46) -> str: 

47 """ 

48 Get the Prefect image name matching the current Prefect and Python versions. 

49 

50 Args: 

51 prefect_version: An optional override for the Prefect version. 

52 python_version: An optional override for the Python version; must be at the 

53 minor level e.g. '3.9'. 

54 flavor: An optional alternative image flavor to build, like 'conda' 

55 """ 

56 parsed_version = Version(prefect_version or prefect.__version__) 

57 is_prod_build = parsed_version.local is None 

58 try: 

59 # Try to get the short SHA from git because it can add additional characters to avoid ambiguity 

60 short_sha = ( 

61 subprocess.check_output(["git", "rev-parse", "--short=7", "HEAD"]) 

62 .decode("utf-8") 

63 .strip() 

64 ) 

65 except Exception: 

66 # If git is not available, fallback to the first 7 characters of the full revision ID 

67 short_sha = prefect.__version_info__["full-revisionid"][:7] 

68 prefect_version = parsed_version.public if is_prod_build else f"sha-{short_sha}" 

69 

70 python_version = python_version or python_version_minor() 

71 

72 tag = slugify( 

73 f"{prefect_version}-python{python_version}" + (f"-{flavor}" if flavor else ""), 

74 lowercase=False, 

75 max_length=128, 

76 # Docker allows these characters for tag names 

77 regex_pattern=r"[^a-zA-Z0-9_.-]+", 

78 ) 

79 

80 image = "prefect" if is_prod_build else "prefect-dev" 

81 return f"prefecthq/{image}:{tag}" 

82 

83 

84@contextmanager 1a

85def silence_docker_warnings() -> Generator[None, None, None]: 1a

86 with warnings.catch_warnings(): 1a

87 # Silence warnings due to use of deprecated methods within dockerpy 

88 # See https://github.com/docker/docker-py/pull/2931 

89 warnings.filterwarnings( 1a

90 "ignore", 

91 message="distutils Version classes are deprecated.*", 

92 category=DeprecationWarning, 

93 ) 

94 

95 warnings.filterwarnings( 1a

96 "ignore", 

97 message="The distutils package is deprecated and slated for removal.*", 

98 category=DeprecationWarning, 

99 ) 

100 

101 yield 1a

102 

103 

104# docker-py has some deprecation warnings that fire off during import, and we don't 

105# want to have those popping up in various modules and test suites. Instead, 

106# consolidate the imports we need here, and expose them via this module. 

107with silence_docker_warnings(): 1a

108 if not TYPE_CHECKING: 108 ↛ 112line 108 didn't jump to line 1121a

109 docker = lazy_import("docker") 1a

110 

111 

112@contextmanager 1a

113def docker_client() -> Generator["DockerClient", None, None]: 1a

114 """Get the environmentally-configured Docker client""" 

115 client = None 

116 try: 

117 with silence_docker_warnings(): 

118 client = docker.DockerClient.from_env() 

119 

120 yield client 

121 except docker.errors.DockerException as exc: 

122 raise RuntimeError( 

123 "This error is often thrown because Docker is not running. Please ensure Docker is running." 

124 ) from exc 

125 finally: 

126 if client is not None: 

127 client.close() # type: ignore # typing stub is not complete 

128 

129 

130class BuildError(Exception): 1a

131 """Raised when a Docker build fails""" 

132 

133 

134# Labels to apply to all images built with Prefect 

135IMAGE_LABELS = { 1a

136 "io.prefect.version": prefect.__version__, 

137} 

138 

139 

140@silence_docker_warnings() 1a

141def build_image( 1a

142 context: Path, 

143 dockerfile: str = "Dockerfile", 

144 tag: Optional[str] = None, 

145 pull: bool = False, 

146 platform: Optional[str] = None, 

147 stream_progress_to: Optional[TextIO] = None, 

148 **kwargs: Any, 

149) -> str: 

150 """Builds a Docker image, returning the image ID 

151 

152 Args: 

153 context: the root directory for the Docker build context 

154 dockerfile: the path to the Dockerfile, relative to the context 

155 tag: the tag to give this image 

156 pull: True to pull the base image during the build 

157 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that 

158 will collect the build output as it is reported by Docker 

159 

160 Returns: 

161 The image ID 

162 """ 

163 

164 if not context: 

165 raise ValueError("context required to build an image") 

166 

167 if not Path(context).exists(): 

168 raise ValueError(f"Context path {context} does not exist") 

169 

170 kwargs = {key: kwargs[key] for key in kwargs if key not in ["decode", "labels"]} 

171 

172 image_id = None 

173 with docker_client() as client: 

174 events = client.api.build( 

175 path=str(context), 

176 tag=tag, 

177 dockerfile=dockerfile, 

178 pull=pull, 

179 decode=True, 

180 labels=IMAGE_LABELS, 

181 platform=platform, 

182 **kwargs, 

183 ) 

184 

185 try: 

186 for event in events: 

187 if "stream" in event: 

188 if not stream_progress_to: 

189 continue 

190 stream_progress_to.write(event["stream"]) 

191 stream_progress_to.flush() 

192 elif "aux" in event: 

193 image_id = event["aux"]["ID"] 

194 elif "error" in event: 

195 raise BuildError(event["error"]) 

196 elif "message" in event: 

197 raise BuildError(event["message"]) 

198 except docker.errors.APIError as e: 

199 raise BuildError(e.explanation) from e 

200 

201 assert image_id, "The Docker daemon did not return an image ID" 

202 return image_id 

203 

204 

205class ImageBuilder: 1a

206 """An interface for preparing Docker build contexts and building images""" 

207 

208 base_directory: Path 1a

209 context: Optional[Path] 1a

210 platform: Optional[str] 1a

211 dockerfile_lines: list[str] 1a

212 temporary_directory: Optional[TemporaryDirectory[str]] 1a

213 

214 def __init__( 1a

215 self, 

216 base_image: str, 

217 base_directory: Optional[Path] = None, 

218 platform: Optional[str] = None, 

219 context: Optional[Path] = None, 

220 ): 

221 """Create an ImageBuilder 

222 

223 Args: 

224 base_image: the base image to use 

225 base_directory: the starting point on your host for relative file locations, 

226 defaulting to the current directory 

227 context: use this path as the build context (if not provided, will create a 

228 temporary directory for the context) 

229 

230 Returns: 

231 The image ID 

232 """ 

233 self.base_directory = base_directory or context or Path().absolute() 

234 self.temporary_directory = None 

235 self.context = context 

236 self.platform = platform 

237 self.dockerfile_lines = [] 

238 

239 if self.context: 

240 dockerfile_path: Path = self.context / "Dockerfile" 

241 if dockerfile_path.exists(): 

242 raise ValueError(f"There is already a Dockerfile at {context}") 

243 

244 self.add_line(f"FROM {base_image}") 

245 

246 def __enter__(self) -> Self: 1a

247 if self.context and not self.temporary_directory: 

248 return self 

249 

250 self.temporary_directory = TemporaryDirectory() 

251 self.context = Path(self.temporary_directory.__enter__()) 

252 return self 

253 

254 def __exit__( 1a

255 self, exc: type[BaseException], value: BaseException, traceback: TracebackType 

256 ) -> None: 

257 if not self.temporary_directory: 

258 return 

259 

260 self.temporary_directory.__exit__(exc, value, traceback) 

261 self.temporary_directory = None 

262 self.context = None 

263 

264 def add_line(self, line: str) -> None: 1a

265 """Add a line to this image's Dockerfile""" 

266 self.add_lines([line]) 

267 

268 def add_lines(self, lines: Iterable[str]) -> None: 1a

269 """Add lines to this image's Dockerfile""" 

270 self.dockerfile_lines.extend(lines) 

271 

272 def copy( 1a

273 self, source: Union[str, Path], destination: Union[str, PurePosixPath] 

274 ) -> None: 

275 """Copy a file to this image""" 

276 if not self.context: 

277 raise Exception("No context available") 

278 

279 if not isinstance(destination, PurePosixPath): 

280 destination = PurePosixPath(destination) 

281 

282 if not isinstance(source, Path): 

283 source = Path(source) 

284 

285 if source.is_absolute(): 

286 source = source.resolve().relative_to(self.base_directory) 

287 

288 if self.temporary_directory: 

289 os.makedirs(self.context / source.parent, exist_ok=True) 

290 

291 if source.is_dir(): 

292 shutil.copytree(self.base_directory / source, self.context / source) 

293 else: 

294 shutil.copy2(self.base_directory / source, self.context / source) 

295 

296 self.add_line(f"COPY {source} {destination}") 

297 

298 def write_text(self, text: str, destination: Union[str, PurePosixPath]) -> None: 1a

299 if not self.context: 

300 raise Exception("No context available") 

301 

302 if not isinstance(destination, PurePosixPath): 

303 destination = PurePosixPath(destination) 

304 

305 source_hash = hashlib.sha256(text.encode()).hexdigest() 

306 (self.context / f".{source_hash}").write_text(text) 

307 self.add_line(f"COPY .{source_hash} {destination}") 

308 

309 def build( 1a

310 self, pull: bool = False, stream_progress_to: Optional[TextIO] = None 

311 ) -> str: 

312 """Build the Docker image from the current state of the ImageBuilder 

313 

314 Args: 

315 pull: True to pull the base image during the build 

316 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) 

317 that will collect the build output as it is reported by Docker 

318 

319 Returns: 

320 The image ID 

321 """ 

322 assert self.context is not None 

323 dockerfile_path: Path = self.context / "Dockerfile" 

324 

325 with dockerfile_path.open("w") as dockerfile: 

326 dockerfile.writelines(line + "\n" for line in self.dockerfile_lines) 

327 

328 try: 

329 return build_image( 

330 self.context, 

331 platform=self.platform, 

332 pull=pull, 

333 stream_progress_to=stream_progress_to, 

334 ) 

335 finally: 

336 os.unlink(dockerfile_path) 

337 

338 def assert_has_line(self, line: str) -> None: 1a

339 """Asserts that the given line is in the Dockerfile""" 

340 all_lines = "\n".join( 

341 [f" {i + 1:>3}: {line}" for i, line in enumerate(self.dockerfile_lines)] 

342 ) 

343 message = ( 

344 f"Expected {line!r} not found in Dockerfile. Dockerfile:\n{all_lines}" 

345 ) 

346 assert line in self.dockerfile_lines, message 

347 

348 def assert_line_absent(self, line: str) -> None: 1a

349 """Asserts that the given line is absent from the Dockerfile""" 

350 if line not in self.dockerfile_lines: 

351 return 

352 

353 i = self.dockerfile_lines.index(line) 

354 

355 surrounding_lines = "\n".join( 

356 [ 

357 f" {i + 1:>3}: {line}" 

358 for i, line in enumerate(self.dockerfile_lines[i - 2 : i + 2]) 

359 ] 

360 ) 

361 message = ( 

362 f"Unexpected {line!r} found in Dockerfile at line {i + 1}. " 

363 f"Surrounding lines:\n{surrounding_lines}" 

364 ) 

365 

366 assert line not in self.dockerfile_lines, message 

367 

368 def assert_line_before(self, first: str, second: str) -> None: 1a

369 """Asserts that the first line appears before the second line""" 

370 self.assert_has_line(first) 

371 self.assert_has_line(second) 

372 

373 first_index = self.dockerfile_lines.index(first) 

374 second_index = self.dockerfile_lines.index(second) 

375 

376 surrounding_lines = "\n".join( 

377 [ 

378 f" {i + 1:>3}: {line}" 

379 for i, line in enumerate( 

380 self.dockerfile_lines[second_index - 2 : first_index + 2] 

381 ) 

382 ] 

383 ) 

384 

385 message = ( 

386 f"Expected {first!r} to appear before {second!r} in the Dockerfile, but " 

387 f"{first!r} was at line {first_index + 1} and {second!r} as at line " 

388 f"{second_index + 1}. Surrounding lines:\n{surrounding_lines}" 

389 ) 

390 

391 assert first_index < second_index, message 

392 

393 def assert_line_after(self, second: str, first: str) -> None: 1a

394 """Asserts that the second line appears after the first line""" 

395 self.assert_line_before(first, second) 

396 

397 def assert_has_file(self, source: Path, container_path: PurePosixPath) -> None: 1a

398 """Asserts that the given file or directory will be copied into the container 

399 at the given path""" 

400 if source.is_absolute(): 

401 source = source.relative_to(self.base_directory) 

402 

403 self.assert_has_line(f"COPY {source} {container_path}") 

404 

405 

406class PushError(Exception): 1a

407 """Raised when a Docker image push fails""" 

408 

409 

410@silence_docker_warnings() 1a

411def push_image( 1a

412 image_id: str, 

413 registry_url: str, 

414 name: str, 

415 tag: Optional[str] = None, 

416 stream_progress_to: Optional[TextIO] = None, 

417) -> str: 

418 """Pushes a local image to a Docker registry, returning the registry-qualified tag 

419 for that image 

420 

421 This assumes that the environment's Docker daemon is already authenticated to the 

422 given registry, and currently makes no attempt to authenticate. 

423 

424 Args: 

425 image_id (str): a Docker image ID 

426 registry_url (str): the URL of a Docker registry 

427 name (str): the name of this image 

428 tag (str): the tag to give this image (defaults to a short representation of 

429 the image's ID) 

430 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that 

431 will collect the build output as it is reported by Docker 

432 

433 Returns: 

434 A registry-qualified tag, like my-registry.example.com/my-image:abcdefg 

435 """ 

436 

437 if not tag: 

438 tag = slugify(prefect.types._datetime.now("UTC").isoformat()) 

439 

440 _, registry, _, _, _ = urlsplit(registry_url) 

441 repository = f"{registry}/{name}" 

442 

443 with docker_client() as client: 

444 image: "Image" = client.images.get(image_id) 

445 image.tag(repository, tag=tag) # type: ignore # typing stub is not complete 

446 events = cast( 

447 Iterator[dict[str, Any]], 

448 client.api.push(repository, tag=tag, stream=True, decode=True), # type: ignore # typing stub is not complete 

449 ) 

450 try: 

451 for event in events: 

452 if "status" in event: 

453 if not stream_progress_to: 

454 continue 

455 stream_progress_to.write(event["status"]) 

456 if "progress" in event: 

457 stream_progress_to.write(" " + event["progress"]) 

458 stream_progress_to.write("\n") 

459 stream_progress_to.flush() 

460 elif "error" in event: 

461 raise PushError(event["error"]) 

462 finally: 

463 client.api.remove_image(f"{repository}:{tag}", noprune=True) # type: ignore # typing stub is not complete 

464 

465 return f"{repository}:{tag}" 

466 

467 

468def to_run_command(command: list[str]) -> str: 1a

469 """ 

470 Convert a process-style list of command arguments to a single Dockerfile RUN 

471 instruction. 

472 """ 

473 if not command: 

474 return "" 

475 

476 run_command = f"RUN {command[0]}" 

477 if len(command) > 1: 

478 run_command += " " + " ".join([repr(arg) for arg in command[1:]]) 

479 

480 # TODO: Consider performing text-wrapping to improve readability of the generated 

481 # Dockerfile 

482 # return textwrap.wrap( 

483 # run_command, 

484 # subsequent_indent=" " * 4, 

485 # break_on_hyphens=False, 

486 # break_long_words=False 

487 # ) 

488 

489 return run_command 

490 

491 

492def parse_image_tag(name: str) -> tuple[str, Optional[str]]: 1a

493 """ 

494 Parse Docker Image String 

495 

496 - If a tag or digest exists, this function parses and returns the image registry and tag/digest, 

497 separately as a tuple. 

498 - Example 1: 'prefecthq/prefect:latest' -> ('prefecthq/prefect', 'latest') 

499 - Example 2: 'hostname.io:5050/folder/subfolder:latest' -> ('hostname.io:5050/folder/subfolder', 'latest') 

500 - Example 3: 'prefecthq/prefect@sha256:abc123' -> ('prefecthq/prefect', 'sha256:abc123') 

501 - Supports parsing Docker Image strings that follow Docker Image Specification v1.1.0 

502 - Image building tools typically enforce this standard 

503 

504 Args: 

505 name (str): Name of Docker Image 

506 

507 Return: 

508 tuple: image registry, image tag/digest 

509 """ 

510 tag = None 

511 name_parts = name.split("/") 

512 

513 # First handles the simplest image names (DockerHub-based, index-free, potentially with a tag or digest) 

514 # - Example: simplename:latest or simplename@sha256:abc123 

515 if len(name_parts) == 1: 

516 if "@" in name_parts[0]: 

517 image_name, tag = name_parts[0].split("@") 

518 elif ":" in name_parts[0]: 

519 image_name, tag = name_parts[0].split(":") 

520 

521 else: 

522 image_name = name_parts[0] 

523 else: 

524 # 1. Separates index (hostname.io or prefecthq) from path:tag (folder/subfolder:latest or prefect:latest) 

525 # 2. Separates path and tag/digest (if exists) 

526 # 3. Reunites index and path (without tag/digest) as image name 

527 index_name = name_parts[0] 

528 image_path = "/".join(name_parts[1:]) 

529 

530 if "@" in image_path: 

531 image_path, tag = image_path.split("@") 

532 elif ":" in image_path: 

533 image_path, tag = image_path.split(":") 

534 

535 image_name = f"{index_name}/{image_path}" 

536 

537 return image_name, tag 

538 

539 

540def split_repository_path(repository_path: str) -> tuple[Optional[str], str]: 1a

541 """ 

542 Splits a Docker repository path into its namespace and repository components. 

543 

544 Args: 

545 repository_path: The Docker repository path to split. 

546 

547 Returns: 

548 Tuple[Optional[str], str]: A tuple containing the namespace and repository components. 

549 - namespace (Optional[str]): The Docker namespace, combining the registry and organization. None if not present. 

550 - repository (Optionals[str]): The repository name. 

551 """ 

552 parts = repository_path.split("/", 2) 

553 

554 # Check if the path includes a registry and organization or just organization/repository 

555 if len(parts) == 3 or (len(parts) == 2 and ("." in parts[0] or ":" in parts[0])): 

556 # Namespace includes registry and organization 

557 namespace = "/".join(parts[:-1]) 

558 repository = parts[-1] 

559 elif len(parts) == 2: 

560 # Only organization/repository provided, so namespace is just the first part 

561 namespace = parts[0] 

562 repository = parts[1] 

563 else: 

564 # No namespace provided 

565 namespace = None 

566 repository = parts[0] 

567 

568 return namespace, repository 

569 

570 

571def format_outlier_version_name(version: str) -> str: 1a

572 """ 

573 Formats outlier docker version names to pass `packaging.version.parse` validation 

574 - Current cases are simple, but creates stub for more complicated formatting if eventually needed. 

575 - Example outlier versions that throw a parsing exception: 

576 - "20.10.0-ce" (variant of community edition label) 

577 - "20.10.0-ee" (variant of enterprise edition label) 

578 

579 Args: 

580 version (str): raw docker version value 

581 

582 Returns: 

583 str: value that can pass `packaging.version.parse` validation 

584 """ 

585 return version.replace("-ce", "").replace("-ee", "") 

586 

587 

588@contextmanager 1a

589def generate_default_dockerfile(context: Optional[Path] = None): 1a

590 """ 

591 Generates a default Dockerfile used for deploying flows. The Dockerfile is written 

592 to a temporary file and yielded. The temporary file is removed after the context 

593 manager exits. 

594 

595 Args: 

596 - context: The context to use for the Dockerfile. Defaults to 

597 the current working directory. 

598 """ 

599 if not context: 

600 context = Path.cwd() 

601 lines: list[str] = [] 

602 base_image = get_prefect_image_name() 

603 lines.append(f"FROM {base_image}") 

604 dir_name = context.name 

605 

606 if (context / "requirements.txt").exists(): 

607 lines.append(f"COPY requirements.txt /opt/prefect/{dir_name}/requirements.txt") 

608 lines.append(f"RUN uv pip install -r /opt/prefect/{dir_name}/requirements.txt") 

609 

610 lines.append(f"COPY . /opt/prefect/{dir_name}/") 

611 lines.append(f"WORKDIR /opt/prefect/{dir_name}/") 

612 

613 temp_dockerfile = context / "Dockerfile" 

614 if Path(temp_dockerfile).exists(): 

615 raise RuntimeError( 

616 "Failed to generate Dockerfile. Dockerfile already exists in the" 

617 " current directory." 

618 ) 

619 

620 with Path(temp_dockerfile).open("w") as f: 

621 f.writelines(line + "\n" for line in lines) 

622 

623 try: 

624 yield temp_dockerfile 

625 finally: 

626 temp_dockerfile.unlink()