Source code for olive.model

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import logging
import os
import shutil
import tempfile
from abc import ABC, abstractmethod
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union

import onnx
import torch
import yaml
from onnx import AttributeProto, GraphProto
from pydantic import validator

import olive.data.template as data_config_template
from olive.common.config_utils import ConfigBase, serialize_to_json, validate_config
from olive.common.ort_inference import get_ort_inference_session
from olive.common.user_module_loader import UserModuleLoader
from olive.constants import Framework, ModelFileFormat
from olive.hardware import AcceleratorLookup, Device
from olive.model.hf_utils import (
    HFConfig,
    get_hf_model_config,
    get_hf_model_dummy_input,
    huggingface_model_loader,
    load_huggingface_model_from_model_class,
    load_huggingface_model_from_task,
)
from olive.model.model_config import IOConfig
from olive.resource_path import (
    OLIVE_RESOURCE_ANNOTATIONS,
    ResourcePath,
    ResourcePathConfig,
    ResourceType,
    create_resource_path,
)
from olive.snpe import SNPEDevice, SNPEInferenceSession, SNPESessionOptions
from olive.snpe.tools.dev import get_dlc_metrics

REGISTRY = {}
logger = logging.getLogger(__name__)


class OliveModel(ABC):
    """
    Abstraction for logical "Model", it contains model path and related metadata.
    Each technique accepts Model as input, return Model as output.
    """

    @classmethod
    def __init_subclass__(cls, **kwargs) -> None:
        """Register the model."""
        super().__init_subclass__(**kwargs)
        REGISTRY[cls.__name__.lower()] = cls

    def __init__(
        self,
        framework: Framework,
        model_file_format: ModelFileFormat,
        model_path: OLIVE_RESOURCE_ANNOTATIONS = None,
    ):
        self.framework = framework
        self.model_file_format = model_file_format
        self.model_resource_path = create_resource_path(model_path) if model_path else None
        self.local_model_path = None
        self.composite_parent = None
        self.io_config = None

    @property
    def model_path(self) -> str:
        """Return local model path."""
        # return None if model_path is None
        if self.model_resource_path is None:
            return None

        # return local path if model_path is local or string name
        if self.model_resource_path.is_local_resource() or self.model_resource_path.is_string_name():
            return self.model_resource_path.get_path()

        # check if model_path is downloaded
        if self.local_model_path is None:
            raise ValueError(
                "Model is not local and not downloaded yet. Please call download_model() or set_local_model_path()"
                " first."
            )
        return self.local_model_path.get_path()

    def download_model(self, download_path: Union[Path, str], overwrite: bool = False) -> str:
        """
        Download model to local path.

        :param download_path: local directory to download model to.
        :param overwrite: whether to overwrite existing model. If not overwrite, will raise exception if model exists.
        :return: local model path.
        """
        # return None if model_path is None
        if self.model_resource_path is None:
            logger.debug("Model path is None, skip downloading.")
            return None

        # return local path if model_path is local or string name
        if self.model_resource_path.is_local_resource() or self.model_resource_path.is_string_name():
            logger.debug("Model path is local or string name, skip downloading.")
            return self.model_resource_path.get_path()

        # download model
        logger.debug(f"Downloading model to {download_path}")
        self.local_model_path = create_resource_path(
            self.model_resource_path.save_to_dir(download_path, overwrite=overwrite)
        )
        return self.local_model_path.get_path()

    def set_local_model_path(self, local_model_path: Union[Path, str, ResourcePath, ResourcePathConfig]):
        """
        Set local model path.

        :param local_model_path: local model path.
        """
        self.local_model_path = create_resource_path(local_model_path)
        assert (
            self.local_model_path.is_local_resource() or self.local_model_path.is_string_name()
        ), "local_model_path must be local resource path or string name."

    @abstractmethod
    def load_model(self, rank: int = None) -> object:
        """
        Load model from disk, return in-memory model object
        Derived class should implement its specific logic if needed.
        """
        raise NotImplementedError()

    @abstractmethod
    def prepare_session(
        self,
        inference_settings: Optional[Dict[str, Any]] = None,
        device: Device = Device.CPU,
        execution_providers: Union[str, List[str]] = None,
        rank: Optional[int] = None,
    ):
        """
        Prepare inference session for Olive model, return in-memory inference session.
        Derived class should implement its specific logic if needed.
        """
        raise NotImplementedError()

    def set_composite_parent(self, cp):
        self.composite_parent = cp

    def get_composite_parent(self):
        return self.composite_parent

    def get_io_config(self) -> Dict[str, Any]:
        return self.io_config

    def to_json(self, check_object: bool = False):
        config = {
            "type": self.__class__.__name__,
            "config": {"model_path": self.model_resource_path.to_json() if self.model_resource_path else None},
        }
        return serialize_to_json(config, check_object)


