Source code for olive.evaluator.olive_evaluator

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import collections
import logging
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from functools import partial
from numbers import Number
from typing import Any, ClassVar, Dict, List, NamedTuple, Tuple, Type, Union

import numpy as np
import torch
from pydantic import validator
from torch.utils.data import Dataset

import olive.data.template as data_config_template
from olive.cache import get_local_path_from_root
from olive.common.config_utils import ConfigBase
from olive.common.user_module_loader import UserModuleLoader
from olive.common.utils import tensor_data_to_device
from olive.constants import Framework
from olive.evaluator.metric import (
    LatencySubType,
    Metric,
    MetricResult,
    MetricType,
    SubMetricResult,
    flatten_metric_result,
    get_latency_config_from_metric,
    joint_metric_key,
)
from olive.evaluator.metric_backend import MetricBackend
from olive.exception import OliveEvaluationError
from olive.hardware import Device
from olive.model import DistributedOnnxModel, OliveModel, ONNXModel, OpenVINOModel, PyTorchModel, SNPEModel
from olive.model.model_config import is_io_config_static
from olive.snpe.data_loader import SNPECommonDataLoader, SNPEDataLoader

logger = logging.getLogger(__name__)

# pylint: disable=useless-parent-delegation


class OliveModelOutput(NamedTuple):
    preds: Any
    logits: Any


