Source code for olive.evaluator.olive_evaluator

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import collections
import logging
import time
from abc import ABC, abstractmethod
from copy import deepcopy
from functools import partial
from numbers import Number
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, NamedTuple, Tuple, Type, Union

import numpy as np
import torch

import olive.data.template as data_config_template
from olive.cache import get_local_path_from_root
from olive.common.config_utils import ConfigBase
from olive.common.ort_inference import OrtInferenceSession, prepare_io_bindings
from olive.common.pydantic_v1 import validator
from olive.common.user_module_loader import UserModuleLoader
from olive.common.utils import tensor_data_to_device
from olive.constants import Framework
from olive.evaluator.metric import LatencySubType, Metric, MetricType, ThroughputSubType, get_latency_config_from_metric
from olive.evaluator.metric_backend import MetricBackend
from olive.evaluator.metric_result import MetricResult, SubMetricResult, flatten_metric_result, joint_metric_key
from olive.hardware import Device
from olive.model import DistributedOnnxModelHandler, ONNXModelHandler
from olive.model.config.io_config import is_io_config_static
from olive.model.utils.onnx_utils import dump_tuning_result
from olive.platform_sdk.qualcomm.utils.data_loader import FileListCommonDataLoader, FileListDataLoader

if TYPE_CHECKING:
    from torch.utils.data import DataLoader

    from olive.model import (
        OliveModelHandler,
        OpenVINOModelHandler,
        PyTorchModelHandler,
        QNNModelHandler,
        SNPEModelHandler,
    )

logger = logging.getLogger(__name__)

# pylint: disable=useless-parent-delegation


class OliveModelOutput(NamedTuple):
    preds: Any
    logits: Any


