Source code for archai.supergraph.utils.metrics

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import copy
import math
import pathlib
import statistics
import time
from collections import defaultdict
from typing import List, Mapping, Optional, Tuple

import yaml
from torch import Tensor

from archai.common import ml_utils, utils
from archai.common.apex_utils import ApexUtils
from archai.common.common import get_tb_writer
from archai.common.ordered_dict_logger import get_global_logger

logger = get_global_logger()


[docs]class Metrics: """Record top1, top5, loss metrics, track best so far. There are 3 levels of metrics: 1. Run level - these for the one call of 'fit', example, best top1 2. Epoch level - these are the averages maintained top1, top5, loss 3. Step level - these are for every step in epoch The pre_run must be called before fit call which will reset all metrics. Similarly pre_epoch will reset running averages and pre_step will reset step level metrics like average step time. The post_step will simply update the running averages while post_epoch updates best we have seen for each epoch. """ def __init__(self, title:str, apex:Optional[ApexUtils], logger_freq:int=50) -> None: """Create the metrics object to maintain epoch stats Arguments: title {str} -- descriptive name of the stage for which metrics are collected Keyword Arguments: logger_freq {int} -- Must be > 0 for epoch level logging, the step level logging is decided by this number (default: {50}) """ self.logger_freq = logger_freq self.title = title self._apex = apex self._reset_run() def _reset_run(self)->None: self.run_metrics = RunMetrics() self.global_step = -1 self._tb_path = logger.current_path
[docs] def pre_run(self)->None: self._reset_run() self.run_metrics.pre_run()
[docs] def post_run(self, test_metrics:Optional['Metrics']=None)->None: self.run_metrics.post_run(test_metrics) # logging if self.logger_freq > 0: with logger.pushd('timings'): logger.info({'epoch':self.run_metrics.epoch_time_avg(), 'step': self.run_metrics.step_time_avg(), 'run': self.run_metrics.duration()}) if self.is_dist(): logger.info({'dist_epoch_sum': self.reduce_sum(self.run_metrics.epoch_time_avg()), 'dist_step': self.reduce_mean(self.run_metrics.step_time_avg()), 'dist_run_sum': self.reduce_sum(self.run_metrics.duration())}) best_train, best_val, best_test = self.run_metrics.best_epoch() with logger.pushd('best_train'): logger.info({'epoch': best_train.index, 'top1': best_train.top1.avg}) if self.is_dist(): logger.info({'dist_epoch': self.reduce_mean(best_train.index), 'dist_top1': self.reduce_mean(best_train.top1.avg)}) if best_val: with logger.pushd('best_val'): logger.info({'epoch': best_val.index, 'top1': best_val.top1.avg}) if self.is_dist(): logger.info({'dist_epoch': self.reduce_mean(best_val.index), 'dist_top1': self.reduce_mean(best_val.top1.avg)}) if best_test: with logger.pushd('best_test'): logger.info({'epoch': best_test.index, 'top1': best_test.top1.avg}) if self.is_dist(): logger.info({'dist_epoch': self.reduce_mean(best_test.index), 'dist_top1': self.reduce_mean(best_test.top1.avg)})
[docs] def pre_step(self, x: Tensor, y: Tensor): self.run_metrics.cur_epoch().pre_step() self.global_step += 1
[docs] def post_step(self, x: Tensor, y: Tensor, logits: Tensor, loss: Tensor, steps: int) -> None: assert len(x)==len(y) and len(y)==len(logits) and len(loss.shape)==0 # update metrics after optimizer step batch_size = x.size(0) # TODO: code for more than 5 classes? top1, top5 = ml_utils.accuracy(logits, y, topk=(1, 5)) epoch = self.run_metrics.cur_epoch() epoch.post_step(top1.item(), top5.item(), loss.item(), batch_size) if self.logger_freq > 0 and \ ((epoch.step+1) % self.logger_freq == 0): logger.info({'top1': epoch.top1.avg, 'top5': epoch.top5.avg, 'loss': epoch.loss.avg, 'step_time': epoch.step_time.last}) if self.is_dist(): logger.info({'dist_top1': self.reduce_mean(epoch.top1.avg), 'dist_top5': self.reduce_mean(epoch.top5.avg), 'dist_loss': self.reduce_mean(epoch.loss.avg), 'dist_step_time': self.reduce_mean(epoch.step_time.last)})
# NOTE: Tensorboard step-level logging is removed as it becomes exponentially expensive on Azure blobs # writer = get_tb_writer() # writer.add_scalar(f'{self._tb_path}/train_steps/loss', # epoch.loss.avg, self.global_step) # writer.add_scalar(f'{self._tb_path}/train_steps/top1', # epoch.top1.avg, self.global_step) # writer.add_scalar(f'{self._tb_path}/train_steps/top5', # epoch.top5.avg, self.global_step)
[docs] def pre_epoch(self, lr:float=math.nan)->None: epoch = self.run_metrics.add_epoch() epoch.pre_epoch(lr) if lr is not None: writer = get_tb_writer() if writer is not None: if self.logger_freq > 0 and not math.isnan(lr): logger.debug({'start_lr': lr}) writer.add_scalar(f'{self._tb_path}/train_steps/lr', lr, self.global_step)
[docs] def post_epoch(self, lr:float=math.nan, val_metrics:Optional['Metrics']=None): epoch = self.run_metrics.cur_epoch() epoch.post_epoch(lr, val_metrics) val_epoch_metrics = None if val_metrics: val_epoch_metrics = val_metrics.run_metrics.epochs_metrics[-1] if self.logger_freq > 0: with logger.pushd('train'): logger.info({'top1': epoch.top1.avg, 'top5': epoch.top5.avg, 'loss': epoch.loss.avg, 'duration': epoch.duration(), 'step_time': epoch.step_time.avg, 'end_lr': lr}) if self.is_dist(): logger.info({'dist_top1': self.reduce_mean(epoch.top1.avg), 'dist_top5': self.reduce_mean(epoch.top5.avg), 'dist_loss': self.reduce_mean(epoch.loss.avg), 'dist_duration': self.reduce_mean(epoch.duration()), 'dist_step_time': self.reduce_mean(epoch.step_time.avg), 'dist_end_lr': self.reduce_mean(lr)}) if val_epoch_metrics: with logger.pushd('val'): logger.info({'top1': val_epoch_metrics.top1.avg, 'top5': val_epoch_metrics.top5.avg, 'loss': val_epoch_metrics.loss.avg, 'duration': val_epoch_metrics.duration()}) if self.is_dist(): logger.info({'dist_top1': self.reduce_mean(val_epoch_metrics.top1.avg), 'dist_top5': self.reduce_mean(val_epoch_metrics.top5.avg), 'dist_loss': self.reduce_mean(val_epoch_metrics.loss.avg), 'dist_duration': self.reduce_mean(val_epoch_metrics.duration())})
# writer = get_tb_writer() # writer.add_scalar(f'{self._tb_path}/train_epochs/loss', # epoch.loss.avg, epoch.index) # writer.add_scalar(f'{self._tb_path}/train_epochs/top1', # epoch.top1.avg, epoch.index) # writer.add_scalar(f'{self._tb_path}/train_epochs/top5', # epoch.top5.avg, epoch.index) # if test_epoch: # writer.add_scalar(f'{self._tb_path}/val_epochs/loss', # test_epoch.loss.avg, epoch.index) # writer.add_scalar(f'{self._tb_path}/val_epochs/top1', # test_epoch.top1.avg, epoch.index) # writer.add_scalar(f'{self._tb_path}/val_epochs/top5', # test_epoch.top5.avg, epoch.index)
[docs] def state_dict(self)->Mapping: return utils.state_dict(self)
[docs] def load_state_dict(self, state_dict:dict)->None: utils.load_state_dict(self, state_dict)
def __getstate__(self): state = self.__dict__.copy() if '_apex' in state: del state['_apex'] # cannot serialize this return state # no need to define __setstate__ because _apex should be set from constructor
[docs] def save(self, filepath:str)->Optional[str]: if filepath: filepath = utils.full_path(filepath) pathlib.Path(filepath).write_text(yaml.dump(self)) return filepath
[docs] def epochs(self)->int: """Returns epochs recorded so far""" return len(self.run_metrics.epochs_metrics)
[docs] def cur_epoch(self)->'EpochMetrics': return self.run_metrics.cur_epoch()
[docs] def reduce_min(self, val): if not self._apex: return val return self._apex.reduce(val, op='min')
[docs] def reduce_max(self, val): if not self._apex: return val return self._apex.reduce(val, op='max')
[docs] def reduce_sum(self, val): if not self._apex: return val return self._apex.reduce(val, op='sum')
[docs] def reduce_mean(self, val): if not self._apex: return val return self._apex.reduce(val, op='mean')
[docs] def is_dist(self)->bool: if not self._apex: return False return self._apex.is_dist()
[docs] def best_train_top1(self)->float: return self.run_metrics.best_epoch()[0].top1.avg
[docs] def best_val_top1(self)->float: val_epoch_metrics = self.run_metrics.best_epoch()[1] return val_epoch_metrics.top1.avg if val_epoch_metrics is not None else math.nan
[docs] def best_test_top1(self)->float: test_epoch_metrics = self.run_metrics.best_epoch()[2] return test_epoch_metrics.top1.avg if test_epoch_metrics is not None else math.nan
[docs]class Accumulator: # TODO: replace this with Metrics class def __init__(self): self.metrics = defaultdict(lambda: 0.)
[docs] def add(self, key, value): self.metrics[key] += value
[docs] def add_dict(self, dict): for key, value in dict.items(): self.add(key, value)
def __getitem__(self, item): return self.metrics[item] def __setitem__(self, key, value): self.metrics[key] = value
[docs] def get_dict(self): return copy.deepcopy(dict(self.metrics))
[docs] def items(self): return self.metrics.items()
def __str__(self): return str(dict(self.metrics)) def __truediv__(self, other): newone = Accumulator() for key, value in self.items(): if isinstance(other, str): if other != key: newone[key] = value / self[other] else: newone[key] = value else: newone[key] = value / other return newone
[docs]class EpochMetrics: """Stores the metrics for each epoch. Training metrics is in top1, top5 etc while validation metrics is in val_metrics""" def __init__(self, index:int) -> None: self.index = index self.top1 = utils.AverageMeter() self.top5 = utils.AverageMeter() self.loss = utils.AverageMeter() self.step_time = utils.AverageMeter() self.start_time = math.nan self.end_time = math.nan self.step = -1 self.start_lr = math.nan self.end_lr = math.nan self.val_metrics:Optional[EpochMetrics] = None
[docs] def pre_step(self): self._step_start_time = time.time() self.step += 1
[docs] def post_step(self, top1:float, top5:float, loss:float, batch:int): self.step_time.update(time.time() - self._step_start_time) self.top1.update(top1, batch) self.top5.update(top5, batch) self.loss.update(loss, batch)
[docs] def pre_epoch(self, lr:float): self.start_time = time.time() self.start_lr = lr
[docs] def post_epoch(self, lr:float, val_metrics:Optional[Metrics]): self.end_time = time.time() self.end_lr = lr if val_metrics is not None: assert len(val_metrics.run_metrics.epochs_metrics)==1, 'Number of epochs in val metrics should be 1' self.val_metrics = val_metrics.run_metrics.epochs_metrics[-1]
[docs] def duration(self): return self.end_time-self.start_time
[docs]class RunMetrics: """Metrics for the entire run. It mainly consist of metrics for each epoch""" def __init__(self) -> None: self.epochs_metrics:List[EpochMetrics] = [] self.start_time = math.nan self.end_time = math.nan self.epoch = -1 self.test_metrics:Optional['Metrics'] = None
[docs] def pre_run(self): self.start_time = time.time()
[docs] def post_run(self, test_metrics:Optional['Metrics']=None): self.end_time = time.time() self.test_metrics = test_metrics # test should have only one epoch assert test_metrics is None or len(test_metrics.run_metrics.epochs_metrics)==1
[docs] def add_epoch(self)->EpochMetrics: self.epoch = len(self.epochs_metrics) epoch_metrics = EpochMetrics(self.epoch) self.epochs_metrics.append(epoch_metrics) return epoch_metrics
[docs] def cur_epoch(self)->EpochMetrics: return self.epochs_metrics[self.epoch]
[docs] def best_epoch(self)->Tuple[EpochMetrics, Optional[EpochMetrics], Optional[EpochMetrics]]: # [train, val, test] best_train = max(self.epochs_metrics, key=lambda e:e.top1.avg) best_val = max(self.epochs_metrics, key=lambda e:e.val_metrics.top1.avg if e.val_metrics else -1) best_val = best_val.val_metrics if best_val.val_metrics else None best_test = self.test_metrics.run_metrics.epochs_metrics[-1] \ if self.test_metrics else None return best_train, best_val, best_test
[docs] def epoch_time_avg(self): return statistics.mean((e.duration() for e in self.epochs_metrics))
[docs] def step_time_avg(self): return statistics.mean((e.step_time.avg for e in self.epochs_metrics))
[docs] def duration(self): return self.end_time-self.start_time