[docs]class OliveEvaluator(ABC): registry: ClassVar[Dict[str, Type["OliveEvaluator"]]] = {} @classmethod def __init_subclass__(cls, framework: Framework, **kwargs) -> None: super().__init_subclass__(**kwargs) cls.framework = framework cls.registry[str(framework).lower()] = cls @abstractmethod def __init__(self): pass def get_inference_settings(self, metric: Metric) -> Dict[str, Any]: # user.config.inference_settings > model.inference_settings > default inference_settings # when user.config.inference_settings is None, the model.inference_settings # will be used in model.prepare_session(..) return ( metric.user_config.inference_settings.get(self.framework.lower()) if metric.user_config.inference_settings else None ) @abstractmethod def _inference( self, model: OliveModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: raise NotImplementedError @abstractmethod def _evaluate_accuracy( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: raise NotImplementedError @abstractmethod def _evaluate_latency( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: raise NotImplementedError def _evaluate_custom( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, eval_func, post_func=None, device: Device = Device.CPU, execution_providers=None, ) -> MetricResult: raw_res = None if metric.user_config.evaluate_func: raw_res = eval_func( model, get_local_path_from_root(data_root, metric.user_config.data_dir), metric.user_config.batch_size, device, execution_providers, ) else: inference_output, targets = self._inference( model, metric, dataloader, post_func, device, execution_providers ) raw_res = eval_func(inference_output, targets) metric_res = {} for sub_type in metric.sub_types: if isinstance(raw_res, Number): assert len(metric.sub_types) == 1, "Only one sub type is allowed for single value custom metric" metric_res[sub_type.name] = SubMetricResult( value=raw_res, priority=sub_type.priority, higher_is_better=sub_type.higher_is_better ) elif isinstance(raw_res, dict): assert sub_type.name in raw_res, f"Custom metric {sub_type.name} is not in the result" metric_res[sub_type.name] = SubMetricResult( value=raw_res[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res) def evaluate( self, model: OliveModel, data_root: str, metrics: List[Metric], device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: metrics_res = {} for original_metric in metrics: # use model io_config if user does not specify input_names and input_shapes # only do this if data_config or dataloader is not provided # priority: dataloader_func > data_config > user_config.input_names/input_shapes > model io_config metric = OliveEvaluator.generate_metric_user_config_with_model_io(original_metric, model) dataloader, eval_func, post_func = OliveEvaluator.get_user_config(model.framework, data_root, metric) if metric.type == MetricType.ACCURACY: metrics_res[metric.name] = self._evaluate_accuracy( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.LATENCY: metrics_res[metric.name] = self._evaluate_latency( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.CUSTOM: metrics_res[metric.name] = self._evaluate_custom( model, data_root, metric, dataloader, eval_func, post_func, device, execution_providers ) else: raise TypeError(f"{metric.type} is not a supported metric type") return flatten_metric_result(metrics_res) @staticmethod def generate_metric_user_config_with_model_io(metric: Metric, model: OliveModel): # if the io_config is not specified in the metrics, use the one in the model # should not change the original metric object which is created from config jsons # otherwise, if affects hashing + caching of the olive restoring. metric = deepcopy(metric) if metric.data_config: return metric io_config = model.get_io_config() if not io_config: return metric if not is_io_config_static(io_config): logger.debug( "Model input shapes are not static. Cannot use inferred input shapes for creating dummy data. This will" " cause an error when creating dummy data for tuning." ) if io_config and not metric.user_config.input_names and not metric.user_config.input_shapes: metric.user_config.input_names = io_config["input_names"] metric.user_config.input_shapes = io_config["input_shapes"] # input_types is optional which can be None. If None, it will be replaced with float32 in DummyDataset metric.user_config.input_types = io_config.get("input_types") return metric @staticmethod def _get_func_kwargs(metric: Metric, func_name: str): """Get the function kwargs from the metric config.""" if metric.user_config.func_kwargs: return metric.user_config.func_kwargs.get(func_name, {}) return {} @classmethod def get_user_config(cls, framework: Framework, data_root: str, metric: Metric): assert metric.user_config, "user_config is not specified in the metric config" user_module = UserModuleLoader(metric.user_config.user_script, metric.user_config.script_dir) # load the post processing function post_processing_func = getattr(metric.user_config, "post_processing_func", None) post_func = user_module.load_object(post_processing_func) post_func_kwargs = cls._get_func_kwargs(metric, "post_processing_func") if post_func_kwargs: # apply the kwargs to the post processing function post_func = partial(post_func, **post_func_kwargs) # load the dataloader function and create the dataloader dataloader_func = getattr(metric.user_config, "dataloader_func", None) if dataloader_func: data_dir = get_local_path_from_root(data_root, metric.user_config.data_dir) dataloader = user_module.call_object( dataloader_func, data_dir, metric.user_config.batch_size, model_framework=framework, **cls._get_func_kwargs(metric, "dataloader_func"), ) else: dataloader = None # load the evaluate function # priority: evaluate_func > metric_func eval_func = None if metric.type == MetricType.CUSTOM: evaluate_func = getattr(metric.user_config, "evaluate_func", None) kwargs = cls._get_func_kwargs(metric, "evaluate_func") if not evaluate_func: evaluate_func = getattr(metric.user_config, "metric_func", None) kwargs = cls._get_func_kwargs(metric, "metric_func") if not evaluate_func: raise ValueError("evaluate_func or metric_func is not specified in the metric config") eval_func = user_module.load_object(evaluate_func) if kwargs: eval_func = partial(eval_func, **kwargs) # get dataloader and/or post processing function from data_config if not specified in the metric config if (not dataloader or not post_func) and metric.data_config: dc = metric.data_config.to_data_container() # TODO(trajep): remove user_scripts dataloader: we should respect user scripts # dataloder to meet back compatibility for time being. dataloader = dataloader or dc.create_dataloader(data_root) post_func = post_func or dc.config.post_process # get dataloader and/or post processing function from model io_config if not specified in the metric config # or data config if metric.user_config.input_names and metric.user_config.input_shapes and not dataloader and not eval_func: dataloader = ( data_config_template.dummy_data_config_template( input_names=metric.user_config.input_names, input_shapes=metric.user_config.input_shapes, input_types=metric.user_config.input_types, ) .to_data_container() .create_dataloader(data_root) ) return dataloader, eval_func, post_func @staticmethod def compute_accuracy(metric: Metric, model_outputs: Union[Tuple, NamedTuple], targets: Any) -> MetricResult: """Compute accuracy metrics.""" evaluate_backend_cls = MetricBackend.registry[metric.backend] return evaluate_backend_cls().measure(model_outputs, targets, metric) @staticmethod def compute_latency(metric: Metric, latencies: Any) -> MetricResult: """Compute latency metrics.""" latency_metrics = { LatencySubType.AVG: round(sum(latencies) / len(latencies) * 1000, 5), LatencySubType.MAX: round(max(latencies) * 1000, 5), LatencySubType.MIN: round(min(latencies) * 1000, 5), LatencySubType.P50: round(np.percentile(latencies, 50) * 1000, 5), LatencySubType.P75: round(np.percentile(latencies, 75) * 1000, 5), LatencySubType.P90: round(np.percentile(latencies, 90) * 1000, 5), LatencySubType.P95: round(np.percentile(latencies, 95) * 1000, 5), LatencySubType.P99: round(np.percentile(latencies, 99) * 1000, 5), LatencySubType.P999: round(np.percentile(latencies, 99.9) * 1000, 5), } metric_res = {} for sub_type in metric.sub_types: metric_res[sub_type.name] = SubMetricResult( value=latency_metrics[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res)
class OnnxEvaluator(OliveEvaluator, framework=Framework.ONNX): def __init__(self): super().__init__() @staticmethod def format_input(input_data, io_config): """Format input data to ONNX input format.""" input_names = io_config["input_names"] name_to_type = dict(zip(io_config["input_names"], io_config["input_types"])) if isinstance(input_data, list): input_data = dict(zip(input_names, input_data)) elif not isinstance(input_data, dict): input_data = dict(zip(input_names, [input_data])) return { k: np.ascontiguousarray( input_data[k].cpu().numpy() if isinstance(input_data[k], torch.Tensor) else input_data[k], dtype=name_to_type[k], ) for k in input_data if k in input_names } @staticmethod def prepare_io_bindings( session, input_data, device, device_id: int = 0, shared_kv_buffer: bool = False, kv_cache_ortvalues: dict = None ): """Convert input from numpy array to OrtValue. session: ONNXRuntime session input_data: dict of input data, value is numpy array device: olive device device_id: 0 by default. TODO(trajep): support user to specified device id shared_kv_buffer: whether to share the key/value buffer across multiple runs, it is False by default, and only used when we observe kv cache and fp16 is used. TODO(trajep): how shared_kv_buffer works with generation task """ from onnxruntime import OrtValue use_fp16 = any(v.dtype == np.float16 for v in input_data.values()) io_bind_op = session.io_binding() io_bind_device = "cuda" if device == "gpu" else "cpu" if shared_kv_buffer: kv_cache_ortvalues = kv_cache_ortvalues or {} for k, v in input_data.items(): # "cache": from microsoft llama model" https://github.com/microsoft/Llama-2-Onnx#before-you-start # "past_key_values": from huggingface llama2 https://huggingface.co/meta-llama/Llama-2-13b-hf if shared_kv_buffer and use_fp16 and ("cache" in k or "past_key_values" in k): if k not in kv_cache_ortvalues: kv_cache_ortvalues[k] = OrtValue.ortvalue_from_numpy(v, io_bind_device, device_id) else: kv_cache_ortvalues[k].update_inplace(v) ort_v = kv_cache_ortvalues[k] else: ort_v = OrtValue.ortvalue_from_numpy(v, io_bind_device, device_id) io_bind_op.bind_ortvalue_input(k, ort_v) for item in session.get_outputs(): name = item.name # "out": from microsoft llama model" https://github.com/microsoft/Llama-2-Onnx#before-you-start # "present": from huggingface llama2 https://huggingface.co/meta-llama/Llama-2-13b-hf if shared_kv_buffer and use_fp16 and ("out" in name or "present" in name): # Bind present KV cache outputs to past KV cache inputs in order to use shared buffer input_name = name.replace("out", "cache").replace("present", "past_key_values") io_bind_op.bind_ortvalue_output(name, kv_cache_ortvalues[input_name]) else: io_bind_op.bind_output(name, io_bind_device) return io_bind_op def _inference( self, model: ONNXModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session( inference_settings=self.get_inference_settings(metric), device=device, execution_providers=execution_providers, ) OnnxEvaluator.disable_ort_fallback(session, execution_providers) io_config = model.get_io_config() preds = [] targets = [] logits = [] logits_dict = collections.defaultdict(list) output_names = io_config["output_names"] is_single_tensor_output = len(output_names) == 1 for input_data, labels in dataloader: input_dict = OnnxEvaluator.format_input(input_data, io_config) res = session.run(input_feed=input_dict, output_names=None) if is_single_tensor_output: result = torch.Tensor(res[0]) else: # convert to dict of torch tensor result = {name: torch.Tensor(res[i]) for i, name in enumerate(output_names)} outputs = post_func(result) if post_func else result # keep as numpy or torch arrays preds.append(outputs.cpu()) targets.append(labels.cpu()) if is_single_tensor_output: logits.append(result.cpu()) else: for k in output_names: logits_dict[k].append(result[k].cpu()) preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) if is_single_tensor_output: logits = torch.cat(logits, dim=0) else: logits = {k: torch.cat(logits[k], dim=0) for k in output_names} return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_onnx_accuracy( self, model: ONNXModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_onnx_latency( self, model: OliveModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session( inference_settings=self.get_inference_settings(metric), device=device, execution_providers=execution_providers, ) OnnxEvaluator.disable_ort_fallback(session, execution_providers) io_config = model.get_io_config() input_data, _ = next(iter(dataloader)) input_dict = OnnxEvaluator.format_input(input_data, io_config) # no deepcopy for kv_cache_ortvalues, will update the value inplace and keep it shared across runs kv_cache_ortvalues = {} if metric.user_config.shared_kv_buffer else None if metric.user_config.io_bind: io_bind_op = OnnxEvaluator.prepare_io_bindings( session, input_dict, device, shared_kv_buffer=metric.user_config.shared_kv_buffer, kv_cache_ortvalues=kv_cache_ortvalues, ) for _ in range(warmup_num): if metric.user_config.io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_dict, output_names=None) latencies = [] for _ in range(repeat_test_num): if metric.user_config.io_bind: t = time.perf_counter() session.run_with_iobinding(io_bind_op) latencies.append(time.perf_counter() - t) else: t = time.perf_counter() session.run(input_feed=input_dict, output_names=None) latencies.append(time.perf_counter() - t) time.sleep(sleep_num) return OliveEvaluator.compute_latency(metric, latencies) @staticmethod def _evaluate_distributed_accuracy_worker(config) -> Tuple[List[Any], List[Any]]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModel(model_path, inference_settings=inference_settings) dataloader, _, post_func = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.get_io_config() preds = [] targets = [] logits = [] output_names = io_config["output_names"] for _, (input_data, labels) in enumerate(dataloader): input_dict = OnnxEvaluator.format_input(input_data, io_config) MPI.COMM_WORLD.barrier() # Synchronize before starting each run output = session.run(input_feed=input_dict, output_names=None) output = torch.Tensor(output[0]) if len(output_names) == 1 else torch.Tensor(output) post_output = post_func(output) if post_func else output preds.extend(post_output.tolist()) targets.extend(labels.data.tolist()) logits.extend(output.tolist()) model_output = OliveModelOutput(preds=preds, logits=logits) return model_output, targets def _evaluate_distributed_accuracy( self, model: DistributedOnnxModel, data_root: str, metric: Metric, device: Device, execution_providers: Union[str, List[str]], ) -> MetricResult: from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.num_ranks, "inference_settings": self.get_inference_settings(metric), "metric": metric.to_json(), } args = [] for rank in range(model.num_ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.num_ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_accuracy_worker, args) executor.shutdown() preds = [x for p, _, _ in results for x in p] targets = [x for _, t, _ in results for x in t] logits = [x for _, _, logit in results for x in logit] model_output = OliveModelOutput(preds, logits) return OliveEvaluator.compute_accuracy(metric, model_output, targets) @staticmethod def _evaluate_distributed_latency_worker(data_root, config) -> List[float]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModel(model_path, inference_settings=inference_settings) dataloader, _, _ = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.get_io_config() input_feed, _ = next(iter(dataloader)) input_feed = OnnxEvaluator.format_input(input_feed, io_config) kv_cache_ortvalues = {} if metric.user_config.shared_kv_buffer else None if metric.user_config.io_bind: io_bind_op = OnnxEvaluator.prepare_io_bindings( session, input_feed, Device.GPU, shared_kv_buffer=metric.user_config.shared_kv_buffer, kv_cache_ortvalues=kv_cache_ortvalues, ) latencies = [] for i in range(warmup_num + repeat_test_num): MPI.COMM_WORLD.barrier() # Synchronize before starting each run start_time = time.perf_counter() if metric.user_config.io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_feed, output_names=None) if i > warmup_num: latencies.append(time.perf_counter() - start_time) time.sleep(sleep_num) return latencies def _evaluate_distributed_latency( self, model: DistributedOnnxModel, data_root: str, metric: Metric, device, execution_providers: Union[str, List[str]], ) -> MetricResult: from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.num_ranks, "inference_settings": self.get_inference_settings(metric), "metric": metric.to_json(), } args = [] for rank in range(model.num_ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.num_ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_latency_worker, args) executor.shutdown() latencies = [x for r in results for x in r] return OliveEvaluator.compute_latency(metric, latencies) def _evaluate_accuracy( self, model: ONNXModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: if isinstance(model, ONNXModel): return self._evaluate_onnx_accuracy(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModel): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_accuracy(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate accuracy for model of type: {type(model)}") def _evaluate_latency( self, model: OliveModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: if isinstance(model, ONNXModel): return self._evaluate_onnx_latency(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModel): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_latency(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate latency for model of type: {type(model)}") @staticmethod def disable_ort_fallback(session, execution_providers): # pylint: disable=protected-access if execution_providers: assert isinstance(execution_providers, (str, list)) execution_providers = [execution_providers] if isinstance(execution_providers, str) else execution_providers session_providers = session.get_providers() for ep in execution_providers: if ep not in session_providers: raise OliveEvaluationError( f"The onnxruntime fallback happens. {ep} is not in the session providers {session_providers}." f" session._enable_fallback = {session._enable_fallback}" ) session.disable_fallback() class PyTorchEvaluator(OliveEvaluator, framework=Framework.PYTORCH): def __init__(self): super().__init__() @staticmethod def _device_string_to_torch_device(device: Device): return torch.device("cuda") if device == Device.GPU else torch.device(device) @torch.no_grad() def _inference( self, model: PyTorchModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] device = PyTorchEvaluator._device_string_to_torch_device(device) if device: session.to(device) for input_data_i, labels in dataloader: input_data = tensor_data_to_device(input_data_i, device) result = session(**input_data) if isinstance(input_data, dict) else session(input_data) outputs = post_func(result) if post_func else result # keep the outputs and results as torch tensor on cpu # it is expensive to convert to list and then convert back to torch tensor preds.append(outputs.cpu()) targets.append(labels.cpu()) logits.append( result.logits.cpu() if not isinstance(result, torch.Tensor) and getattr(result, "logits", None) is not None else result.cpu() ) # concatenate along the batch dimension preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) logits = torch.cat(logits, dim=0) # move model to cpu if device: session.to("cpu") # only move to cpu cannot release gpu memory, call cuda.empty_cache() to release gpu memory if torch.cuda.is_available(): torch.cuda.empty_cache() return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: PyTorchModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) @torch.no_grad() def _evaluate_latency( self, model: PyTorchModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: # pylint: disable=expression-not-assigned warmup_num, repeat_test_num, _ = get_latency_config_from_metric(metric) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) input_data, _ = next(iter(dataloader)) device = PyTorchEvaluator._device_string_to_torch_device(device) is_cuda = device == Device.GPU if device: session.to(device) input_data = tensor_data_to_device(input_data, device) input_is_dict = isinstance(input_data, dict) # warm up for _ in range(warmup_num): session(**input_data) if input_is_dict else session(input_data) latencies = [] if not is_cuda: for _ in range(repeat_test_num): t = time.perf_counter() # TODO(jambayk): do we care about the efficiency of if/else here? # probably won't add much overhead compared to the inference time # also we are doing the same for all models session(**input_data) if input_is_dict else session(input_data) latencies.append(time.perf_counter() - t) else: # synchronize before starting the test torch.cuda.synchronize() # cuda events for measuring latency starter = torch.cuda.Event(enable_timing=True) ender = torch.cuda.Event(enable_timing=True) for _ in range(repeat_test_num): starter.record() session(**input_data) if input_is_dict else session(input_data) ender.record() # synchronize after forward pass torch.cuda.synchronize() # add time in seconds, originally in milliseconds latencies.append(starter.elapsed_time(ender) * 1e-3) # move model to cpu if device: session.to("cpu") # only move to cpu cannot release gpu memory, call cuda.empty_cache() to release gpu memory if torch.cuda.is_available(): torch.cuda.empty_cache() return OliveEvaluator.compute_latency(metric, latencies) class SNPEEvaluator(OliveEvaluator, framework=Framework.SNPE): def __init__(self): super().__init__() def _inference( self, model: SNPEModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: dataloader = self._prepare_dataloader(dataloader, model) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] for data_dir, input_list, labels in dataloader: result = session(input_list, data_dir) if post_func: outputs = post_func(result) else: raise ValueError("Post processing function is required for SNPE model") preds.extend(outputs.tolist()) targets.extend(labels.tolist()) # TODO(trajep): verify if we need to return logits logits.extend(result.tolist()) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: SNPEModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_latency( self, model: SNPEModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: dataloader = self._prepare_dataloader(dataloader, model) warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) data_dir, input_data, _ = next(iter(dataloader)) total_runs = warmup_num + repeat_test_num results = session(input_data, data_dir, runs=total_runs, sleep=sleep_num) latencies = results["latencies"]["total_inference_time"][warmup_num:] return OliveEvaluator.compute_latency(metric, latencies) def _prepare_dataloader(self, dataloader: Dataset, model: SNPEModel) -> SNPEDataLoader: if isinstance(dataloader, SNPEDataLoader): return dataloader return SNPECommonDataLoader(dataloader, model.io_config) class OpenVINOEvaluator(OliveEvaluator, framework=Framework.OPENVINO): def __init__(self): super().__init__() def _inference( self, model: OpenVINOModel, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] logits = [] for input_data, labels in dataloader: result = session.infer_new_request({0: input_data}) outputs = post_func(result) if post_func else result if not isinstance(labels, list): labels = [labels] # noqa: PLW2901 preds.extend(outputs) targets.extend(labels) logits.extend(result) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: OpenVINOModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_latency( self, model: OpenVINOModel, data_root: str, metric: Metric, dataloader: Dataset, post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) latencies = [] for input_data, _ in dataloader: t = time.perf_counter() session(input_data) latencies.append(time.perf_counter() - t) return OliveEvaluator.compute_latency(metric, latencies) class OliveEvaluatorFactory: @staticmethod def create_evaluator_for_model(model: OliveModel) -> OliveEvaluator: evaluator_cls = OliveEvaluator.registry[str(model.framework).lower()] return evaluator_cls()
[docs]class OliveEvaluatorConfig(ConfigBase): metrics: List[Metric] = [] # noqa: RUF012 @validator("metrics") def validate_metrics(cls, v): metric_len = len(v) metric_names = {metric.name for metric in v} assert len(metric_names) == metric_len, "Metric names must be unique" sub_type_names = set() sub_type_with_rank = set() rank_set = set() for metric in v: for sub_type in metric.sub_types: unique_metric_name = joint_metric_key(metric.name, sub_type.name) sub_type_names.add(unique_metric_name) if sub_type.priority != -1: sub_type_with_rank.add(unique_metric_name) rank_set.add(sub_type.priority) if not rank_set and len(sub_type_names) == 1: logger.debug( "No priority is specified, but only one sub type " " metric is specified. Use rank 1 for single for this metric." ) v[0].sub_types[0].priority = 1 elif not rank_set and len(sub_type_names) > 1: raise ValueError("Priority must be specified for multiple sub type metrics") expected_rank_set = set(range(1, len(sub_type_with_rank) + 1)) # Check if all ranks are present if rank_set != expected_rank_set: raise ValueError(f"Priorities must be unique and in the range 1 to {metric_len}") return v