Source code for nanotune.classification.classifier

import json
import logging
import os
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy.typing as npt
import numpy as np

from sklearn import svm
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (accuracy_score, auc, average_precision_score,
                             brier_score_loss, confusion_matrix, roc_curve)
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.tree import DecisionTreeClassifier

import nanotune as nt
from nanotune.data.dataset import Dataset
from nanotune.data.export_data import prep_data

logger = logging.getLogger(__name__)
ALLOWED_CATEGORIES = list(dict(nt.config["core"]["features"]).keys())
DOT_LABEL_MAPPING = dict(nt.config["core"]["dot_mapping"])

RELEVANT_FEATURE_INDEXES: Dict[str, List[int]] = {
    "pinchoff": [1, 2, 3, 4],
    "outerbarriers": [1],
    "singledot": [1],
    "doubledot": [1],
    "dotregime": [1],
}

DEFAULT_CLF_PARAMETERS: Dict[str, Any] = {
    "SVC": {"kernel": "linear", "probability": True, "gamma": "auto"},
    "LogisticRegression": {"solver": "newton-cg", "max_iter": 3000},
    "MLPClassifier": {"max_iter": 3000},
    "GaussianProcessClassifier": {},
    "DecisionTreeClassifier": {"max_depth": 5},
    "RandomForestClassifier": {"max_depth": 5, "n_estimators": 10, "max_features": 1},
    "AdaBoostClassifier": {},
    "GaussianNB": {},
    "QuadraticDiscriminantAnalysis": {},
    "KNeighborsClassifier": {"n_neighbors": 2},
}

METRIC_NAMES = ["accuracy_score", "brier_score_loss", "auc", "average_precision_recall"]

DEFAULT_DATA_FILES = {
    "pinchoff": "pinchoff.npy",
    "singledot": "dots.npy",
    "doubledot": "dots.npy",
    "dotregime": "dots.npy",
}

DEFAULT_N_ITER = {
    "pinchoff": 100,
    "singledot": 20,
    "doubledot": 25,
    "dotregime": 20,
}