class ModelConfig(ConfigBase):
    type: str
    config: dict

    @validator("type")
    def validate_type(cls, v):
        if v.lower() not in REGISTRY:
            raise ValueError(f"Unknown model type {v}")
        return v

    def create_model(self):
        return REGISTRY[self.type.lower()](**self.config)


class ONNXModelBase(OliveModel):
    """
    Abstract class to manage ONNX models
    """

    def __init__(
        self,
        model_path: OLIVE_RESOURCE_ANNOTATIONS = None,
        inference_settings: Optional[dict] = None,
        use_ort_extensions: bool = False,
    ):
        super().__init__(framework=Framework.ONNX, model_file_format=ModelFileFormat.ONNX, model_path=model_path)
        self.inference_settings = inference_settings
        self.use_ort_extensions = use_ort_extensions

    def _is_valid_ep(self, filepath: str, ep: str = None):
        # TODO: should be remove if future accelerators is implemented
        # It should be a bug for onnxruntime where the execution provider is not be fallback.
        import onnxruntime as ort

        try:
            sess_options = ort.SessionOptions()
            if self.use_ort_extensions:
                # register custom ops for onnxruntime-extensions
                from onnxruntime_extensions import get_library_path

                sess_options.register_custom_ops_library(get_library_path())

            ort.InferenceSession(filepath, sess_options, providers=[ep])
        except Exception as e:
            logger.warning(
                f"Error: {e}Olive will ignore this {ep}."
                + f"Please make sure the environment with {ep} has the required dependencies."
            )
            return False
        return True

    @abstractmethod
    def get_default_execution_providers(self, device: Device):
        """
        Returns a list of supported default execution providers
        """
        return ["CPUExecutionProvider"]


