Source code for archai.discrete_search.algos.regularized_evolution

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import random
from pathlib import Path
from typing import List, Optional

from overrides import overrides
from tqdm import tqdm

from archai.common.ordered_dict_logger import OrderedDictLogger
from archai.discrete_search.api.archai_model import ArchaiModel
from archai.discrete_search.api.search_objectives import SearchObjectives
from archai.discrete_search.api.search_results import SearchResults
from archai.discrete_search.api.search_space import EvolutionarySearchSpace
from archai.discrete_search.api.searcher import Searcher
from archai.discrete_search.utils.multi_objective import get_pareto_frontier

logger = OrderedDictLogger(source=__name__)


[docs]class RegularizedEvolutionSearch(Searcher): """Regularized Evolution algorithm. It has been proposed in `Regularized Evolution for Image Classifier Architecture Search`. Reference: https://arxiv.org/abs/1802.01548v7. """ def __init__( self, search_space: EvolutionarySearchSpace, search_objectives: SearchObjectives, output_dir: str, num_iters: Optional[int] = 10, init_num_models: Optional[int] = 10, initial_population_paths: Optional[List[str]] = None, pareto_sample_size: Optional[int] = 40, history_size: Optional[int] = 100, clear_evaluated_models: Optional[bool] = True, save_pareto_model_weights: bool = True, seed: Optional[int] = 1, ) -> None: """Initialize the Regularized Evolution. Args: search_space: Discrete search space compatible with evolutionary algorithms. search_objectives: Search objectives. output_dir: Output directory. num_iters: Number of iterations. init_num_models: Number of initial models to evaluate. initial_population_paths: Paths to initial population models. pareto_sample_size: Number of models to sample from the pareto frontier. history_size: Number of models to keep in the history. clear_evaluated_models (bool, optional): Optimizes memory usage by clearing the architecture of `ArchaiModel` after each iteration. Defaults to True. save_pareto_model_weights: If `True`, saves the weights of the pareto models. seed: Random seed. """ super(RegularizedEvolutionSearch, self).__init__() assert isinstance( search_space, EvolutionarySearchSpace ), f"{str(search_space.__class__)} is not compatible with {str(self.__class__)}" self.iter_num = 0 self.search_space = search_space self.so = search_objectives self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True, parents=True) # Algorithm settings self.num_iters = num_iters self.init_num_models = init_num_models self.initial_population_paths = initial_population_paths self.pareto_sample_size = pareto_sample_size self.history_size = history_size # Utils self.clear_evaluated_models = clear_evaluated_models self.save_pareto_model_weights = save_pareto_model_weights self.search_state = SearchResults(search_space, self.so) self.seed = seed self.rng = random.Random(seed) self.seen_archs = set() self.num_sampled_archs = 0 assert self.init_num_models > 0 assert self.num_iters > 0
[docs] def sample_models(self, num_models: int, patience: Optional[int] = 5) -> List[ArchaiModel]: """Sample models from the search space. Args: num_models: Number of models to sample. patience: Number of tries to sample a valid model. Returns: List of sampled models. """ nb_tries, valid_sample = 0, [] while len(valid_sample) < num_models and nb_tries < patience: sample = [self.search_space.random_sample() for _ in range(num_models)] _, valid_indices = self.so.validate_constraints(sample) valid_sample += [sample[i] for i in valid_indices] return valid_sample[:num_models]
[docs] def mutate_parents( self, parents: List[ArchaiModel], mutations_per_parent: Optional[int] = 1, patience: Optional[int] = 20 ) -> List[ArchaiModel]: """Mutate parents to generate new models. Args: parents: List of parent models. mutations_per_parent: Number of mutations to apply to each parent. patience: Number of tries to sample a valid model. Returns: List of mutated models. """ mutations = {} for p in tqdm(parents, desc="Mutating parents"): candidates = {} nb_tries = 0 while len(candidates) < mutations_per_parent and nb_tries < patience: mutated_model = self.search_space.mutate(p) mutated_model.metadata["parent"] = p.archid if not self.so.is_model_valid(mutated_model): continue if mutated_model.archid not in self.seen_archs: mutated_model.metadata["generation"] = self.iter_num candidates[mutated_model.archid] = mutated_model nb_tries += 1 mutations.update(candidates) return list(mutations.values())
[docs] @overrides def search(self) -> SearchResults: self.iter_num = 0 if self.initial_population_paths: logger.info(f"Loading initial population from {len(self.initial_population_paths)} architectures ...") iter_members = [self.search_space.load_arch(path) for path in self.initial_population_paths] else: logger.info(f"Using {self.init_num_models} random architectures as the initial population ...") iter_members = self.sample_models(self.init_num_models) self.all_pop = iter_members for i in range(self.num_iters): self.iter_num = i + 1 self.on_start_iteration(self.iter_num) logger.info(f"Iteration {i+1}/{self.num_iters}") if len(iter_members) == 0: logger.info("No models to evaluate. Stopping search ...") break # Calculates objectives logger.info(f"Calculating search objectives {list(self.so.objective_names)} for {len(iter_members)} models ...") results = self.so.eval_all_objs(iter_members) self.search_state.add_iteration_results( iter_members, results, # Mutation and crossover info extra_model_data={"parent": [p.metadata.get("parent", None) for p in iter_members]}, ) # Records evaluated archs to avoid computing the same architecture twice self.seen_archs.update([m.archid for m in iter_members]) # Saves search iteration results self.search_state.save_search_state(str(self.output_dir / f"search_state_{self.iter_num}.csv")) self.search_state.save_pareto_frontier_models( str(self.output_dir / f"pareto_models_iter_{self.iter_num}"), save_weights=self.save_pareto_model_weights ) self.search_state.save_all_2d_pareto_evolution_plots(str(self.output_dir)) # Clears models from memory if needed if self.clear_evaluated_models: logger.info("Optimzing memory usage ...") [model.clear() for model in iter_members] # Samples subset of models from the history buffer history_indices = list(range(max(0, len(self.all_pop) - self.history_size), len(self.all_pop))) sample_indices = self.rng.sample(history_indices, min(self.pareto_sample_size, self.history_size)) logger.info(f"Sampled {len(sample_indices)} models from the history ({len(history_indices)}) models.") # Gets the pareto frontier of the history sample logger.info("Calculating Pareto frontier of the sample ...") pareto_sample = get_pareto_frontier( [self.all_pop[sample_idx] for sample_idx in sample_indices], { obj_name: obj_results[sample_indices] for obj_name, obj_results in self.search_state.all_evaluated_objs.items() }, self.so, ) logger.info(f"Found {len(pareto_sample)} pareto members from the sample.") # mutate random 'k' subsets of the parents # while ensuring the mutations fall within # desired constraint limits iter_members = self.mutate_parents(pareto_sample["models"], 1) logger.info(f"Mutation: {len(iter_members)} new models.") # update the set of architectures ever visited self.all_pop.extend(iter_members) return self.search_state