Source code for flytekit.image_spec.image_spec

import base64
import copy
import hashlib
import os
import pathlib
import typing
from abc import abstractmethod
from dataclasses import asdict, dataclass
from functools import lru_cache
from importlib import metadata
from typing import Dict, List, Optional, Tuple, Union

import click
import requests
from packaging.version import Version

from flytekit.exceptions.user import FlyteAssertion

DOCKER_HUB = "docker.io"
_F_IMG_ID = "_F_IMG_ID"
FLYTE_FORCE_PUSH_IMAGE_SPEC = "FLYTE_FORCE_PUSH_IMAGE_SPEC"


[docs] @dataclass class ImageSpec: """ This class is used to specify the docker image that will be used to run the task. Args: name: name of the image. python_version: python version of the image. Use default python in the base image if None. builder: Type of plugin to build the image. Use envd by default. source_root: source root of the image. env: environment variables of the image. registry: registry of the image. packages: list of python packages to install. conda_packages: list of conda packages to install. conda_channels: list of conda channels. requirements: path to the requirements.txt file. apt_packages: list of apt packages to install. cuda: version of cuda to install. cudnn: version of cudnn to install. base_image: base image of the image. platform: Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64 pip_index: Specify the custom pip index url pip_extra_index_url: Specify one or more pip index urls as a list registry_config: Specify the path to a JSON registry config file commands: Command to run during the building process tag_format: Custom string format for image tag. The ImageSpec hash passed in as `spec_hash`. For example, to add a "dev" suffix to the image tag, set `tag_format="{spec_hash}-dev"` """ name: str = "flytekit" python_version: str = None # Use default python in the base image if None. builder: Optional[str] = None source_root: Optional[str] = None env: Optional[typing.Dict[str, str]] = None registry: Optional[str] = None packages: Optional[List[str]] = None conda_packages: Optional[List[str]] = None conda_channels: Optional[List[str]] = None requirements: Optional[str] = None apt_packages: Optional[List[str]] = None cuda: Optional[str] = None cudnn: Optional[str] = None base_image: Optional[Union[str, "ImageSpec"]] = None platform: str = "linux/amd64" pip_index: Optional[str] = None pip_extra_index_url: Optional[List[str]] = None registry_config: Optional[str] = None commands: Optional[List[str]] = None tag_format: Optional[str] = None def __post_init__(self): self.name = self.name.lower() self._is_force_push = os.environ.get(FLYTE_FORCE_PUSH_IMAGE_SPEC, False) # False by default if self.registry: self.registry = self.registry.lower()
[docs] def image_name(self) -> str: """Full image name with tag.""" image_name = self._image_name() try: return ImageBuildEngine._IMAGE_NAME_TO_REAL_NAME[image_name] except KeyError: return image_name
def _image_name(self) -> str: """Construct full image name with tag.""" tag = calculate_hash_from_image_spec(self) if self.tag_format: tag = self.tag_format.format(spec_hash=tag) container_image = f"{self.name}:{tag}" if self.registry: container_image = f"{self.registry}/{container_image}" return container_image
[docs] def is_container(self) -> bool: from flytekit.core.context_manager import ExecutionState, FlyteContextManager state = FlyteContextManager.current_context().execution_state if state and state.mode and state.mode != ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION: return os.environ.get(_F_IMG_ID) == self.image_name() return True
[docs] @lru_cache def exist(self) -> bool: """ Check if the image exists in the registry. """ import docker from docker.errors import APIError, ImageNotFound try: client = docker.from_env() if self.registry: client.images.get_registry_data(self.image_name()) else: client.images.get(self.image_name()) return True except APIError as e: if e.response.status_code == 404: return False return True except ImageNotFound: return False except Exception as e: tag = calculate_hash_from_image_spec(self) # if docker engine is not running locally container_registry = DOCKER_HUB if self.registry and "/" in self.registry: container_registry = self.registry.split("/")[0] if container_registry == DOCKER_HUB: url = f"https://hub.docker.com/v2/repositories/{self.registry}/{self.name}/tags/{tag}" response = requests.get(url) if response.status_code == 200: return True if response.status_code == 404 and "not found" in str(response.content): return False click.secho(f"Failed to check if the image exists with error : {e}", fg="red") click.secho("Flytekit assumes that the image already exists.", fg="blue") return True
def __hash__(self): return hash(asdict(self).__str__())
[docs] def with_commands(self, commands: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image spec with an additional list of commands that will be executed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.commands is None: new_image_spec.commands = [] if isinstance(commands, List): new_image_spec.commands.extend(commands) else: new_image_spec.commands.append(commands) return new_image_spec
[docs] def with_packages(self, packages: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image speck with additional python packages that will be installed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.packages is None: new_image_spec.packages = [] if isinstance(packages, List): new_image_spec.packages.extend(packages) else: new_image_spec.packages.append(packages) return new_image_spec
[docs] def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image spec with additional list of apt packages that will be executed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.apt_packages is None: new_image_spec.apt_packages = [] if isinstance(apt_packages, List): new_image_spec.apt_packages.extend(apt_packages) else: new_image_spec.apt_packages.append(apt_packages) return new_image_spec
[docs] def force_push(self) -> "ImageSpec": """ Builder that returns a new image spec with force push enabled. """ new_image_spec = copy.deepcopy(self) new_image_spec._is_force_push = True return new_image_spec
class ImageSpecBuilder: @abstractmethod def build_image(self, image_spec: ImageSpec) -> Optional[str]: """ Build the docker image and push it to the registry. Args: image_spec: image spec of the task. Returns: fully_qualified_image_name: Fully qualified image name. If None, then `image_spec.image_name()` is used. """ raise NotImplementedError("This method is not implemented in the base class.") class ImageBuildEngine: """ ImageBuildEngine contains a list of builders that can be used to build an ImageSpec. """ _REGISTRY: typing.Dict[str, Tuple[ImageSpecBuilder, int]] = {} # _IMAGE_NAME_TO_REAL_NAME is used to keep track of the fully qualified image name # returned by the image builder. This allows ImageSpec to map from `image_spc.image_name()` # to the real qualified name. _IMAGE_NAME_TO_REAL_NAME: Dict[str, str] = {} @classmethod def register(cls, builder_type: str, image_spec_builder: ImageSpecBuilder, priority: int = 5): cls._REGISTRY[builder_type] = (image_spec_builder, priority) @classmethod @lru_cache def build(cls, image_spec: ImageSpec) -> str: if isinstance(image_spec.base_image, ImageSpec): cls.build(image_spec.base_image) image_spec.base_image = image_spec.base_image.image_name() if image_spec.builder is None and cls._REGISTRY: builder = max(cls._REGISTRY, key=lambda name: cls._REGISTRY[name][1]) else: builder = image_spec.builder img_name = image_spec.image_name() if image_spec.exist(): if image_spec._is_force_push: click.secho(f"Image {img_name} found. but overwriting existing image.", fg="blue") cls._build_image(builder, image_spec, img_name) else: click.secho(f"Image {img_name} found. Skip building.", fg="blue") else: click.secho(f"Image {img_name} not found. building...", fg="blue") cls._build_image(builder, image_spec, img_name) @classmethod def _build_image(cls, builder, image_spec, img_name): if builder not in cls._REGISTRY: raise Exception(f"Builder {builder} is not registered.") if builder == "envd": envd_version = metadata.version("envd") # flytekit v1.10.2+ copies the workflow code to the WorkDir specified in the Dockerfile. However, envd<0.3.39 # overwrites the WorkDir when building the image, resulting in a permission issue when flytekit downloads the file. if Version(envd_version) < Version("0.3.39"): raise FlyteAssertion( f"envd version {envd_version} is not compatible with flytekit>v1.10.2." f" Please upgrade envd to v0.3.39+." ) fully_qualified_image_name = cls._REGISTRY[builder][0].build_image(image_spec) if fully_qualified_image_name is not None: cls._IMAGE_NAME_TO_REAL_NAME[img_name] = fully_qualified_image_name @lru_cache def calculate_hash_from_image_spec(image_spec: ImageSpec): """ Calculate the hash from the image spec. """ # copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different. spec = copy.deepcopy(image_spec) if isinstance(spec.base_image, ImageSpec): spec.base_image = spec.base_image.image_name() if image_spec.source_root: from flytekit.tools.fast_registration import compute_digest from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore ignore = IgnoreGroup(image_spec.source_root, [GitIgnore, DockerIgnore, StandardIgnore]) digest = compute_digest(image_spec.source_root, ignore.is_ignored) spec.source_root = digest if spec.requirements: spec.requirements = hashlib.sha1(pathlib.Path(spec.requirements).read_bytes()).hexdigest() # won't rebuild the image if we change the registry_config path spec.registry_config = None image_spec_bytes = asdict(spec).__str__().encode("utf-8") tag = base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=") # replace "-" with "_" to make it a valid tag return tag.replace("-", "_") def hash_directory(path): """ Return the SHA-256 hash of the directory at the given path. """ hasher = hashlib.sha256() for root, dirs, files in os.walk(path): for file in files: with open(os.path.join(root, file), "rb") as f: while True: # Read file in small chunks to avoid loading large files into memory all at once chunk = f.read(4096) if not chunk: break hasher.update(chunk) return bytes(hasher.hexdigest(), "utf-8")