Source code for archai.discrete_search.algos.bananas

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
from overrides import overrides

from archai.api.dataset_provider import DatasetProvider
from archai.common.ordered_dict_logger import OrderedDictLogger
from archai.discrete_search.api.archai_model import ArchaiModel
from archai.discrete_search.api.predictor import MeanVar, Predictor
from archai.discrete_search.api.search_objectives import SearchObjectives
from archai.discrete_search.api.search_results import SearchResults
from archai.discrete_search.api.search_space import (
    BayesOptSearchSpace,
    EvolutionarySearchSpace,
)
from archai.discrete_search.api.searcher import Searcher
from archai.discrete_search.predictors.dnn_ensemble import PredictiveDNNEnsemble
from archai.discrete_search.utils.multi_objective import get_non_dominated_sorting

logger = OrderedDictLogger(source=__name__)


[docs]class MoBananasSearch(Searcher): """Multi-objective version of BANANAS algorithm. It has been proposed in `Bag of Baselines for Multi-objective Joint Neural Architecture Search and Hyperparameter Optimization`. Reference: https://arxiv.org/abs/2105.01015 """ def __init__( self, search_space: BayesOptSearchSpace, search_objectives: SearchObjectives, output_dir: str, surrogate_model: Optional[Predictor] = None, num_iters: Optional[int] = 10, init_num_models: Optional[int] = 10, num_parents: Optional[int] = 10, mutations_per_parent: Optional[int] = 5, num_candidates: Optional[int] = 10, clear_evaluated_models: bool = True, save_pareto_weights: bool = False, seed: Optional[int] = 1, ) -> None: """Initialize the multi-objective BANANAS. Args: search_space: Discrete search space compatible with Bayesian Optimization algorithms. search_objectives: Search objectives. Expensive objectives (registered with `compute_intensive=True`) will be estimated using a surrogate model during certain parts of the search. Cheap objectives will be always evaluated directly. output_dir: Output directory. surrogate_model: Surrogate model. If `None`, a `PredictiveDNNEnsemble` will be used. num_iters: Number of iterations. init_num_models: Number of initial models to evaluate. num_parents: Number of parents to select for each iteration. mutations_per_parent: Number of mutations to apply to each parent. num_candidates: Number of selected models to add to evaluate in the next iteration. clear_evaluated_models: Optimizes memory usage by clearing the architecture of `ArchaiModel` after each iteration. Defaults to True save_pareto_model_weights: If `False`, saves the weights of the pareto models. seed: Random seed. """ super(MoBananasSearch, self).__init__() assert isinstance(search_space, BayesOptSearchSpace) assert isinstance(search_space, EvolutionarySearchSpace) if surrogate_model: assert isinstance(surrogate_model, Predictor) else: surrogate_model = PredictiveDNNEnsemble() self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.search_space = search_space self.surrogate_model = surrogate_model # Objectives self.so = search_objectives # Algorithm parameters self.num_iters = num_iters self.init_num_models = init_num_models self.num_parents = num_parents self.mutations_per_parent = mutations_per_parent self.num_candidates = num_candidates # Utils self.clear_evaluated_models = clear_evaluated_models self.save_pareto_weights = save_pareto_weights self.seen_archs = set() self.seed = seed self.rng = np.random.RandomState(self.seed) self.surrogate_dataset = [] self.search_state = SearchResults(search_space, search_objectives) if self.save_pareto_weights: raise NotImplementedError
[docs] def get_surrogate_iter_dataset(self, all_pop: List[ArchaiModel]) -> Tuple[np.ndarray, np.ndarray]: """Get the surrogate dataset for the current iteration. Args: all_pop: All population models. Returns: Tuple of encoded architectures and target values. """ encoded_archs = np.vstack([self.search_space.encode(m) for m in all_pop]) target = np.array([self.search_state.all_evaluated_objs[obj] for obj in self.so.expensive_objectives]).T return encoded_archs, target
[docs] def sample_models(self, num_models: int, patience: Optional[int] = 30) -> List[ArchaiModel]: """Sample models from the search space. Args: num_models: Number of models to sample. patience: Number of tries to sample a valid model. Returns: List of sampled models. """ nb_tries, valid_sample = 0, [] while len(valid_sample) < num_models and nb_tries < patience: sample = [self.search_space.random_sample() for _ in range(num_models)] _, valid_indices = self.so.validate_constraints(sample) valid_sample += [sample[i] for i in valid_indices] return valid_sample[:num_models]
[docs] def mutate_parents( self, parents: List[ArchaiModel], mutations_per_parent: Optional[int] = 1, patience: Optional[int] = 30 ) -> List[ArchaiModel]: """Mutate parents to generate new models. Args: parents: List of parent models. mutations_per_parent: Number of mutations to apply to each parent. patience: Number of tries to sample a valid model. Returns: List of mutated models. """ mutations = {} for p in parents: candidates = {} nb_tries = 0 while len(candidates) < mutations_per_parent and nb_tries < patience: mutated_model = self.search_space.mutate(p) mutated_model.metadata["parent"] = p.archid if not self.so.is_model_valid(mutated_model): continue if mutated_model.archid not in self.seen_archs: candidates[mutated_model.archid] = mutated_model nb_tries += 1 mutations.update(candidates) if len(mutations) == 0: logger.warn(f"No mutations found after {patience} tries for each one of the {len(parents)} parents.") return list(mutations.values())
[docs] def predict_expensive_objectives(self, archs: List[ArchaiModel]) -> Dict[str, MeanVar]: """Predict expensive objectives for `archs` using surrogate model. Args: archs: List of architectures. Returns: Dictionary of predicted expensive objectives. """ encoded_archs = np.vstack([self.search_space.encode(m) for m in archs]) pred_results = self.surrogate_model.predict(encoded_archs) return { obj_name: MeanVar(pred_results.mean[:, i], pred_results.var[:, i]) for i, obj_name in enumerate(self.so.expensive_objectives) }
[docs] def thompson_sampling( self, archs: List[ArchaiModel], sample_size: int, pred_expensive_objs: Dict[str, MeanVar], cheap_objs: Dict[str, np.ndarray], ) -> List[int]: """Get the selected architecture list indices from Thompson Sampling. Args: archs: List of architectures. sample_size: Number of architectures to select. pred_expensive_objs: Predicted expensive objectives. cheap_objs: Cheap objectives. Returns: List of selected architecture indices. """ simulation_results = cheap_objs # Simulates results from surrogate model assuming N(pred_mean, pred_std) simulation_results.update( { obj_name: self.rng.randn(len(archs)) * np.sqrt(pred.var) + pred.mean for obj_name, pred in pred_expensive_objs.items() } ) # Performs non-dominated sorting nds_frontiers = get_non_dominated_sorting(archs, simulation_results, self.so) # Shuffle elements inside each frontier to avoid giving advantage to a specific # part of the nds frontiers for frontier in nds_frontiers: self.rng.shuffle(frontier["indices"]) return [idx for frontier in nds_frontiers for idx in frontier["indices"]][:sample_size]
[docs] @overrides def search(self) -> SearchResults: all_pop, selected_indices, pred_expensive_objs = [], [], {} unseen_pop = self.sample_models(self.init_num_models) for i in range(self.num_iters): self.on_start_iteration(i + 1) logger.info(f"Iteration {i+1}/{self.num_iters}") all_pop.extend(unseen_pop) logger.info(f"Evaluating objectives for {len(unseen_pop)} architectures ...") iter_results = self.so.eval_all_objs(unseen_pop) self.seen_archs.update([m.archid for m in unseen_pop]) # Adds iteration results and predictions from the previous iteration for comparison extra_model_data = { f"Predicted {obj_name} {c}": getattr(obj_results, c)[selected_indices] for obj_name, obj_results in pred_expensive_objs.items() for c in ["mean", "var"] } self.search_state.add_iteration_results(unseen_pop, iter_results, extra_model_data) # Clears models from memory if needed if self.clear_evaluated_models: for m in unseen_pop: m.clear() # Updates surrogate logger.info("Updating surrogate model ...") X, y = self.get_surrogate_iter_dataset(all_pop) self.surrogate_model.fit(X, y) # Selects top-`num_parents` models from non-dominated sorted results nds_frontiers = get_non_dominated_sorting(all_pop, self.search_state.all_evaluated_objs, self.so) parents = [model for frontier in nds_frontiers for model in frontier["models"]] parents = parents[: self.num_parents] # Mutates top models logger.info(f"Generating mutations for {len(parents)} parent architectures ...") mutated = self.mutate_parents(parents, self.mutations_per_parent) logger.info(f"Found {len(mutated)} new architectures satisfying constraints.") if not mutated: logger.info("No new architectures found. Stopping search ...") break # Predicts expensive objectives using surrogate model # and calculates cheap objectives for mutated architectures logger.info(f"Predicting {self.so.expensive_objective_names} for new architectures using surrogate model ...") pred_expensive_objs = self.predict_expensive_objectives(mutated) logger.info(f"Calculating cheap objectives {self.so.cheap_objective_names} for new architectures ...") cheap_objs = self.so.eval_cheap_objs(mutated) # Selects `num_candidates`-archtiectures for next iteration using Thompson Sampling selected_indices = self.thompson_sampling(mutated, self.num_candidates, pred_expensive_objs, cheap_objs) unseen_pop = [mutated[i] for i in selected_indices] logger.info(f"Best {self.num_candidates} candidate architectures were selected for the next iteration.") # Save plots and reports self.search_state.save_all_2d_pareto_evolution_plots(self.output_dir) self.search_state.save_search_state(self.output_dir / f"search_state_{i}.csv") return self.search_state