Source code for olive.model

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import logging
import os
import shutil
import tempfile
from abc import ABC, abstractmethod
from copy import deepcopy
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

import onnx
import torch
import yaml
from onnx import AttributeProto, GraphProto
from pydantic import validator

if TYPE_CHECKING:
    from azure.ai.ml import MLClient
from olive.common.config_utils import ConfigBase, serialize_to_json, validate_config
from olive.common.ort_inference import get_ort_inference_session
from olive.common.user_module_loader import UserModuleLoader
from olive.constants import Framework, ModelFileFormat
from olive.hardware import AcceleratorLookup, Device
from olive.hf_utils import (
    get_hf_model_config,
    huggingface_model_loader,
    load_huggingface_model_from_model_class,
    load_huggingface_model_from_task,
)
from olive.snpe import SNPEDevice, SNPEInferenceSession, SNPESessionOptions
from olive.snpe.tools.dev import get_dlc_metrics

REGISTRY = {}
logger = logging.getLogger(__name__)


class ModelStorageKind(str, Enum):
    LocalFile = "file"
    LocalFolder = "folder"
    AzureMLModel = "azureml"

    def __str__(self) -> str:
        return self.value


class OliveModel(ABC):
    """
    Abstraction for logical "Model", it contains model path and related metadata.
    Each technique accepts Model as input, return Model as output.
    """

    @classmethod
    def __init_subclass__(cls, **kwargs) -> None:
        """Register the model."""
        super().__init_subclass__(**kwargs)
        REGISTRY[cls.__name__.lower()] = cls

    def __init__(
        self,
        framework: Framework,
        model_file_format: ModelFileFormat,
        model_path: Optional[Union[Path, str]] = None,
        name: Optional[str] = None,
        version: Optional[int] = None,
        aml_storage_name: Optional[str] = None,
        model_storage_kind: ModelStorageKind = ModelStorageKind.LocalFile,
    ):
        if isinstance(model_storage_kind, str):
            model_storage_kind = ModelStorageKind(model_storage_kind)

        assert isinstance(model_storage_kind, ModelStorageKind)

        if model_storage_kind == ModelStorageKind.AzureMLModel:
            if not name:
                raise Exception("Please specify model 'name' for Azure ML model")
            if not version:
                raise Exception("Please specify model 'version' for Azure ML model")
            self.model_path = f"azureml:{name}:{version}"
        else:
            self.model_path = model_path
        self.version = version
        self.framework = framework
        self.model_file_format = model_file_format
        self.name = name
        self.aml_storage_name = aml_storage_name
        self.composite_parent = None
        self.model_storage_kind = model_storage_kind

    @abstractmethod
    def load_model(self, rank: int = None) -> object:
        """
        Load model from disk, return in-memory model object
        Derived class should implement its specific logic if needed.
        """
        raise NotImplementedError()

    @abstractmethod
    def prepare_session(
        self,
        inference_settings: Optional[Dict[str, Any]] = None,
        device: Device = Device.CPU,
        execution_providers: Union[str, List[str]] = None,
        rank: Optional[int] = None,
    ):
        """
        Prepare inference session for Olive model, return in-memory inference session.
        Derived class should implement its specific logic if needed.
        """
        raise NotImplementedError()

    def download_from_azureml(self, ml_client: "MLClient", download_path: Union[Path, str]):
        if ml_client is None:
            raise Exception("Azure ML client is not initialized")
        if self.model_storage_kind != ModelStorageKind.AzureMLModel:
            raise Exception("Only Azure ML model can be downloaded from Azure ML workspace")
        if not self.aml_storage_name:
            logger.error(
                f"Error to download model {self.model_path} from Azure ML workspace: aml_storage_name is not specified"
            )
            raise Exception("Please specify model 'aml_storage_name' for Azure ML model")
        try:
            ml_client.models.download(name=self.name, version=self.version, download_path=download_path)
        except Exception as e:
            logger.error(f"Failed to downloafd model {self.name} from Azure ML workspace, error: {e}")
            raise e

        model_path = Path(download_path) / self.name / self.aml_storage_name
        return model_path

    def set_composite_parent(self, cp):
        self.composite_parent = cp

    def get_composite_parent(self):
        return self.composite_parent

    def to_json(self, check_object: bool = False):
        model_path = self.model_path
        if model_path and Path(model_path).exists():
            model_path = Path(model_path)
        config = {
            "type": self.__class__.__name__,
            "config": {
                "model_path": model_path,
                "name": self.name,
                "model_storage_kind": self.model_storage_kind.value,
                "version": self.version,
            },
        }
        return serialize_to_json(config, check_object)


