Source code for olive.evaluator.olive_evaluator

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import logging
import time
from abc import ABC, abstractmethod
from typing import Any, Dict, List

import numpy as np
import torch
from pydantic import validator
from torch.utils.data import Dataset

from olive.common.config_utils import ConfigBase
from olive.common.user_module_loader import UserModuleLoader
from olive.common.utils import tensor_data_to_device
from olive.constants import Framework
from olive.evaluator.accuracy import AUC, AccuracyScore, F1Score, Precision, Recall
from olive.evaluator.metric import AccuracySubType, LatencySubType, Metric, MetricType
from olive.model import OliveModel, ONNXModel, OpenVINOModel, PyTorchModel, SNPEModel
from olive.systems.common import Device

logger = logging.getLogger(__name__)


class DummyDataloader(Dataset):
    def __init__(self, input_names, input_shapes, input_types):
        self.input_names = input_names
        self.input_shapes = input_shapes
        self.input_types = input_types

    def __len__(self):
        return 100

    def __getitem__(self, index):
        str_to_type = {"float32": torch.float32, "float16": torch.float16, "int32": torch.int32, "int64": torch.int64}
        input_types = []
        if self.input_types:
            for input_type in self.input_types:
                input_types.append(str_to_type[input_type])
        else:
            for _ in range(len(self.input_names)):
                input_types.append(torch.float32)
        if len(self.input_names) == 1:
            dummy_inputs = torch.ones(self.input_shapes[0], dtype=input_types[0])
        else:
            dummy_inputs = {}
            for input_name, input_shape, input_type in zip(self.input_names, self.input_shapes, input_types):
                dummy_inputs.update({input_name: torch.ones(input_shape, dtype=input_type)})
        label = 0
        return dummy_inputs, label


