Source code for olive.engine.engine

# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import json
import logging
import time
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union

import olive.cache as cache_utils
from olive.common.config_utils import validate_config
from olive.common.utils import hash_dict
from olive.engine.config import FAILED_CONFIG, INVALID_CONFIG, PRUNED_CONFIGS
from olive.engine.footprint import Footprint, FootprintNodeMetric
from olive.engine.packaging.packaging_generator import generate_output_artifacts
from olive.evaluator.metric import Metric
from olive.evaluator.metric_result import MetricResult, joint_metric_key
from olive.evaluator.olive_evaluator import OliveEvaluatorConfig
from olive.exception import EXCEPTIONS_TO_RAISE, OlivePassError
from olive.hardware import AcceleratorSpec
from olive.model import ModelConfig
from olive.strategy.search_strategy import SearchStrategy, SearchStrategyConfig
from olive.systems.common import SystemType
from olive.systems.system_config import SystemConfig
from olive.systems.utils import create_managed_system_with_cache

if TYPE_CHECKING:
    from olive.engine.packaging.packaging_config import PackagingConfig
    from olive.passes.olive_pass import Pass
    from olive.systems.olive_system import OliveSystem

logger = logging.getLogger(__name__)


[docs]class Engine: """The engine executes the registered Olive Steps. It facilitate evaluation of the output models using provided evaluation criteria and produces output model(s). """ def __init__( self, search_strategy: Optional[Union[Dict[str, Any], SearchStrategyConfig]] = None, host: Optional[Union[Dict[str, Any], "SystemConfig"]] = None, target: Optional[Union[Dict[str, Any], "SystemConfig"]] = None, evaluator: Optional[Union[Dict[str, Any], "OliveEvaluatorConfig"]] = None, cache_dir=".olive-cache", clean_cache=False, clean_evaluation_cache=False, plot_pareto_frontier=False, *, azureml_client_config=None, ): self.no_search = False if not search_strategy: # if search strategy is None or False, disable search self.no_search = True self.search_strategy = None else: # if search strategy is provided in config, use it self.search_strategy = SearchStrategy(search_strategy) # default host host = host or {"type": SystemType.Local} self.host_config = validate_config(host, SystemConfig) self.host = None # engine target target = target or {"type": SystemType.Local} self.target_config = validate_config(target, SystemConfig) self.target = None # default evaluator self.evaluator_config = validate_config(evaluator, OliveEvaluatorConfig) if evaluator else None self.cache_dir = cache_dir self.clean_cache = clean_cache self.clean_evaluation_cache = clean_evaluation_cache self.plot_pareto_frontier = plot_pareto_frontier self.azureml_client_config = azureml_client_config # dictionary of passes self.pass_config = OrderedDict() # {"pass_name": {"pass": pass, "host": host, "evaluator": evaluator, "clean_run_cache": clean_run_cache}} self.passes = OrderedDict() self.pass_flows = None self.pass_flows_search_spaces = None self.footprints = defaultdict(Footprint) self._initialized = False def initialize(self): """Initialize engine state. This should be done before running the registered passes.""" # pylint: disable=attribute-defined-outside-init if self.clean_cache: cache_utils.clean_cache(self.cache_dir) if self.clean_evaluation_cache: cache_utils.clean_evaluation_cache(self.cache_dir) logger.info("Using cache directory: %s", self.cache_dir) self._model_cache_path, self._run_cache_path, self._evaluation_cache_path, _ = cache_utils.get_cache_sub_dirs( self.cache_dir ) cache_utils.create_cache(self.cache_dir) # initialize counters # we do this before cleaning pass run caches to ensure we don't reuse model numbers even if the model was # deleted from the cache self._new_model_number = 0 # model jsons have the format <model_number>_<pass_type>-<source_model>-<pass_config_hash>.json # model contents are stored in <model_number>_<pass_type>-<source_model>-<pass_config_hash> folder # sometimes the folder is created with contents but the json is not created when the pass fails to run # so we check for both when determining the new model number model_files = list(self._model_cache_path.glob("*_*")) if len(model_files) > 0: self._new_model_number = max(int(model_file.stem.split("_")[0]) for model_file in model_files) + 1 # clean pass run cache if requested # removes all run cache for pass type and all children elements for pass_config in self.pass_config.values(): clean_run_cache = pass_config["clean_run_cache"] if clean_run_cache: cache_utils.clean_pass_run_cache(pass_config["type"].__name__, self.cache_dir) self.set_pass_flows(self.pass_flows) self._initialized = True
[docs] def register( self, pass_type: Type["Pass"], config: Dict[str, Any] = None, disable_search=False, name: str = None, host: "OliveSystem" = None, evaluator_config: "OliveEvaluatorConfig" = None, clean_run_cache: bool = False, output_name: str = None, ): """Register a pass configuration so that it could be instantiated and executed later.""" if name is not None: assert name not in self.passes, f"Pass with name {name} already registered" else: idx = 0 while True: name = pass_type.__name__ if idx > 0: name = f"{name}_{idx}" idx += 1 if name not in self.pass_config: break self.pass_config[name] = { "type": pass_type, "config": config or {}, "disable_search": disable_search, "host": host, "evaluator": evaluator_config, "clean_run_cache": clean_run_cache, "output_name": output_name, }
def register_pass( self, p: "Pass", name: str = None, host: "OliveSystem" = None, evaluator_config: "OliveEvaluatorConfig" = None, output_name: str = None, ): """Register a pass instance.""" if name is not None: assert name not in self.passes, f"Pass with name {name} already registered" else: idx = 0 while True: name = p.__class__.__name__ if idx > 0: name = f"{name}_{idx}" idx += 1 if name not in self.passes: break if self.no_search and len(p.search_space) > 0: raise ValueError(f"Search strategy is None but pass {name} has search space") if output_name and not self.no_search: # In no-search mode, if output_name is provided, the output model of the pass will be saved to # engine's output_dir with the prefix of output_name. logger.debug("output_name %s for pass %s will be ignored if search strategy is None", output_name, name) self.passes[name] = { "pass": p, "host": host, "evaluator": evaluator_config, "output_name": output_name, } def set_pass_flows(self, pass_flows: List[List[str]] = None): """Construct pass flows from a list of pass names. Args: pass_flows: a list of pass names, each pass name is a string. """ if not pass_flows: self.pass_flows = [list(self.pass_config.keys())] if self.pass_config else [] else: self.pass_flows = pass_flows
[docs] def run( self, input_model_config: ModelConfig, accelerator_specs: List["AcceleratorSpec"], data_root: str = None, packaging_config: Optional[Union["PackagingConfig", List["PackagingConfig"]]] = None, output_dir: str = None, output_name: str = None, evaluate_input_model: bool = True, ): """Run all the registered Olive passes on the input model and produce one or more candidate models. Args: input_model_config: input Olive model configuration accelerator_specs: list of accelerator specs data_root: data root for the input data packaging_config: packaging configuration, if packaging_config is provided, the output model will be packaged into a zip file. output_dir: output directory for the output model output_name: output name for the output model, if output_name is provided, the output model will be saved to engine's output_dir with the prefix of output_name. evaluate_input_model: if evaluate_input_model is True, run the evaluation on the input model. Return: if search strategy is None, all passes are run in the order they were registered. 1. Final model -> {output_dir}/{output_name}_{AcceleratorSpec}_model.onnx 2. JSON file -> {output_dir}/{output_name}_{AcceleratorSpec}_model.json 3. Evaluation results of the final model -> {output_dir}/{output_name}_{AcceleratorSpec}_metrics.json Return footprint/zip(packaging_config) of the final model and evaluation results of the final model. if search strategy is not None, run the search strategy to find candidate models. Return footprint/zip(packaging_config) of candidate models and evaluation results. """ if not accelerator_specs: raise ValueError("No accelerator specified") if not self._initialized: self.initialize() output_dir: Path = Path(output_dir) if output_dir else Path.cwd() output_dir.mkdir(parents=True, exist_ok=True) outputs = {} for accelerator_spec in accelerator_specs: logger.info("Running Olive on accelerator: %s", accelerator_spec) with self._create_system(accelerator_spec): run_result = self.run_accelerator( input_model_config, data_root, output_dir, output_name, evaluate_input_model, accelerator_spec, ) if run_result is None: continue outputs[accelerator_spec] = run_result for accelerator_spec in self.footprints: logger.info("Run history for %s:", accelerator_spec) run_history = self.footprints[accelerator_spec].summarize_run_history() self.dump_run_history(run_history, output_dir / f"run_history_{accelerator_spec}.txt") if packaging_config and self.passes: # TODO(trajep): should we support packaging pytorch model? logger.info("Package top ranked %d models as artifacts", sum(len(f.nodes) for f in outputs.values())) generate_output_artifacts( packaging_config, self.footprints, outputs, output_dir, self.azureml_client_config, ) else: logger.info("No packaging config provided, skip packaging artifacts") return outputs
def run_accelerator( self, input_model_config: ModelConfig, data_root: str, output_dir: Path, output_name: str, evaluate_input_model: bool, accelerator_spec: "AcceleratorSpec", ): # generate search space and initialize the passes for each hardware accelerator self.setup_passes(accelerator_spec) # hash the input model input_model_id = self._init_input_model(input_model_config) self.footprints[accelerator_spec].record(model_id=input_model_id) prefix_output_name = Engine._get_prefix_output_name(output_name, accelerator_spec) try: if evaluate_input_model and not self.evaluator_config: logger.debug( "evaluate_input_model is True but no evaluator provided in no-search mode. Skipping input model" " evaluation." ) elif evaluate_input_model: results = self._evaluate_model( input_model_config, input_model_id, data_root, self.evaluator_config, accelerator_spec ) logger.info("Input model evaluation results: %s", results) result_name = f"{prefix_output_name}_input_model_metrics" results_path = output_dir / f"{result_name}.json" with results_path.open("w") as f: json.dump(results.to_json(), f, indent=4) logger.info("Saved evaluation results of input model to %s", results_path) if not self.passes: logger.debug("No passes registered, return input model evaluation results.") return results if self.no_search: logger.debug("Running Olive in no-search mode ...") output_footprint = self.run_no_search( input_model_config, input_model_id, data_root, accelerator_spec, output_dir, output_name, ) else: logger.debug("Running Olive in search mode ...") output_footprint = self.run_search( input_model_config, input_model_id, data_root, accelerator_spec, output_dir, output_name, ) except EXCEPTIONS_TO_RAISE: raise except Exception: logger.warning("Failed to run Olive on %s.", accelerator_spec, exc_info=True) return None output_fp_path = output_dir / f"{prefix_output_name}_footprints.json" logger.info("Save footprint to %s.", output_fp_path) self.footprints[accelerator_spec].to_file(output_fp_path) logger.debug("run_accelerator done") return output_footprint def get_host_device(self): if self.host_config.config.accelerators: # for host device, we will always use the first accelerator device return self.host_config.config.accelerators[0].device else: return None def setup_passes(self, accelerator_spec: "AcceleratorSpec"): host_device = self.get_host_device() # clean the passes self.passes.clear() for name, config in self.pass_config.items(): pass_cls: Type["Pass"] = config["type"] pass_cfg = config["config"] pass_cfg = pass_cls.generate_search_space(accelerator_spec, pass_cfg, config["disable_search"]) p = pass_cls(accelerator_spec, pass_cfg, config["disable_search"], host_device) self.register_pass( p, name=name, host=config["host"], evaluator_config=config["evaluator"], output_name=config["output_name"], ) # list of passes starting from the first pass with non-empty search space # These passes will be added to the search space self.pass_flows_search_spaces = [] for pass_flow in self.pass_flows: pass_search_spaces = [] for pass_name in pass_flow: p: "Pass" = self.passes[pass_name]["pass"] pass_search_spaces.append((pass_name, p.search_space)) self.pass_flows_search_spaces.append(pass_search_spaces) def reset_passes(self): """Cleanup the passes.""" self.passes.clear() self.pass_config.clear() self.pass_flows = [] def run_no_search( self, input_model_config: ModelConfig, input_model_id: str, data_root: str, accelerator_spec: "AcceleratorSpec", output_dir: str = None, output_name: str = None, ): """Run all the registered Olive pass flows in no-search mode.""" for pass_item in self.passes.values(): if len(pass_item["pass"].search_space) > 0: pass_name = pass_item["name"] raise ValueError(f"Pass {pass_name} has search space but search strategy is None") output_models = {} for pass_flow in self.pass_flows: # search point is empty since there is no search passes_to_run = [(pass_id, {}) for pass_id in pass_flow] # run all the passes in the pass flow logger.debug("Running %s with no search ...", pass_flow) should_prune, signal, model_ids = self._run_passes( passes_to_run, input_model_config, input_model_id, data_root, accelerator_spec ) if should_prune: failed_pass = pass_flow[len(model_ids)] logger.warning( "Flow %s is pruned due to failed or invalid config for pass '%s'", pass_flow, failed_pass ) continue # names of the output models of the passes pass_output_names = [self.passes[pass_id]["output_name"] for pass_id in pass_flow] pass_output_names = [f"{name}_{accelerator_spec}" if name else None for name in pass_output_names] # output dir with pass flow output_dir_with_pf = Path(output_dir) / "-".join(pass_flow) if not pass_output_names[-1] or output_name: # if the last pass does not have output name, use the prefix output name pass_output_names[-1] = Engine._get_prefix_output_name(output_name, accelerator_spec) final_output_name = pass_output_names[-1] output_model_json = None for pass_output_name, pass_output_model_id in zip(pass_output_names, model_ids): if not pass_output_name: continue output_model_json = cache_utils.save_model( model_number=pass_output_model_id, output_dir=output_dir_with_pf, output_name=f"{pass_output_name}_model", overwrite=True, cache_dir=self.cache_dir, ) # it is not supported to save compositepytorchmodel/compositemodel again # so the output_model_json could be None output_models[pass_output_model_id] = output_model_json # save the evaluation results to output_dir if signal is not None: results_path = output_dir_with_pf / f"{final_output_name}_metrics.json" with results_path.open("w") as f: json.dump(signal.to_json(), f, indent=4) output_model_ids = list(output_models.keys()) fp_outputs = self.footprints[accelerator_spec].create_footprints_by_model_ids(output_model_ids) # update the output model config for model_id, model_config in output_models.items(): if model_config: fp_outputs.nodes[model_id].model_config = model_config return fp_outputs def run_search( self, input_model_config: ModelConfig, input_model_id: str, data_root: str, accelerator_spec: "AcceleratorSpec", output_dir: str = None, output_name: str = None, ): """Run all the registered Olive passes in search model where search strategy is not None.""" prefix_output_name = Engine._get_prefix_output_name(output_name, accelerator_spec) # get objective_dict evaluator_config = self.evaluator_for_pass(list(self.passes.keys())[-1]) if evaluator_config is None: raise ValueError("No evaluator provided for the last pass") else: objective_dict = self.resolve_objectives( input_model_config, input_model_id, data_root, evaluator_config.metrics, accelerator_spec ) self.footprints[accelerator_spec].record_objective_dict(objective_dict) # initialize the search strategy self.search_strategy.initialize(self.pass_flows_search_spaces, input_model_id, objective_dict) output_model_num = self.search_strategy.get_output_model_num() # record start time start_time = time.time() iter_num = 0 while True: iter_num += 1 # get the next step next_step = self.search_strategy.next_step() # if no more steps, break if next_step is None: break # get the model id of the first input model model_id = next_step["model_id"] if model_id == input_model_id: model_config = input_model_config else: model_config = self._load_model(model_id) logger.debug("Step %d with search point %s ...", iter_num, next_step["search_point"]) # run all the passes in the step should_prune, signal, model_ids = self._run_passes( next_step["passes"], model_config, model_id, data_root, accelerator_spec ) # record feedback signal self.search_strategy.record_feedback_signal(next_step["search_point"], signal, model_ids, should_prune) time_diff = time.time() - start_time self.search_strategy.check_exit_criteria(iter_num, time_diff, signal) return self.create_pareto_frontier_footprints( accelerator_spec, output_model_num, output_dir, prefix_output_name ) def create_pareto_frontier_footprints(self, accelerator_spec, output_model_num, output_dir, prefix_output_name): pf_footprints = self.footprints[accelerator_spec].create_pareto_frontier(output_model_num) if not pf_footprints: return None pf_footprints.to_file(output_dir / f"{prefix_output_name}_pareto_frontier_footprints.json") if self.plot_pareto_frontier: pf_footprints.plot_pareto_frontier_to_html( save_path=output_dir / f"{prefix_output_name}_pareto_frontier_footprints_chart.html" ) return pf_footprints def dump_run_history(self, run_history, output_path: str = None): if not run_history: logger.info("No run history to dump!") return headers = run_history[0]._fields try: from tabulate import tabulate formatted_rls = tabulate([tuple(rh) for rh in run_history], headers=headers, tablefmt="grid") logger.info("run history:\n%s", formatted_rls) except ImportError: logger.info("Please install tabulate for better run history output") formatted_rls = run_history with Path(output_path).open("w") as f: f.write(f"{formatted_rls}") def resolve_objectives( self, input_model_config: ModelConfig, input_model_id: str, data_root: str, metrics: List[Metric], accelerator_spec: "AcceleratorSpec", ) -> Dict[str, Dict[str, Any]]: """Return a dictionary of objectives and their higher_is_better and goal values. {objective_name: {"higher_is_better": bool, "goal": float}} """ goals = self.resolve_goals(input_model_config, input_model_id, data_root, metrics, accelerator_spec) objective_dict = {} for metric in metrics: for sub_type in metric.sub_types: if sub_type.priority <= 0: continue metric_key = joint_metric_key(metric.name, sub_type.name) objective_dict[metric_key] = { "higher_is_better": sub_type.higher_is_better, "goal": goals.get(metric_key), "priority": sub_type.priority, } return dict(sorted(objective_dict.items(), key=lambda x: x[1]["priority"])) def resolve_goals( self, input_model_config: ModelConfig, input_model_id: str, data_root: str, metrics: List[Metric], accelerator_spec: "AcceleratorSpec", ) -> Dict[str, float]: """Resolve the goals of the given metrics into thresholds for the given model.""" goals = {} multipliers = {} for metric in metrics: # only resolve sub metrics whose priority > 0 goals[metric.name] = metric.get_sub_type_info("goal") multipliers[metric.name] = metric.get_sub_type_info( info_name="higher_is_better", callback=lambda x: 1 if x else -1, ) if goals: logger.debug("Resolving goals: %s", goals) baseline = None for goal in goals.values(): _evaluated = False for sub_goal in goal.values(): if not sub_goal: break if sub_goal.type != "threshold": assert self.evaluator_config is not None, "Default evaluator must be provided to resolve goals" logger.debug("Computing baseline for metrics ...") baseline = self._evaluate_model( input_model_config, input_model_id, data_root, self.evaluator_config, accelerator_spec ) _evaluated = True break if _evaluated: break if not baseline: logger.debug("No baseline got as no goal is provided the the goal is threshold") return {} if baseline: logger.debug("Baseline: %s", baseline) # resolve goals to thresholds resolved_goals = {} for metric_name, sub_type_goals in goals.items(): for sub_type_name, goal in sub_type_goals.items(): # TODO(trajep): make the logic cleaner resolved_goal_value = None if goal is not None: baseline_sub_type = baseline.get_value(metric_name, sub_type_name) multiplier = multipliers[metric_name][sub_type_name] if goal.type == "threshold": resolved_goal_value = goal.value elif goal.type == "max-degradation": resolved_goal_value = baseline_sub_type - multiplier * goal.value elif goal.type == "min-improvement": resolved_goal_value = baseline_sub_type + multiplier * goal.value elif goal.type == "percent-max-degradation": resolved_goal_value = baseline_sub_type * (1 - multiplier * goal.value / 100) elif goal.type == "percent-min-improvement": resolved_goal_value = baseline_sub_type * (1 + multiplier * goal.value / 100) resolved_goals[joint_metric_key(metric_name, sub_type_name)] = resolved_goal_value if len(resolved_goals) > 0: logger.debug("Resolved goals: %s", resolved_goals) return resolved_goals def host_for_pass(self, pass_id: str): host = self.passes[pass_id]["host"] if host is None: return self.host return host def evaluator_for_pass(self, pass_id: str): """Return evaluator for the given pass.""" e = self.passes[pass_id]["evaluator"] if e is None: return self.evaluator_config return e def _get_new_model_number(self): """Get a new model number.""" while True: new_model_number = self._new_model_number self._new_model_number += 1 if not list(self._model_cache_path.glob(f"{new_model_number}_*")): break return new_model_number def get_model_json_path(self, model_id: str) -> Path: """Get the path to the model json file.""" return self._model_cache_path / f"{model_id}.json" def _cache_model(self, model: Union[ModelConfig, str], model_id: str, check_object: bool = True): """Cache the model in the cache directory.""" # TODO(trajep): move model/pass run/evaluation cache into footprints if model == FAILED_CONFIG: model_json = {} else: model_json = model.to_json(check_object=check_object) model_json_path = self.get_model_json_path(model_id) try: with model_json_path.open("w") as f: json.dump(model_json, f, indent=4) logger.debug("Cached model %s to %s", model_id, model_json_path) except Exception: logger.exception("Failed to cache model") def _load_model(self, model_id: str) -> Union[ModelConfig, str]: """Load the model from the cache directory.""" model_json_path = self.get_model_json_path(model_id) try: with model_json_path.open() as f: model_json = json.load(f) except Exception: logger.exception("Failed to load model.") return None if model_json == {}: return FAILED_CONFIG return ModelConfig.from_json(model_json) def _prepare_non_local_model(self, model_config: ModelConfig) -> ModelConfig: """Prepare models with non-local model path for local run by downloading the model resources to cache.""" # TODO(myguo): maybe we can move this method into OliveSystem? resource_paths = model_config.get_resource_paths() for resource_name, resource_path in resource_paths.items(): if not resource_path or resource_path.is_local_resource_or_string_name(): continue downloaded_resource_path = cache_utils.download_resource(resource_path, self.cache_dir) if downloaded_resource_path: # set local resource path model_config.config[resource_name] = downloaded_resource_path return model_config def _init_input_model(self, input_model_config: ModelConfig): """Initialize the input model.""" model_hash = hash_dict(input_model_config.to_json())[:8] # cache the model self._cache_model(input_model_config, model_hash, check_object=False) return model_hash def get_run_json_path( self, pass_name: int, input_model_number: str, pass_config: dict, accelerator_spec: "AcceleratorSpec", ): """Get the path to the run json.""" pass_config_hash = hash_dict(pass_config)[:8] if not accelerator_spec: run_json_path = self._run_cache_path / f"{pass_name}-{input_model_number}-{pass_config_hash}.json" else: run_json_path = ( self._run_cache_path / f"{pass_name}-{input_model_number}-{pass_config_hash}-{accelerator_spec}.json" ) return run_json_path def _cache_run( self, pass_name: int, pass_config: dict, input_model_id: str, output_model_id: str, accelerator_spec: "AcceleratorSpec", run_start_time: float = 0, run_end_time: float = 0, ): """Cache the run in the cache directory.""" run_json = { "pass_name": pass_name, "pass_config": pass_config, "input_model_id": input_model_id, "output_model_id": output_model_id, "run_start_time": run_start_time, "run_end_time": run_end_time, } input_model_number = input_model_id.split("_")[0] run_json_path = self.get_run_json_path(pass_name, input_model_number, pass_config, accelerator_spec) try: with run_json_path.open("w") as f: json.dump(run_json, f, indent=4) logger.debug("Cached run for %s->%s into %s", input_model_id, output_model_id, run_json_path) except Exception: logger.exception("Failed to cache run") def _load_run(self, input_model_id: str, pass_name: int, pass_config: dict, accelerator_spec: "AcceleratorSpec"): """Load the run from the cache directory.""" input_model_number = input_model_id.split("_")[0] run_json_path = self.get_run_json_path(pass_name, input_model_number, pass_config, accelerator_spec) run_json = {} if run_json_path.exists(): try: with run_json_path.open() as f: run_json = json.load(f) except Exception: logger.exception("Failed to load run") run_json = {} return run_json def _run_passes( self, passes: List[Tuple[str, Dict[str, Any]]], model_config: ModelConfig, model_id: str, data_root: str, accelerator_spec: "AcceleratorSpec", ): """Run all the passes in the order they were registered. the passes is the list of (pass_name, pass_search_point) tuples """ should_prune = False # run all the passes in the step model_ids = [] pass_id = None for pass_id, pass_search_point in passes: model_config, model_id = self._run_pass( pass_id, pass_search_point, model_config, model_id, data_root, accelerator_spec ) if model_config in PRUNED_CONFIGS: should_prune = True logger.debug("Pruned for pass %s", pass_id) break model_ids.append(model_id) if not should_prune: # evaluate the model evaluator_config = self.evaluator_for_pass(pass_id) if self.no_search and evaluator_config is None: # skip evaluation if no search and no evaluator signal = None else: logger.info("Run model evaluation for the final model...") signal = self._evaluate_model(model_config, model_id, data_root, evaluator_config, accelerator_spec) logger.debug("Signal: %s", signal) else: signal = None logger.warning("Skipping evaluation as model was pruned") return should_prune, signal, model_ids def _run_pass( self, pass_id: str, pass_search_point: Dict[str, Any], input_model_config: ModelConfig, input_model_id: str, data_root: str, accelerator_spec: "AcceleratorSpec", ): """Run a pass on the input model.""" # pass p: "Pass" = self.passes[pass_id]["pass"] pass_name = p.__class__.__name__ logger.info("Running pass %s:%s", pass_id, pass_name) pass_config = p.config_at_search_point(pass_search_point) pass_config = p.serialize_config(pass_config) # check whether the config is valid if not p.validate_search_point(pass_search_point, accelerator_spec, with_fixed_value=True): logger.warning("Invalid search point, prune") output_model_config = INVALID_CONFIG # no need to record in footprint since there was no run and thus no valid/failed model # invalid configs are also not cached since the same config can be valid for other accelerator specs # a pass can be accelerator agnostic but still have accelerator specific invalid configs # this helps reusing cached models for different accelerator specs return output_model_config, None # load run from cache if it exists run_accel = None if p.is_accelerator_agnostic(accelerator_spec) else accelerator_spec run_cache = self._load_run(input_model_id, pass_name, pass_config, run_accel) output_model_id = run_cache.get("output_model_id", None) if output_model_id is not None: logger.debug("Loading model from cache ...") output_model_config = self._load_model(output_model_id) if output_model_config is not None: # footprint model and run self.footprints[accelerator_spec].record( model_id=output_model_id, model_config=( output_model_config.to_json() if output_model_config != FAILED_CONFIG else {"is_pruned": True} ), parent_model_id=input_model_id, from_pass=pass_name, pass_run_config=pass_config, start_time=run_cache.get("run_start_time", 0), end_time=run_cache.get("run_end_time", 0), ) logger.info("Loaded model from cache: %s from %s", output_model_id, self._run_cache_path) return output_model_config, output_model_id # new model id input_model_number = input_model_id.split("_")[0] # Note: the final output model id need contains the accelerator information # if the output model is accelerator dependent. output_model_id_parts = [ f"{self._get_new_model_number()}_{pass_name}", input_model_number, hash_dict(pass_config)[:8], ] if not p.is_accelerator_agnostic(accelerator_spec): output_model_id_parts.append(f"{accelerator_spec}") output_model_id = "-".join(map(str, output_model_id_parts)) output_model_path = self._model_cache_path / f"{output_model_id}" / "output_model" output_model_path.parent.mkdir(parents=True, exist_ok=True) output_model_path = str(output_model_path) # run pass host = self.host_for_pass(pass_id) if host.system_type != SystemType.AzureML: input_model_config = self._prepare_non_local_model(input_model_config) run_start_time = datetime.now().timestamp() try: if p.run_on_target: if self.target.system_type == SystemType.IsolatedORT: logger.warning( "Cannot run pass %s on IsolatedORT target, will use the host to run the pass.", pass_id ) else: host = self.target output_model_config = host.run_pass(p, input_model_config, data_root, output_model_path, pass_search_point) except OlivePassError: logger.exception("Pass run_pass failed") output_model_config = FAILED_CONFIG except EXCEPTIONS_TO_RAISE: # Don't catch these errors since most of time, it is caused by the user errors and need not retry. raise except Exception: output_model_config = FAILED_CONFIG # TODO(jambayk): from the time being, we need to catch all exceptions to make the # search process robust. We need rethrow the exception only when # it is not pass specific. For example, for olive bugs and user errors logger.exception("Pass run failed.") if self.no_search: raise # rethrow the exception if no search is performed run_end_time = datetime.now().timestamp() logger.info("Pass %s:%s finished in %f seconds", pass_id, pass_name, run_end_time - run_start_time) # cache model self._cache_model(output_model_config, output_model_id) # cache run self._cache_run( pass_name, pass_config, input_model_id, output_model_id, run_accel, run_start_time, run_end_time ) # footprint model and run self.footprints[accelerator_spec].record( model_id=output_model_id, model_config=output_model_config.to_json() if output_model_config != FAILED_CONFIG else {"is_pruned": True}, parent_model_id=input_model_id, from_pass=pass_name, pass_run_config=pass_config, start_time=run_start_time, end_time=run_end_time, ) return output_model_config, output_model_id def get_evaluation_json_path(self, model_id: str): """Get the path to the evaluation json.""" return self._evaluation_cache_path / f"{model_id}.json" def _cache_evaluation(self, model_id: str, signal: MetricResult): """Cache the evaluation in the cache directory.""" evaluation_json = { "model_id": model_id, "signal": signal.dict(), } evaluation_json_path = self.get_evaluation_json_path(model_id) try: with evaluation_json_path.open("w") as f: json.dump(evaluation_json, f, indent=4) except Exception: logger.exception("Failed to cache evaluation") def _load_evaluation(self, model_id: str): """Load the evaluation from the cache directory.""" evaluation_json_path = self.get_evaluation_json_path(model_id) if evaluation_json_path.exists(): try: with evaluation_json_path.open() as f: evaluation_json = json.load(f) signal = evaluation_json["signal"] signal = MetricResult(**signal) except Exception: logger.exception("Failed to load evaluation") signal = None return signal else: return None def _evaluate_model( self, model_config: ModelConfig, model_id: str, data_root: str, evaluator_config: "OliveEvaluatorConfig", accelerator_spec: "AcceleratorSpec", ): """Evaluate a model.""" logger.debug("Evaluating model ...") accelerator_suffix = f"-{accelerator_spec}" if accelerator_spec else "" if not model_id.endswith(accelerator_suffix): # append the suffix if the model is accelerator independent model_id_with_accelerator = f"{model_id}{accelerator_suffix}" else: model_id_with_accelerator = model_id # load evaluation from cache if it exists signal = self._load_evaluation(model_id_with_accelerator) if signal is not None: logger.debug("Loading evaluation from cache ...") # footprint evaluation self.footprints[accelerator_spec].record( model_id=model_id, metrics=FootprintNodeMetric( value=signal, if_goals_met=False, ), ) return signal # evaluate model metrics = evaluator_config.metrics if evaluator_config else [] if self.target.system_type != SystemType.AzureML: model_config = self._prepare_non_local_model(model_config) signal = self.target.evaluate_model(model_config, data_root, metrics, accelerator_spec) # cache evaluation self._cache_evaluation(model_id_with_accelerator, signal) # footprint evaluation self.footprints[accelerator_spec].record( model_id=model_id, metrics=FootprintNodeMetric( value=signal, if_goals_met=False, ), ) return signal @staticmethod def _get_prefix_output_name(output_name: str, accelerator_spec: "AcceleratorSpec"): return f"{output_name}_{accelerator_spec}" if output_name else str(accelerator_spec) @contextmanager def _create_system(self, accelerator_spec): def create_system(config: "SystemConfig", accelerator_spec): assert config, "System config is not provided" if config.olive_managed_env: logger.debug( "Creating olive_managed_env %s with EP %s", config.type, accelerator_spec.execution_provider ) return create_managed_system_with_cache(config, accelerator_spec) else: logger.debug("create native OliveSystem %s", config.type) return config.create_system() if not self.target: logger.info("Creating target system ...") target_start_time = time.time() self.target = create_system(self.target_config, accelerator_spec) logger.info("Target system created in %f seconds", time.time() - target_start_time) if not self.host: host_accelerators = self.host_config.config.accelerators if host_accelerators and host_accelerators[0].execution_providers: host_accelerator_spec = AcceleratorSpec( host_accelerators[0].device, host_accelerators[0].execution_providers[0] ) else: host_accelerator_spec = None logger.info("Creating host system ...") host_start_time = time.time() self.host = create_system(self.host_config, host_accelerator_spec) logger.info("Host system created in %f seconds", time.time() - host_start_time) yield if self.target_config.olive_managed_env: # could we put it under cache system for reusing? logger.info("Removing target system ...") self.target.remove() self.target = None if self.host_config.olive_managed_env: logger.info("Removing host system ...") self.host.remove() self.host = None create_managed_system_with_cache.cache_clear()