Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/dockerutils.py: 20%
262 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import hashlib 1a
2import os 1a
3import shutil 1a
4import subprocess 1a
5import sys 1a
6import warnings 1a
7from collections.abc import Generator, Iterable, Iterator 1a
8from contextlib import contextmanager 1a
9from pathlib import Path, PurePosixPath 1a
10from tempfile import TemporaryDirectory 1a
11from types import TracebackType 1a
12from typing import TYPE_CHECKING, Any, Optional, TextIO, Union, cast 1a
13from urllib.parse import urlsplit 1a
15from packaging.version import Version 1a
16from typing_extensions import Self 1a
18import prefect 1a
19import prefect.types._datetime 1a
20from prefect.utilities.importtools import lazy_import 1a
21from prefect.utilities.slugify import slugify 1a
23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a
24 import docker
25 import docker.errors
26 from docker import DockerClient
27 from docker.models.images import Image
29CONTAINER_LABELS = { 1a
30 "io.prefect.version": prefect.__version__,
31}
34def python_version_minor() -> str: 1a
35 return f"{sys.version_info.major}.{sys.version_info.minor}" 1a
38def python_version_micro() -> str: 1a
39 return f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
42def get_prefect_image_name( 1a
43 prefect_version: Optional[str] = None,
44 python_version: Optional[str] = None,
45 flavor: Optional[str] = None,
46) -> str:
47 """
48 Get the Prefect image name matching the current Prefect and Python versions.
50 Args:
51 prefect_version: An optional override for the Prefect version.
52 python_version: An optional override for the Python version; must be at the
53 minor level e.g. '3.9'.
54 flavor: An optional alternative image flavor to build, like 'conda'
55 """
56 parsed_version = Version(prefect_version or prefect.__version__)
57 is_prod_build = parsed_version.local is None
58 try:
59 # Try to get the short SHA from git because it can add additional characters to avoid ambiguity
60 short_sha = (
61 subprocess.check_output(["git", "rev-parse", "--short=7", "HEAD"])
62 .decode("utf-8")
63 .strip()
64 )
65 except Exception:
66 # If git is not available, fallback to the first 7 characters of the full revision ID
67 short_sha = prefect.__version_info__["full-revisionid"][:7]
68 prefect_version = parsed_version.public if is_prod_build else f"sha-{short_sha}"
70 python_version = python_version or python_version_minor()
72 tag = slugify(
73 f"{prefect_version}-python{python_version}" + (f"-{flavor}" if flavor else ""),
74 lowercase=False,
75 max_length=128,
76 # Docker allows these characters for tag names
77 regex_pattern=r"[^a-zA-Z0-9_.-]+",
78 )
80 image = "prefect" if is_prod_build else "prefect-dev"
81 return f"prefecthq/{image}:{tag}"
84@contextmanager 1a
85def silence_docker_warnings() -> Generator[None, None, None]: 1a
86 with warnings.catch_warnings(): 1a
87 # Silence warnings due to use of deprecated methods within dockerpy
88 # See https://github.com/docker/docker-py/pull/2931
89 warnings.filterwarnings( 1a
90 "ignore",
91 message="distutils Version classes are deprecated.*",
92 category=DeprecationWarning,
93 )
95 warnings.filterwarnings( 1a
96 "ignore",
97 message="The distutils package is deprecated and slated for removal.*",
98 category=DeprecationWarning,
99 )
101 yield 1a
104# docker-py has some deprecation warnings that fire off during import, and we don't
105# want to have those popping up in various modules and test suites. Instead,
106# consolidate the imports we need here, and expose them via this module.
107with silence_docker_warnings(): 1a
108 if not TYPE_CHECKING: 108 ↛ 112line 108 didn't jump to line 1121a
109 docker = lazy_import("docker") 1a
112@contextmanager 1a
113def docker_client() -> Generator["DockerClient", None, None]: 1a
114 """Get the environmentally-configured Docker client"""
115 client = None
116 try:
117 with silence_docker_warnings():
118 client = docker.DockerClient.from_env()
120 yield client
121 except docker.errors.DockerException as exc:
122 raise RuntimeError(
123 "This error is often thrown because Docker is not running. Please ensure Docker is running."
124 ) from exc
125 finally:
126 if client is not None:
127 client.close() # type: ignore # typing stub is not complete
130class BuildError(Exception): 1a
131 """Raised when a Docker build fails"""
134# Labels to apply to all images built with Prefect
135IMAGE_LABELS = { 1a
136 "io.prefect.version": prefect.__version__,
137}
140@silence_docker_warnings() 1a
141def build_image( 1a
142 context: Path,
143 dockerfile: str = "Dockerfile",
144 tag: Optional[str] = None,
145 pull: bool = False,
146 platform: Optional[str] = None,
147 stream_progress_to: Optional[TextIO] = None,
148 **kwargs: Any,
149) -> str:
150 """Builds a Docker image, returning the image ID
152 Args:
153 context: the root directory for the Docker build context
154 dockerfile: the path to the Dockerfile, relative to the context
155 tag: the tag to give this image
156 pull: True to pull the base image during the build
157 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
158 will collect the build output as it is reported by Docker
160 Returns:
161 The image ID
162 """
164 if not context:
165 raise ValueError("context required to build an image")
167 if not Path(context).exists():
168 raise ValueError(f"Context path {context} does not exist")
170 kwargs = {key: kwargs[key] for key in kwargs if key not in ["decode", "labels"]}
172 image_id = None
173 with docker_client() as client:
174 events = client.api.build(
175 path=str(context),
176 tag=tag,
177 dockerfile=dockerfile,
178 pull=pull,
179 decode=True,
180 labels=IMAGE_LABELS,
181 platform=platform,
182 **kwargs,
183 )
185 try:
186 for event in events:
187 if "stream" in event:
188 if not stream_progress_to:
189 continue
190 stream_progress_to.write(event["stream"])
191 stream_progress_to.flush()
192 elif "aux" in event:
193 image_id = event["aux"]["ID"]
194 elif "error" in event:
195 raise BuildError(event["error"])
196 elif "message" in event:
197 raise BuildError(event["message"])
198 except docker.errors.APIError as e:
199 raise BuildError(e.explanation) from e
201 assert image_id, "The Docker daemon did not return an image ID"
202 return image_id
205class ImageBuilder: 1a
206 """An interface for preparing Docker build contexts and building images"""
208 base_directory: Path 1a
209 context: Optional[Path] 1a
210 platform: Optional[str] 1a
211 dockerfile_lines: list[str] 1a
212 temporary_directory: Optional[TemporaryDirectory[str]] 1a
214 def __init__( 1a
215 self,
216 base_image: str,
217 base_directory: Optional[Path] = None,
218 platform: Optional[str] = None,
219 context: Optional[Path] = None,
220 ):
221 """Create an ImageBuilder
223 Args:
224 base_image: the base image to use
225 base_directory: the starting point on your host for relative file locations,
226 defaulting to the current directory
227 context: use this path as the build context (if not provided, will create a
228 temporary directory for the context)
230 Returns:
231 The image ID
232 """
233 self.base_directory = base_directory or context or Path().absolute()
234 self.temporary_directory = None
235 self.context = context
236 self.platform = platform
237 self.dockerfile_lines = []
239 if self.context:
240 dockerfile_path: Path = self.context / "Dockerfile"
241 if dockerfile_path.exists():
242 raise ValueError(f"There is already a Dockerfile at {context}")
244 self.add_line(f"FROM {base_image}")
246 def __enter__(self) -> Self: 1a
247 if self.context and not self.temporary_directory:
248 return self
250 self.temporary_directory = TemporaryDirectory()
251 self.context = Path(self.temporary_directory.__enter__())
252 return self
254 def __exit__( 1a
255 self, exc: type[BaseException], value: BaseException, traceback: TracebackType
256 ) -> None:
257 if not self.temporary_directory:
258 return
260 self.temporary_directory.__exit__(exc, value, traceback)
261 self.temporary_directory = None
262 self.context = None
264 def add_line(self, line: str) -> None: 1a
265 """Add a line to this image's Dockerfile"""
266 self.add_lines([line])
268 def add_lines(self, lines: Iterable[str]) -> None: 1a
269 """Add lines to this image's Dockerfile"""
270 self.dockerfile_lines.extend(lines)
272 def copy( 1a
273 self, source: Union[str, Path], destination: Union[str, PurePosixPath]
274 ) -> None:
275 """Copy a file to this image"""
276 if not self.context:
277 raise Exception("No context available")
279 if not isinstance(destination, PurePosixPath):
280 destination = PurePosixPath(destination)
282 if not isinstance(source, Path):
283 source = Path(source)
285 if source.is_absolute():
286 source = source.resolve().relative_to(self.base_directory)
288 if self.temporary_directory:
289 os.makedirs(self.context / source.parent, exist_ok=True)
291 if source.is_dir():
292 shutil.copytree(self.base_directory / source, self.context / source)
293 else:
294 shutil.copy2(self.base_directory / source, self.context / source)
296 self.add_line(f"COPY {source} {destination}")
298 def write_text(self, text: str, destination: Union[str, PurePosixPath]) -> None: 1a
299 if not self.context:
300 raise Exception("No context available")
302 if not isinstance(destination, PurePosixPath):
303 destination = PurePosixPath(destination)
305 source_hash = hashlib.sha256(text.encode()).hexdigest()
306 (self.context / f".{source_hash}").write_text(text)
307 self.add_line(f"COPY .{source_hash} {destination}")
309 def build( 1a
310 self, pull: bool = False, stream_progress_to: Optional[TextIO] = None
311 ) -> str:
312 """Build the Docker image from the current state of the ImageBuilder
314 Args:
315 pull: True to pull the base image during the build
316 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO)
317 that will collect the build output as it is reported by Docker
319 Returns:
320 The image ID
321 """
322 assert self.context is not None
323 dockerfile_path: Path = self.context / "Dockerfile"
325 with dockerfile_path.open("w") as dockerfile:
326 dockerfile.writelines(line + "\n" for line in self.dockerfile_lines)
328 try:
329 return build_image(
330 self.context,
331 platform=self.platform,
332 pull=pull,
333 stream_progress_to=stream_progress_to,
334 )
335 finally:
336 os.unlink(dockerfile_path)
338 def assert_has_line(self, line: str) -> None: 1a
339 """Asserts that the given line is in the Dockerfile"""
340 all_lines = "\n".join(
341 [f" {i + 1:>3}: {line}" for i, line in enumerate(self.dockerfile_lines)]
342 )
343 message = (
344 f"Expected {line!r} not found in Dockerfile. Dockerfile:\n{all_lines}"
345 )
346 assert line in self.dockerfile_lines, message
348 def assert_line_absent(self, line: str) -> None: 1a
349 """Asserts that the given line is absent from the Dockerfile"""
350 if line not in self.dockerfile_lines:
351 return
353 i = self.dockerfile_lines.index(line)
355 surrounding_lines = "\n".join(
356 [
357 f" {i + 1:>3}: {line}"
358 for i, line in enumerate(self.dockerfile_lines[i - 2 : i + 2])
359 ]
360 )
361 message = (
362 f"Unexpected {line!r} found in Dockerfile at line {i + 1}. "
363 f"Surrounding lines:\n{surrounding_lines}"
364 )
366 assert line not in self.dockerfile_lines, message
368 def assert_line_before(self, first: str, second: str) -> None: 1a
369 """Asserts that the first line appears before the second line"""
370 self.assert_has_line(first)
371 self.assert_has_line(second)
373 first_index = self.dockerfile_lines.index(first)
374 second_index = self.dockerfile_lines.index(second)
376 surrounding_lines = "\n".join(
377 [
378 f" {i + 1:>3}: {line}"
379 for i, line in enumerate(
380 self.dockerfile_lines[second_index - 2 : first_index + 2]
381 )
382 ]
383 )
385 message = (
386 f"Expected {first!r} to appear before {second!r} in the Dockerfile, but "
387 f"{first!r} was at line {first_index + 1} and {second!r} as at line "
388 f"{second_index + 1}. Surrounding lines:\n{surrounding_lines}"
389 )
391 assert first_index < second_index, message
393 def assert_line_after(self, second: str, first: str) -> None: 1a
394 """Asserts that the second line appears after the first line"""
395 self.assert_line_before(first, second)
397 def assert_has_file(self, source: Path, container_path: PurePosixPath) -> None: 1a
398 """Asserts that the given file or directory will be copied into the container
399 at the given path"""
400 if source.is_absolute():
401 source = source.relative_to(self.base_directory)
403 self.assert_has_line(f"COPY {source} {container_path}")
406class PushError(Exception): 1a
407 """Raised when a Docker image push fails"""
410@silence_docker_warnings() 1a
411def push_image( 1a
412 image_id: str,
413 registry_url: str,
414 name: str,
415 tag: Optional[str] = None,
416 stream_progress_to: Optional[TextIO] = None,
417) -> str:
418 """Pushes a local image to a Docker registry, returning the registry-qualified tag
419 for that image
421 This assumes that the environment's Docker daemon is already authenticated to the
422 given registry, and currently makes no attempt to authenticate.
424 Args:
425 image_id (str): a Docker image ID
426 registry_url (str): the URL of a Docker registry
427 name (str): the name of this image
428 tag (str): the tag to give this image (defaults to a short representation of
429 the image's ID)
430 stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
431 will collect the build output as it is reported by Docker
433 Returns:
434 A registry-qualified tag, like my-registry.example.com/my-image:abcdefg
435 """
437 if not tag:
438 tag = slugify(prefect.types._datetime.now("UTC").isoformat())
440 _, registry, _, _, _ = urlsplit(registry_url)
441 repository = f"{registry}/{name}"
443 with docker_client() as client:
444 image: "Image" = client.images.get(image_id)
445 image.tag(repository, tag=tag) # type: ignore # typing stub is not complete
446 events = cast(
447 Iterator[dict[str, Any]],
448 client.api.push(repository, tag=tag, stream=True, decode=True), # type: ignore # typing stub is not complete
449 )
450 try:
451 for event in events:
452 if "status" in event:
453 if not stream_progress_to:
454 continue
455 stream_progress_to.write(event["status"])
456 if "progress" in event:
457 stream_progress_to.write(" " + event["progress"])
458 stream_progress_to.write("\n")
459 stream_progress_to.flush()
460 elif "error" in event:
461 raise PushError(event["error"])
462 finally:
463 client.api.remove_image(f"{repository}:{tag}", noprune=True) # type: ignore # typing stub is not complete
465 return f"{repository}:{tag}"
468def to_run_command(command: list[str]) -> str: 1a
469 """
470 Convert a process-style list of command arguments to a single Dockerfile RUN
471 instruction.
472 """
473 if not command:
474 return ""
476 run_command = f"RUN {command[0]}"
477 if len(command) > 1:
478 run_command += " " + " ".join([repr(arg) for arg in command[1:]])
480 # TODO: Consider performing text-wrapping to improve readability of the generated
481 # Dockerfile
482 # return textwrap.wrap(
483 # run_command,
484 # subsequent_indent=" " * 4,
485 # break_on_hyphens=False,
486 # break_long_words=False
487 # )
489 return run_command
492def parse_image_tag(name: str) -> tuple[str, Optional[str]]: 1a
493 """
494 Parse Docker Image String
496 - If a tag or digest exists, this function parses and returns the image registry and tag/digest,
497 separately as a tuple.
498 - Example 1: 'prefecthq/prefect:latest' -> ('prefecthq/prefect', 'latest')
499 - Example 2: 'hostname.io:5050/folder/subfolder:latest' -> ('hostname.io:5050/folder/subfolder', 'latest')
500 - Example 3: 'prefecthq/prefect@sha256:abc123' -> ('prefecthq/prefect', 'sha256:abc123')
501 - Supports parsing Docker Image strings that follow Docker Image Specification v1.1.0
502 - Image building tools typically enforce this standard
504 Args:
505 name (str): Name of Docker Image
507 Return:
508 tuple: image registry, image tag/digest
509 """
510 tag = None
511 name_parts = name.split("/")
513 # First handles the simplest image names (DockerHub-based, index-free, potentially with a tag or digest)
514 # - Example: simplename:latest or simplename@sha256:abc123
515 if len(name_parts) == 1:
516 if "@" in name_parts[0]:
517 image_name, tag = name_parts[0].split("@")
518 elif ":" in name_parts[0]:
519 image_name, tag = name_parts[0].split(":")
521 else:
522 image_name = name_parts[0]
523 else:
524 # 1. Separates index (hostname.io or prefecthq) from path:tag (folder/subfolder:latest or prefect:latest)
525 # 2. Separates path and tag/digest (if exists)
526 # 3. Reunites index and path (without tag/digest) as image name
527 index_name = name_parts[0]
528 image_path = "/".join(name_parts[1:])
530 if "@" in image_path:
531 image_path, tag = image_path.split("@")
532 elif ":" in image_path:
533 image_path, tag = image_path.split(":")
535 image_name = f"{index_name}/{image_path}"
537 return image_name, tag
540def split_repository_path(repository_path: str) -> tuple[Optional[str], str]: 1a
541 """
542 Splits a Docker repository path into its namespace and repository components.
544 Args:
545 repository_path: The Docker repository path to split.
547 Returns:
548 Tuple[Optional[str], str]: A tuple containing the namespace and repository components.
549 - namespace (Optional[str]): The Docker namespace, combining the registry and organization. None if not present.
550 - repository (Optionals[str]): The repository name.
551 """
552 parts = repository_path.split("/", 2)
554 # Check if the path includes a registry and organization or just organization/repository
555 if len(parts) == 3 or (len(parts) == 2 and ("." in parts[0] or ":" in parts[0])):
556 # Namespace includes registry and organization
557 namespace = "/".join(parts[:-1])
558 repository = parts[-1]
559 elif len(parts) == 2:
560 # Only organization/repository provided, so namespace is just the first part
561 namespace = parts[0]
562 repository = parts[1]
563 else:
564 # No namespace provided
565 namespace = None
566 repository = parts[0]
568 return namespace, repository
571def format_outlier_version_name(version: str) -> str: 1a
572 """
573 Formats outlier docker version names to pass `packaging.version.parse` validation
574 - Current cases are simple, but creates stub for more complicated formatting if eventually needed.
575 - Example outlier versions that throw a parsing exception:
576 - "20.10.0-ce" (variant of community edition label)
577 - "20.10.0-ee" (variant of enterprise edition label)
579 Args:
580 version (str): raw docker version value
582 Returns:
583 str: value that can pass `packaging.version.parse` validation
584 """
585 return version.replace("-ce", "").replace("-ee", "")
588@contextmanager 1a
589def generate_default_dockerfile(context: Optional[Path] = None): 1a
590 """
591 Generates a default Dockerfile used for deploying flows. The Dockerfile is written
592 to a temporary file and yielded. The temporary file is removed after the context
593 manager exits.
595 Args:
596 - context: The context to use for the Dockerfile. Defaults to
597 the current working directory.
598 """
599 if not context:
600 context = Path.cwd()
601 lines: list[str] = []
602 base_image = get_prefect_image_name()
603 lines.append(f"FROM {base_image}")
604 dir_name = context.name
606 if (context / "requirements.txt").exists():
607 lines.append(f"COPY requirements.txt /opt/prefect/{dir_name}/requirements.txt")
608 lines.append(f"RUN uv pip install -r /opt/prefect/{dir_name}/requirements.txt")
610 lines.append(f"COPY . /opt/prefect/{dir_name}/")
611 lines.append(f"WORKDIR /opt/prefect/{dir_name}/")
613 temp_dockerfile = context / "Dockerfile"
614 if Path(temp_dockerfile).exists():
615 raise RuntimeError(
616 "Failed to generate Dockerfile. Dockerfile already exists in the"
617 " current directory."
618 )
620 with Path(temp_dockerfile).open("w") as f:
621 f.writelines(line + "\n" for line in lines)
623 try:
624 yield temp_dockerfile
625 finally:
626 temp_dockerfile.unlink()