Source code for olive.evaluator.olive_evaluator

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import collections
import logging
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from numbers import Number
from typing import Any, ClassVar, Dict, List, NamedTuple, Tuple, Type, Union

import numpy as np
import torch
from pydantic import validator
from torch.utils.data import Dataset

import olive.data.template as data_config_template
from olive.cache import get_local_path_from_root
from olive.common.config_utils import ConfigBase
from olive.common.user_module_loader import UserModuleLoader
from olive.common.utils import tensor_data_to_device
from olive.constants import Framework
from olive.evaluator.metric import (
    LatencySubType,
    Metric,
    MetricResult,
    MetricType,
    SubMetricResult,
    flatten_metric_result,
    get_latency_config_from_metric,
    joint_metric_key,
)
from olive.evaluator.metric_backend import MetricBackend
from olive.exception import OliveEvaluationError
from olive.hardware import Device
from olive.model import DistributedOnnxModel, OliveModel, ONNXModel, OpenVINOModel, PyTorchModel, SNPEModel
from olive.model.model_config import is_io_config_static
from olive.snpe.data_loader import SNPECommonDataLoader, SNPEDataLoader

logger = logging.getLogger(__name__)


class OliveModelOutput(NamedTuple):
    preds: Any
    logits: Any


[docs]class OliveEvaluator(ABC): registry: ClassVar[Dict[str, Type["OliveEvaluator"]]] = {} @classmethod def __init_subclass__(cls, framework: Framework, **kwargs) -> None: super().__init_subclass__(**kwargs) cls.framework = framework cls.registry[str(framework).lower()] = cls @abstractmethod def __init__(self): pass def get_inference_settings(self, metric: Metric) -> Dict[str, Any]: # user.config.inference_settings > model.inference_settings > default inference_settings # when user.config.inference_settings is None, the model.inference_settings # will be used in model.prepare_session(..) return ( metric.user_config.inference_settings.get(self.framework.lower()) if metric.user_config.inference_settings else None ) @abstractmethod def _inference( self, model: OliveModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: raise NotImplementedError @abstractmethod def _evaluate_accuracy( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: raise NotImplementedError @abstractmethod def _evaluate_latency( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: raise NotImplementedError def _evaluate_custom( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, eval_func, post_func=None, device: Device = Device.CPU, execution_providers=None, ) -> MetricResult: raw_res = None if metric.user_config.evaluate_func: raw_res = eval_func( model, get_local_path_from_root(data_root, metric.user_config.data_dir), metric.user_config.batch_size, device, execution_providers, ) else: inference_output, targets = self._inference( model, metric, dataloader, post_func, device, execution_providers ) raw_res = eval_func(inference_output, targets) metric_res = {} for sub_type in metric.sub_types: if isinstance(raw_res, Number): assert len(metric.sub_types) == 1, "Only one sub type is allowed for single value custom metric" metric_res[sub_type.name] = SubMetricResult( value=raw_res, priority=sub_type.priority, higher_is_better=sub_type.higher_is_better ) elif isinstance(raw_res, dict): assert sub_type.name in raw_res, f"Custom metric {sub_type.name} is not in the result" metric_res[sub_type.name] = SubMetricResult( value=raw_res[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res) def evaluate( self, model: OliveModel, data_root: str, metrics: List[Metric], device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: metrics_res = {} for original_metric in metrics: # use model io_config if user does not specify input_names and input_shapes # only do this if data_config or dataloader is not provided # priority: dataloader_func > data_config > user_config.input_names/input_shapes > model io_config metric = OliveEvaluator.generate_metric_user_config_with_model_io(original_metric, model) dataloader, eval_func, post_func = OliveEvaluator.get_user_config(model.framework, data_root, metric) if metric.type == MetricType.ACCURACY: metrics_res[metric.name] = self._evaluate_accuracy( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.LATENCY: metrics_res[metric.name] = self._evaluate_latency( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.CUSTOM: metrics_res[metric.name] = self._evaluate_custom( model, data_root, metric, dataloader, eval_func, post_func, device, execution_providers ) else: raise TypeError(f"{metric.type} is not a supported metric type") return flatten_metric_result(metrics_res) @staticmethod def generate_metric_user_config_with_model_io(metric: Metric, model: OliveModel): # if the io_config is not specified in the metrics, use the one in the model # should not change the original metric object which is created from config jsons # otherwise, if affects hashing + caching of the olive restoring. metric = deepcopy(metric) if metric.data_config: return metric io_config = model.get_io_config() if not io_config: return metric if not is_io_config_static(io_config): logger.debug( "Model input shapes are not static. Cannot use inferred input shapes for creating dummy data. This will" " cause an error when creating dummy data for tuning." ) if io_config and not metric.user_config.input_names and not metric.user_config.input_shapes: metric.user_config.input_names = io_config["input_names"] metric.user_config.input_shapes = io_config["input_shapes"] # input_types is optional which can be None. If None, it will be replaced with float32 in DummyDataset metric.user_config.input_types = io_config.get("input_types") return metric @staticmethod def get_user_config(framework: Framework, data_root: str, metric: Metric): assert metric.user_config, "user_config is not specified in the metric config" user_module = UserModuleLoader(metric.user_config.user_script, metric.user_config.script_dir) post_processing_func = getattr(metric.user_config, "post_processing_func", None) post_func = user_module.load_object(post_processing_func) dataloader_func = getattr(metric.user_config, "dataloader_func", None) if dataloader_func: data_dir = get_local_path_from_root(data_root, metric.user_config.data_dir) dataloader = user_module.call_object( dataloader_func, data_dir, metric.user_config.batch_size, model_framework=framework, ) else: dataloader = None eval_func = None if metric.type == MetricType.CUSTOM: evaluate_func = getattr(metric.user_config, "evaluate_func", None) if not evaluate_func: evaluate_func = getattr(metric.user_config, "metric_func", None) if not evaluate_func: raise ValueError("evaluate_func or metric_func is not specified in the metric config") eval_func = user_module.load_object(evaluate_func) if (not dataloader or not post_func) and metric.data_config: dc = metric.data_config.to_data_container() # TODO(trajep): remove user_scripts dataloader: we should respect user scripts # dataloder to meet back compatibility for time being. dataloader = dataloader or dc.create_dataloader(data_root) post_func = post_func or dc.config.post_process if metric.user_config.input_names and metric.user_config.input_shapes and not dataloader and not eval_func: dataloader = ( data_config_template.dummy_data_config_template( input_names=metric.user_config.input_names, input_shapes=metric.user_config.input_shapes, input_types=metric.user_config.input_types, ) .to_data_container() .create_dataloader(data_root) ) return dataloader, eval_func, post_func @staticmethod def compute_accuracy(metric: Metric, model_outputs: Union[Tuple, NamedTuple], targets: Any) -> MetricResult: """Compute accuracy metrics.""" evaluate_backend_cls = MetricBackend.registry[metric.backend] return evaluate_backend_cls().measure(model_outputs, targets, metric) @staticmethod def compute_latency(metric: Metric, latencies: Any) -> MetricResult: """Compute latency metrics.""" latency_metrics = { LatencySubType.AVG: round(sum(latencies) / len(latencies) * 1000, 5), LatencySubType.MAX: round(max(latencies) * 1000, 5), LatencySubType.MIN: round(min(latencies) * 1000, 5), LatencySubType.P50: round(np.percentile(latencies, 50) * 1000, 5), LatencySubType.P75: round(np.percentile(latencies, 75) * 1000, 5), LatencySubType.P90: round(np.percentile(latencies, 90) * 1000, 5), LatencySubType.P95: round(np.percentile(latencies, 95) * 1000, 5), LatencySubType.P99: round(np.percentile(latencies, 99) * 1000, 5), LatencySubType.P999: round(np.percentile(latencies, 99.9) * 1000, 5), } metric_res = {} for sub_type in metric.sub_types: metric_res[sub_type.name] = SubMetricResult( value=latency_metrics[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res)
class OnnxEvaluator(OliveEvaluator, framework=Framework.ONNX): def __init__(self): super().__init__() @staticmethod def format_input(input_data, io_config): """Format input data to ONNX input format.""" input_names = io_config["input_names"] name_to_type = dict(zip(io_config["input_names"], io_config["input_types"])) if isinstance(input_data, list): input_data = dict(zip(input_names, input_data)) elif not isinstance(input_data, dict): input_data = dict(zip(input_names, [input_data])) return { k: np.ascontiguousarray( input_data[k].cpu().numpy() if isinstance(input_data[k], torch.Tensor) else input_data[k], dtype=name_to_type[k], ) for k in input_data if k in input_names } def _inference( self, model: ONNXModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session( inference_settings=self.get_inference_settings(metric), device=device, execution_providers=execution_providers, ) OnnxEvaluator.disable_ort_fallback(session, execution_providers) io_config = model.get_io_config() preds = [] targets = [] logits = [] logits_dict = collections.defaultdict(list) output_names = io_config["output_names"] is_single_tensor_output = len(output_names) == 1 for input_data, labels in dataloader: input_dict = OnnxEvaluator.format_input(input_data, io_config) res = session.run(input_feed=input_dict, output_names=None) if is_single_tensor_output: result = torch.Tensor(res[0]) else: # convert to dict of torch tensor result = {name: torch.Tensor(res[i]) for i, name in enumerate(output_names)} outputs = post_func(result) if post_func else result # keep as numpy or torch arrays preds.append(outputs.cpu()) targets.append(labels.cpu()) if is_single_tensor_output: logits.append(result.cpu()) else: for k in output_names: logits_dict[k].append(result[k].cpu()) preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) if is_single_tensor_output: logits = torch.cat(logits, dim=0) else: logits = {k: torch.cat(logits[k], dim=0) for k in output_names} return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_onnx_accuracy( self, model: ONNXModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_onnx_latency( self, model: OliveModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session( inference_settings=self.get_inference_settings(metric), device=device, execution_providers=execution_providers, ) OnnxEvaluator.disable_ort_fallback(session, execution_providers) io_config = model.get_io_config() input_data, _ = next(iter(dataloader)) input_dict = OnnxEvaluator.format_input(input_data, io_config) if metric.user_config.io_bind: io_bind_op = session.io_binding() io_bind_device = "cuda" if device == "gpu" else "cpu" for k, v in input_dict.items(): io_bind_op.bind_cpu_input(k, v) for item in session.get_outputs(): io_bind_op.bind_output(item.name, io_bind_device) for _ in range(warmup_num): if metric.user_config.io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_dict, output_names=None) latencies = [] for _ in range(repeat_test_num): if metric.user_config.io_bind: t = time.perf_counter() session.run_with_iobinding(io_bind_op) latencies.append(time.perf_counter() - t) else: t = time.perf_counter() session.run(input_feed=input_dict, output_names=None) latencies.append(time.perf_counter() - t) time.sleep(sleep_num) return OliveEvaluator.compute_latency(metric, latencies) @staticmethod def _evaluate_distributed_accuracy_worker(config) -> Tuple[List[Any], List[Any]]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModel(model_path, inference_settings=inference_settings) dataloader, _, post_func = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.get_io_config() preds = [] targets = [] logits = [] output_names = io_config["output_names"] for _, (input_data, labels) in enumerate(dataloader): input_dict = OnnxEvaluator.format_input(input_data, io_config) MPI.COMM_WORLD.barrier() # Synchronize before starting each run output = session.run(input_feed=input_dict, output_names=None) output = torch.Tensor(output[0]) if len(output_names) == 1 else torch.Tensor(output) post_output = post_func(output) if post_func else output preds.extend(post_output.tolist()) targets.extend(labels.data.tolist()) logits.extend(output.tolist()) model_output = OliveModelOutput(preds=preds, logits=logits) return model_output, targets def _evaluate_distributed_accuracy( self, model: DistributedOnnxModel, data_root: str, metric: Metric, device: Device, execution_providers: Union[str, List[str]], ) -> MetricResult: from copy import deepcopy from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.ranks, "inference_settings": self.get_inference_settings(metric), "metric": metric.to_json(), } args = [] for rank in range(model.ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_accuracy_worker, args) executor.shutdown() preds = [x for p, _, _ in results for x in p] targets = [x for _, t, _ in results for x in t] logits = [x for _, _, logit in results for x in logit] model_output = OliveModelOutput(preds, logits) return OliveEvaluator.compute_accuracy(metric, model_output, targets) @staticmethod def _evaluate_distributed_latency_worker(data_root, config) -> List[float]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModel(model_path, inference_settings=inference_settings) dataloader, _, _ = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.get_io_config() input_feed, _ = next(iter(dataloader)) input_feed = OnnxEvaluator.format_input(input_feed, io_config) if metric.user_config.io_bind: io_bind_op = session.io_binding() for k, v in input_feed.items(): io_bind_op.bind_cpu_input(k, v) for item in session.get_outputs(): io_bind_op.bind_output(item.name, "cuda") latencies = [] for i in range(warmup_num + repeat_test_num): MPI.COMM_WORLD.barrier() # Synchronize before starting each run start_time = time.perf_counter() if metric.user_config.io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_feed, output_names=None) if i > warmup_num: latencies.append(time.perf_counter() - start_time) time.sleep(sleep_num) return latencies def _evaluate_distributed_latency( self, model: DistributedOnnxModel, data_root: str, metric: Metric, device, execution_providers: Union[str, List[str]], ) -> MetricResult: from copy import deepcopy from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.ranks, "inference_settings": self.get_inference_settings(metric), "metric": metric.to_json(), } args = [] for rank in range(model.ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_latency_worker, args) executor.shutdown() latencies = [x for r in results for x in r] return OliveEvaluator.compute_latency(metric, latencies) def _evaluate_accuracy( self, model: ONNXModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: if isinstance(model, ONNXModel): return self._evaluate_onnx_accuracy(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModel): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_accuracy(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate accuracy for model of type: {type(model)}") def _evaluate_latency( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: if isinstance(model, ONNXModel): return self._evaluate_onnx_latency(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModel): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_latency(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate latency for model of type: {type(model)}") @staticmethod def disable_ort_fallback(session, execution_providers): if execution_providers: assert isinstance(execution_providers, (str, list)) execution_providers = [execution_providers] if isinstance(execution_providers, str) else execution_providers session_providers = session.get_providers() for ep in execution_providers: if ep not in session_providers: raise OliveEvaluationError( f"The onnxruntime fallback happens. {ep} is not in the session providers {session_providers}." f" session._enable_fallback = {session._enable_fallback}" ) session.disable_fallback() class PyTorchEvaluator(OliveEvaluator, framework=Framework.PYTORCH): def __init__(self): super().__init__() @staticmethod def _device_string_to_torch_device(device: Device): return torch.device("cuda") if device == Device.GPU else torch.device(device) @torch.no_grad() def _inference( self, model: PyTorchModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] device = PyTorchEvaluator._device_string_to_torch_device(device) if device: session.to(device) for input_data_i, labels in dataloader: input_data = tensor_data_to_device(input_data_i, device) result = session(**input_data) if isinstance(input_data, dict) else session(input_data) outputs = post_func(result) if post_func else result # keep the outputs and results as torch tensor on cpu # it is expensive to convert to list and then convert back to torch tensor preds.append(outputs.cpu()) targets.append(labels.cpu()) logits.append( result.logits.cpu() if not isinstance(result, torch.Tensor) and getattr(result, "logits", None) is not None else result.cpu() ) # concatenate along the batch dimension preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) logits = torch.cat(logits, dim=0) # move model to cpu # don't want model to be kept on gpu since model persists and takes up gpu memory if device: session.to("cpu") return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: PyTorchModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) @torch.no_grad() def _evaluate_latency( self, model: PyTorchModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: warmup_num, repeat_test_num, _ = get_latency_config_from_metric(metric) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) input_data, _ = next(iter(dataloader)) device = PyTorchEvaluator._device_string_to_torch_device(device) is_cuda = device == Device.GPU if device: session.to(device) input_data = tensor_data_to_device(input_data, device) input_is_dict = isinstance(input_data, dict) # warm up for _ in range(warmup_num): session(**input_data) if input_is_dict else session(input_data) latencies = [] if not is_cuda: for _ in range(repeat_test_num): t = time.perf_counter() # TODO(jambayk): do we care about the efficiency of if/else here? # probably won't add much overhead compared to the inference time # also we are doing the same for all models session(**input_data) if input_is_dict else session(input_data) latencies.append(time.perf_counter() - t) else: # synchronize before starting the test torch.cuda.synchronize() # cuda events for measuring latency starter = torch.cuda.Event(enable_timing=True) ender = torch.cuda.Event(enable_timing=True) for _ in range(repeat_test_num): starter.record() session(**input_data) if input_is_dict else session(input_data) ender.record() # synchronize after forward pass torch.cuda.synchronize() # add time in seconds, originally in milliseconds latencies.append(starter.elapsed_time(ender) * 1e-3) # move model to cpu if device: session.to("cpu") return OliveEvaluator.compute_latency(metric, latencies) class SNPEEvaluator(OliveEvaluator, framework=Framework.SNPE): def __init__(self): super().__init__() def _inference( self, model: SNPEModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: dataloader = self._prepare_dataloader(dataloader, model) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] for data_dir, input_list, labels in dataloader: result = session(input_list, data_dir) if post_func: outputs = post_func(result) else: raise ValueError("Post processing function is required for SNPE model") preds.extend(outputs.tolist()) targets.extend(labels.tolist()) # TODO(trajep): verify if we need to return logits logits.extend(result.tolist()) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: SNPEModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_latency( self, model: SNPEModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: dataloader = self._prepare_dataloader(dataloader, model) warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) data_dir, input_data, _ = next(iter(dataloader)) total_runs = warmup_num + repeat_test_num results = session(input_data, data_dir, runs=total_runs, sleep=sleep_num) latencies = results["latencies"]["total_inference_time"][warmup_num:] return OliveEvaluator.compute_latency(metric, latencies) def _prepare_dataloader(self, dataloader: Dataset, model: SNPEModel) -> SNPEDataLoader: if isinstance(dataloader, SNPEDataLoader): return dataloader return SNPECommonDataLoader(dataloader, model.io_config) class OpenVINOEvaluator(OliveEvaluator, framework=Framework.OPENVINO): def __init__(self): super().__init__() def _inference( self, model: OpenVINOModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] for input_data, labels in dataloader: result = session.infer_new_request({0: input_data}) outputs = post_func(result) if post_func else result if not isinstance(labels, list): labels = [labels] # noqa: PLW2901 preds.extend(outputs) targets.extend(labels) logits.extend(result) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: OpenVINOModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_latency( self, model: OpenVINOModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) latencies = [] for input_data, _ in dataloader: t = time.perf_counter() session(input_data) latencies.append(time.perf_counter() - t) return OliveEvaluator.compute_latency(metric, latencies) class OliveEvaluatorFactory: @staticmethod def create_evaluator_for_model(model: OliveModel) -> OliveEvaluator: evaluator_cls = OliveEvaluator.registry[str(model.framework).lower()] return evaluator_cls()
[docs]class OliveEvaluatorConfig(ConfigBase): metrics: List[Metric] = [] # noqa: RUF012 @validator("metrics") def validate_metrics(cls, v): metric_len = len(v) metric_names = {metric.name for metric in v} assert len(metric_names) == metric_len, "Metric names must be unique" sub_type_names = set() sub_type_with_rank = set() rank_set = set() for metric in v: for sub_type in metric.sub_types: sub_type_names.add(joint_metric_key(metric.name, sub_type.name)) if sub_type.priority != -1: sub_type_with_rank.add(sub_type.name) rank_set.add(sub_type.priority) if not rank_set and len(sub_type_names) == 1: logger.debug( "No priority is specified, but only one sub type " " metric is specified. Use rank 1 for single for this metric." ) v[0].sub_types[0].priority = 1 elif not rank_set and len(sub_type_names) > 1: raise ValueError("Priority must be specified for multiple sub type metrics") expected_rank_set = set(range(1, len(sub_type_with_rank) + 1)) # Check if all ranks are present if rank_set != expected_rank_set: raise ValueError(f"Priorities must be unique and in the range 1 to {metric_len}") return v