[docs]class Classifier: """Emulates binary classifiers implemented in scikit-learn. It includes methods loading labelled data saved to numpy files, splitting it into balanced sets, training and evaluating the classifier. The data to load is expected to be in the same format as outputted by `nanotune.data.export_data.export_data`. Attributes: hyper_parameters: hyperparameters/keyword arguments to pass to the binary classifier. category: which measurements wil be classified, e.g. 'pinchoff', 'singledot', 'dotregime'. Supported categories correspond to keys of nt.config['core']['features']. file_fractions: fraction of all data loaded that should be used. For a value less than 1, data is chosen at random. It can be used to calculate accuracy variation when different random sub-sets of avaliable data are used for training data. folder_path: path to folder where numpy data files are located.s file_paths: list of paths of numpy data files. classifier_type: string indicating which binary classifier to use. E.g 'SVG', 'MLPClassifier', .... For a full list see the keys of `DEFAULT_CLF_PARAMETERS`. retained_variance: variance to retain when calculating principal components. data_types: data types, one or more of: 'signal', 'gradient', 'frequencies', 'features'. test_size: size of test set; it's the relative proportional of all data loaded. clf: instance of a scikit-learn binary classifier. original_data: all data loaded. labels: labels of original data. """ def __init__( self, data_filenames: List[str], category: str, data_types: Optional[List[str]] = None, folder_path: Optional[str] = None, test_size: float = 0.2, classifier_type: Optional[str] = None, hyper_parameters: Optional[Dict[str, Union[str, float, int]]] = None, retained_variance: float = 0.99, file_fractions: Optional[List[float]] = None, ) -> None: if folder_path is None: folder_path = nt.config["db_folder"] if hyper_parameters is None: hyper_parameters = {} if category not in ALLOWED_CATEGORIES: logger.error( "Classifier category must be one of {}".format(ALLOWED_CATEGORIES) ) raise ValueError self.category = category if data_types is None: data_types = ["signal", "frequencies"] if "features" in data_types: self._feature_indexes = RELEVANT_FEATURE_INDEXES[category] else: self._feature_indexes = [] if category in DOT_LABEL_MAPPING.keys(): relevant_labels = DOT_LABEL_MAPPING[category] else: relevant_labels = [0, 1] self._relevant_labels = sorted(relevant_labels) if file_fractions is None: file_fractions = [1.0] * len(data_filenames) self.file_fractions = file_fractions # Add default values to parameter dict in case some have not been # specified if classifier_type is None: default_params = {} else: default_params = DEFAULT_CLF_PARAMETERS[classifier_type] default_params.update(hyper_parameters) self.hyper_parameters = default_params self.folder_path = folder_path self.file_paths, name_addon = self._list_paths(data_filenames) self.name = name_addon + category + "_" if classifier_type is not None: self.name += classifier_type self.classifier_type = classifier_type self.retained_variance = retained_variance self.data_types = data_types self.test_size = test_size if classifier_type == "SVC": self.clf = svm.SVC(**self.hyper_parameters) elif classifier_type == "LogisticRegression": self.clf = LogisticRegression(**self.hyper_parameters) elif classifier_type == "LinearSVC": self.clf = svm.LinearSVC(**self.hyper_parameters) elif classifier_type == "MLPClassifier": self.clf = MLPClassifier(**self.hyper_parameters) elif classifier_type == "GaussianProcessClassifier": self.clf = GaussianProcessClassifier(**self.hyper_parameters) elif classifier_type == "DecisionTreeClassifier": self.clf = DecisionTreeClassifier(**self.hyper_parameters) elif classifier_type == "RandomForestClassifier": self.clf = RandomForestClassifier(**self.hyper_parameters) elif classifier_type == "AdaBoostClassifier": self.clf = AdaBoostClassifier(**self.hyper_parameters) elif classifier_type == "GaussianNB": self.clf = GaussianNB(**self.hyper_parameters) elif classifier_type == "QuadraticDiscriminantAnalysis": self.clf = QuadraticDiscriminantAnalysis(**self.hyper_parameters) elif classifier_type == "KNeighborsClassifier": self.clf = KNeighborsClassifier(**self.hyper_parameters) else: self.clf = None (self.original_data, self.labels) = self.load_data( self.file_paths, self.data_types, file_fractions=self.file_fractions )
[docs] def load_data( self, file_paths: List[str], data_types: List[str], file_fractions: Optional[List[float]] = [1.0], ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.int64]]: """Loads data including labels from numpy files. Args: file_paths: list of paths of numpy data files. data_types: data types, one or more of: 'signal', 'gradient', 'frequencies', 'features'. file_fractions: fraction of all data loaded that should be used. For a value less than 1, data is chosen at random. It can be used to calculate accuracy variation when different random sub-sets of avaliable data are used for training data. Returns: np.array: data np.array: labels """ DATA_TYPE_MAPPING = dict(nt.config["core"]["data_types"]) len_2d = np.prod(nt.config["core"]["standard_shapes"]["2"]) + 1 all_the_stuff = np.empty([len(DATA_TYPE_MAPPING), 0, len_2d]) try: for ip in range(len(file_paths)): sub_data = np.array( np.load(file_paths[ip], allow_pickle=True), dtype=np.float64 ) frac = file_fractions[ip] # type: ignore n_samples = int(round(sub_data.shape[1] * frac)) select = np.random.choice(sub_data.shape[1], n_samples, replace=False) sub_data = sub_data[:, select, :] all_the_stuff = np.concatenate([all_the_stuff, sub_data], axis=1) except ValueError: len_1d = np.prod(nt.config["core"]["standard_shapes"]["1"]) + 1 all_the_stuff = np.empty([len(DATA_TYPE_MAPPING), 0, len_1d]) for ip in range(len(file_paths)): sub_data = np.array( np.load(file_paths[ip], allow_pickle=True), dtype=np.float64 ) frac = file_fractions[ip] # type: ignore n_samples = int(round(sub_data.shape[1] * frac)) select = np.random.choice(sub_data.shape[1], n_samples, replace=False) sub_data = sub_data[:, select, :] all_the_stuff = np.concatenate([all_the_stuff, sub_data], axis=1) labels = all_the_stuff[0, :, -1] data = all_the_stuff[:, :, :-1] relevant_data = np.empty((data.shape[1], 0)) for data_type in data_types: to_append = data[DATA_TYPE_MAPPING[data_type]] if data_type == "features": to_append[to_append == nt.config["core"]["fill_value"]] = np.nan to_append = to_append[:, np.isfinite(to_append).any(axis=0)] try: to_append = to_append[:, self._feature_indexes] except IndexError: logger.warning( "Some data in {} ".format(file_paths) + "does not have the" + "feature requested. Make sure all data " + "has been fitted with appropriate" + " fit classes." ) relevant_data = np.append(relevant_data, to_append, axis=1) # remove NaNs in labels mask = ~np.isnan(labels) relevant_data = relevant_data[mask, :] labels = labels[mask] # remove NaNs in data mask = np.isfinite(relevant_data).any(axis=-1) relevant_data = relevant_data[mask] labels = labels[mask].astype(int) relevant_data, labels = self.select_relevant_data(relevant_data, labels) return relevant_data, labels
[docs] def select_relevant_data( self, data: npt.NDArray[np.float64], labels: npt.NDArray[np.int64], ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.int64]]: """Extract a subset data depending on which labels we would like to predict. Only relevant for dot data as the data file may contain four different labels. Args: data: original data label: original labels """ shape = data.shape relevant_data = np.empty((0, shape[1])) relevant_labels = np.empty([0]) for il, label in enumerate(self._relevant_labels): relevant_indx = np.where(labels == label)[0] relevant_data = np.concatenate( [relevant_data, data[relevant_indx, :]], axis=0 ) relevant_labels = np.concatenate( [relevant_labels, np.ones(len(relevant_indx)) * il], axis=0 ) return relevant_data, relevant_labels
[docs] def train( self, data: Optional[npt.NDArray[np.float64]] = None, labels: Optional[npt.NDArray[np.int64]] = None, ) -> None: """Trains binary classifier. Equal populations of data are selected before the sklearn classifier is fitted. Args: data: prepped data label: corresponding labels """ if data is None: data = self.original_data if labels is None: labels = self.labels (data_to_use, labels_to_use) = self.select_equal_populations( data, labels) X_train, _ = self.prep_data(train_data=data_to_use) self.clf.fit(X_train, labels_to_use)
[docs] def prep_data( self, train_data: Optional[npt.NDArray[np.float64]] = None, test_data: Optional[npt.NDArray[np.float64]] = None, perform_pca: Optional[bool] = False, scale_pc: Optional[bool] = False, ) -> Tuple[Optional[npt.NDArray[np.float64]], Optional[npt.NDArray[np.float64]]]: """Prepares data for training and testing. It scales the data and extracts principle components is desired. Transformations are applied to both train and test data, although transformations are first fitted on training data and then applied to test data. Args: train_data: dataset to be used for training test_data: dataset to be used for testing perform_pca: whether or not to perform a PCA and thus use principal components for training and testing. scale_pc: whether to scale principal components. Returns: np.array: prepped training data np.array: prepped testing data """ (train_data, test_data) = self.scale_raw_data( train_data=train_data, test_data=test_data ) if perform_pca: (train_data, test_data) = self.get_principle_components( train_data=train_data, test_data=test_data ) if scale_pc: (train_data, test_data) = self.scale_compressed_data( train_data=train_data, test_data=test_data ) return train_data, test_data
[docs] def select_equal_populations( self, data: npt.NDArray[np.float64], labels: npt.NDArray[np.int64], ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.int64]]: """Selects a subset of data at random so that each population/label appears equally often. Makes for a balanced dataset. Args: data: the data to subsample and balance labels: labels corresponding to `data`. """ populations_labels, population_counts = np.unique(labels, return_counts=True) n_each = int(np.min(population_counts)) new_data = np.empty([n_each * len(populations_labels), data.shape[-1]]) new_labels = np.empty(n_each * len(populations_labels), int) for ii, label in enumerate(populations_labels): idx = np.where(labels == int(label)) idx = np.random.choice(idx[0], n_each, replace=False) idx = idx.astype(int) dat = data[idx] new_data[ii * n_each : (ii + 1) * n_each] = dat label_array = np.ones(n_each, dtype=int) * int(label) new_labels[ii * n_each : (ii + 1) * n_each] = label_array p = np.random.permutation(len(new_labels)) return new_data[p], new_labels[p]
[docs] def split_data( self, data: npt.NDArray[np.float64], labels: npt.NDArray[np.int64], ) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.int64], npt.NDArray[np.int64]]: """Splits data into train and test set. Emulates sklearn's `train_test_split`. Args: data: the data to split labels: labels of `data` Returns: np.array: train data np.array: test data np.array: train labels np.array: test labels """ (train_data, test_data, train_labels, test_labels) = train_test_split( data, labels, test_size=self.test_size, random_state=0 ) return train_data, test_data, train_labels, test_labels
[docs] def scale_raw_data( self, train_data: Optional[npt.NDArray[np.float64]] = None, test_data: Optional[npt.NDArray[np.float64]] = None, ) -> Tuple[Optional[npt.NDArray[np.float64]], Optional[npt.NDArray[np.float64]]]: """Scales data loaded from numpy files by emulating sklearn's `StandardScaler`. The scaler is first fitted using the train set before also scaling the test set. Args: train_data: dataset to be used for training test_data: dataset to be used for testing Returns: np.array: scaled training data np.array: scaled test data """ if train_data is not None: self.raw_scaler = StandardScaler() self.raw_scaler.fit(train_data) train_data = self.raw_scaler.transform(train_data) if test_data is not None: if hasattr(self, "raw_scaler"): test_data = self.raw_scaler.transform(test_data) else: logger.error("Scale train data before scaling test data.") raise AttributeError return train_data, test_data
[docs] def scale_compressed_data( self, train_data: Optional[npt.NDArray[np.float64]] = None, test_data: Optional[npt.NDArray[np.float64]] = None, ) -> Tuple[Optional[npt.NDArray[np.float64]], Optional[npt.NDArray[np.float64]]]: """Scales previously computed principal components. Args: train_data: dataset to be used for training, containing principal components. test_data: dataset to be used for testing, containing principal components. Returns: np.array: scaled training data containing scaled principal components. np.array: scaled test data containing scaled principal components. """ if train_data is not None: self.compressed_scaler = StandardScaler() self.compressed_scaler.fit(train_data) train_data = self.compressed_scaler.transform(train_data) if test_data is not None: if hasattr(self, "compressed_scaler"): test_data = self.compressed_scaler.transform(test_data) else: logger.error( "Train data principle components has not been \ have to be scaled before scaling test PC." ) raise AttributeError return train_data, test_data
[docs] def get_principle_components( self, train_data: Optional[npt.NDArray[np.float64]] = None, test_data: Optional[npt.NDArray[np.float64]] = None, ) -> Tuple[Optional[npt.NDArray[np.float64]], Optional[npt.NDArray[np.float64]]]: """Computes principal components. Args: train_data: dataset to be used for training test_data: dataset to be used for testing Returns: np.array: scaled training data containing principal components. np.array: scaled test data containing principal components. """ if train_data is not None: self.pca = PCA(self.retained_variance) self.pca.fit(train_data) train_data = self.pca.transform(train_data) if test_data is not None: if hasattr(self, "pca"): test_data = self.pca.transform(test_data) else: logger.error( "Compress train data before compressing test data.") raise AttributeError return train_data, test_data
[docs] def score(self, test_data: npt.NDArray[np.float64], test_labels: npt.NDArray[np.float64], ) -> float: """Emulates the binary classifiers `score` method and returns the mean accuracy for the given test set. Args: test_data: prepped test data test_labels: labels of `test_data`. Returns: float: mean accuracy. """ self.clf_score = self.clf.score(test_data, test_labels) return self.clf_score
[docs] def predict( self, dataid: int, db_name: str, db_folder: Optional[str] = None, readout_method_to_use: str = 'transport', ) -> List[Any]: """Classifies the trace of a QCoDeS dataset. Ideally, this data has been measured with nanotune and/or normalization constants saved to metadata under the `nt.meta_tag` key. Otherwise correct classification can not be guaranteed. Args: dataid: QCoDeS run ID. db_name: name of database db_folder: path to folder where database is located. Returns: array: containing the result as integers. """ if db_folder is None: db_folder = nt.config["db_folder"] DATA_TYPE_MAPPING = dict(nt.config["core"]["data_types"]) df = Dataset(dataid, db_name) condensed_data_all = prep_data( df, self.category, readout_method_to_use=readout_method_to_use) predictions = [] for condensed_data in condensed_data_all: relevant_data = np.empty((1, 0)) for data_type in self.data_types: to_append = condensed_data[DATA_TYPE_MAPPING[data_type]] if data_type == "features": to_append[to_append == nt.config["core"]["fill_value"]] = np.nan to_append = to_append[:, np.isfinite(to_append).any(axis=0)] # type: ignore try: to_append = to_append[:, self.feature_indexes] # type: ignore except IndexError: logger.warning( f"Some data in {dataid} does not have the requested \ features. Make sure all data has been fitted with \ appropriate fit classes." ) relevant_data = np.append(relevant_data, to_append, axis=1) _, relevant_data = self.prep_data(test_data=relevant_data) # type: ignore predictions.append(self.clf.predict(relevant_data)) return predictions
[docs] def compute_metrics( self, n_iter: Optional[int] = None, save_to_file: bool = True, filename: str = "", supp_train_data: Optional[List[str]] = None, n_supp_train: Optional[int] = None, perform_pca: bool = False, scale_pc: bool = False, ) -> Tuple[Dict[str, Dict[str, Any]], npt.NDArray[np.float64]]: """Computes different metrics of a classifier, averaging over a number of iterations. Beside metrics, the training time is tracked too. All information extracted is saved to a dict. Metrics computed are `accuracy_score`, `brier_score_loss`, `auc` (area under curve), `average_precision_recall` and the confusion matrix - all implemented in sklearn.metrics. Args: n_iter: number of train and test iterations over which the metrics statistic should be computed. save_to_file: whether to save metrics info to file. filename: name of json file if metrics are saved. supp_train_data: list of paths to files with additional training data. n_supp_train: number of datasets which should be added to the train set from additional data. perform_pca: whether the metrics should be computed using principal components for training and testing. scale_pc: whether principal components should be scaled. Returns: dict: summary od results, mapping the a string indicating the quantity onto the quantity itself. Containt mean and std each metric. np.array: metric results of all iterations. Shape is (len(METRIC_NAMES), n_iter), where the metrics appear in the order defined by METRIC_NAMES. """ if n_iter is None: n_iter = DEFAULT_N_ITER[self.category] metrics = np.empty([len(METRIC_NAMES), n_iter]) conf_matrix = [] start_time = time.time() train_times = [] if supp_train_data is not None: supp_train_data, name_addon = self._list_paths(supp_train_data) f_frac = [1.0] * len(supp_train_data) (train_data_addon, train_labels_addon) = self.load_data( supp_train_data, self.data_types, file_fractions=f_frac ) if n_supp_train is None: n_supp_train = train_data_addon.shape[0] mask = [1] * n_supp_train mask = mask + [0] * (train_data_addon.shape[0] - n_supp_train) mask_np = np.array(mask, dtype=bool) np.random.shuffle(mask_np) train_data_addon = train_data_addon[mask_np] train_labels_addon = train_labels_addon[mask_np] else: train_data_addon = None train_labels_addon = None for curr_iter in range(n_iter): start_time_train = time.time() (data_to_use, labels_to_use) = self.select_equal_populations( self.original_data, self.labels ) (train_data, test_data, train_labels, test_labels) = self.split_data(data_to_use, labels_to_use) if train_data_addon is not None: train_data = np.concatenate([train_data, train_data_addon], axis=0) train_labels = np.concatenate( [train_labels, train_labels_addon], axis=0 ) X_train, X_test = self.prep_data( train_data=train_data, test_data=test_data, perform_pca=perform_pca, scale_pc=scale_pc, ) probas = self.clf.fit(X_train, train_labels).predict_proba(X_test) train_times.append(time.time() - start_time_train) pred_labels = self.clf.predict(X_test) m_in = METRIC_NAMES.index("accuracy_score") metrics[m_in, curr_iter] = accuracy_score(test_labels, pred_labels) m_in = METRIC_NAMES.index("brier_score_loss") metrics[m_in, curr_iter] = brier_score_loss( test_labels, pred_labels) fpr, tpr, thresholds = roc_curve(test_labels, probas[:, 1]) m_in = METRIC_NAMES.index("auc") metrics[m_in, curr_iter] = auc(fpr, tpr) if hasattr(self.clf, "decision_function"): y_score = self.clf.decision_function(X_test) else: y_score = self.clf.predict_proba(X_test)[:, 1] m_in = METRIC_NAMES.index("average_precision_recall") metrics[m_in, curr_iter] = average_precision_score(test_labels, y_score) conf_matrix.append(confusion_matrix(test_labels, pred_labels)) elapsed_time = (time.time() - start_time) / n_iter conf_matrix_np: npt.NDArray[np.float64] = np.array(conf_matrix) info_dict: Dict[str, Any] = { "n_iter": n_iter, "classifier": self.classifier_type, "category": self.category, "data_files": self.file_paths, "data_types": self.data_types, "hyper_parameters": self.clf.get_params(), "metric_names": METRIC_NAMES, "elapsed_time [s/iter]": elapsed_time, "n_test": test_data.shape[0], "n_train": train_data.shape[0], "mean_train_time": np.mean(train_times), "std_train_time": np.std(train_times), "perform_pca": perform_pca, "scale_pc": scale_pc, "metadata": {}, "supp_train_data": supp_train_data, } for im, metric_name in enumerate(METRIC_NAMES): info_dict[metric_name] = { "std": np.std(metrics[im]), "mean": np.mean(metrics[im]), } info_dict["confusion_matrix"] = { "std": np.std(conf_matrix_np, axis=0).tolist(), "mean": np.mean(conf_matrix_np, axis=0).tolist(), } if save_to_file: if not filename: filename = self.name + "_" if supp_train_data is not None: filename = filename + name_addon + "_" filename += "_".join(self.data_types) if perform_pca: filename += "_PCA" if scale_pc: filename += "_scaled" filename += ".json" path = os.path.join(nt.config["db_folder"], "classifier_metrics") if not os.path.exists(path): os.makedirs(path) path = os.path.join(path, filename) with open(path, "w") as f: json.dump(info_dict, f) return info_dict, metrics
def _list_paths(self, filenames: List[str]) -> Tuple[List[str], str]: """Adds path to file names.""" file_paths = [] name = "" for filename in filenames: p = os.path.join(self.folder_path, filename) file_paths.append(p) name = name + os.path.splitext(filename)[0] + "_" return file_paths, name