class ModelConfig(ConfigBase):
    type: str
    config: dict

    @validator("type")
    def validate_type(cls, v):
        if v.lower() not in REGISTRY:
            raise ValueError(f"Unknown model type {v}")
        return v

    def create_model(self):
        return REGISTRY[self.type.lower()](**self.config)


class IOConfig(ConfigBase):
    # TODO remove input names, shapes and types, turn to use olive dataset conifg.
    input_names: List[str]
    input_shapes: List[List[int]] = None
    input_types: List[str] = None
    output_names: List[str]
    output_shapes: List[List[int]] = None
    output_types: List[str] = None
    dynamic_axes: Dict[str, Dict[int, str]] = None
    # ONNX exporter might mark dimension like 'Transposepresent_value_self_1_dim_2' in shape inference
    # even though we want the dimension to be a constant int.
    # We use a workaround here: first use dim_param like "1" to represent the dimension, and then
    # convert it to int in the onnx model.
    string_to_int_dim_params: List[str] = None

    @validator("input_shapes", "input_types")
    def check_input_shapes(cls, v, values):
        if not v:
            return v

        if "input_names" not in values:
            raise ValueError("Invalid input_names")
        if len(v) != len(values["input_names"]):
            raise ValueError("input_names and input_shapes must have the same length")
        return v

    @validator("output_shapes", "output_types")
    def check_output_shapes(cls, v, values):
        if not v:
            return v

        if "output_names" not in values:
            raise ValueError("Invalid output_names")
        if len(v) != len(values["output_names"]):
            raise ValueError("output_names and output_shapes must have the same length")
        return v

    @validator("dynamic_axes")
    def convert_dynamic_axes(cls, v):
        if not v:
            return v

        dynamic_axes = v
        for k, v in dynamic_axes.items():
            dynamic_axes[k] = {int(kk): vv for kk, vv in v.items()}
        return dynamic_axes

    @validator("string_to_int_dim_params")
    def check_string_to_int_dim_params(cls, v):
        if not v:
            return v

        for dim_param in v:
            try:
                int(dim_param)
            except ValueError:
                raise ValueError(f"Invalid string_to_int_dim_params: {dim_param}. Must be castable to int.")
        return v


class HFComponent(ConfigBase):
    name: str
    io_config: IOConfig = None
    dummy_inputs_func: Union[str, Callable] = None


class HFConfig(ConfigBase):
    model_name: str = None
    task: str = None
    # TODO: remove model_class and only use task
    model_class: str = None
    use_ort_implementation: bool = False
    components: List[HFComponent] = None
    dataset: Dict[str, Any] = None

    @validator("model_class", always=True)
    def task_or_model_class_required(cls, v, values):
        if values["model_name"]:
            if not v and not values.get("task", None):
                raise ValueError("Either task or model_class must be specified")
            return v