[docs]class OliveEvaluator(ABC): registry: Dict[str, "OliveEvaluator"] = {} @classmethod def __init_subclass__(cls, framework: Framework, **kwargs) -> None: super().__init_subclass__(**kwargs) cls.framework = framework cls.registry[str(framework).lower()] = cls def __init__(self): pass def get_inference_settings(self, metric: Metric) -> Dict[str, Any]: # user.config.inference_settings > model.inference_settings > default inference_settings # when user.config.inference_settings is None, the model.inference_settings # will be used in model.prepare_session(..) return ( metric.user_config.inference_settings.get(self.framework.lower()) if metric.user_config.inference_settings else None ) @abstractmethod def _evaluate_accuracy( self, model: OliveModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: raise NotImplementedError() @abstractmethod def _evaluate_latency( self, model: OliveModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: raise NotImplementedError() def _evaluate_custom( self, model: OliveModel, metric: Metric, dataloader: Dataset, eval_func, device: Device = Device.CPU, post_func=None, ) -> Dict[str, Any]: # TODO: Change the evaluate function to accept the metric rather than # breaking it into multiple arguments # return eval_func(model, metric, dataloader, device, post_func) return eval_func(model, metric.user_config.data_dir, metric.user_config.batch_size, device) def evaluate(self, model: OliveModel, metrics: List[Metric], device: Device = Device.CPU) -> Dict[str, Any]: metrics_res = {} for metric in metrics: dataloader, eval_func, post_func = OliveEvaluator.get_user_config(metric) if metric.type == MetricType.ACCURACY: metrics_res[metric.name] = self._evaluate_accuracy(model, metric, dataloader, device, post_func) elif metric.type == MetricType.LATENCY: metrics_res[metric.name] = self._evaluate_latency(model, metric, dataloader, device, post_func) elif metric.type == MetricType.CUSTOM: metrics_res[metric.name] = self._evaluate_custom( model, metric, dataloader, eval_func, device, post_func ) else: raise TypeError(f"{metric.type} is not a supported metric type") return metrics_res @staticmethod def get_user_config(metric: Metric): user_module = UserModuleLoader(metric.user_config.user_script, metric.user_config.script_dir) post_processing_func = getattr(metric.user_config, "post_processing_func", None) post_func = user_module.load_object(post_processing_func) dataloader_func = getattr(metric.user_config, "dataloader_func", None) dataloader = user_module.call_object( dataloader_func, metric.user_config.data_dir, metric.user_config.batch_size ) evaluate_func = getattr(metric.user_config, "evaluate_func", None) eval_func = user_module.load_object(evaluate_func) if metric.user_config.input_names and metric.user_config.input_shapes and not dataloader and not eval_func: dataloader = DummyDataloader( metric.user_config.input_names, metric.user_config.input_shapes, metric.user_config.input_types ) if not dataloader or not post_func: dc = metric.data_config.to_data_container() # TODO remove user_scripts dataloader: we should respect user scripts # dataloder to meet back compatibility for time being. dataloader = dataloader or dc.create_dataloader() post_func = post_func or dc.config.post_process return dataloader, eval_func, post_func @staticmethod def compute_accuracy(metric: Metric, preds: Any, targets: Any) -> Dict[str, Any]: """ Compute accuracy metrics """ if metric.sub_type == AccuracySubType.ACCURACY_SCORE: return AccuracyScore(metric.metric_config).measure(preds, targets) elif metric.sub_type == AccuracySubType.F1_SCORE: return F1Score(metric.metric_config).measure(preds, targets) elif metric.sub_type == AccuracySubType.PRECISION: return Precision(metric.metric_config).measure(preds, targets) elif metric.sub_type == AccuracySubType.RECALL: return Recall(metric.metric_config).measure(preds, targets) elif metric.sub_type == AccuracySubType.AUC: return AUC(metric.metric_config).measure(preds, targets) else: raise TypeError(f"{metric.sub_type} is not a supported accuracy metric") @staticmethod def compute_latency(metric: Metric, latencies: Any) -> float: """ Compute latency metrics """ if metric.sub_type == LatencySubType.AVG: return round(sum(latencies) / len(latencies) * 1000, 5) elif metric.sub_type == LatencySubType.MAX: return round(max(latencies) * 1000, 5) elif metric.sub_type == LatencySubType.MIN: return round(min(latencies) * 1000, 5) elif metric.sub_type == LatencySubType.P50: return round(np.percentile(latencies, 50) * 1000, 5) elif metric.sub_type == LatencySubType.P75: return round(np.percentile(latencies, 75) * 1000, 5) elif metric.sub_type == LatencySubType.P90: return round(np.percentile(latencies, 90) * 1000, 5) elif metric.sub_type == LatencySubType.P95: return round(np.percentile(latencies, 95) * 1000, 5) elif metric.sub_type == LatencySubType.P99: return round(np.percentile(latencies, 99) * 1000, 5) elif metric.sub_type == LatencySubType.P999: return round(np.percentile(latencies, 99.9) * 1000, 5) else: raise TypeError(f"{metric.sub_type} is not a supported latency metric")
class OnnxEvaluator(OliveEvaluator, framework=Framework.ONNX): def __init__(self): super().__init__() @staticmethod def format_input(input_data, io_config): """ Format input data to ONNX input format. """ input_names = io_config["input_names"] name_to_type = {k: v for k, v in zip(io_config["input_names"], io_config["input_types"])} if not isinstance(input_data, dict): input_data = dict(zip(input_names, [input_data])) input_dict = { k: np.ascontiguousarray( input_data[k].cpu().numpy() if isinstance(input_data[k], torch.Tensor) else input_data[k], dtype=name_to_type[k], ) for k in input_data.keys() if k in input_names } return input_dict def _evaluate_accuracy( self, model: ONNXModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) io_config = model.get_io_config() preds = [] targets = [] output_names = io_config["output_names"] for input_data, labels in dataloader: input_dict = OnnxEvaluator.format_input(input_data, io_config) res = session.run(input_feed=input_dict, output_names=None) result = torch.Tensor(res[0]) if len(output_names) == 1 else torch.Tensor(res) outputs = post_func(result) if post_func else result preds.extend(outputs.tolist()) targets.extend(labels.data.tolist()) return OliveEvaluator.compute_accuracy(metric, preds, targets) def _evaluate_latency( self, model: OliveModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: warmup_num = metric.metric_config.warmup_num repeat_test_num = metric.metric_config.repeat_test_num sleep_num = metric.metric_config.sleep_num session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) io_config = model.get_io_config() input_data, _ = next(iter(dataloader)) input_dict = OnnxEvaluator.format_input(input_data, io_config) if metric.user_config.io_bind: io_bind_op = session.io_binding() io_bind_device = "cuda" if device == "gpu" else "cpu" for k, v in input_dict.items(): io_bind_op.bind_cpu_input(k, v) for item in session.get_outputs(): io_bind_op.bind_output(item.name, io_bind_device) for _ in range(warmup_num): if metric.user_config.io_bind: session.run_with_iobinding(io_bind_op) else: session.run(input_feed=input_dict, output_names=None) latencies = [] for _ in range(repeat_test_num): if metric.user_config.io_bind: t = time.perf_counter() session.run_with_iobinding(io_bind_op) latencies.append(time.perf_counter() - t) else: t = time.perf_counter() session.run(input_feed=input_dict, output_names=None) latencies.append(time.perf_counter() - t) time.sleep(sleep_num) return OliveEvaluator.compute_latency(metric, latencies) class PyTorchEvaluator(OliveEvaluator, framework=Framework.PYTORCH): def __init__(self): super().__init__() @staticmethod def _device_string_to_torch_device(device: Device): return torch.device("cuda") if device == Device.GPU else torch.device(device) def _evaluate_accuracy( self, model: PyTorchModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] device = PyTorchEvaluator._device_string_to_torch_device(device) if device: session.to(device) for input_data, labels in dataloader: input_data = tensor_data_to_device(input_data, device) result = session(**input_data) if isinstance(input_data, dict) else session(input_data) outputs = post_func(result) if post_func else result # use the list.extend instead of list.append to avoid the different sub-array has different size when # batch size is greater than 2 so that the residue array has different size with the batch size, # which will result the exception like: # ValueError: expected sequence of length 128 at dim 1 (got 3) preds.extend(outputs.tolist()) targets.extend(labels.data.tolist()) return OliveEvaluator.compute_accuracy(metric, preds, targets) def _evaluate_latency( self, model: PyTorchModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: warmup_num = metric.metric_config.warmup_num repeat_test_num = metric.metric_config.repeat_test_num session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) input_data, _ = next(iter(dataloader)) device = PyTorchEvaluator._device_string_to_torch_device(device) if device: session.to(device) input_data = tensor_data_to_device(input_data, device) latencies = [] if isinstance(input_data, dict): for _ in range(warmup_num): session(**input_data) for _ in range(repeat_test_num): t = time.perf_counter() session(**input_data) latencies.append(time.perf_counter() - t) else: for _ in range(warmup_num): session(input_data) for _ in range(repeat_test_num): t = time.perf_counter() session(input_data) latencies.append(time.perf_counter() - t) return OliveEvaluator.compute_latency(metric, latencies) class SNPEEvaluator(OliveEvaluator, framework=Framework.SNPE): def __init__(self): super().__init__() def _evaluate_accuracy( self, model: SNPEModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] for data_dir, input_list, labels in dataloader: result = session(input_list, data_dir) if post_func: outputs = post_func(result) else: raise ValueError("Post processing function is required for SNPE model") preds.extend(outputs.tolist()) targets.extend(labels.tolist()) return OliveEvaluator.compute_accuracy(metric, preds, targets) def _evaluate_latency( self, model: SNPEModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) data_dir, input_data, _ = next(iter(dataloader)) total_runs = metric.metric_config.warmup_num + metric.metric_config.repeat_test_num results = session(input_data, data_dir, runs=total_runs, sleep=metric.metric_config.sleep_num) latencies = results["latencies"]["total_inference_time"][metric.metric_config.warmup_num:] # fmt: skip return OliveEvaluator.compute_latency(metric, latencies) class OpenVINOEvaluator(OliveEvaluator, framework=Framework.OPENVINO): def __init__(self): super().__init__() def _evaluate_accuracy( self, model: OpenVINOModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) preds = [] targets = [] for input_data, labels in dataloader: result = session.infer_new_request({0: input_data}) outputs = post_func(result) if post_func else result if not isinstance(labels, list): labels = [labels] preds.extend(outputs) targets.extend(labels) return OliveEvaluator.compute_accuracy(metric, preds, targets) def _evaluate_latency( self, model: OpenVINOModel, metric: Metric, dataloader: Dataset, device: Device = Device.CPU, post_func=None ) -> Dict[str, Any]: session = model.prepare_session(inference_settings=self.get_inference_settings(metric), device=device) latencies = [] for input_data, _ in dataloader: t = time.perf_counter() session(input_data) latencies.append(time.perf_counter() - t) return OliveEvaluator.compute_latency(metric, latencies) class OliveEvaluatorFactory: @staticmethod def create_evaluator_for_model(model: OliveModel) -> OliveEvaluator: evaluator_cls = OliveEvaluator.registry[str(model.framework).lower()] return evaluator_cls()
[docs]class OliveEvaluatorConfig(ConfigBase): metrics: List[Metric] = [] @validator("metrics") def validate_metrics(cls, v): metric_len = len(v) if metric_len == 1: return v metric_names = set([metric.name for metric in v]) assert len(metric_names) == metric_len, "Metric names must be unique" rank_set = set([metric.priority_rank for metric in v]) expected_rank_set = set(range(1, metric_len + 1)) # Check if all ranks are present if rank_set != expected_rank_set: raise ValueError(f"Priority ranks must be unique and in the range 1 to {metric_len}") return v