# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import copy
import json
import logging
import shutil
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import docker
from docker.errors import BuildError, ContainerError
import olive.systems.docker.utils as docker_utils
from olive.common.config_utils import ParamCategory, validate_config
from olive.evaluator.metric_result import MetricResult
from olive.hardware import Device
from olive.model import ModelConfig
from olive.systems.common import AcceleratorConfig, LocalDockerConfig, SystemType
from olive.systems.olive_system import OliveSystem
from olive.systems.system_config import DockerTargetUserConfig
if TYPE_CHECKING:
from olive.evaluator.metric import Metric
from olive.evaluator.olive_evaluator import OliveEvaluatorConfig
from olive.hardware.accelerator import AcceleratorSpec
from olive.passes import Pass
logger = logging.getLogger(__name__)
[docs]
class DockerSystem(OliveSystem):
system_type = SystemType.Docker
BASE_DOCKERFILE = "Dockerfile"
def __init__(
self,
local_docker_config: Union[Dict[str, Any], LocalDockerConfig],
accelerators: List[AcceleratorConfig] = None,
is_dev: bool = False,
hf_token: bool = None,
requirements_file: Optional[Union[Path, str]] = None,
**kwargs, # used to hold the rest of the arguments not used by dockersystem.
):
super().__init__(accelerators=accelerators, hf_token=hf_token)
logger.info("Initializing Docker System...")
self.is_dev = is_dev
self.docker_client = docker.from_env()
if local_docker_config is None:
raise ValueError("local_docker_config cannot be None.")
local_docker_config = validate_config(local_docker_config, LocalDockerConfig)
if not local_docker_config.build_context_path and not local_docker_config.dockerfile and not requirements_file:
raise ValueError("build_context_path, dockerfile and requirements_file cannot be None at the same time.")
self.config = DockerTargetUserConfig(**locals(), **kwargs)
self.run_params = local_docker_config.run_params
try:
self.image = self.docker_client.images.get(local_docker_config.image_name)
logger.info("Image %s found", local_docker_config.image_name)
except docker.errors.ImageNotFound:
with tempfile.TemporaryDirectory() as tempdir:
build_context_path = tempdir
if local_docker_config.build_context_path and local_docker_config.dockerfile:
dockerfile = local_docker_config.dockerfile
dockerfile_path = Path(local_docker_config.build_context_path) / dockerfile
shutil.copytree(local_docker_config.build_context_path, build_context_path, dirs_exist_ok=True)
else:
dockerfile = self.BASE_DOCKERFILE
dockerfile_path = Path(__file__).resolve().parent / self.BASE_DOCKERFILE
shutil.copy2(dockerfile_path, build_context_path)
if requirements_file:
shutil.copyfile(requirements_file, Path(build_context_path) / "requirements.txt")
else:
requirements_dest = Path(build_context_path) / "requirements.txt"
if not requirements_dest.exists():
with (Path(build_context_path) / "requirements.txt").open("w"):
pass
logger.info(
"Building image from Dockerfile %s with buildargs %s ",
dockerfile_path,
local_docker_config.build_args,
)
try:
self.image, build_logs = self.docker_client.images.build(
path=build_context_path,
dockerfile=dockerfile,
tag=local_docker_config.image_name,
buildargs=local_docker_config.build_args,
)
logger.info("Image %s build successfully.", local_docker_config.image_name)
_print_docker_logs(build_logs, logging.DEBUG)
except BuildError as e:
logger.exception("Image build failed with error.")
_print_docker_logs(e.build_log, logging.ERROR)
raise
def run_pass(
self,
the_pass: "Pass",
model_config: "ModelConfig",
output_model_path: str,
point: Optional[Dict[str, Any]] = None,
) -> "ModelConfig":
"""Run the pass on the model at a specific point in the search space."""
with tempfile.TemporaryDirectory() as tempdir:
return self._run_pass_container(Path(tempdir), the_pass, model_config, output_model_path, point)
def _run_pass_container(
self,
workdir: Path,
the_pass: "Pass",
model_config: "ModelConfig",
output_model_path: str,
point: Optional[Dict[str, Any]] = None,
) -> "ModelConfig":
point = point or {}
config = the_pass.config_at_search_point(point)
pass_config = the_pass.to_json(check_object=True)
pass_config["config"].update(the_pass.serialize_config(config, check_object=True))
volumes_list = []
runner_output_path = "runner_output"
runner_output_name = "runner_res.json"
container_root_path = Path("/olive-ws/")
# mount pass_runner script
docker_runner_path, pass_runner_file_mount_str = docker_utils.create_runner_script_mount(container_root_path)
volumes_list.append(pass_runner_file_mount_str)
# mount dev stuff
if self.is_dev:
_, dev_mount_str = docker_utils.create_dev_mount(workdir, container_root_path)
volumes_list.append(dev_mount_str)
# mount model
docker_model_files, model_mount_str_list, mount_model_to_local = docker_utils.create_model_mount(
model_config=model_config, container_root_path=container_root_path
)
volumes_list.extend(model_mount_str_list)
# data_dir or data_config
docker_data_paths, data_mount_str_list = self._create_data_mounts_for_pass(container_root_path, the_pass)
volumes_list.extend(data_mount_str_list)
# mount config file
data = self._create_runner_config(model_config, pass_config, docker_model_files, docker_data_paths)
logger.debug("Runner config is %s", data)
docker_config_file, config_file_mount_str = docker_utils.create_config_file(
workdir=workdir,
config=data,
container_root_path=container_root_path,
)
volumes_list.append(config_file_mount_str)
# output mount
output_local_path, docker_output_path, output_mount_str = docker_utils.create_output_mount(
workdir=workdir,
docker_output_path=runner_output_path,
container_root_path=container_root_path,
)
volumes_list.append(output_mount_str)
logger.debug("The volumes list is %s", volumes_list)
runner_command = docker_utils.create_runner_command(
runner_script_path=docker_runner_path,
config_path=docker_config_file,
output_path=docker_output_path,
output_name=runner_output_name,
)
model_output_json_file = self._run_container(
runner_command, volumes_list, output_local_path, runner_output_name, the_pass.accelerator_spec
)
if model_output_json_file.is_file():
with model_output_json_file.open() as f:
model_output = json.load(f)
output_model = ModelConfig.parse_obj(model_output)
logger.debug("Copying model from %s to %s", output_local_path, output_model_path)
shutil.copytree(output_local_path, output_model_path, dirs_exist_ok=True)
logger.debug("mount_model_to_local: %s", mount_model_to_local)
for resource_name, resource_str in output_model.get_resource_strings().items():
if not resource_str:
continue
logger.debug("Resource %s path: %s", resource_name, resource_str)
original_resource_path = mount_model_to_local.get(resource_str)
if original_resource_path:
# If the output model path is something like /olive-ws/model.onnx
# we need replace with the original model path
output_model.config[resource_name] = original_resource_path
logger.info("Original resource path for %s is: %s", resource_str, original_resource_path)
continue
# output_local_path should be something like: /tmp/tmpd1sjw9xa/runner_output
# If there are any output models, they will be saved in that path
# and the output_model.config["model_path"] would like /olive-ws/runner_output/model.onnx
# the model path should starts with /olive-ws/runner_output
assert resource_str.startswith(docker_output_path)
candidate_resource_path = resource_str.replace(docker_output_path, output_model_path)
output_model.config[resource_name] = candidate_resource_path
logger.debug("Model path is: %s", output_model.config["model_path"])
return output_model
else:
logger.error("Model output file %s not found.", model_output_json_file)
return None
def evaluate_model(
self, model_config: "ModelConfig", evaluator_config: "OliveEvaluatorConfig", accelerator: "AcceleratorSpec"
) -> Dict[str, Any]:
container_root_path = Path("/olive-ws/")
with tempfile.TemporaryDirectory() as tempdir:
metric_json = self._run_eval_container(
tempdir, model_config, evaluator_config, accelerator, container_root_path
)
if metric_json.is_file():
with metric_json.open() as f:
metrics_res = json.load(f)
return MetricResult.parse_obj(metrics_res)
else:
logger.error("Metric result file %s not found.", metric_json)
return None
def _run_eval_container(
self,
workdir,
model_config: "ModelConfig",
evaluator_config: "OliveEvaluatorConfig",
accelerator: "AcceleratorSpec",
container_root_path: Path,
):
eval_output_path = "eval_output"
eval_output_name = "eval_res.json"
volumes_list = []
# mount eval script
eval_file_mount_path, eval_file_mount_str = docker_utils.create_eval_script_mount(container_root_path)
volumes_list.append(eval_file_mount_str)
# mount dev stuff
if self.is_dev:
_, dev_mount_str = docker_utils.create_dev_mount(workdir, container_root_path)
volumes_list.append(dev_mount_str)
# mount model
model_mounts, model_mount_str_list, _ = docker_utils.create_model_mount(
model_config=model_config, container_root_path=container_root_path
)
volumes_list += model_mount_str_list
metrics_copy = copy.deepcopy(evaluator_config.metrics)
# mount metrics related external files
volumes_list.extend(
# the metrics_copy is modified when creating the volumes list
docker_utils.create_metric_volumes_list(
metrics=metrics_copy,
container_root_path=container_root_path,
)
)
# mount config file
data = self._create_eval_config(model_config, metrics_copy, model_mounts)
config_mount_path, config_file_mount_str = docker_utils.create_config_file(
workdir=workdir,
config=data,
container_root_path=container_root_path,
)
volumes_list.append(config_file_mount_str)
output_local_path, output_mount_path, output_mount_str = docker_utils.create_output_mount(
workdir=workdir,
docker_output_path=eval_output_path,
container_root_path=container_root_path,
)
volumes_list.append(output_mount_str)
logger.debug("The volumes list is %s", volumes_list)
eval_command = docker_utils.create_evaluate_command(
eval_script_path=eval_file_mount_path,
config_path=config_mount_path,
output_path=output_mount_path,
output_name=eval_output_name,
accelerator=accelerator,
)
return self._run_container(eval_command, volumes_list, output_local_path, eval_output_name, accelerator)
@staticmethod
def _create_eval_config(model_config: "ModelConfig", metrics: List["Metric"], model_mounts: Dict[str, str]):
model_json = model_config.to_json(check_object=True)
for k, v in model_mounts.items():
model_json["config"][k] = v
return {"metrics": [k.to_json(check_object=True) for k in metrics], "model": model_json}
@staticmethod
def _create_runner_config(
model_config: "ModelConfig",
pass_config: Dict[str, Any],
model_mounts: Dict[str, str],
data_mounts: Dict[str, str],
):
model_json = model_config.to_json(check_object=True)
for k, v in model_mounts.items():
model_json["config"][k] = v
pass_config_copy = copy.deepcopy(pass_config)
for k, v in data_mounts.items():
pass_config_copy["config"][k] = v
return {"model": model_json, "pass": pass_config_copy}
def _run_container(
self,
command,
volumes_list: List[str],
output_local_path,
output_name,
accelerator: "AcceleratorSpec",
):
run_command = docker_utils.create_run_command(run_params=self.run_params)
environment = run_command.pop("environment", {})
envs_dict = {"PYTHONPYCACHEPREFIX": "/tmp"}
for k, v in envs_dict.items():
if isinstance(environment, list):
environment = {env.split("=")[0]: env.split("=")[1] for env in environment}
elif isinstance(environment, dict) and not environment.get(k):
environment[k] = v
if self.hf_token:
token = get_huggingface_token()
environment.update({"HF_TOKEN": token})
log_level = logging.getLevelName(logger.getEffectiveLevel())
environment.update({"OLIVE_LOG_LEVEL": log_level})
logger.debug("Running container with command: %s", command)
if accelerator.accelerator_type == Device.GPU:
run_command["device_requests"] = [docker.types.DeviceRequest(capabilities=[["gpu"]])]
container = self.docker_client.containers.run(
image=self.image,
command=command,
volumes=volumes_list,
detach=True,
environment=environment,
**run_command,
)
docker_logs = []
for line in container.logs(stream=True):
# containers.logs can accept stdout/stderr as arguments, but it doesn't work
# as we cannot ensure that all the logs will be printed in the correct channel(out/err)
# so, we collect all the logs and print them in the end if there is an error.
log = line.decode().strip()
logger.debug(log)
docker_logs.append(log)
exit_code = container.wait()["StatusCode"]
container.remove()
if exit_code != 0:
error_msg = "\n".join(docker_logs)
raise ContainerError(
container, exit_code, command, self.image, f"Docker container evaluation failed with: {error_msg}"
)
logger.debug("Docker container run completed successfully")
return Path(output_local_path) / output_name
def _create_data_mounts_for_pass(self, container_root_path: Path, the_pass: "Pass"):
mounts = {}
mount_strs = []
for param, _, category in the_pass.path_params:
param_val = the_pass.config.get(param)
if category == ParamCategory.DATA and param_val:
mount = str(container_root_path / param)
mounts[param] = mount
mount_strs.append(f"{param_val}:{mount}")
return mounts, mount_strs
def remove(self):
self.docker_client.images.remove(self.image.tags[0], force=True)
logger.info("Image %s removed successfully.", self.image.tags[0])
def _print_docker_logs(logs, level=logging.DEBUG):
msgs = []
for log in logs:
if "stream" in log:
msgs.append(str(log["stream"]).strip())
else:
msgs.append(str(log).strip())
message = "\n".join(msgs)
logger.log(level, message)
def get_huggingface_token():
"""Get huggingface token from environment variable or token file."""
import os
if os.getenv("HF_TOKEN"):
return os.getenv("HF_TOKEN")
token_path = Path.home() / ".huggingface" / "token"
if not token_path.exists():
logger.error(
"Huggingface token is required at this step. Could not find huggingface token at %s. "
"Please login to huggingface first using `huggingface-cli login`. "
"If you already logged in, Olive will get token from '~/.huggingface/token' file'. "
"Please make sure the token file exists.",
token_path,
)
return None
with Path(token_path).open() as f:
return f.read().strip()