class ONNXModelBase(OliveModel):
    """
    Abstract class to manage ONNX models
    """

    def __init__(
        self,
        model_path: Optional[Union[Path, str]] = None,
        name: Optional[str] = None,
        version: Optional[int] = None,
        aml_storage_name: Optional[str] = None,
        model_storage_kind: Union[str, ModelStorageKind] = ModelStorageKind.LocalFile,
        inference_settings: Optional[dict] = None,
        use_ort_extensions: bool = False,
    ):
        super().__init__(
            framework=Framework.ONNX,
            model_file_format=ModelFileFormat.ONNX,
            model_path=model_path,
            name=name,
            version=version,
            aml_storage_name=aml_storage_name,
            model_storage_kind=model_storage_kind,
        )
        self.inference_settings = inference_settings
        self.use_ort_extensions = use_ort_extensions

    def _is_valid_ep(self, filepath: str, ep: str = None):
        # TODO: should be remove if future accelerators is implemented
        # It should be a bug for onnxruntime where the execution provider is not be fallback.
        import onnxruntime as ort

        try:
            sess_options = ort.SessionOptions()
            if self.use_ort_extensions:
                # register custom ops for onnxruntime-extensions
                from onnxruntime_extensions import get_library_path

                sess_options.register_custom_ops_library(get_library_path())

            ort.InferenceSession(filepath, sess_options, providers=[ep])
        except Exception as e:
            logger.warning(
                f"Error: {e}Olive will ignore this {ep}."
                + f"Please make sure the environment with {ep} has the required dependencies."
            )
            return False
        return True

    @abstractmethod
    def get_default_execution_providers(self, device: Device):
        """
        Returns a list of supported default execution providers
        """
        return ["CPUExecutionProvider"]