[docs]class OliveEvaluator(ABC): registry: ClassVar[Dict[str, Type["OliveEvaluator"]]] = {} @classmethod def __init_subclass__(cls, framework: Framework, **kwargs) -> None: super().__init_subclass__(**kwargs) cls.framework = framework cls.registry[str(framework).lower()] = cls @classmethod def io_bind_enabled(cls, metric: Metric, inference_settings: Dict) -> bool: if metric.user_config.io_bind: return True if inference_settings and inference_settings.get("io_bind"): return True return False @abstractmethod def _inference( self, model: "OliveModelHandler", metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: raise NotImplementedError @abstractmethod def _evaluate_accuracy( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: raise NotImplementedError @abstractmethod def _evaluate_raw_latency( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: """For given repeat_test_num, return a list of latencies(ms).""" raise NotImplementedError def _evaluate_latency( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: latencies = self._evaluate_raw_latency( model, data_root, metric, dataloader, post_func, device, execution_providers ) return OliveEvaluator.compute_latency(metric, latencies) def _evaluate_throughput( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: latencies = self._evaluate_raw_latency( model, data_root, metric, dataloader, post_func, device, execution_providers ) return OliveEvaluator.compute_throughput(metric, latencies) def _evaluate_custom( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", eval_func, post_func=None, device: Device = Device.CPU, execution_providers=None, ) -> MetricResult: raw_res = None if metric.user_config.evaluate_func: raw_res = eval_func( model, get_local_path_from_root(data_root, metric.user_config.data_dir), metric.user_config.batch_size, device, execution_providers, ) else: inference_output, targets = self._inference( model, metric, dataloader, post_func, device, execution_providers ) raw_res = eval_func(inference_output, targets) metric_res = {} for sub_type in metric.sub_types: if isinstance(raw_res, Number): assert len(metric.sub_types) == 1, "Only one sub type is allowed for single value custom metric" metric_res[sub_type.name] = SubMetricResult( value=raw_res, priority=sub_type.priority, higher_is_better=sub_type.higher_is_better ) elif isinstance(raw_res, dict): assert sub_type.name in raw_res, f"Custom metric {sub_type.name} is not in the result" metric_res[sub_type.name] = SubMetricResult( value=raw_res[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res) def evaluate( self, model: "OliveModelHandler", data_root: str, metrics: List[Metric], device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: metrics_res = {} for original_metric in metrics: # use model io_config if user does not specify input_names and input_shapes # only do this if data_config or dataloader is not provided # priority: dataloader_func > data_config > user_config.input_names/input_shapes > model io_config metric = OliveEvaluator.generate_metric_user_config_with_model_io(original_metric, model) dataloader, eval_func, post_func = OliveEvaluator.get_user_config(model.framework, data_root, metric) if metric.type == MetricType.ACCURACY: metrics_res[metric.name] = self._evaluate_accuracy( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.LATENCY: metrics_res[metric.name] = self._evaluate_latency( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.THROUGHPUT: metrics_res[metric.name] = self._evaluate_throughput( model, data_root, metric, dataloader, post_func, device, execution_providers ) elif metric.type == MetricType.CUSTOM: metrics_res[metric.name] = self._evaluate_custom( model, data_root, metric, dataloader, eval_func, post_func, device, execution_providers ) else: raise TypeError(f"{metric.type} is not a supported metric type") return flatten_metric_result(metrics_res) @staticmethod def generate_metric_user_config_with_model_io(metric: Metric, model: "OliveModelHandler"): # if the io_config is not specified in the metrics, use the one in the model # should not change the original metric object which is created from config jsons # otherwise, if affects hashing + caching of the olive restoring. metric = deepcopy(metric) if metric.data_config: return metric io_config = model.io_config if not io_config: return metric if not is_io_config_static(io_config): # since Olive will not save the pytorch model's io_config to olive onnx model # we cannot generate dummy data for the onnx model if this model has dynamic input shapes # TODO(trajep): try to get static input shapes from onnx model. # If so, we can move the dataloader for latency measurement. logger.debug( "Model input shapes are not static. Cannot use inferred input shapes for creating dummy data. This will" " cause an error when creating dummy data for tuning." ) if io_config and not metric.user_config.input_names and not metric.user_config.input_shapes: metric.user_config.input_names = io_config["input_names"] # input_shapes is optional for hf models metric.user_config.input_shapes = io_config.get("input_shapes") # input_types is optional which can be None. If None, it will be replaced with float32 in DummyDataset metric.user_config.input_types = io_config.get("input_types") return metric @staticmethod def _get_func_kwargs(metric: Metric, func_name: str): """Get the function kwargs from the metric config.""" if metric.user_config.func_kwargs: return metric.user_config.func_kwargs.get(func_name, {}) return {} @classmethod def get_user_config(cls, framework: Framework, data_root: str, metric: Metric): assert metric.user_config, "user_config is not specified in the metric config" user_module = UserModuleLoader(metric.user_config.user_script, metric.user_config.script_dir) # load the post processing function post_processing_func = getattr(metric.user_config, "post_processing_func", None) post_func = user_module.load_object(post_processing_func) post_func_kwargs = cls._get_func_kwargs(metric, "post_processing_func") if post_func_kwargs: # apply the kwargs to the post processing function post_func = partial(post_func, **post_func_kwargs) # load the dataloader function and create the dataloader dataloader_func = getattr(metric.user_config, "dataloader_func", None) if dataloader_func: data_dir = get_local_path_from_root(data_root, metric.user_config.data_dir) dataloader = user_module.call_object( dataloader_func, data_dir, metric.user_config.batch_size, model_framework=framework, **cls._get_func_kwargs(metric, "dataloader_func"), ) else: dataloader = None # load the evaluate function # priority: evaluate_func > metric_func eval_func = None if metric.type == MetricType.CUSTOM: evaluate_func = getattr(metric.user_config, "evaluate_func", None) kwargs = cls._get_func_kwargs(metric, "evaluate_func") if not evaluate_func: evaluate_func = getattr(metric.user_config, "metric_func", None) kwargs = cls._get_func_kwargs(metric, "metric_func") if not evaluate_func: raise ValueError("evaluate_func or metric_func is not specified in the metric config") eval_func = user_module.load_object(evaluate_func) if kwargs: eval_func = partial(eval_func, **kwargs) # get dataloader and/or post processing function from data_config if not specified in the metric config if (not dataloader or not post_func) and metric.data_config: dc = metric.data_config.to_data_container() # TODO(trajep): remove user_scripts dataloader: we should respect user scripts # dataloder to meet back compatibility for time being. dataloader = dataloader or dc.create_dataloader(data_root) post_func = post_func or dc.config.post_process # get dataloader and/or post processing function from model io_config if not specified in the metric config # or data config if metric.user_config.input_names and metric.user_config.input_shapes and not dataloader and not eval_func: dataloader = ( data_config_template.dummy_data_config_template( input_names=metric.user_config.input_names, input_shapes=metric.user_config.input_shapes, input_types=metric.user_config.input_types, ) .to_data_container() .create_dataloader(data_root) ) return dataloader, eval_func, post_func @staticmethod def compute_accuracy(metric: Metric, model_outputs: Union[Tuple, NamedTuple], targets: Any) -> MetricResult: """Compute accuracy metrics.""" evaluate_backend_cls = MetricBackend.registry[metric.backend] return evaluate_backend_cls().measure(model_outputs, targets, metric) @staticmethod def latency_helper(latencies) -> Dict: return { LatencySubType.AVG: round(sum(latencies) / len(latencies) * 1000, 5), LatencySubType.MAX: round(max(latencies) * 1000, 5), LatencySubType.MIN: round(min(latencies) * 1000, 5), LatencySubType.P50: round(np.percentile(latencies, 50) * 1000, 5), LatencySubType.P75: round(np.percentile(latencies, 75) * 1000, 5), LatencySubType.P90: round(np.percentile(latencies, 90) * 1000, 5), LatencySubType.P95: round(np.percentile(latencies, 95) * 1000, 5), LatencySubType.P99: round(np.percentile(latencies, 99) * 1000, 5), LatencySubType.P999: round(np.percentile(latencies, 99.9) * 1000, 5), } @staticmethod def compute_latency(metric: Metric, latencies: Any) -> MetricResult: """Compute latency metrics.""" latency_metrics = OliveEvaluator.latency_helper(latencies) metric_res = {} for sub_type in metric.sub_types: metric_res[sub_type.name] = SubMetricResult( value=latency_metrics[sub_type.name], priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res) @staticmethod def compute_throughput(metric: Metric, latencies: Any) -> MetricResult: """Compute throughput metrics.""" latency_metrics = OliveEvaluator.latency_helper(latencies) metric_res = {} batch_size = metric.user_config.batch_size for sub_type in metric.sub_types: if sub_type.name == ThroughputSubType.MIN: latency_sub_type_name = LatencySubType.MAX elif sub_type.name == ThroughputSubType.MAX: latency_sub_type_name = LatencySubType.MIN else: latency_sub_type_name = LatencySubType(sub_type.name) metric_res[sub_type.name] = SubMetricResult( # per second, so multiply by 1000 value=round(batch_size / latency_metrics[latency_sub_type_name] * 1000, 5), priority=sub_type.priority, higher_is_better=sub_type.higher_is_better, ) return MetricResult.parse_obj(metric_res)
class OnnxEvaluatorMixin: @staticmethod def format_input(input_data, io_config): """Format input data to ONNX input format.""" input_names = io_config["input_names"] name_to_type = dict(zip(io_config["input_names"], io_config["input_types"])) if isinstance(input_data, list): input_data = dict(zip(input_names, input_data)) elif not isinstance(input_data, dict): input_data = dict(zip(input_names, [input_data])) return { k: np.ascontiguousarray( input_data[k].cpu().numpy() if isinstance(input_data[k], torch.Tensor) else input_data[k], dtype=name_to_type[k], ) for k in input_data if k in input_names } @staticmethod def get_inference_settings(metric: Metric, model: ONNXModelHandler) -> Dict[str, Any]: # user.config.inference_settings > model.inference_settings > default inference_settings # when user.config.inference_settings is None, the model.inference_settings # will be used in model.prepare_session(..) inference_settings = {} model_infrerence_settings = model.inference_settings if model_infrerence_settings: inference_settings.update(model_infrerence_settings) metric_inference_settings = metric.get_inference_settings(Framework.ONNX.lower()) if metric_inference_settings: inference_settings.update(metric_inference_settings) return inference_settings class OnnxEvaluator(OliveEvaluator, OnnxEvaluatorMixin, framework=Framework.ONNX): @staticmethod def get_session_wrapper( model: ONNXModelHandler, metric: Metric, dataloader: "DataLoader", device: Device, execution_providers: List[str], ) -> Tuple[OrtInferenceSession, Dict[str, Any]]: """Get the session wrapper for the model.""" # user.config.inference_settings > model.inference_settings > default inference_settings inference_settings = OnnxEvaluator.get_inference_settings(metric, model) session = model.prepare_session( inference_settings=inference_settings, device=device, execution_providers=execution_providers, ) # prepare for io binding io_config = model.io_config io_bind = OnnxEvaluator.io_bind_enabled(metric, model.inference_settings) shared_kv_buffer = metric.user_config.shared_kv_buffer use_fp16 = any(v == "float16" for v in io_config["input_types"]) input_feed = None if io_bind and shared_kv_buffer and use_fp16: input_feed = OnnxEvaluator.format_input(next(iter(dataloader))[0], io_config) # load constant inputs if any constant_inputs = None if model.constant_inputs_path: constant_inputs = OnnxEvaluator.format_input(dict(np.load(model.constant_inputs_path)), io_config) # create session wrapper session_wrapper = OrtInferenceSession( session, io_bind=io_bind, device=device, shared_kv_buffer=shared_kv_buffer, use_fp16=use_fp16, input_feed=input_feed, constant_inputs=constant_inputs, ) return session_wrapper, inference_settings def _inference( self, model: ONNXModelHandler, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session, inference_settings = OnnxEvaluator.get_session_wrapper( model, metric, dataloader, device, execution_providers ) io_config = model.io_config preds = [] targets = [] logits = [] logits_dict = collections.defaultdict(list) output_names = io_config["output_names"] is_single_tensor_output = len(output_names) == 1 for input_data, labels in dataloader: input_feed = OnnxEvaluator.format_input(input_data, io_config) result = session.run(input_feed) if is_single_tensor_output: result = torch.Tensor(result[0]) else: # convert to dict of torch tensor result = {name: torch.Tensor(result[i]) for i, name in enumerate(output_names)} outputs = post_func(result) if post_func else result # keep as numpy or torch arrays preds.append(outputs.cpu()) targets.append(labels.cpu()) if is_single_tensor_output: logits.append(result.cpu()) else: for k in output_names: logits_dict[k].append(result[k].cpu()) preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) if is_single_tensor_output: logits = torch.cat(logits, dim=0) else: logits = {k: torch.cat(logits[k], dim=0) for k in output_names} tuning_result_file = inference_settings.get("tuning_result_file") if tuning_result_file: dump_tuning_result(session.session, tuning_result_file) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_onnx_accuracy( self, model: ONNXModelHandler, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_onnx_latency( self, model: ONNXModelHandler, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session, inference_settings = OnnxEvaluator.get_session_wrapper( model, metric, dataloader, device, execution_providers ) io_config = model.io_config input_data, _ = next(iter(dataloader)) input_feed = OnnxEvaluator.format_input(input_data, io_config) latencies = session.time_run( input_feed, num_runs=repeat_test_num, num_warmup=warmup_num, sleep_time=sleep_num, ) tuning_result_file = inference_settings.get("tuning_result_file") if tuning_result_file: dump_tuning_result(session.session, tuning_result_file) return latencies @staticmethod def _evaluate_distributed_accuracy_worker(config) -> Tuple[List[Any], List[Any]]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModelHandler(model_path, inference_settings=inference_settings) dataloader, _, post_func = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.io_config preds = [] targets = [] logits = [] output_names = io_config["output_names"] for _, (input_data, labels) in enumerate(dataloader): input_dict = OnnxEvaluator.format_input(input_data, io_config) MPI.COMM_WORLD.barrier() # Synchronize before starting each run output = session.run(input_feed=input_dict, output_names=None) output = torch.Tensor(output[0]) if len(output_names) == 1 else torch.Tensor(output) post_output = post_func(output) if post_func else output preds.extend(post_output.tolist()) targets.extend(labels.data.tolist()) logits.extend(output.tolist()) model_output = OliveModelOutput(preds=preds, logits=logits) return model_output, targets def _evaluate_distributed_accuracy( self, model: DistributedOnnxModelHandler, data_root: str, metric: Metric, device: Device, execution_providers: Union[str, List[str]], ) -> MetricResult: from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.num_ranks, "inference_settings": metric.get_inference_settings(self.framework.lower()), "metric": metric.to_json(), } args = [] for rank in range(model.num_ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.num_ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_accuracy_worker, args) executor.shutdown() preds = [x for p, _, _ in results for x in p] targets = [x for _, t, _ in results for x in t] logits = [x for _, _, logit in results for x in logit] model_output = OliveModelOutput(preds, logits) return OliveEvaluator.compute_accuracy(metric, model_output, targets) @staticmethod def _evaluate_distributed_latency_worker(data_root, config) -> List[float]: model_path = config["model_path"] data_root = config["data_root"] local_rank = config["local_rank"] world_size = config["world_size"] inference_settings = config["inference_settings"] execution_providers = config["providers"] metric = Metric.from_json(config["metric"]) import os os.environ["OMPI_COMM_WORLD_RANK"] = str(local_rank) os.environ["OMPI_COMM_WORLD_SIZE"] = str(world_size) from mpi4py import MPI local_rank = MPI.COMM_WORLD.Get_rank() warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) inference_settings["execution_provider"] = execution_providers inference_settings["provider_options"] = [ {"device_id": str(local_rank)} if provider == "CUDAExecutionProvider" else {} for provider in execution_providers ] model = ONNXModelHandler(model_path, inference_settings=inference_settings) dataloader, _, _ = OnnxEvaluator.get_user_config(model.framework, data_root, metric) session = model.prepare_session(inference_settings=inference_settings, device=Device.GPU, rank=int(local_rank)) io_config = model.io_config input_feed, _ = next(iter(dataloader)) input_feed = OnnxEvaluator.format_input(input_feed, io_config) kv_cache_ortvalues = {} if metric.user_config.shared_kv_buffer else None io_bind = OnnxEvaluator.io_bind_enabled(metric, model.inference_settings) if io_bind: io_bind_op = prepare_io_bindings( session, input_feed, Device.GPU, shared_kv_buffer=metric.user_config.shared_kv_buffer, kv_cache_ortvalues=kv_cache_ortvalues, ) latencies = [] for i in range(warmup_num + repeat_test_num): MPI.COMM_WORLD.barrier() # Synchronize before starting each run start_time = time.perf_counter() if io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_feed, output_names=None) if i > warmup_num: latencies.append(time.perf_counter() - start_time) time.sleep(sleep_num) return latencies def _evaluate_distributed_latency( self, model: DistributedOnnxModelHandler, data_root: str, metric: Metric, device, execution_providers: Union[str, List[str]], ) -> List[float]: from mpi4py.futures import MPIPoolExecutor config = { "model_path": None, "local_rank": None, "world_size": model.num_ranks, "inference_settings": metric.get_inference_settings(self.framework.lower()), "metric": metric.to_json(), } args = [] for rank in range(model.num_ranks): cfg = deepcopy(config) cfg["local_rank"] = rank cfg["model_path"] = model.ranked_model_path(rank) cfg["data_root"] = data_root cfg["device"] = device cfg["providers"] = execution_providers args.append(cfg) with MPIPoolExecutor(max_workers=model.num_ranks) as executor: results = executor.map(OnnxEvaluator._evaluate_distributed_latency_worker, args) executor.shutdown() return [x for r in results for x in r] def _evaluate_accuracy( self, model: ONNXModelHandler, data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: if isinstance(model, ONNXModelHandler): return self._evaluate_onnx_accuracy(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModelHandler): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_accuracy(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate accuracy for model of type: {type(model)}") def _evaluate_raw_latency( self, model: "OliveModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: if isinstance(model, ONNXModelHandler): return self._evaluate_onnx_latency(model, metric, dataloader, post_func, device, execution_providers) elif isinstance(model, DistributedOnnxModelHandler): if device != Device.GPU: raise ValueError("Distributed inferencing is supported only on GPU") return self._evaluate_distributed_latency(model, data_root, metric, device, execution_providers) else: raise TypeError(f"Cannot evaluate latency for model of type: {type(model)}") class PyTorchEvaluator(OliveEvaluator, framework=Framework.PYTORCH): @staticmethod def _device_string_to_torch_device(device: Device): return torch.device("cuda") if device == Device.GPU else torch.device(device) @torch.no_grad() def _inference( self, model: "PyTorchModelHandler", metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session() preds = [] targets = [] logits = [] device = PyTorchEvaluator._device_string_to_torch_device(device) if device: session.to(device) for input_data_i, labels in dataloader: input_data = tensor_data_to_device(input_data_i, device) result = session(**input_data) if isinstance(input_data, dict) else session(input_data) outputs = post_func(result) if post_func else result # keep the outputs and results as torch tensor on cpu # it is expensive to convert to list and then convert back to torch tensor preds.append(outputs.cpu()) targets.append(labels.cpu()) logits.append( result.logits.cpu() if not isinstance(result, torch.Tensor) and getattr(result, "logits", None) is not None else result.cpu() ) # concatenate along the batch dimension preds = torch.cat(preds, dim=0) targets = torch.cat(targets, dim=0) logits = torch.cat(logits, dim=0) # move model to cpu if device: session.to("cpu") # only move to cpu cannot release gpu memory, call cuda.empty_cache() to release gpu memory if torch.cuda.is_available(): torch.cuda.empty_cache() return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: "PyTorchModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) @torch.no_grad() def _evaluate_raw_latency( self, model: "PyTorchModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: # pylint: disable=expression-not-assigned warmup_num, repeat_test_num, _ = get_latency_config_from_metric(metric) # pytorch model doesn't use inference_settings, so we can pass None session = model.prepare_session(inference_settings=None, device=device) input_data, _ = next(iter(dataloader)) device = PyTorchEvaluator._device_string_to_torch_device(device) is_cuda = device == Device.GPU if device: session.to(device) input_data = tensor_data_to_device(input_data, device) input_is_dict = isinstance(input_data, dict) # warm up for _ in range(warmup_num): session(**input_data) if input_is_dict else session(input_data) latencies = [] if not is_cuda: for _ in range(repeat_test_num): t = time.perf_counter() # TODO(jambayk): do we care about the efficiency of if/else here? # probably won't add much overhead compared to the inference time # also we are doing the same for all models session(**input_data) if input_is_dict else session(input_data) latencies.append(time.perf_counter() - t) else: # synchronize before starting the test torch.cuda.synchronize() # cuda events for measuring latency starter = torch.cuda.Event(enable_timing=True) ender = torch.cuda.Event(enable_timing=True) for _ in range(repeat_test_num): starter.record() session(**input_data) if input_is_dict else session(input_data) ender.record() # synchronize after forward pass torch.cuda.synchronize() # add time in seconds, originally in milliseconds latencies.append(starter.elapsed_time(ender) * 1e-3) # move model to cpu if device: session.to("cpu") # only move to cpu cannot release gpu memory, call cuda.empty_cache() to release gpu memory if torch.cuda.is_available(): torch.cuda.empty_cache() return latencies class SNPEEvaluator(OliveEvaluator, framework=Framework.SNPE): def _inference( self, model: "SNPEModelHandler", metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: dataloader = self._prepare_dataloader(dataloader, model) inference_settings = metric.get_inference_settings(self.framework.lower()) # for accuracy evaluation, the `return_numpy_results` is required to be True # but for model inference, it is not required to be True. # We just set it to True for simple evaluation. inference_settings["return_numpy_results"] = True session = model.prepare_session(inference_settings=inference_settings, device=device) preds = [] targets = [] logits = [] for data_dir, input_list, labels in dataloader: result = session(input_list, data_dir) # as the SNPE inference will return a list of outputs which is beyond the model output shape # we need to squeeze the fist dimensions of output to get right accuracy metrics for idx, output in enumerate(result.get("results")): if post_func: post_output = post_func(output) else: raise ValueError("Post processing function is required for SNPE model") preds.extend(post_output.tolist()) if isinstance(labels[idx], (list, np.ndarray)): targets.extend(labels[idx]) else: targets.append(labels[idx]) # only when return_numpy_results is True, the result is a dict with "logits" key logits.extend(output.get("logits", np.array([])).tolist()) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: "SNPEModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_raw_latency( self, model: "SNPEModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: dataloader = self._prepare_dataloader(dataloader, model, 1) warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session( inference_settings=metric.get_inference_settings(self.framework.lower()), device=device ) data_dir, input_data, _ = next(iter(dataloader)) total_runs = warmup_num + repeat_test_num results = session(input_data, data_dir, runs=total_runs, sleep=sleep_num) return results["latencies"]["total_inference_time"][warmup_num:] def _prepare_dataloader( self, dataloader: Union["DataLoader", FileListDataLoader], model: "SNPEModelHandler", file_chunk_size=None ) -> FileListDataLoader: if isinstance(dataloader, FileListDataLoader): return dataloader return FileListCommonDataLoader(dataloader, model.io_config, batch_size=file_chunk_size) class OpenVINOEvaluator(OliveEvaluator, framework=Framework.OPENVINO): def _inference( self, model: "OpenVINOModelHandler", metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: session = model.prepare_session( inference_settings=metric.get_inference_settings(self.framework.lower()), device=device ) preds = [] targets = [] logits = [] for input_data, label in dataloader: session.infer({0: input_data}) result = session.get_output_tensor(0).data outputs = post_func(result) if post_func else result preds.extend(outputs) targets.extend(label) logits.extend(result) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: "OpenVINOModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_raw_latency( self, model: "OpenVINOModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: session = model.prepare_session( inference_settings=metric.get_inference_settings(self.framework.lower()), device=device ) latencies = [] for input_data, _ in dataloader: t = time.perf_counter() session.infer(input_data) latencies.append(time.perf_counter() - t) return latencies class QNNEvaluator(OliveEvaluator, framework=Framework.QNN): def _inference( self, model: "QNNModelHandler", metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> Tuple[OliveModelOutput, Any]: dataloader = self._prepare_dataloader(dataloader, model) session = model.prepare_session( inference_settings=metric.get_inference_settings(self.framework.lower()), device=device ) preds = [] targets = [] logits = [] for data_dir, input_list, labels in dataloader: result = session(input_list, data_dir) for idx, output in enumerate(result.get("result")): if post_func: post_output = post_func(output) else: raise ValueError("Post processing function is required for QNN model") preds.extend(post_output.tolist()) if isinstance(labels[idx], (list, np.ndarray)): targets.extend(labels[idx]) else: targets.append(labels[idx]) logits.extend(output.tolist()) return OliveModelOutput(preds=preds, logits=logits), targets def _evaluate_accuracy( self, model: "QNNModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> MetricResult: inference_output, targets = self._inference(model, metric, dataloader, post_func, device, execution_providers) return OliveEvaluator.compute_accuracy(metric, inference_output, targets) def _evaluate_raw_latency( self, model: "QNNModelHandler", data_root: str, metric: Metric, dataloader: "DataLoader", post_func=None, device: Device = Device.CPU, execution_providers: Union[str, List[str]] = None, ) -> List[float]: dataloader = self._prepare_dataloader(dataloader, model, 1) warmup_num, repeat_test_num, sleep_num = get_latency_config_from_metric(metric) session = model.prepare_session( inference_settings=metric.get_inference_settings(self.framework.lower()), device=device ) data_dir, input_data, _ = next(iter(dataloader)) # for qnn-net-run only keep 20 logs total_runs = min(warmup_num + repeat_test_num, 20) results = session(input_data, data_dir, runs=total_runs, sleep=sleep_num) return results["latencies"]["net_run"][warmup_num:] def _prepare_dataloader( self, dataloader: "DataLoader", model: "QNNModelHandler", file_chunk_size=None ) -> FileListDataLoader: if isinstance(dataloader, FileListDataLoader): return dataloader return FileListCommonDataLoader(dataloader, model.io_config, batch_size=file_chunk_size) class OliveEvaluatorFactory: @staticmethod def create_evaluator_for_model(model: "OliveModelHandler") -> OliveEvaluator: evaluator_cls = OliveEvaluator.registry[str(model.framework).lower()] return evaluator_cls()
[docs]class OliveEvaluatorConfig(ConfigBase): metrics: List[Metric] = [] # noqa: RUF012 @property def is_accuracy_drop_tolerance(self): for metric in self.metrics: for sub_metric in metric.sub_types: if metric.type == MetricType.ACCURACY and sub_metric.higher_is_better: return sub_metric.goal is not None and sub_metric.goal.has_regression_goal() return False @validator("metrics") def validate_metrics(cls, v): metric_len = len(v) metric_names = {metric.name for metric in v} assert len(metric_names) == metric_len, "Metric names must be unique" sub_type_names = set() sub_type_with_rank = set() rank_set = set() for metric in v: for sub_type in metric.sub_types: unique_metric_name = joint_metric_key(metric.name, sub_type.name) sub_type_names.add(unique_metric_name) if sub_type.priority != -1: sub_type_with_rank.add(unique_metric_name) rank_set.add(sub_type.priority) if not rank_set and len(sub_type_names) == 1: logger.debug( "No priority is specified, but only one sub type " " metric is specified. Use rank 1 for single for this metric." ) v[0].sub_types[0].priority = 1 elif not rank_set and len(sub_type_names) > 1: raise ValueError("Priority must be specified for multiple sub type metrics") expected_rank_set = set(range(1, len(sub_type_with_rank) + 1)) # Check if all ranks are present if rank_set != expected_rank_set: raise ValueError(f"Priorities must be unique and in the range 1 to {metric_len}") return v