[docs]class ONNXModel(ONNXModelBase): def __init__( self, model_path: OLIVE_RESOURCE_ANNOTATIONS = None, onnx_file_name: Optional[str] = None, inference_settings: Optional[dict] = None, use_ort_extensions: bool = False, hf_config: Union[Dict[str, Any], HFConfig] = None, ): super().__init__( model_path=model_path, inference_settings=inference_settings, use_ort_extensions=use_ort_extensions, ) self.onnx_file_name = onnx_file_name self.io_config = None self.graph = None self.all_graphs: Optional[List[GraphProto]] = None # huggingface config self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None # if model_path is local folder, check for onnx file name if self.model_resource_path and self.model_resource_path.type == ResourceType.LocalFolder: self.get_onnx_file_path(self.model_resource_path.get_path(), self.onnx_file_name) @staticmethod def get_onnx_file_path(model_path: str, onnx_file_name: Optional[str] = None) -> str: """ Get the path to the ONNX model file. If model_path is a file, it is returned as is. If model_path is a directory, the onnx_file_name is appended to it and the resulting path is returned. If onnx_file_name is not specified, it is inferred if there is only one .onnx file in the directory, else an error is raised. """ assert Path(model_path).exists(), f"Model path {model_path} does not exist" # if model_path is a file, return it as is if Path(model_path).is_file(): return model_path # if model_path is a directory, append onnx_file_name to it if onnx_file_name: onnx_file_path = Path(model_path) / onnx_file_name assert onnx_file_path.exists(), f"ONNX model file {onnx_file_path} does not exist" return str(onnx_file_path) # try to infer onnx_file_name logger.warning( "model_path is a directory but onnx_file_name is not specified. Trying to infer it. It is recommended to" " specify onnx_file_name explicitly." ) onnx_file_names = list(Path(model_path).glob("*.onnx")) if len(onnx_file_names) == 1: return str(onnx_file_names[0]) elif len(onnx_file_names) > 1: raise ValueError( f"Multiple .onnx model files found in the model folder {model_path}. Please specify one using the" " onnx_file_name argument." ) else: raise ValueError(f"No .onnx file found in the model folder {model_path}.") @property def model_path(self) -> str: model_path = super().model_path model_path = self.get_onnx_file_path(model_path, self.onnx_file_name) if model_path else None return model_path @staticmethod def resolve_path(file_or_dir_path: str, model_filename: str = "model.onnx") -> str: """ The engine provides output paths to ONNX passes that do not contain .onnx extension (these paths are generally locations in the cache). This function will convert such paths to absolute file paths and also ensure the parent directories exist. If the input path is already an ONNX file it is simply returned. Examples: resolve_path("c:/foo/bar.onnx") -> c:/foo/bar.onnx resolve_path("c:/foo/bar") -> c:/foo/bar/model.onnx """ path = Path(file_or_dir_path) if path.suffix != ".onnx": path = path / model_filename parent_dir = path.parent if not parent_dir.exists(): parent_dir.mkdir(parents=True, exist_ok=True) return str(path) def load_model(self, rank: int = None) -> onnx.ModelProto: return onnx.load(self.model_path) def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): # user provided inference_settings > model's inference_settings > default settings inference_settings = inference_settings or self.inference_settings or {} # deep copy to avoid modifying the original settings inference_settings = deepcopy(inference_settings) # if user doesn't not provide ep list, use default value([ep]). Otherwise, use the user's ep list # user provided ep list > eps given by arguments > default eps execution_providers = inference_settings.get("execution_provider") or execution_providers if not execution_providers: execution_providers = self.get_default_execution_providers(device) elif isinstance(execution_providers, str): execution_providers = [execution_providers] else: # the execution_providers is a list pass inference_settings["execution_provider"] = execution_providers if (device == Device.GPU) and (rank is not None) and not inference_settings.get("provider_options"): inference_settings["provider_options"] = [ {"device_id": str(rank)} if ep == "CUDAExecutionProvider" else {} for ep in execution_providers ] return get_ort_inference_session(self.model_path, inference_settings, self.use_ort_extensions) def nodes(self): for graph in self.get_all_graphs(): for node in graph.node: yield node def get_graph(self): if self.graph is not None: return self.graph self.graph = self.load_model().graph return self.graph def get_all_graphs(self): if self.all_graphs is not None: return self.all_graphs self.all_graphs = [] graph_queue = [self.get_graph()] while graph_queue: graph = graph_queue.pop(0) self.all_graphs.append(graph) for node in graph.node: for attr in node.attribute: if attr.type == AttributeProto.AttributeType.GRAPH: assert isinstance(attr.g, GraphProto) graph_queue.append(attr.g) if attr.type == AttributeProto.AttributeType.GRAPHS: for g in attr.graphs: assert isinstance(g, GraphProto) graph_queue.append(g) return self.all_graphs def output_name_to_node(self): output_name_to_node = {} for node in self.nodes(): for output_name in node.output: if output_name: # could be empty when it is optional output_name_to_node[output_name] = node return output_name_to_node def get_initializer(self, name): for graph in self.get_all_graphs(): for tensor in graph.initializer: if tensor.name == name: return tensor return None def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update( { "onnx_file_name": self.onnx_file_name, "inference_settings": self.inference_settings, "use_ort_extensions": self.use_ort_extensions, "hf_config": self.hf_config, } ) return serialize_to_json(config, check_object) def get_default_execution_providers(self, device: Device): # return firstly available ep as ort default ep available_providers = AcceleratorLookup.get_execution_providers_for_device(device) for ep in available_providers: if self._is_valid_ep(self.model_path, ep): return [ep] return super().get_default_execution_providers(device) def get_io_config(self): """ Get input/output names, shapes, types of the onnx model without creating an ort session. This function loads the onnx model and parses the graph to get the io config. """ if self.io_config: return self.io_config try: from onnx.helper import tensor_dtype_to_np_dtype except ImportError: from onnx.mapping import TENSOR_TYPE_TO_NP_TYPE def tensor_dtype_to_np_dtype(tensor_type): return TENSOR_TYPE_TO_NP_TYPE[tensor_type] # external data is not needed for io config parsing # the .onnx model already contains all of the graph information # this method works whether the external data is in the same directory or not model = onnx.load(self.model_path, load_external_data=False) io_config = { "input_names": [], "input_shapes": [], "input_types": [], "output_names": [], "output_shapes": [], "output_types": [], } for prefix, ios in [("input", model.graph.input), ("output", model.graph.output)]: for io in ios: # get name, type, shape name = io.name tensor_type = io.type.tensor_type if tensor_type.elem_type == 0: # sequence type # TODO: add support for different types # refer to https://github.com/lutzroeder/netron/blob/main/source/onnx.js#L1424 tensor_type = io.type.sequence_type.elem_type.tensor_type data_type = str(tensor_dtype_to_np_dtype(tensor_type.elem_type)) shape = [dim.dim_param if dim.dim_param else dim.dim_value for dim in tensor_type.shape.dim] # append to io_config io_config[f"{prefix}_names"].append(name) io_config[f"{prefix}_types"].append(data_type) io_config[f"{prefix}_shapes"].append(shape) # save io_config self.io_config = io_config return self.io_config
[docs]class PyTorchModel(OliveModel): def __init__( self, model_path: OLIVE_RESOURCE_ANNOTATIONS = None, model_file_format: ModelFileFormat = ModelFileFormat.PYTORCH_ENTIRE_MODEL, model_loader: Union[str, Callable] = None, model_script: Union[str, Path] = None, script_dir: Union[str, Path] = None, io_config: Union[Dict[str, Any], IOConfig] = None, dummy_inputs_func: Union[str, Callable] = None, hf_config: Union[Dict[str, Any], HFConfig] = None, ): if not ( isinstance(model_loader, Callable) or (isinstance(model_loader, str) and model_script) or model_path or hf_config ): raise ValueError( "model_path is required since model_loader is not callable or model_script is not provided" ) self.model_loader = model_loader self.model_script = model_script self.script_dir = script_dir self.model = None super().__init__(framework=Framework.PYTORCH, model_file_format=model_file_format, model_path=model_path) # io config for conversion to onnx self.io_config = validate_config(io_config, IOConfig).dict() if io_config else None self.dummy_inputs_func = dummy_inputs_func self.dummy_inputs = None # huggingface config self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None def load_model(self, rank: int = None) -> torch.nn.Module: if self.model is not None: return self.model if self.model_loader is not None: user_module_loader = UserModuleLoader(self.model_script, self.script_dir) model = user_module_loader.call_object(self.model_loader, self.model_path) elif self.hf_config and (self.hf_config.model_class or self.hf_config.task): input_model = self.model_path or self.hf_config.model_name if self.hf_config.task: model = load_huggingface_model_from_task(self.hf_config.task, input_model) else: model = load_huggingface_model_from_model_class(self.hf_config.model_class, input_model) else: if self.model_file_format == ModelFileFormat.PYTORCH_ENTIRE_MODEL: model = torch.load(self.model_path) elif self.model_file_format == ModelFileFormat.PYTORCH_TORCH_SCRIPT: model = torch.jit.load(self.model_path) elif self.model_file_format == ModelFileFormat.PYTORCH_MLFLOW_MODEL: model = self.load_mlflow_model() elif self.model_file_format == ModelFileFormat.PYTORCH_STATE_DICT: raise ValueError("Please use customized model loader to load state dict model.") else: raise ValueError(f"Unsupported model file format: {self.model_file_format}") self.model = model return model def load_mlflow_model(self): tmp_dir = tempfile.TemporaryDirectory(prefix="mlflow_tmp") tmp_dir_path = Path(tmp_dir.name) shutil.copytree(os.path.join(self.model_path, "data/model"), tmp_dir_path, dirs_exist_ok=True) shutil.copytree(os.path.join(self.model_path, "data/config"), tmp_dir_path, dirs_exist_ok=True) shutil.copytree(os.path.join(self.model_path, "data/tokenizer"), tmp_dir_path, dirs_exist_ok=True) with open(os.path.join(self.model_path, "MLmodel"), "r") as fp: mlflow_data = yaml.safe_load(fp) hf_pretrained_class = mlflow_data["flavors"]["hftransformers"]["hf_pretrained_class"] model_loader = huggingface_model_loader(hf_pretrained_class) loaded_model = model_loader(tmp_dir_path) loaded_model.eval() tmp_dir.cleanup() return loaded_model def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): return self.load_model().eval() def get_dummy_inputs(self): """ Return a dummy input for the model. """ if self.dummy_inputs is not None: return self.dummy_inputs # Priority: dummy_inputs_func > io_config.input_shapes > hf_config.dataset > onnx_config dummy_inputs = None if self.dummy_inputs_func is not None: logger.debug("Using dummy_inputs_func to get dummy inputs") user_module_loader = UserModuleLoader(self.model_script, self.script_dir) dummy_inputs = user_module_loader.call_object(self.dummy_inputs_func, self) elif self.io_config and self.io_config["input_shapes"]: logger.debug("Using io_config.input_shapes to get dummy inputs") dummy_inputs, _ = ( # input_types is optional data_config_template.dummy_data_config_template( input_shapes=self.io_config["input_shapes"], input_types=self.io_config.get("input_types"), ) .to_data_container() .get_first_batch(data_root_path=None) ) elif self.hf_config and self.hf_config.model_name and self.hf_config.task: if self.hf_config.dataset: logger.debug("Using hf_config.dataset to get dummy inputs") dummy_inputs, _ = ( data_config_template.huggingface_data_config_template( self.hf_config.model_name, self.hf_config.task, **self.hf_config.dataset, ) .to_data_container() .get_first_batch(data_root_path=None) ) elif not self.hf_config.components: logger.debug("Using hf onnx_config to get dummy inputs") dummy_inputs = get_hf_model_dummy_input( self.hf_config.model_name, self.hf_config.task, self.hf_config.feature ) if dummy_inputs is None: raise ValueError( "Unable to get dummy inputs. Please provide dummy_inputs_func, io_config.input_shapes," " hf_config.dataset, or hf_config." ) return dummy_inputs def get_model_config(self): if self.hf_config is None: raise ValueError("HF model_config is not available") return get_hf_model_config(self.hf_config.model_name) @property def components(self) -> List[str]: """ Names of the components of the model. """ if not self.hf_config or not self.hf_config.components: return None return [component.name for component in self.hf_config.components] def get_component(self, component_name: str) -> "PyTorchModel": """ Get a component of the model as a PyTorchModel. """ assert self.components, "hf_config.components must be provided to get component" assert component_name in self.components, f"component {component_name} not found in hf_config" # get the component from hf_config components_dict = {component.name: component for component in self.hf_config.components} hf_component = components_dict[component_name] user_module_loader = UserModuleLoader(self.model_script, self.script_dir) model_component = user_module_loader.call_object(hf_component.component_func, self.hf_config.model_name) io_config = hf_component.io_config if isinstance(io_config, str): user_module_loader = UserModuleLoader(self.model_script, self.script_dir) io_config = user_module_loader.call_object(hf_component.io_config, self.hf_config.model_name) io_config = validate_config(io_config, IOConfig) def model_loader(_): return model_component return PyTorchModel( model_loader=model_loader, io_config=io_config, dummy_inputs_func=hf_component.dummy_inputs_func, model_script=self.model_script, script_dir=self.script_dir, ) def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update( { "model_file_format": self.model_file_format, "model_loader": self.model_loader, "model_script": Path(self.model_script) if self.model_script else None, "script_dir": Path(self.script_dir) if self.script_dir else None, "io_config": self.io_config, "dummy_inputs_func": self.dummy_inputs_func, "hf_config": self.hf_config, } ) return serialize_to_json(config, check_object)
class OptimumModel(PyTorchModel): def __init__(self, model_components: List[str], **kwargs): super().__init__( model_file_format=ModelFileFormat.OPTIMUM, **(kwargs or {}), ) self.model_components = model_components def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update({"model_components": self.model_components}) return serialize_to_json(config, check_object)
[docs]class SNPEModel(OliveModel): def __init__( self, input_names: List[str], input_shapes: List[List[int]], output_names: List[str], output_shapes: List[List[int]], model_path: OLIVE_RESOURCE_ANNOTATIONS = None, ): super().__init__(framework=Framework.SNPE, model_file_format=ModelFileFormat.SNPE_DLC, model_path=model_path) self.io_config = { "input_names": input_names, "input_shapes": input_shapes, "output_names": output_names, "output_shapes": output_shapes, } def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ) -> SNPEInferenceSession: inference_settings = inference_settings or {} session_options = SNPESessionOptions(**inference_settings) if device == Device.NPU: device = SNPEDevice.DSP session_options.device = device return SNPEInferenceSession(self.model_path, self.io_config, session_options) def to_json(self, check_object: bool = False): config = super().to_json(check_object) config["config"].update(self.io_config) return serialize_to_json(config, check_object) def get_dlc_metrics(self) -> dict: return get_dlc_metrics(self.model_path)
class TensorFlowModel(OliveModel): def __init__( self, model_path: OLIVE_RESOURCE_ANNOTATIONS = None, model_file_format: ModelFileFormat = ModelFileFormat.TENSORFLOW_SAVED_MODEL, ): super().__init__(model_path=model_path, framework=Framework.TENSORFLOW, model_file_format=model_file_format) def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): raise NotImplementedError()
[docs]class OpenVINOModel(OliveModel): def __init__(self, model_path: OLIVE_RESOURCE_ANNOTATIONS): super().__init__( model_path=model_path, framework=Framework.OPENVINO, model_file_format=ModelFileFormat.OPENVINO_IR ) # check if the model files (xml, bin) are in the same directory if self.model_resource_path.is_local_resource(): _ = self.model_config @property def model_config(self) -> Dict[str, str]: """Get the model configuration for OpenVINO model.""" model_path = self.model_path assert Path(model_path).is_dir(), f"OpenVINO model path {model_path} is not a directory" if len(list(Path(model_path).glob("*.xml"))) == 0 or len(list(Path(model_path).glob("*.bin"))) == 0: raise FileNotFoundError(f"No OpenVINO model found in {model_path}") if len(list(Path(model_path).glob("*.xml"))) > 1 or len(list(Path(model_path).glob("*.bin"))) > 1: raise FileExistsError(f"More than 1 OpenVINO models are found in {model_path}") for model_file in Path(model_path).glob("*.xml"): ov_model = Path(model_file) for weights_file in Path(model_path).glob("*.bin"): ov_weights = Path(weights_file) return { "model_name": ov_model.stem, "model": str(ov_model.resolve()), "weights": str(ov_weights.resolve()), } def load_model(self, rank: int = None): try: from openvino.tools.pot import load_model except ImportError: raise ImportError("Please install olive-ai[openvino] to use OpenVINO model") return load_model(self.model_config) def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): try: from openvino.runtime import Core except ImportError: raise ImportError("Please install olive-ai[openvino] to use OpenVINO model") ie = Core() model_pot = ie.read_model(model=self.model_config["model"]) if device == Device.INTEL_MYRIAD: device = "MYRIAD" compiled_model = ie.compile_model(model=model_pot, device_name=device.upper()) return compiled_model
[docs]class DistributedOnnxModel(ONNXModelBase): EXECUTION_PROVIDERS = { "cpu": ["CPUExecutionProvider"], "gpu": ["CUDAExecutionProvider", "CPUExecutionProvider"], } def __init__( self, model_filepaths: List[Union[Path, str]] = [], inference_settings: Optional[dict] = None, use_ort_extensions: bool = False, ): super().__init__( model_path=None, inference_settings=inference_settings, use_ort_extensions=use_ort_extensions, ) self.model_filepaths = model_filepaths @property def ranks(self): return len(self.model_filepaths) def ranked_model_path(self, rank: int) -> Union[Path, str]: return self.model_filepaths[rank] def load_model(self, rank: int) -> ONNXModel: return ONNXModel(self.model_filepaths[rank], inference_settings=self.inference_settings) def prepare_session( self, inference_settings: Optional[Dict[str, Any]] = None, device: Device = Device.GPU, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = 0, ): raise RuntimeError("DistributedOnnxModel doesn't have a session of its own") def get_default_execution_providers(self, filepath: str, device: Device): # return firstly available ep as ort default ep available_providers = DistributedOnnxModel.get_execution_providers(device) for ep in available_providers: if self._is_valid_ep(filepath, ep): return [ep] return ["CUDAExecutionProvider", "CPUExecutionProvider"] @staticmethod def get_execution_providers(device: Device): import onnxruntime as ort eps_per_device = DistributedOnnxModel.EXECUTION_PROVIDERS.get(device) available_providers = ort.get_available_providers() return AcceleratorLookup.get_execution_providers(eps_per_device, available_providers) def to_json(self, check_object: bool = False): config = { "type": self.__class__.__name__, "config": { "model_filepaths": self.model_filepaths, "inference_settings": self.inference_settings, "use_ort_extensions": self.use_ort_extensions, }, } return serialize_to_json(config, check_object=check_object)
[docs]class CompositeOnnxModel(ONNXModelBase): """ CompositeOnnxModel represents multi component models. Whisper is an example composite model that has encoder and decoder components. CompositeOnnxModel is a collection of OnnxModels. """ def __init__( self, model_components: List[Union[ONNXModel, Dict[str, Any]]], model_component_names: List[str], hf_config: Union[Dict[str, Any], HFConfig] = None, ): super().__init__(model_path=None) if isinstance(model_components[0], dict): assert all( [m.get("type").lower() == "onnxmodel" for m in model_components] ), "All components must be ONNXModel" self.model_components = [ONNXModel(**m.get("config", {})) for m in model_components] else: assert all([isinstance(m, ONNXModel) for m in model_components]), "All components must be ONNXModel" self.model_components = model_components assert len(self.model_components) == len(model_component_names), "Number of components and names must match" self.model_component_names = model_component_names for m in self.model_components: m.set_composite_parent(self) self.hf_config = validate_config(hf_config, HFConfig) if hf_config else None def load_model(self, rank: int = None): raise NotImplementedError() def prepare_session( self, inference_settings: Dict[str, Any], device: Device, execution_providers: Union[str, List[str]] = None, rank: Optional[int] = None, ): raise NotImplementedError() def get_default_execution_providers(self, device: Device): raise NotImplementedError() def get_model_components(self): return self.model_components def get_model_component(self, idx): return self.model_components[idx] def get_model_component_names(self): return self.model_component_names def get_model_component_name(self, idx): return self.model_component_names[idx] def get_model_config(self): if self.hf_config is None: raise ValueError("HF model_config is not available") return get_hf_model_config(self.hf_config.model_name) def to_json(self, check_object: bool = False): json_dict = { "type": self.__class__.__name__, "config": {"hf_config": self.hf_config, "model_component_names": self.model_component_names}, } json_dict["config"]["model_components"] = [] for m in self.model_components: json_dict["config"]["model_components"].append(m.to_json(check_object)) return serialize_to_json(json_dict, check_object)