[docs]class ONNXModel(ONNXModelBase): def __init__( self, model_path: str = None, name: Optional[str] = None, version: Optional[int] = None, aml_storage_name: Optional[str] = None, model_storage_kind: Union[str, ModelStorageKind] = ModelStorageKind.LocalFile, inference_settings: Optional[dict] = None, use_ort_extensions: bool = False, hf_config: Union[Dict[str, Any], HFConfig] = None, ): super().__init__( model_path=model_path, name=name, version=version, aml_storage_name=aml_storage_name, model_storage_kind=model_storage_kind, inference_settings=inference_settings, use_ort_extensions=use_ort_extensions, ) self.io_config = None self.graph = None self.all_graphs: Optional[List[GraphProto]] = None # huggingface config self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None @staticmethod def resolve_path(file_or_dir_path: str, model_filename: str = "model.onnx") -> str: """ The engine provides output paths to ONNX passes that do not contain .onnx extension (these paths are generally locations in the cache). This function will convert such paths to absolute file paths and also ensure the parent directories exist. If the input path is already an ONNX file it is simply returned. Examples: resolve_path("c:/foo/bar.onnx") -> c:/foo/bar.onnx resolve_path("c:/foo/bar") -> c:/foo/bar/model.onnx """ path = Path(file_or_dir_path) if path.suffix != ".onnx": path = path / model_filename parent_dir = path.parent if not parent_dir.exists(): parent_dir.mkdir(parents=True, exist_ok=True) return str(path) def load_model(self, rank: int = None) -> onnx.ModelProto: # HACK: ASSUME no external data return onnx.load(self.model_path) def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): # user provided inference_settings > model's inference_settings > default settings inference_settings = inference_settings or self.inference_settings or {} # deep copy to avoid modifying the original settings inference_settings = deepcopy(inference_settings) # if user doesn't not provide ep list, use default value([ep]). Otherwise, use the user's ep list # user provided ep list > eps given by arguments > default eps execution_providers = inference_settings.get("execution_provider") or execution_providers if not execution_providers: execution_providers = self.get_default_execution_providers(device) elif isinstance(execution_providers, str): execution_providers = [execution_providers] else: # the execution_providers is a list pass inference_settings["execution_provider"] = execution_providers if (device == Device.GPU) and (rank is not None) and not inference_settings.get("provider_options"): inference_settings["provider_options"] = [ {"device_id": str(rank)} if ep == "CUDAExecutionProvider" else {} for ep in execution_providers ] return get_ort_inference_session(self.model_path, inference_settings, self.use_ort_extensions) def nodes(self): for graph in self.get_all_graphs(): for node in graph.node: yield node def get_graph(self): if self.graph is not None: return self.graph self.graph = self.load_model().graph return self.graph def get_all_graphs(self): if self.all_graphs is not None: return self.all_graphs self.all_graphs = [] graph_queue = [self.get_graph()] while graph_queue: graph = graph_queue.pop(0) self.all_graphs.append(graph) for node in graph.node: for attr in node.attribute: if attr.type == AttributeProto.AttributeType.GRAPH: assert isinstance(attr.g, GraphProto) graph_queue.append(attr.g) if attr.type == AttributeProto.AttributeType.GRAPHS: for g in attr.graphs: assert isinstance(g, GraphProto) graph_queue.append(g) return self.all_graphs def output_name_to_node(self): output_name_to_node = {} for node in self.nodes(): for output_name in node.output: if output_name: # could be empty when it is optional output_name_to_node[output_name] = node return output_name_to_node def get_initializer(self, name): for graph in self.get_all_graphs(): for tensor in graph.initializer: if tensor.name == name: return tensor return None def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update( { "inference_settings": self.inference_settings, "use_ort_extensions": self.use_ort_extensions, "hf_config": self.hf_config, } ) return serialize_to_json(config, check_object) def get_default_execution_providers(self, device: Device): # return firstly available ep as ort default ep available_providers = AcceleratorLookup.get_execution_providers_for_device(device) for ep in available_providers: if self._is_valid_ep(self.model_path, ep): return [ep] return super().get_default_execution_providers(device) def get_io_config(self): """ Get input/output names, shapes, types of the onnx model without creating an ort session. This function loads the onnx model and parses the graph to get the io config. """ if self.io_config: return self.io_config try: from onnx.helper import tensor_dtype_to_np_dtype except ImportError: from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE def tensor_dtype_to_np_dtype(tensor_type): return TENSOR_TYPE_TO_NP_TYPE[tensor_type] # external data is not needed for io config parsing # the .onnx model already contains all of the graph information # this method works whether the external data is in the same directory or not model = onnx.load(self.model_path, load_external_data=False) io_config = { "input_names": [], "input_shapes": [], "input_types": [], "output_names": [], "output_shapes": [], "output_types": [], } for prefix, ios in [("input", model.graph.input), ("output", model.graph.output)]: for io in ios: # get name, type, shape name = io.name tensor_type = io.type.tensor_type if tensor_type.elem_type == 0: # sequence type # TODO: add support for different types # refer to https://github.com/lutzroeder/netron/blob/main/source/onnx.js#L1424 tensor_type = io.type.sequence_type.elem_type.tensor_type data_type = str(tensor_dtype_to_np_dtype(tensor_type.elem_type)) shape = [dim.dim_param if dim.dim_param else dim.dim_value for dim in tensor_type.shape.dim] # append to io_config io_config[f"{prefix}_names"].append(name) io_config[f"{prefix}_types"].append(data_type) io_config[f"{prefix}_shapes"].append(shape) # save io_config self.io_config = io_config return io_config
[docs]class PyTorchModel(OliveModel): def __init__( self, model_path: str = None, model_file_format: ModelFileFormat = ModelFileFormat.PYTORCH_ENTIRE_MODEL, name: Optional[str] = None, version: Optional[int] = None, aml_storage_name: Optional[str] = None, model_storage_kind: Union[str, ModelStorageKind] = ModelStorageKind.LocalFolder, model_loader: Union[str, Callable] = None, model_script: Union[str, Path] = None, script_dir: Union[str, Path] = None, io_config: Union[Dict[str, Any], IOConfig] = None, dummy_inputs_func: Union[str, Callable] = None, hf_config: Union[Dict[str, Any], HFConfig] = None, ): if not ( isinstance(model_loader, Callable) or (isinstance(model_loader, str) and model_script) or model_path or model_storage_kind == ModelStorageKind.AzureMLModel or hf_config ): raise ValueError( "model_path or model_storage_kind/AzureMLModel is required " "since model_loader is not callable or model_script is not provided" ) self.model_loader = model_loader self.model_script = model_script self.script_dir = script_dir self.model = None super().__init__( framework=Framework.PYTORCH, model_file_format=model_file_format, model_path=model_path, name=name, version=version, aml_storage_name=aml_storage_name, model_storage_kind=model_storage_kind, ) # io config for conversion to onnx self.io_config = validate_config(io_config, IOConfig) if io_config else None self.dummy_inputs_func = dummy_inputs_func self.dummy_inputs = None # huggingface config self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None def load_model(self, rank: int = None) -> torch.nn.Module: if self.model is not None: return self.model if self.model_loader is not None: user_module_loader = UserModuleLoader(self.model_script, self.script_dir) model = user_module_loader.call_object(self.model_loader, self.model_path) elif self.hf_config and (self.hf_config.model_class or self.hf_config.task): input_model = self.model_path or self.hf_config.model_name if self.hf_config.task: model = load_huggingface_model_from_task(self.hf_config.task, input_model) else: model = load_huggingface_model_from_model_class( self.hf_config.model_class, input_model, self.hf_config.use_ort_implementation ) else: if self.model_file_format == ModelFileFormat.PYTORCH_ENTIRE_MODEL: model = torch.load(self.model_path) elif self.model_file_format == ModelFileFormat.PYTORCH_TORCH_SCRIPT: model = torch.jit.load(self.model_path) elif self.model_file_format == ModelFileFormat.PYTORCH_MLFLOW_MODEL: model = self.load_mlflow_model() elif self.model_file_format == ModelFileFormat.PYTORCH_STATE_DICT: raise ValueError("Please use customized model loader to load state dict model.") else: raise ValueError(f"Unsupported model file format: {self.model_file_format}") self.model = model return model def load_mlflow_model(self): tmp_dir = tempfile.TemporaryDirectory(prefix="mlflow_tmp") tmp_dir_path = Path(tmp_dir.name) shutil.copytree(os.path.join(self.model_path, "data/model"), tmp_dir_path, dirs_exist_ok=True) shutil.copytree(os.path.join(self.model_path, "data/config"), tmp_dir_path, dirs_exist_ok=True) shutil.copytree(os.path.join(self.model_path, "data/tokenizer"), tmp_dir_path, dirs_exist_ok=True) with open(os.path.join(self.model_path, "MLmodel"), "r") as fp: mlflow_data = yaml.safe_load(fp) hf_pretrained_class = mlflow_data["flavors"]["hftransformers"]["hf_pretrained_class"] model_loader = huggingface_model_loader(hf_pretrained_class) loaded_model = model_loader(tmp_dir_path) loaded_model.eval() tmp_dir.cleanup() return loaded_model def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): return self.load_model().eval() # TODO: remove this method once we have olive datasets implemented. # The dataset should be able to provide the dummy inputs. def get_dummy_inputs(self): """ Return a dummy input for the model. """ if self.dummy_inputs is not None: return self.dummy_inputs assert self.dummy_inputs_func or ( self.io_config and self.io_config.input_shapes ), "dummy_inputs_func or io_config.input_shapes must be provided to get dummy input" if self.dummy_inputs_func is not None: user_module_loader = UserModuleLoader(self.model_script, self.script_dir) dummy_inputs = user_module_loader.call_object(self.dummy_inputs_func, self) else: str_to_type = { "float32": torch.float32, "float16": torch.float16, "int32": torch.int32, "int64": torch.int64, "int8": torch.int8, "bool": torch.bool, } input_types = self.io_config.input_types or ["float32"] * len(self.io_config.input_shapes) dummy_inputs = [] for shape, dtype in zip(self.io_config.input_shapes, input_types): dummy_inputs.append(torch.zeros(shape, dtype=str_to_type[dtype])) dummy_inputs = tuple(dummy_inputs) if len(dummy_inputs) > 1 else dummy_inputs[0] self.dummy_inputs = dummy_inputs return dummy_inputs def get_model_config(self): if self.hf_config is None: raise Exception("HF model_config is not available") return get_hf_model_config(self.hf_config.model_name) @property def components(self) -> List[str]: """ Names of the components of the model. """ if not self.hf_config or not self.hf_config.components: return None return [component.name for component in self.hf_config.components] def get_component(self, component_name: str) -> "PyTorchModel": """ Get a component of the model as a PyTorchModel. """ assert self.components, "hf_config.components must be provided to get component" assert component_name in self.components, f"component {component_name} not found in hf_config" model = self.load_model() model_component = getattr(model, component_name) # get the component from hf_config components_dict = {component.name: component for component in self.hf_config.components} hf_component = components_dict[component_name] def model_loader(_): return model_component return PyTorchModel( model_loader=model_loader, name=hf_component.name, io_config=hf_component.io_config, dummy_inputs_func=hf_component.dummy_inputs_func, model_script=self.model_script, script_dir=self.script_dir, ) def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update( { "model_loader": self.model_loader, "model_script": Path(self.model_script) if self.model_script else None, "script_dir": Path(self.script_dir) if self.script_dir else None, "io_config": self.io_config, "dummy_inputs_func": self.dummy_inputs_func, "hf_config": self.hf_config, } ) return serialize_to_json(config, check_object)
class OptimumModel(OliveModel): def __init__(self, model_path: str, name: str, model_components: List[str]): super().__init__( name=name, model_storage_kind=ModelStorageKind.LocalFolder, model_path=model_path, model_file_format=ModelFileFormat.OPTIMUM, framework=Framework.PYTORCH, ) self.model_components = model_components def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session(self, inference_settings: Dict[str, Any], device: Device, rank: int = None): raise NotImplementedError() def to_json(self, check_object: bool = False): config = { "type": self.__class__.__name__, "config": {"name": self.name, "model_path": self.model_path, "model_components": self.model_components}, } return serialize_to_json(config, check_object)
[docs]class SNPEModel(OliveModel): def __init__( self, input_names: List[str], input_shapes: List[List[int]], output_names: List[str], output_shapes: List[List[int]], model_path: str = None, model_storage_kind=ModelStorageKind.LocalFile, name: Optional[str] = None, version: Optional[int] = None, ): super().__init__( framework=Framework.SNPE, model_file_format=ModelFileFormat.SNPE_DLC, model_path=model_path, name=name, version=version, model_storage_kind=model_storage_kind, ) self.io_config = { "input_names": input_names, "input_shapes": input_shapes, "output_names": output_names, "output_shapes": output_shapes, } def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ) -> SNPEInferenceSession: session_options = SNPESessionOptions(**inference_settings) if inference_settings else None if device == Device.NPU: device = SNPEDevice.DSP session_options.device = device return SNPEInferenceSession(self.model_path, self.io_config, session_options) def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update(self.io_config) return serialize_to_json(config, check_object) def get_dlc_metrics(self) -> dict: return get_dlc_metrics(self.model_path)
class TensorFlowModel(OliveModel): def __init__( self, model_path: str = None, model_file_format: ModelFileFormat = ModelFileFormat.TENSORFLOW_SAVED_MODEL, name: Optional[str] = None, model_storage_kind=ModelStorageKind.LocalFolder, ): super().__init__( model_path=model_path, framework=Framework.TENSORFLOW, model_file_format=model_file_format, name=name, model_storage_kind=model_storage_kind, ) def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): raise NotImplementedError()
[docs]class OpenVINOModel(OliveModel): def __init__( self, model_path: str, name: str = None, model_storage_kind=ModelStorageKind.LocalFolder, version: Optional[int] = None, aml_storage_name: Optional[str] = None, ): super().__init__( model_path=model_path, framework=Framework.OPENVINO, model_file_format=ModelFileFormat.OPENVINO_IR, name=name, version=version, aml_storage_name=aml_storage_name, model_storage_kind=model_storage_kind, ) if len(list(Path(model_path).glob("*.xml"))) == 0 or len(list(Path(model_path).glob("*.bin"))) == 0: raise Exception(f"No OpenVINO model found in {model_path}") if len(list(Path(model_path).glob("*.xml"))) > 1 or len(list(Path(model_path).glob("*.bin"))) > 1: raise Exception(f"More than 1 OpenVINO models are found in {model_path}") for model_file in Path(model_path).glob("*.xml"): ov_model = Path(model_file) for weights_file in Path(model_path).glob("*.bin"): ov_weights = Path(weights_file) self.model_config = { "model_name": name if name else ov_model.stem, "model": str(ov_model.resolve()), "weights": str(ov_weights.resolve()), } def load_model(self, rank: int = None): try: from openvino.tools.pot import load_model except ImportError: raise ImportError("Please install olive-ai[openvino] to use OpenVINO model") return load_model(self.model_config) def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): try: from openvino.runtime import Core except ImportError: raise ImportError("Please install olive-ai[openvino] to use OpenVINO model") ie = Core() model_pot = ie.read_model(model=self.model_config["model"]) if device == Device.INTEL_MYRIAD: device = "MYRIAD" compiled_model = ie.compile_model(model=model_pot, device_name=device.upper()) return compiled_model
[docs]class DistributedOnnxModel(ONNXModelBase): EXECUTION_PROVIDERS = { "cpu": ["CPUExecutionProvider"], "gpu": ["CUDAExecutionProvider", "CPUExecutionProvider"], } def __init__( self, model_filepaths: List[Union[Path, str]] = [], name: Optional[str] = None, version: Optional[int] = None, aml_storage_name: Optional[str] = None, inference_settings: Optional[dict] = None, use_ort_extensions: bool = False, ): super().__init__( model_path=None, name=name, version=version, aml_storage_name=aml_storage_name, model_storage_kind=ModelStorageKind.LocalFolder, inference_settings=inference_settings, use_ort_extensions=use_ort_extensions, ) self.model_filepaths = model_filepaths @property def ranks(self): return len(self.model_filepaths) def ranked_model_path(self, rank: int) -> Union[Path, str]: return self.model_filepaths[rank] def load_model(self, rank: int) -> ONNXModel: return ONNXModel(self.model_filepaths[rank], inference_settings=self.inference_settings) def prepare_session( self, inference_settings: Optional[Dict[str, Any]] = None, device: Device = Device.GPU, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = 0, ): raise RuntimeError("DistributedOnnxModel doesn't have a session of its own") def get_default_execution_providers(self, filepath: str, device: Device): # return firstly available ep as ort default ep available_providers = DistributedOnnxModel.get_execution_providers(device) for ep in available_providers: if self._is_valid_ep(filepath, ep): return [ep] return ["CUDAExecutionProvider", "CPUExecutionProvider"] @staticmethod def get_execution_providers(device: Device): import onnxruntime as ort eps_per_device = DistributedOnnxModel.EXECUTION_PROVIDERS.get(device) available_providers = ort.get_available_providers() return AcceleratorLookup.get_execution_providers(eps_per_device, available_providers) def to_json(self, check_object: bool = False): config = { "type": self.__class__.__name__, "config": { "model_filepaths": self.model_filepaths, "name": self.name, "version": self.version, "inference_settings": self.inference_settings, "use_ort_extensions": self.use_ort_extensions, }, } return serialize_to_json(config, check_object=check_object)
[docs]class CompositeOnnxModel(ONNXModelBase): """ CompositeOnnxModel represents multi component models. Whisper is an example composite model that has encoder and decoder components. CompositeOnnxModel is a collection of OnnxModels. """ def __init__( self, model_components: List[str], name: Optional[str] = None, version: Optional[int] = None, aml_storage_name: Optional[str] = None, hf_config: Union[Dict[str, Any], HFConfig] = None, ): super().__init__( model_path=None, name=name, version=version, aml_storage_name=aml_storage_name, model_storage_kind=ModelStorageKind.LocalFolder, ) if isinstance(model_components[0], dict): assert all( [m.get("type").lower() == "onnxmodel" for m in model_components] ), "All components must be ONNXModel" self.model_components = [ONNXModel(**m.get("config", {})) for m in model_components] else: assert all([isinstance(m, ONNXModel) for m in model_components]), "All components must be ONNXModel" self.model_components = model_components for m in self.model_components: m.set_composite_parent(self) self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): raise NotImplementedError() def get_default_execution_providers(self, device: Device): raise NotImplementedError() def get_model_components(self): return self.model_components def get_model_component(self, idx): return self.model_components[idx] def get_model_config(self): if self.hf_config is None: raise Exception("HF model_config is not available") return get_hf_model_config(self.hf_config.model_name) def to_json(self, check_object: bool = False): json_dict = { "type": self.__class__.__name__, "config": {"name": self.name, "version": self.version, "hf_config": self.hf_config}, } json_dict["config"]["model_components"] = [] for m in self.model_components: json_dict["config"]["model_components"].append(m.to_json(check_object)) return serialize_to_json(json_dict, check_object)