*Copyright (c) Microsoft Corporation. All rights reserved.*

*Licensed under the MIT License.*

# Text Classification of MultiNLI Sentences using BERT with Azure ML Pipelines

![Impressions](https://PixelServer20190423114238.azurewebsites.net/api/impressions/nlp/examples/text_classification/tc_bert_azureml.png)

## 0. Introduction

In this notebook, we fine-tune and evaluate a pretrained [BERT](https://arxiv.org/abs/1810.04805) model on a subset of the [MultiNLI](https://www.nyu.edu/projects/bowman/multinli/) dataset using [AzureML](https://azure.microsoft.com/en-us/services/machine-learning-service/) Pipelines.

We use a [distributed sequence classifier](../../utils_nlp/bert/sequence_classification_distributed.py) that wraps [Hugging Face's PyTorch implementation](https://github.com/huggingface/pytorch-pretrained-BERT) of Google's [BERT](https://github.com/google-research/bert).

The notebooks acts as a template to,
1. Process a massive dataset in parallel by dividing the dataset into chunks using [DASK](https://dask.org/) .
2. Perform distributed training on AzureML compute on these processed chunks.

We create an [AzureML Pipeline](https://docs.microsoft.com/en-us/azure/machine-learning/service/concept-ml-pipelines) for the two steps mentioned above. With this pipeline, the notebook can be scheduled regularly to fine tune BERT with new data and get a model which can be further deployed on [Azure Container Instance](https://docs.microsoft.com/en-us/azure/container-service/).

AzureML Pipeline define reusable machine learning workflows that can be used as a template for your machine learning scenarios. Pipelines allow you to optimize your workflow and spend time on machine learning rather than infrastructure. If you are new to the concept of pipelines, [this would be a good place to get started](https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines).

**Note: To learn how to do pre-training on your own, please reference the [AzureML-BERT repo](https://github.com/microsoft/AzureML-BERT) created by Microsoft.**

In [29]:
import sys
sys.path.append("../../")
import os
import json
import random
import shutil
import pandas as pd

from utils_nlp.azureml import azureml_utils
from utils_nlp.dataset.multinli import get_generator

from sklearn.preprocessing import LabelEncoder
import azureml.core
from azureml.core import Datastore, Experiment,  get_run
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import RunConfiguration
from azureml.core.compute import ComputeTarget,  AmlCompute
from azureml.exceptions import ComputeTargetException
from azureml.data.data_reference import DataReference
from azureml.pipeline.steps import PythonScriptStep
from azureml.pipeline.core import Pipeline, PipelineData
from azureml.widgets import RunDetails
from azureml.train.dnn import PyTorch
from azureml.core.runconfig import MpiConfiguration
from azureml.pipeline.steps import EstimatorStep

print("System version: {}".format(sys.version))
print("Azure ML SDK Version:", azureml.core.VERSION)

System version: 3.6.8 |Anaconda, Inc.| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
Azure ML SDK Version: 1.0.48


Let's define a few variables before we get started, these variables define the folder where the data would reside, the batch size and the number of epochs we are training for. 
We also define the variables for AzureML workspace, which you can use to create a new workspace. You can ignore these variables if you have `config.json` in `.azureml` directory.

In [30]:
LABEL_COL = "genre"
DATA_FOLDER = "../../data/temp"
TRAIN_FOLDER = "../../data/temp/train"
TEST_FOLDER = "../../data/temp/test"
ENCODED_LABEL_COL = "label"
NUM_PARTITIONS = None
LABELS = ['telephone', 'government', 'travel', 'slate', 'fiction']
PROJECT_FOLDER = "../../"
NODE_COUNT = 4

config_path = (
    "./.azureml"
)  # Path to the directory containing config.json with azureml credentials

# Azure resources
subscription_id = "YOUR_SUBSCRIPTION_ID"
resource_group = "YOUR_RESOURCE_GROUP_NAME"  
workspace_name = "YOUR_WORKSPACE_NAME"  
workspace_region = "YOUR_WORKSPACE_REGION" #Possible values eastus, eastus2 and so on.
cluster_name = "pipelines-tc-12"

In this example we will use AzureML pipelines to execute training pipelines. Each preprocessing step is included as a step in the pipeline. For a more detailed walkthrough of what pipelines are with a getting started guidelines check this [notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/how-to-use-azureml/machine-learning-pipelines/intro-to-pipelines/aml-pipelines-getting-started.ipynb). We start by doing some AzureML related setup below.

### 0.1 Initialize a workspace

The following cell looks to set up the connection to your [Azure Machine Learning service Workspace](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#workspace). You can choose to connect to an existing workspace or create a new one. 

**To access an existing workspace:**
1. If you have a `config.json` file, you do not need to provide the workspace information; you will only need to update the `config_path` variable that is defined above which contains the file.
2. Otherwise, you will need to supply the following:
    * The name of your workspace
    * Your subscription id
    * The resource group name

**To create a new workspace:**

Set the following information:
* A name for your workspace
* Your subscription id
* The resource group name
* [Azure region](https://azure.microsoft.com/en-us/global-infrastructure/regions/) to create the workspace in, such as `eastus2`. 

This will automatically create a new resource group for you in the region provided if a resource group with the name given does not already exist. 

In [31]:
ws = azureml_utils.get_or_create_workspace(
    config_path=config_path,
    subscription_id=subscription_id,
    resource_group=resource_group,
    workspace_name=workspace_name,
    workspace_region=workspace_region,
)

### 0.2 Create a compute target
We create and attach a [compute target](https://docs.microsoft.com/azure/machine-learning/service/concept-azure-machine-learning-architecture#compute-target) for training the model. Here we use the AzureML-managed compute target ([AmlCompute](https://docs.microsoft.com/azure/machine-learning/service/how-to-set-up-training-targets#amlcompute)) as our remote training compute resource. Our cluster autoscales from 0 to 8 `STANDARD_NC12` GPU nodes.

Creating and configuring the AmlCompute cluster takes approximately 5 minutes the first time around. Once a cluster with the given configuration is created, it does not need to be created again.

As with other Azure services, there are limits on certain resources (e.g. AmlCompute) associated with the Azure Machine Learning service. Read more about the default limits and how to request more quota [here](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-manage-quotas).

In [32]:
try:
    compute_target = ComputeTarget(workspace=ws, name=cluster_name)
    print("Found existing compute target.")
except ComputeTargetException:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size="STANDARD_NC12", max_nodes=8
    )

    # create the cluster
    compute_target = ComputeTarget.create(ws, cluster_name, compute_config)

    compute_target.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current AmlCompute.
print(compute_target.get_status().serialize())

Found existing compute target.
{'currentNodeCount': 0, 'targetNodeCount': 0, 'nodeStateCounts': {'preparingNodeCount': 0, 'runningNodeCount': 0, 'idleNodeCount': 0, 'unusableNodeCount': 0, 'leavingNodeCount': 0, 'preemptedNodeCount': 0}, 'allocationState': 'Steady', 'allocationStateTransitionTime': '2019-08-11T08:53:18.284000+00:00', 'errors': None, 'creationTime': '2019-07-25T04:16:20.598768+00:00', 'modifiedTime': '2019-08-05T06:40:12.292030+00:00', 'provisioningState': 'Succeeded', 'provisioningStateTransitionTime': None, 'scaleSettings': {'minNodeCount': 0, 'maxNodeCount': 10, 'nodeIdleTimeBeforeScaleDown': 'PT120S'}, 'vmPriority': 'Dedicated', 'vmSize': 'STANDARD_NC12'}


## 1. Preprocessing

The pipeline is defined by a series of steps, the first being a PythonScriptStep which utilizes [DASK](https://dask.org/) to load dataframes in partitions allowing us to load and preprocess different sets of data in parallel.

### 1.1 Read Dataset

In [33]:
train_batches = get_generator(DATA_FOLDER, "train", num_batches=NUM_PARTITIONS, batch_size=10e6)
test_batches = get_generator(DATA_FOLDER, "dev_matched", num_batches=NUM_PARTITIONS, batch_size=10e6)

### 1.2 Preprocess and Tokenize

In the classification task, we use the first sentence only as the text input, and the corresponding genre as the label. Select the examples corresponding to one of the entailment labels (*neutral* in this case) to avoid duplicate rows, as the sentences are not unique, whereas the sentence pairs are.

Once filtered, we encode the labels. To do this, fit a label encoder with the known labels in a MNLI dataset.

In [34]:
os.makedirs(TRAIN_FOLDER, exist_ok=True)
os.makedirs(TEST_FOLDER, exist_ok=True)

labels = LABELS
label_encoder = LabelEncoder()
label_encoder.fit(labels)

num_train_partitions = 0
for batch in train_batches:
    batch = batch[batch["gold_label"]=="neutral"]
    batch[ENCODED_LABEL_COL] = label_encoder.transform(batch[LABEL_COL])
    batch.to_csv(TRAIN_FOLDER+"/batch{}.csv".format(str(num_train_partitions)))
    num_train_partitions += 1
    
num_test_partitions = 0
for batch in test_batches:
    batch = batch[batch["gold_label"]=="neutral"]
    batch[ENCODED_LABEL_COL] = label_encoder.transform(batch[LABEL_COL])
    batch.to_csv(TEST_FOLDER+"/batch{}.csv".format(str(num_test_partitions)))
    num_test_partitions += 1

Once we have the partitions of data ready they are uploaded to the datastore.

In [35]:
ds = ws.get_default_datastore()
ds.upload(src_dir=TRAIN_FOLDER, target_path="mnli_data/train", overwrite=True, show_progress=False)
ds.upload(src_dir=TEST_FOLDER, target_path="mnli_data/test", overwrite=True, show_progress=False)

$AZUREML_DATAREFERENCE_9609849b541244d396d06017b5729edb

In [36]:
shutil.rmtree(TRAIN_FOLDER)
shutil.rmtree(TEST_FOLDER)

We can now parallely operate on each batch to tokenize the data and preprocess the tokens. To do this, we create a PythonScript step below.

In [37]:
%%writefile preprocess.py
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import argparse
import logging
import os

import pandas as pd

from utils_nlp.models.bert.common import Language, Tokenizer

LABEL_COL = "genre"
TEXT_COL = "sentence1"
LANGUAGE = Language.ENGLISH
TO_LOWER = True
MAX_LEN = 150

logger = logging.getLogger(__name__)


def tokenize(df):
    """Tokenize the text documents and convert them to lists of tokens using the BERT tokenizer.
    Args:
        df(pd.Dataframe): Dataframe with training or test samples

    Returns:

        list: List of lists of tokens for train set.

    """
    tokenizer = Tokenizer(
        LANGUAGE, to_lower=TO_LOWER)
    tokens = tokenizer.tokenize(list(df[TEXT_COL]))

    return tokens


def preprocess(tokens):
    """ Preprocess method that does the following,
            Convert the tokens into token indices corresponding to the BERT tokenizer's vocabulary
            Add the special tokens [CLS] and [SEP] to mark the beginning and end of a sentence
            Pad or truncate the token lists to the specified max length
            Return mask lists that indicate paddings' positions
            Return token type id lists that indicate which sentence the tokens belong to (not needed
            for one-sequence classification)

    Args:
        tokens(pd.Dataframe): Dataframe with tokens for train set.

    Returns:
        list: List of lists of tokens for train or test set with special tokens added.
        list: Input mask.
    """
    tokenizer = Tokenizer(
        LANGUAGE, to_lower=TO_LOWER)
    tokens, mask, _ = tokenizer.preprocess_classification_tokens(
        tokens, MAX_LEN
    )

    return tokens, mask


parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str, help="input data")
parser.add_argument("--output_data", type=str, help="Path to the output file.")

args = parser.parse_args()
input_data = args.input_data
output_data = args.output_data
output_dir = os.path.dirname(os.path.abspath(output_data))

if output_dir is not None:
    os.makedirs(output_dir, exist_ok=True)
    logger.info("%s created" % output_dir)

df = pd.read_csv(args.input_data)
tokens_array = tokenize(df)
tokens_array, mask_array = preprocess(tokens_array)

df['tokens'] = tokens_array
df['mask'] = mask_array

# Filter columns
cols = ['tokens', 'mask', 'label']
df = df[cols]
df.to_csv(output_data, header=False, index=False)
logger.info("Completed")

Writing preprocess.py


In [38]:
preprocess_file = os.path.join(PROJECT_FOLDER,'utils_nlp/models/bert/preprocess.py')
shutil.move('preprocess.py',preprocess_file)

'../../utils_nlp/models/bert/preprocess.py'

Create a conda environment for the steps below.

In [39]:
conda_dependencies = CondaDependencies.create(
    conda_packages=[
        "numpy",
        "scikit-learn",
        "pandas",
    ],
    pip_packages=["azureml-sdk==1.0.43.*", 
                  "torch==1.1", 
                  "tqdm==4.31.1",
                 "pytorch-pretrained-bert>=0.6"],
    python_version="3.6.8",
)
run_config = RunConfiguration(conda_dependencies=conda_dependencies)
run_config.environment.docker.enabled = True

Then create the list of steps that use the preprocess.py created above. We use the output of these steps as input to training in the next section.

In [40]:
processed_train_files = []
processed_test_files = []
ds = ws.get_default_datastore()

for i in range(num_train_partitions):
        input_data = DataReference(datastore=ds, 
                                   data_reference_name='train_batch_{}'.format(str(i)), 
                                   path_on_datastore='mnli_data/train/batch{}.csv'.format(str(i)),
                                   overwrite=False)

        output_data = PipelineData(name="train{}".format(str(i)), datastore=ds,
                       output_path_on_compute='mnli_data/processed_train/batch{}.csv'.format(str(i)))

        step = PythonScriptStep(
            name='preprocess_step_train_{}'.format(str(i)),
            arguments=["--input_data", input_data, "--output_data", output_data],
            script_name= 'utils_nlp/models/bert/preprocess.py',
            inputs=[input_data],
            outputs=[output_data],
            source_directory=PROJECT_FOLDER,
            compute_target=compute_target,
            runconfig=run_config,
            allow_reuse=False,
        )
        
        processed_train_files.append(output_data)         
            
for i in range(num_test_partitions):
            input_data = DataReference(datastore=ds, 
                                       data_reference_name='test_batch_{}'.format(str(i)), 
                                       path_on_datastore='mnli_data/test/batch{}.csv'.format(str(i)),
                                       overwrite=False)
        
            output_data = PipelineData(name="test{}".format(str(i)), datastore=ds,
                        output_path_on_compute='mnli_data/processed_test/batch{}.csv'.format(str(i)))
            
            step = PythonScriptStep(
                name='preprocess_step_test_{}'.format(str(i)),
                arguments=["--input_data", input_data, "--output_data", output_data],
                script_name= 'utils_nlp/models/bert/preprocess.py',
                inputs=[input_data],
                outputs=[output_data],
                source_directory=PROJECT_FOLDER,
                compute_target=compute_target,
                runconfig=run_config,
                allow_reuse=False,
            )
            
            processed_test_files.append(output_data)

## 2. Train and Score

Once the data is processed and available on datastore, we  train the classifier using the training examples. This involves fine-tuning the BERT Transformer and learning a linear classification layer on top of that. After training is complete we score the performance of the model on the test dataset

The training is distributed and is done AzureML's capability to support distributed using MPI with horovod. 

**Please note** that training requires a GPU enabled cluster in AzureML Compute. We suggest using NC12. If you would like to change the GPU configuration, please changes `NUM_GPUS` variable accordingly.


### 2.1 Setup training script

In [41]:
%%writefile train.py
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

import argparse
import json
import logging
import os
import torch

from sklearn.metrics import classification_report

from utils_nlp.common.timer import Timer
from utils_nlp.models.bert.common import Language, get_dataset_multiple_files
from utils_nlp.models.bert.sequence_classification_distributed import (
    BERTSequenceClassifier,
)

BATCH_SIZE = 32
NUM_GPUS = 2
NUM_EPOCHS = 1
LABELS = ["telephone", "government", "travel", "slate", "fiction"]
OUTPUT_DIR = "./outputs/"

logger = logging.getLogger(__name__)

parser = argparse.ArgumentParser()
parser.add_argument(
    "--train_files",
    nargs="+",
    default=[],
    help="List of file paths to all the files in train dataset.",
)

parser.add_argument(
    "--test_files",
    nargs="+",
    default=[],
    help="List of file paths to all the files in test dataset.",
)

args = parser.parse_args()
train_files = [file.strip() for file in args.train_files]
test_files = [file.strip() for file in args.test_files]

# Handle square brackets from train list
train_files[0] = train_files[0][1:]
train_files[len(train_files) - 1] = train_files[len(train_files) - 1][:-1]
train_dataset = get_dataset_multiple_files(train_files)

# Handle square brackets from test list
test_files[0] = test_files[0][1:]
test_files[len(test_files) - 1] = test_files[len(test_files) - 1][:-1]
test_dataset = get_dataset_multiple_files(test_files)

# Train
classifier = BERTSequenceClassifier(
    language=Language.ENGLISH, num_labels=len(LABELS), use_distributed=True
)

# Create data loaders.
kwargs = (
    {"num_workers": NUM_GPUS, "pin_memory": True} if torch.cuda.is_available() else {}
)
train_data_loader = classifier.create_data_loader(
    train_dataset, batch_size=BATCH_SIZE, **kwargs
)
test_data_loader = classifier.create_data_loader(
    test_dataset, batch_size=BATCH_SIZE, mode="test", **kwargs
)

# Create optimizer
num_examples = len(train_dataset)
num_batches = int(num_examples / BATCH_SIZE)
num_train_optimization_steps = num_batches * NUM_EPOCHS
optimizer = classifier.create_optimizer(num_train_optimization_steps)

with Timer() as t:
    for epoch in range(1, NUM_EPOCHS + 1):
        train_data_loader.sampler.set_epoch(epoch)
        classifier.fit(
            train_data_loader,
            epoch=epoch,
            bert_optimizer=optimizer,
            num_gpus=NUM_GPUS,
            num_epochs=NUM_EPOCHS,
        )

# Predict
preds, labels_test = classifier.predict(test_data_loader, num_gpus=NUM_GPUS)

# Evaluate
results = classification_report(
    labels_test, preds, target_names=LABELS, output_dict=True
)

# Write out results.
classifier.save_model()
result_file = os.path.join(OUTPUT_DIR, "results.json")
with open(result_file, "w+") as fp:
    json.dump(results, fp)

Writing train.py


In [42]:
train_file = os.path.join(PROJECT_FOLDER,'utils_nlp/models/bert/train.py')
shutil.move('train.py',train_file)

'../../utils_nlp/models/bert/train.py'

### 2.2 Create a Pytorch Estimator

We create a Pytorch Estimator using AzureML SDK and additonally define an EstimatorStep to run it on AzureML pipelines.

The Azure ML SDK's PyTorch Estimator allows us to submit PyTorch training jobs for both single-node and distributed runs. For more information on the PyTorch estimator, refer [here](https://docs.microsoft.com/azure/machine-learning/service/how-to-train-pytorch).

This Estimator specifies that the training script will run on 4 nodes, with 2 worker per node. In order to execute a distributed run using GPU, we must define `use_gpu` and `distributed_backend` to use MPI/Horovod. PyTorch, Horovod, and other necessary dependencies are installed automatically.

In [43]:
estimator = PyTorch(source_directory=PROJECT_FOLDER,
                    compute_target=compute_target,
                    entry_script='utils_nlp/models/bert/train.py',
                    node_count= NODE_COUNT,
                    distributed_training= MpiConfiguration(),
                    use_gpu=True,
                    conda_packages=['scikit-learn=0.20.3', 'numpy>=1.16.0', 'pandas'],
                    pip_packages=["tqdm==4.31.1","pytorch-pretrained-bert>=0.6"]
                   )



In [44]:
inputs = processed_train_files + processed_test_files

est_step = EstimatorStep(name="Estimator-Train", 
                         estimator=estimator, 
                         estimator_entry_script_arguments=[
                             '--train_files',  str(processed_train_files),
                             '--test_files', str(processed_test_files)],
                         inputs = inputs,
                         runconfig_pipeline_params=None, 
                         compute_target=compute_target)

### 2.3 Submit the pipeline

The model is fine tuned on AML Compute and takes **45 minutes** to train. The total time to run the pipeline will be around **1h 30 minutes** if you use the default value `max_epoch=1`.

In [None]:
pipeline = Pipeline(workspace=ws, steps=[est_step])
experiment = Experiment(ws, 'NLP-TC-BERT-distributed')
pipeline_run = experiment.submit(pipeline)

In [46]:
RunDetails(pipeline_run).show()

_PipelineWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', â€¦

In [47]:
#If you would like to cancel the job for any reasons uncomment the code below.
#pipeline_run.cancel()

In [None]:
#wait for the run to complete before continuing in the notebook
pipeline_run.wait_for_completion()

### 2.4 Download and analyze results

In [49]:
step_run = pipeline_run.find_step_run("Estimator-Train")[0]
file_names = ['outputs/results.json', 'outputs/bert-large-uncased', 'outputs/bert_config.json' ]
azureml_utils.get_output_files(step_run, './outputs', file_names=file_names)

Downloading file outputs/results.json to ./outputs\results.json...
Downloading file outputs/bert-large-uncased to ./outputs\bert-large-uncased...
Downloading file outputs/bert_config.json to ./outputs\bert_config.json...


In [50]:
with open('outputs/results.json', 'r') as handle:
    parsed = json.load(handle)
    print(pd.DataFrame.from_dict(parsed).transpose())

              f1-score  precision    recall  support
telephone     0.904130   0.843191  0.974563    629.0
government    0.955857   0.972366  0.939900    599.0
travel        0.839966   0.935849  0.761905    651.0
slate         0.986411   0.974724  0.998382    618.0
fiction       0.938871   0.918712  0.959936    624.0
micro avg     0.925344   0.925344  0.925344   3121.0
macro avg     0.925047   0.928968  0.926937   3121.0
weighted avg  0.923913   0.928455  0.925344   3121.0


From the above chart we can notice the performance of the model trained on a distributed setup in AzureML Compute. From our comparison to fine tuning the same model on MNLI dataset on a `STANDARD_NC12` machine [here](tc_mnli_bert.ipynb) we notice a gain of 20% in the model training time with no drop in performance for AzureML Compute. We present the comparison of weight avg of the metrics along with the training time below,

| Training Setup | F1-Score | Precision | Recall | Training Time |
| --- | --- | --- | --- | --- |
|Standard NC12 | 0.93 |0.93 |0.93 | 58 min |
|AzureML Compute*|0.934| 0.934 | 0.934| 46 min |

* AzureML Compute - The setup used 4 nodes with `STANDARD_NC12` machines.

We also observe common tradeoffs associated with distributed training. We make use of [Horovod](https://github.com/horovod/horovod), a distributed training tool for many popular deep learning frameworks that enables parallelization of work across the nodes in the cluster. Distributed training decreases the time it takes for the model to converge in theory, but the model may also take more time in communicating with each node. Note that the communication time will eventually become negligible when training on larger and larger datasets, but being aware of this tradeoff is helpful for choosing the node configuration when training on smaller datasets. We expect the gains of using AzureML to increase with increased dataset size.

Finally clean up any intermediate files we created.

In [51]:
os.remove(train_file)
os.remove(preprocess_file)