# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
import json
import logging
import time
from collections import OrderedDict, defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type, Union
import olive.cache as cache_utils
from olive.common.config_utils import ConfigBase, validate_config
from olive.common.utils import hash_dict
from olive.engine.config import FAILED_CONFIG, INVALID_CONFIG, PRUNED_CONFIGS, EngineConfig
from olive.engine.footprint import Footprint, FootprintNode, FootprintNodeMetric
from olive.engine.packaging.packaging_config import PackagingConfig
from olive.engine.packaging.packaging_generator import generate_output_artifacts
from olive.evaluator.metric import Metric, MetricResult, joint_metric_key
from olive.evaluator.olive_evaluator import OliveEvaluatorConfig
from olive.exception import OlivePassException
from olive.hardware import AcceleratorLookup, AcceleratorSpec, Device
from olive.model import ModelConfig, OliveModel
from olive.passes.olive_pass import Pass
from olive.strategy.search_strategy import SearchStrategy
from olive.systems.common import SystemType
from olive.systems.local import LocalSystem
from olive.systems.olive_system import OliveSystem
logger = logging.getLogger(__name__)
EXCEPTIONS_TO_RAISE = (AssertionError, AttributeError, ImportError, TypeError, ValueError)
[docs]class Engine:
"""
The engine executes the registered Olive Steps, facilitate evaluation of the output models using
provided evaluation criteria and produces output model(s).
"""
def __init__(
self,
config: Union[Dict[str, Any], EngineConfig] = None,
search_strategy: Optional[SearchStrategy] = None,
host: Optional[OliveSystem] = None,
target: Optional[OliveSystem] = None,
evaluator_config: Optional[OliveEvaluatorConfig] = None,
execution_providers: Optional[List[str]] = None,
):
self._config = validate_config(config, EngineConfig)
self.no_search = False
# default search strategy
self.search_strategy = SearchStrategy({"execution_order": "joint", "search_algorithm": "exhaustive"})
if search_strategy is not None:
# if search strategy is provided, use it. It takes precedence
self.search_strategy = search_strategy
elif isinstance(self._config.search_strategy, ConfigBase) or isinstance(self._config.search_strategy, dict):
# if search strategy is provided in config, use it
self.search_strategy = SearchStrategy(self._config.search_strategy)
elif not self._config.search_strategy:
# if search strategy is None or False, disable search
self.no_search = True
# default host
if host is not None:
self.host = host
elif self._config.host is not None:
self.host = self._config.host.create_system()
else:
# host accelerator is not used, so no need to specify it
self.host = LocalSystem()
# engine target
if target is not None:
self.target = target
elif self._config.target is not None:
self.target = self._config.target.create_system()
else:
self.target = LocalSystem()
if execution_providers is None:
execution_providers = self._config.execution_providers
# verify the AzureML system have specified the execution providers
# Please note we could not use isinstance(target, AzureMLSystem) since it would import AzureML packages.
if self.target.system_type == SystemType.AzureML and execution_providers is None:
raise ValueError("AzureMLSystem requires execution providers to be specified.")
elif execution_providers is None:
if self.target.system_type in (SystemType.Local, SystemType.PythonEnvironment):
execution_providers = self.target.get_supported_execution_providers()
else:
# for docker system and python system, we default use CPUExecutionProvider
execution_providers = ["CPUExecutionProvider"]
self.execution_providers = execution_providers
# Flatten the accelerators to list of AcceleratorSpec
accelerators: List[str] = self.target.accelerators
if accelerators is None:
inferred_accelerators = AcceleratorLookup.infer_accelerators_from_execution_provider(
self.execution_providers
)
if not inferred_accelerators:
logger.warning("Cannot infer the accelerators from the target system. Use CPU as default.")
accelerators = ["CPU"]
else:
logger.debug(
f"Use inferred accelerators {inferred_accelerators} "
f"from given execution providers {self.execution_providers}."
)
accelerators = inferred_accelerators
not_supported_ep = set()
processed_ep = set()
self.accelerator_specs: List[AcceleratorSpec] = []
is_cpu_available = "cpu" in [accelerator.lower() for accelerator in accelerators]
for accelerator in accelerators:
device = Device(accelerator.lower())
supported_eps = AcceleratorLookup.get_execution_providers_for_device(device)
eps = [e for e in self.execution_providers if e not in processed_ep]
for ep in eps:
if ep not in supported_eps:
not_supported_ep.add(ep)
processed_ep.add(ep)
elif ep == "CPUExecutionProvider" and device != "cpu" and is_cpu_available:
logger.info("ignore the CPUExecutionProvider for non-cpu device")
else:
self.accelerator_specs.append(AcceleratorSpec(device, ep))
processed_ep.add(ep)
assert self.accelerator_specs, (
"No valid accelerator specified for target system. "
"Please specify the accelerators in the target system or provide valid execution providers. "
f"Given execution providers: {self.execution_providers}. "
f"Current accelerators: {accelerators}."
f"Supported execution providers: {AcceleratorLookup.EXECUTION_PROVIDERS}."
)
if not_supported_ep:
logger.warning(
f"The following execution provider is not supported: {','.join(not_supported_ep)}. "
"Please consider installing an onnxruntime build that contains the relevant execution providers. "
)
# default evaluator
self.evaluator_config = None
if evaluator_config is not None:
self.evaluator_config = evaluator_config
elif self._config.evaluator is not None:
self.evaluator_config = self._config.evaluator
# dictionary of passes
self.pass_config = OrderedDict()
# {"pass_name": {"pass": pass, "host": host, "evaluator": evaluator, "clean_run_cache": clean_run_cache}}
self.passes = OrderedDict()
self.pass_flows = None
self.pass_flows_search_spaces = None
self.footprints = defaultdict(Footprint)
self.azureml_client_config = self._config.azureml_client_config
self._initialized = False
def initialize(self):
"""
Initialize engine state. This should be done before running the registered passes.
"""
cache_dir = self._config.cache_dir
if self._config.clean_cache:
cache_utils.clean_cache(cache_dir)
if self._config.clean_evaluation_cache:
cache_utils.clean_evaluation_cache(cache_dir)
self._model_cache_path, self._run_cache_path, self._evaluation_cache_path, _ = cache_utils.get_cache_sub_dirs(
cache_dir
)
cache_utils.create_cache(cache_dir)
# initialize counters
# we do this before cleaning pass run caches to ensure we don't reuse model numbers even if the model was
# deleted from the cache
self._new_model_number = 0
# model jsons have the format <model_number>_<pass_type>-<source_model>-<pass_config_hash>.json
# model contents are stored in <model_number>_<pass_type>-<source_model>-<pass_config_hash> folder
# sometimes the folder is created with contents but the json is not created when the pass fails to run
# so we check for both when determining the new model number
model_files = list(self._model_cache_path.glob("*_*"))
if len(model_files) > 0:
self._new_model_number = max([int(model_file.stem.split("_")[0]) for model_file in model_files]) + 1
# clean pass run cache if requested
# removes all run cache for pass type and all children elements
for pass_config in self.pass_config.values():
clean_run_cache = pass_config["clean_run_cache"]
if clean_run_cache:
cache_utils.clean_pass_run_cache(pass_config["type"].__name__, cache_dir)
self.set_pass_flows(self.pass_flows)
self._initialized = True
[docs] def register(
self,
pass_type: Type[Pass],
config: Dict[str, Any] = None,
disable_search=False,
name: str = None,
host: OliveSystem = None,
evaluator_config: OliveEvaluatorConfig = None,
clean_run_cache: bool = False,
output_name: str = None,
):
"""Register a pass configuration so that it could be instantiated and executed later."""
if name is not None:
assert name not in self.passes, f"Pass with name {name} already registered"
else:
id = 0
while True:
name = pass_type.__name__
if id > 0:
name = f"{name}_{id}"
id += 1
if name not in self.pass_config:
break
self.pass_config[name] = {
"type": pass_type,
"config": config or {},
"disable_search": disable_search,
"host": host,
"evaluator": evaluator_config,
"clean_run_cache": clean_run_cache,
"output_name": output_name,
}
def register_pass(
self,
p: Pass,
name: str = None,
host: OliveSystem = None,
evaluator_config: OliveEvaluatorConfig = None,
output_name: str = None,
):
"""
Register a pass
"""
if name is not None:
assert name not in self.passes, f"Pass with name {name} already registered"
else:
id = 0
while True:
name = p.__class__.__name__
if id > 0:
name = f"{name}_{id}"
id += 1
if name not in self.passes:
break
if self.no_search and len(p.search_space()) > 0:
raise ValueError(f"Search strategy is None but pass {name} has search space")
if output_name and not self.no_search:
# In no-search mode, if output_name is provided, the output model of the pass will be saved to
# engine's output_dir with the prefix of output_name.
logger.debug(f"output_name {output_name} for pass {name} will be ignored if search strategy is None")
self.passes[name] = {
"pass": p,
"host": host,
"evaluator": evaluator_config,
"output_name": output_name,
}
def set_pass_flows(self, pass_flows: List[List[str]] = None):
"""
Construct pass flows from a list of pass names.
Args:
pass_flows: a list of pass names, each pass name is a string.
"""
if not pass_flows:
self.pass_flows = [list(self.pass_config.keys())] if self.pass_config else []
else:
self.pass_flows = pass_flows
[docs] def run(
self,
input_model: OliveModel,
data_root: str = None,
packaging_config: Optional[PackagingConfig] = None,
output_dir: str = None,
output_name: str = None,
evaluate_input_model: bool = True,
):
"""
Run all the registered Olive passes on the input model and produce one or more candidate models.
Args:
input_model: input Olive model
packaging_config: packaging configuration, if packaging_config is provided, the output
model will be packaged into a zip file.
output_dir: output directory for the output model
output_name: output name for the output model, if output_name is provided, the output
model will be saved to engine's output_dir with the prefix of output_name.
evaluate_input_model: if evaluate_input_model is True, run the evaluation on the input model.
Return:
if search strategy is None, all passes are run in the order they were registered.
1. Final model -> {output_dir}/{output_name}_{AcceleratorSpec}_model.onnx
2. JSON file -> {output_dir}/{output_name}_{AcceleratorSpec}_model.json
3. Evaluation results of the final model -> {output_dir}/{output_name}_{AcceleratorSpec}_metrics.json
Return footprint/zip(packaging_config) of the final model and evaluation results of the final model.
if search strategy is not None, run the search strategy to find candidate models.
Return footprint/zip(packaging_config) of candidate models and evaluation results.
"""
if not self._initialized:
self.initialize()
output_dir: Path = Path(output_dir) if output_dir else Path.cwd()
output_dir.mkdir(parents=True, exist_ok=True)
outputs = {}
pf_footprints = {}
for accelerator_spec in self.accelerator_specs:
# generate search space and initialize the passes for each hardware accelerator
self.setup_passes(accelerator_spec)
# hash the input model
input_model_id = self._init_input_model(input_model)
self.footprints[accelerator_spec].record(model_id=input_model_id)
try:
if evaluate_input_model:
prefix_output_name = (
f"{output_name}_{accelerator_spec}_" if output_name is not None else f"{accelerator_spec}"
)
assert self.evaluator_config is not None, "evaluate_input_model is True but no evaluator provided"
results = self._evaluate_model(
input_model, input_model_id, data_root, self.evaluator_config, accelerator_spec
)
logger.info(f"Input model evaluation results: {results}")
result_name = f"{prefix_output_name}_input_model_metrics"
results_path = output_dir / f"{result_name}.json"
with open(results_path, "w") as f:
json.dump(results.to_json(), f, indent=4)
logger.info(f"Saved evaluation results of input model to {results_path}")
outputs[accelerator_spec] = results
if not self.passes:
logger.debug("No passes registered, return input model evaluation results.")
return outputs
if self.no_search:
output, model_ids = self.run_no_search(
input_model,
input_model_id,
data_root,
accelerator_spec,
output_dir,
output_name,
)
if output:
outputs[accelerator_spec] = output
pf_footprints[accelerator_spec] = self.footprints[accelerator_spec].get_footprints_by_model_ids(
model_ids
)
else:
footprint = self.run_search(
input_model,
input_model_id,
data_root,
accelerator_spec,
output_dir,
output_name,
)
outputs[accelerator_spec] = footprint
pf_footprints[accelerator_spec] = footprint
except EXCEPTIONS_TO_RAISE:
raise
except Exception as e:
logger.warning(f"Failed to run Olive on {accelerator_spec}: {e}", exc_info=True)
if packaging_config:
logger.info(f"Package top ranked {sum([len(f.nodes) for f in pf_footprints.values()])} models as artifacts")
generate_output_artifacts(
packaging_config,
self.footprints,
pf_footprints,
output_dir,
)
else:
logger.info("No packaging config provided, skip packaging artifacts")
return outputs
def setup_passes(self, accelerator_spec: AcceleratorSpec):
# clean the passes
self.passes.clear()
for config in self.pass_config.values():
pass_cls: Type[Pass] = config["type"]
pass_cfg = config["config"]
pass_cfg = pass_cls.generate_search_space(accelerator_spec, pass_cfg, config["disable_search"])
p = pass_cls(accelerator_spec, pass_cfg, config["disable_search"])
self.register_pass(
p, host=config["host"], evaluator_config=config["evaluator"], output_name=config["output_name"]
)
# list of passes starting from the first pass with non-empty search space
# These passes will be added to the search space
self.pass_flows_search_spaces = []
for pass_flow in self.pass_flows:
self.pass_search_spaces = []
for pass_name in pass_flow:
pass_cls_name = self.pass_config[pass_name]["type"].__name__
p: Pass = self.passes[pass_cls_name]["pass"]
self.pass_search_spaces.append((pass_cls_name, p.search_space()))
self.pass_flows_search_spaces.append(self.pass_search_spaces)
def run_no_search(
self,
input_model: OliveModel,
input_model_id: str,
data_root: str,
accelerator_spec: AcceleratorSpec,
output_dir: str = None,
output_name: str = None,
):
"""
Run all the registered Olive passes in no-search model where search strategy is None.
"""
assert (
self.search_strategy._config.execution_order == "joint"
), "run_no_search only supports default joint execution order"
for pass_item in self.passes.values():
if len(pass_item["pass"].search_space()) > 0:
pass_name = pass_item["name"]
raise ValueError(f"Pass {pass_name} has search space but search strategy is None")
evaluator_config = self.evaluator_for_pass(list(self.passes.keys())[-1])
if evaluator_config is None:
# provide dummy objective
objective_dict = {"dummy": {"higher_is_better": True, "goal": 0}}
else:
objective_dict = self.resolve_objectives(
input_model, input_model_id, data_root, evaluator_config.metrics, accelerator_spec
)
# initialize the search strategy
self.search_strategy.initialize(self.pass_flows_search_spaces, input_model_id, objective_dict)
iter_num = 0
flows_output = {}
output_model_ids = []
while True:
iter_num += 1
# get the next step
next_step = self.search_strategy.next_step()
if iter_num == 1:
assert next_step is not None, "Search strategy returned None for the first step"
# if no more steps, break
if next_step is None:
break
assert iter_num <= len(self.pass_flows), "No more pass flows to run"
# get the model id of the first input model
model_id = next_step["model_id"]
if model_id == input_model_id:
model = input_model
else:
model = self._load_model(model_id)
logger.debug(f"Step no search with search point {next_step['search_point']} ...")
# run all the passes in the step
(
should_prune,
signal,
model_ids,
) = self._run_passes(next_step["passes"], model, model_id, data_root, accelerator_spec)
pass_flow = self.pass_flows[iter_num - 1]
if should_prune:
failed_pass = pass_flow[len(model_ids)]
logger.warning(f"Flow {pass_flow} is pruned due to failed or invalid config for pass '{failed_pass}'")
# names of the output models of the passes
pass_output_names = [self.passes[pass_name]["output_name"] for pass_name, _ in next_step["passes"]]
pass_output_names = [f"{name}_{accelerator_spec}" if name else None for name in pass_output_names]
# output dir with pass flow
output_dir_with_pf = Path(output_dir) / "-".join(pass_flow)
final_output_name = pass_output_names[-1]
if output_name:
# override the output name of the last pass
logger.debug("Engine output_name is provided. Will ignore output_name for final pass")
final_output_name = f"{output_name}_{accelerator_spec}"
elif not final_output_name:
# use the default output name
final_output_name = f"{accelerator_spec}"
pass_output_names[-1] = final_output_name
output_model_json = None
output = {}
for pass_output_name, pass_output_model_id in zip(pass_output_names, model_ids):
if not pass_output_name:
continue
output_model_json = cache_utils.save_model(
model_number=pass_output_model_id,
output_dir=output_dir_with_pf,
output_name=f"{pass_output_name}_model",
overwrite=True,
cache_dir=self._config.cache_dir,
)
output_model_ids.append(pass_output_model_id)
# save the evaluation results to output_dir
if signal is not None:
results_path = output_dir_with_pf / f"{final_output_name}_metrics.json"
with open(results_path, "w") as f:
json.dump(signal.to_json(), f, indent=4)
if output_model_json and not should_prune:
# output_model_json is the last model only if the flow is not pruned
output["model"] = output_model_json
if signal is not None:
output["metrics"] = signal
else:
output = None
flows_output[tuple(pass_flow)] = output
return flows_output, output_model_ids
def run_search(
self,
input_model: OliveModel,
input_model_id: str,
data_root: str,
accelerator_spec: AcceleratorSpec,
output_dir: str = None,
output_name: str = None,
):
"""
Run all the registered Olive passes in search model where search strategy is not None.
"""
prefix_output_name = f"{output_name}_{accelerator_spec}_" if output_name is not None else f"{accelerator_spec}_"
# get objective_dict
evaluator_config = self.evaluator_for_pass(list(self.passes.keys())[-1])
if evaluator_config is None:
raise ValueError("No evaluator provided for the last pass")
else:
objective_dict = self.resolve_objectives(
input_model, input_model_id, data_root, evaluator_config.metrics, accelerator_spec
)
# initialize the search strategy
self.search_strategy.initialize(self.pass_flows_search_spaces, input_model_id, objective_dict)
output_model_num = self.search_strategy.get_output_model_num()
# record start time
start_time = time.time()
iter_num = 0
while True:
iter_num += 1
# get the next step
next_step = self.search_strategy.next_step()
# if no more steps, break
if next_step is None:
break
# get the model id of the first input model
model_id = next_step["model_id"]
if model_id == input_model_id:
model = input_model
else:
model = self._load_model(model_id)
logger.debug(f"Step {iter_num} with search point {next_step['search_point']} ...")
# run all the passes in the step
should_prune, signal, model_ids = self._run_passes(
next_step["passes"], model, model_id, data_root, accelerator_spec
)
# record feedback signal
self.search_strategy.record_feedback_signal(next_step["search_point"], signal, model_ids, should_prune)
time_diff = time.time() - start_time
self.search_strategy.check_exit_criteria(iter_num, time_diff, signal)
self.footprints[accelerator_spec].to_file(output_dir / f"{prefix_output_name}footprints.json")
return self.get_pareto_frontier_footprints(
accelerator_spec, output_model_num, objective_dict, output_dir, prefix_output_name
)
def get_pareto_frontier_footprints(
self, accelerator_spec, output_model_num, objective_dict, output_dir, prefix_output_name
):
pf_footprints = self.footprints[accelerator_spec].get_pareto_frontier()
if output_model_num is None or len(pf_footprints.nodes) <= output_model_num:
logger.info(f"Output all {len(pf_footprints.nodes)} models")
else:
top_ranked_nodes = self._get_top_ranked_nodes(objective_dict, pf_footprints, output_model_num)
logger.info(f"Output top ranked {len(top_ranked_nodes)} models based on metric priorities")
pf_footprints.update_nodes(top_ranked_nodes)
pf_footprints.to_file(output_dir / f"{prefix_output_name}pareto_frontier_footprints.json")
if self._config.plot_pareto_frontier:
pf_footprints.plot_pareto_frontier_to_html(
save_path=output_dir / f"{prefix_output_name}pareto_frontier_footprints_chart.html"
)
return pf_footprints
def resolve_objectives(
self,
input_model: OliveModel,
input_model_id: str,
data_root: str,
metrics: List[Metric],
accelerator_spec: AcceleratorSpec,
) -> Dict[str, Dict[str, Any]]:
"""
Return a dictionary of objectives and their higher_is_better and goal values.
{objective_name: {"higher_is_better": bool, "goal": float}}
"""
goals = self.resolve_goals(input_model, input_model_id, data_root, metrics, accelerator_spec)
objective_dict = {}
for metric in metrics:
for sub_type in metric.sub_types:
if sub_type.priority <= 0:
continue
metric_key = joint_metric_key(metric.name, sub_type.name)
objective_dict[metric_key] = {
"higher_is_better": sub_type.higher_is_better,
"goal": goals.get(metric_key),
"priority": sub_type.priority,
}
self.footprints[accelerator_spec].record_objective_dict(objective_dict)
ranked_objective_dict = dict(sorted(objective_dict.items(), key=lambda x: x[1]["priority"]))
return ranked_objective_dict
def resolve_goals(
self,
input_model: OliveModel,
input_model_id: str,
data_root: str,
metrics: List[Metric],
accelerator_spec: AcceleratorSpec,
) -> Dict[str, float]:
"""
Resolve the goals of the given metrics into thresholds for the given model.
"""
goals = {}
multipliers = {}
for metric in metrics:
# only resolve sub metrics whose priority > 0
goals[metric.name] = metric.get_sub_type_info("goal")
multipliers[metric.name] = metric.get_sub_type_info(
info_name="higher_is_better",
callback=lambda x: 1 if x else -1,
)
if goals:
logger.debug(f"Resolving goals: {goals}")
baseline = None
for goal in goals.values():
_evaluated = False
for sub_goal in goal.values():
if not sub_goal:
break
if sub_goal.type != "threshold":
assert self.evaluator_config is not None, "Default evaluator must be provided to resolve goals"
logger.debug("Computing baseline for metrics ...")
baseline = self._evaluate_model(
input_model, input_model_id, data_root, self.evaluator_config, accelerator_spec
)
_evaluated = True
break
if _evaluated:
break
if not baseline:
logger.debug("No baseline got as no goal is provided the the goal is threshold")
return {}
if baseline:
logger.debug(f"Baseline: {baseline}")
# resolve goals to thresholds
resolved_goals = {}
for metric_name, sub_type_goals in goals.items():
for sub_type_name, goal in sub_type_goals.items():
# TODO: make the logic cleaner
resolved_goal_value = None
baseline_sub_type = baseline.get_value(metric_name, sub_type_name)
multiplier = multipliers[metric_name][sub_type_name]
if goal.type == "threshold":
resolved_goal_value = goal.value
elif goal.type == "max-degradation":
resolved_goal_value = baseline_sub_type - multiplier * goal.value
elif goal.type == "min-improvement":
resolved_goal_value = baseline_sub_type + multiplier * goal.value
elif goal.type == "percent-max-degradation":
resolved_goal_value = baseline_sub_type * (1 - multiplier * goal.value / 100)
elif goal.type == "percent-min-improvement":
resolved_goal_value = baseline_sub_type * (1 + multiplier * goal.value / 100)
resolved_goals[joint_metric_key(metric_name, sub_type_name)] = resolved_goal_value
if len(resolved_goals) > 0:
logger.debug(f"Resolved goals: {resolved_goals}")
return resolved_goals
def host_for_pass(self, pass_id: str):
host = self.passes[pass_id]["host"]
if host is None:
return self.host
return host
def evaluator_for_pass(self, pass_id: str):
"""
Return evaluator for the given pass.
"""
e = self.passes[pass_id]["evaluator"]
if e is None:
return self.evaluator_config
return e
def _get_new_model_number(self):
"""
Get a new model number.
"""
while True:
new_model_number = self._new_model_number
self._new_model_number += 1
if list(self._model_cache_path.glob(f"{new_model_number}_*")) == []:
break
return new_model_number
def get_model_json_path(self, model_id: str) -> Path:
"""
Get the path to the model json file.
"""
return self._model_cache_path / f"{model_id}.json"
def _cache_model(self, model: Union[OliveModel, str], model_id: str, check_object: bool = True):
"""
Cache the model in the cache directory.
"""
# TODO move model/pass run/evaluation cache into footprints
if model == FAILED_CONFIG:
model_json = {}
else:
model_json = model.to_json(check_object=check_object)
model_json_path = self.get_model_json_path(model_id)
try:
with open(model_json_path, "w") as f:
json.dump(model_json, f, indent=4)
except Exception as e:
logger.error(f"Failed to cache model: {e}", exc_info=True)
def _load_model(self, model_id: str) -> Union[OliveModel, str]:
"""
Load the model from the cache directory.
"""
model_json_path = self.get_model_json_path(model_id)
try:
with open(model_json_path, "r") as f:
model_json = json.load(f)
except Exception as e:
logger.error(f"Failed to load model: {e}", exc_info=True)
return None
if model_json == {}:
return FAILED_CONFIG
model = ModelConfig.from_json(model_json).create_model()
return model
def _prepare_non_local_model(self, model: OliveModel) -> OliveModel:
"""
Prepare models with non-local model path for local run by downloading the model resource to cache
"""
model_resource_path = model.model_resource_path
if (
model_resource_path is None
or model_resource_path.is_local_resource()
or model_resource_path.is_string_name()
):
logger.debug("Model path is None, local or string name. No need to prepare")
return model
# download and cache the model resource
logger.debug("Downloading non local model resource to cache")
local_model_resource_path = cache_utils.download_resource(model_resource_path, self._config.cache_dir)
# set local model resource path
model.set_local_model_path(local_model_resource_path)
return model
def _init_input_model(self, input_model: OliveModel):
"""
Initialize the input model.
"""
model_hash = hash_dict(input_model.to_json())
# cache the model
self._cache_model(input_model, model_hash, check_object=False)
return model_hash
def get_run_json_path(
self,
pass_name: int,
input_model_number: str,
pass_config: dict,
accelerator_spec: AcceleratorSpec,
):
"""
Get the path to the run json.
"""
pass_config_hash = hash_dict(pass_config)
if not accelerator_spec:
run_json_path = self._run_cache_path / f"{pass_name}-{input_model_number}-{pass_config_hash}.json"
else:
run_json_path = (
self._run_cache_path / f"{pass_name}-{input_model_number}-{pass_config_hash}-{accelerator_spec}.json"
)
return run_json_path
def _cache_run(
self,
pass_name: int,
pass_config: dict,
input_model_id: str,
output_model_id: str,
accelerator_spec: AcceleratorSpec,
):
"""
Cache the run in the cache directory.
"""
run_json = {
"pass_name": pass_name,
"pass_config": pass_config,
"input_model_id": input_model_id,
"output_model_id": output_model_id,
}
input_model_number = input_model_id.split("_")[0]
run_json_path = self.get_run_json_path(pass_name, input_model_number, pass_config, accelerator_spec)
try:
with open(run_json_path, "w") as f:
json.dump(run_json, f, indent=4)
except Exception as e:
logger.error(f"Failed to cache run: {e}", exc_info=True)
def _load_run(self, input_model_id: str, pass_name: int, pass_config: dict, accelerator_spec: AcceleratorSpec):
"""
Load the run from the cache directory.
"""
input_model_number = input_model_id.split("_")[0]
run_json_path = self.get_run_json_path(pass_name, input_model_number, pass_config, accelerator_spec)
if run_json_path.exists():
try:
with open(run_json_path, "r") as f:
run_json = json.load(f)
output_model_id = run_json["output_model_id"]
except Exception as e:
logger.error(f"Failed to load run: {e}", exc_info=True)
output_model_id = None
return output_model_id
else:
return None
def _run_passes(
self,
passes: List[Tuple[str, Dict[str, Any]]],
model: OliveModel,
model_id: str,
data_root: str,
accelerator_spec: AcceleratorSpec,
):
"""
Run all the passes in the order they were registered.
the passes is the list of (pass_name, pass_search_point) tuples
"""
should_prune = False
# run all the passes in the step
model_ids = []
for pass_id, pass_search_point in passes:
model, model_id = self._run_pass(pass_id, pass_search_point, model, model_id, data_root, accelerator_spec)
if model in PRUNED_CONFIGS:
should_prune = True
logger.debug("Pruned")
break
model_ids.append(model_id)
if not should_prune:
# evaluate the model
evaluator_config = self.evaluator_for_pass(pass_id)
if self.no_search and evaluator_config is None:
# skip evaluation if no search and no evaluator
signal = None
else:
signal = self._evaluate_model(model, model_id, data_root, evaluator_config, accelerator_spec)
logger.debug(f"Signal: {signal}")
else:
signal = None
logger.warning("Skipping evaluation as model was pruned")
return should_prune, signal, model_ids
def _run_pass(
self,
pass_id: str,
pass_search_point: Dict[str, Any],
input_model: OliveModel,
input_model_id: str,
data_root: str,
accelerator_spec: AcceleratorSpec,
):
"""
Run a pass on the input model.
"""
# pass
p: Pass = self.passes[pass_id]["pass"]
pass_name = p.__class__.__name__
logger.info(f"Running pass {pass_name}")
pass_config = p.config_at_search_point(pass_search_point)
pass_config = p.serialize_config(pass_config)
# check whether the config is valid
if not p.validate_search_point(pass_search_point, accelerator_spec, with_fixed_value=True):
logger.debug("Invalid search point, prune")
output_model = INVALID_CONFIG
# no need to record in footprint since there was no run and thus no valid/failed model
# invalid configs are also not cached since the same config can be valid for other accelerator specs
# a pass can be accelerator agnostic but still have accelerator specific invalid configs
# this helps reusing cached models for different accelerator specs
return output_model, None
# load run from cache if it exists
run_accel = None if p.is_accelerator_agnostic(accelerator_spec) else accelerator_spec
output_model_id = self._load_run(input_model_id, pass_name, pass_config, run_accel)
if output_model_id is not None:
logger.debug("Loading model from cache ...")
output_model = self._load_model(output_model_id)
if output_model is not None:
# footprint model and run
self.footprints[accelerator_spec].record(
model_id=output_model_id,
model_config=output_model.to_json() if output_model != FAILED_CONFIG else {"is_pruned": True},
parent_model_id=input_model_id,
from_pass=pass_name,
pass_run_config=pass_config,
)
return output_model, output_model_id
# new model id
input_model_number = input_model_id.split("_")[0]
# Note: the final output model id need contains the accelerator information
# if the output model is accelerator dependent.
output_model_id_parts = [
f"{self._get_new_model_number()}_{pass_name}",
input_model_number,
hash_dict(pass_config),
]
if not p.is_accelerator_agnostic(accelerator_spec):
output_model_id_parts.append(f"{accelerator_spec}")
output_model_id = "-".join(map(str, output_model_id_parts))
output_model_path = self._model_cache_path / f"{output_model_id}" / "output_model"
output_model_path.parent.mkdir(parents=True, exist_ok=True)
output_model_path = str(output_model_path)
# run pass
host = self.host_for_pass(pass_id)
if host.system_type != SystemType.AzureML:
input_model = self._prepare_non_local_model(input_model)
try:
output_model = host.run_pass(p, input_model, data_root, output_model_path, pass_search_point)
except OlivePassException as e:
logger.error(f"Pass run_pass failed: {e}", exc_info=True)
output_model = FAILED_CONFIG
except EXCEPTIONS_TO_RAISE:
# Don't catch these errors since most of time, it is caused by the user errors and need not retry.
raise
except Exception:
output_model = FAILED_CONFIG
# TODO: from the time being, we need to catch all exceptions to make the
# search process robust. We need rethrow the exception only when
# it is not pass specific. For example, for olive bugs and user errors
logger.error("Pass run failed.", exc_info=True)
if self.no_search:
raise # rethrow the exception if no search is performed
# cache model
self._cache_model(output_model, output_model_id)
# cache run
self._cache_run(pass_name, pass_config, input_model_id, output_model_id, run_accel)
# footprint model and run
self.footprints[accelerator_spec].record(
model_id=output_model_id,
model_config=output_model.to_json() if output_model != FAILED_CONFIG else {"is_pruned": True},
parent_model_id=input_model_id,
from_pass=pass_name,
pass_run_config=pass_config,
)
return output_model, output_model_id
def get_evaluation_json_path(self, model_id: str):
"""
Get the path to the evaluation json.
"""
evaluation_json_path = self._evaluation_cache_path / f"{model_id}.json"
return evaluation_json_path
def _cache_evaluation(self, model_id: str, signal: MetricResult):
"""
Cache the evaluation in the cache directory.
"""
evaluation_json = {
"model_id": model_id,
"signal": signal.dict(),
}
evaluation_json_path = self.get_evaluation_json_path(model_id)
try:
with open(evaluation_json_path, "w") as f:
json.dump(evaluation_json, f, indent=4)
except Exception as e:
logger.error(f"Failed to cache evaluation: {e}", exc_info=True)
def _load_evaluation(self, model_id: str):
"""
Load the evaluation from the cache directory.
"""
evaluation_json_path = self.get_evaluation_json_path(model_id)
if evaluation_json_path.exists():
try:
with open(evaluation_json_path, "r") as f:
evaluation_json = json.load(f)
signal = evaluation_json["signal"]
signal = MetricResult(**signal)
except Exception as e:
logger.error(f"Failed to load evaluation: {e}", exc_info=True)
signal = None
return signal
else:
return None
def _evaluate_model(
self,
model: OliveModel,
model_id: str,
data_root: str,
evaluator_config: OliveEvaluatorConfig,
accelerator_spec: AcceleratorSpec,
):
"""
Evaluate a model.
"""
logger.debug("Evaluating model ...")
accelerator_suffix = f"-{accelerator_spec}" if accelerator_spec else ""
if not model_id.endswith(accelerator_suffix):
# append the suffix if the model is accelerator independent
model_id_with_accelerator = f"{model_id}{accelerator_suffix}"
else:
model_id_with_accelerator = model_id
# load evaluation from cache if it exists
signal = self._load_evaluation(model_id_with_accelerator)
if signal is not None:
logger.debug("Loading evaluation from cache ...")
# footprint evaluation
self.footprints[accelerator_spec].record(
model_id=model_id,
metrics=FootprintNodeMetric(
value=signal,
is_goals_met=False,
),
)
return signal
# evaluate model
metrics = evaluator_config.metrics if evaluator_config else []
if self.target.system_type != SystemType.AzureML:
model = self._prepare_non_local_model(model)
signal = self.target.evaluate_model(model, data_root, metrics, accelerator_spec)
# cache evaluation
self._cache_evaluation(model_id_with_accelerator, signal)
# footprint evaluation
self.footprints[accelerator_spec].record(
model_id=model_id,
metrics=FootprintNodeMetric(
value=signal,
is_goals_met=False,
),
)
return signal
def _get_top_ranked_nodes(
self, objective_dict: Dict[str, Any], footprint: Footprint, k: int
) -> List[FootprintNode]:
footprint_node_list = footprint.nodes.values()
sorted_footprint_node_list = sorted(
footprint_node_list,
key=lambda x: tuple(
x.metrics.value[metric].value
if x.metrics.cmp_direction[metric] == 1
else -x.metrics.value[metric].value
for metric in objective_dict.keys()
),
reverse=True,
)
selected_footprint_nodes = sorted_footprint_node_list[:k]
return selected_footprint_nodes