Source code for olive.systems.isolated_ort.isolated_ort_system

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import collections
import json
import logging
import shutil
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
import torch
from torch.utils.data import Dataset

from olive.common.utils import run_subprocess
from olive.evaluator.metric import get_latency_config_from_metric
from olive.evaluator.olive_evaluator import OliveEvaluator, OliveModelOutput, OnnxEvaluatorMixin
from olive.hardware import Device
from olive.systems.common import AcceleratorConfig, SystemType
from olive.systems.olive_system import OliveSystem
from olive.systems.system_config import IsolatedORTTargetUserConfig
from olive.systems.utils import create_new_environ, run_available_providers_runner

if TYPE_CHECKING:
    from olive.evaluator.metric import Metric
    from olive.evaluator.metric_result import MetricResult
    from olive.hardware.accelerator import AcceleratorSpec
    from olive.model import ModelConfig, ONNXModelHandler
    from olive.passes.olive_pass import Pass

logger = logging.getLogger(__name__)


[docs]class IsolatedORTSystem(OliveSystem): system_type = SystemType.IsolatedORT def __init__( self, python_environment_path: Union[Path, str] = None, environment_variables: Dict[str, str] = None, prepend_to_path: List[str] = None, accelerators: List[AcceleratorConfig] = None, hf_token: bool = None, ): if python_environment_path is None: raise ValueError("python_environment_path is required for PythonEnvironmentSystem.") super().__init__(accelerators=accelerators, hf_token=hf_token) self.config = IsolatedORTTargetUserConfig(**locals()) self.environ = create_new_environ( python_environment_path=python_environment_path, environment_variables=environment_variables, prepend_to_path=prepend_to_path, ) # available eps. This will be populated the first time self.get_supported_execution_providers() is called. # used for caching the available eps self.available_eps = None def run_pass( self, the_pass: "Pass", model_config: "ModelConfig", data_root: str, output_model_path: str, point: Optional[Dict[str, Any]] = None, ) -> "ModelConfig": """Run the pass on the model at a specific point in the search space.""" logger.warning("IsolatedORTSystem does not support running passes.") raise NotImplementedError def evaluate_model( self, model_config: "ModelConfig", data_root: str, metrics: List["Metric"], accelerator: "AcceleratorSpec" ) -> "MetricResult": """Evaluate the model.""" # only onnx model handler is supported if not model_config.type.lower() == "onnxmodel": raise ValueError(f"IsolatedORTSystem only supports evaluation for ONNXModel, got {model_config.type}") device = accelerator.accelerator_type if accelerator else Device.CPU execution_providers = accelerator.execution_provider if accelerator else None model = model_config.create_model() evaluator = IsolatedORTEvaluator(self.environ) return evaluator.evaluate(model, data_root, metrics, device=device, execution_providers=execution_providers) def get_supported_execution_providers(self) -> List[str]: """Get the available execution providers.""" if self.available_eps: return self.available_eps self.available_eps = run_available_providers_runner(self.environ) return self.available_eps def remove(self): raise NotImplementedError("ORT inference system does not support system removal")
class IsolatedORTEvaluator(OliveEvaluator, OnnxEvaluatorMixin, framework="ort_inference"): def __init__(self, environ: Dict[str, str]): super().__init__() assert environ, "environ should not be None" self.environ = environ self.inference_runner_path = Path(__file__).parent.resolve() / "inference_runner.py" self.executable = shutil.which("python", path=self.environ["PATH"]) @classmethod def _get_common_config( cls, model: "ONNXModelHandler", metric: "Metric", device: Device, execution_providers: Union[str, List[str]] ) -> Dict: inference_settings = cls.get_inference_settings(metric, model) inference_settings = model.merge_inference_settings(inference_settings, execution_providers) return { "inference_settings": inference_settings, "use_ort_extensions": model.use_ort_extensions, "io_bind": cls.io_bind_enabled(metric, model.inference_settings), "device": str(device), "share_kv_buffer": metric.user_config.shared_kv_buffer, "use_fp16": any(v == "float16" for v in model.io_config["input_types"]), } def _run_inference(self, **kwargs): """Run inference using the inference runner. :param kwargs: arguments to be passed to the inference runner """ command = [self.executable, str(self.inference_runner_path)] for key, value in kwargs.items(): if not value: continue command.extend([f"--{key}", str(value)]) run_subprocess(command, self.environ, check=True) def _inference( self, model: "ONNXModelHandler", metric: "Metric", dataloader: Dataset, post_func: Callable = None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: inference_config = self._get_common_config(model, metric, device, execution_providers) inference_config["mode"] = "inference" io_config = model.io_config preds = [] targets = [] logits = [] logits_dict = collections.defaultdict(list) output_names = io_config["output_names"] is_single_tensor_output = len(output_names) == 1 with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) # create input and output dir input_dir = temp_dir_path / "input" input_dir.mkdir(parents=True, exist_ok=True) output_dir = temp_dir_path / "output" output_dir.mkdir(parents=True, exist_ok=True) num_batches = 0 for idx, (input_data, labels) in enumerate(dataloader): # save input data np.savez(input_dir / f"input_{idx}.npz", **self.format_input(input_data, io_config)) # save labels targets.append(labels.cpu()) num_batches += 1 inference_config["num_batches"] = num_batches # save inference config config_path = temp_dir_path / "config.json" with config_path.open("w") as f: json.dump(inference_config, f) logger.debug("Inference config: %s", inference_config) # run inference self._run_inference( config_path=config_path, model_path=model.model_path, input_dir=input_dir, output_dir=output_dir, external_initializers_path=model.external_initializers_path, constant_inputs_path=model.constant_inputs_path, ) # load and process output for idx in range(num_batches): result = np.load(output_dir / f"output_{idx}.npy") if is_single_tensor_output: result = torch.Tensor(result[0]) else: result = {name: torch.Tensor(result[i]) for i, name in enumerate(output_names)} outputs = post_func(result) if post_func else result # keep as numpy or torch arrays preds.append(outputs.cpu()) if is_single_tensor_output: logits.append(result.cpu()) else: for k in output_names: logits_dict[k].append(result[k].cpu()) preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) if is_single_tensor_output: logits = torch.cat(logits, dim=0) else: logits = {k: torch.cat(logits[k], dim=0) for k in output_names} return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: "ONNXModelHandler", data_root: str, metric: "Metric", dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> "MetricResult": inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_raw_latency( self, model: "ONNXModelHandler", data_root: str, metric: "Metric", dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: """For given repeat_test_num, return a list of latencies(ms).""" inference_config = self._get_common_config(model, metric, device, execution_providers) warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) inference_config.update( { "mode": "latency", "warmup_num": warmup_num, "repeat_test_num": repeat_test_num, "sleep_num": sleep_num, } ) io_config = model.io_config with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) # create input and output dir input_dir = temp_dir_path / "input" input_dir.mkdir(parents=True, exist_ok=True) output_dir = temp_dir_path / "output" output_dir.mkdir(parents=True, exist_ok=True) # save input data np.savez(input_dir / "input.npz", **self.format_input(next(iter(dataloader))[0], io_config)) # save inference config config_path = temp_dir_path / "config.json" with config_path.open("w") as f: json.dump(inference_config, f) # run inference self._run_inference( config_path=config_path, model_path=model.model_path, input_dir=input_dir, output_dir=output_dir, external_initializers_path=model.external_initializers_path, constant_inputs_path=model.constant_inputs_path, ) # load output return np.load(output_dir / "output.npy").tolist()