Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License

# Distributed Training For Extractive Summarization on CNN/DM Dataset

## Summary
This notebook demonstrates how to use Azure Machine Learning to run distributed training using Distributed Data Parallel in Pytorch for extractive summarization. For more detailed model related information, please see [extractive_summarization_cnndm_transformer.ipynb](extractive_summarization_cnndm_transformer.ipynb)

## Prerequisites
If you are using an Azure Machine Learning Notebook VM, you are all set. Otherwise, refer to the [Configuration Notebook](https://github.com/Azure/MachineLearningNotebooks/blob/master/configuration.ipynb) first if you haven't already to establish your connection to the AzureML Workspace. Prerequisites are:

- Azure subscription
- Azure Machine Learning Workspace
- Azure Machine Learning SDK

To run rouge evaluation, please refer to the section of compute_rouge_perl in [summarization_evaluation.ipynb](summarization_evaluation.ipynb). 

You can run this notebook on CPU-only machines.

## Import Libraries

In [None]:
%load_ext autoreload

In [None]:
%autoreload 2

In [None]:
import os
import sys
from tempfile import TemporaryDirectory
import torch

import azureml.core
from azureml.core import Experiment, Workspace, Run
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from azureml.train.dnn import PyTorch
from azureml.train.dnn import Nccl
from azureml.widgets import RunDetails

nlp_path = os.path.abspath("../../")
if nlp_path not in sys.path:
    sys.path.insert(0, nlp_path)
from utils_nlp.azureml.azureml_utils import get_or_create_workspace
from utils_nlp.dataset.cnndm import CNNDMSummarizationDataset
from utils_nlp.eval import compute_rouge_python
from utils_nlp.models.transformers.extractive_summarization import (
    ExtractiveSummarizer,
    ExtSumProcessedData,
    ExtSumProcessor,
)
# Check core SDK version number
print("SDK version:", azureml.core.VERSION)

import pprint

## Configuration 

In [None]:
# for Azure ML Workspacen
SUBSRIPTION_ID = "YOUR_SUBSCRIPTION_ID"
LOCATION = "YOUR_RESOURCE_GROUP_NAME"  # example "eastus2"
RESOURCE_GROUP = "YOUR_WORKSPACE_NAME"  # modifiy to use your own
WORKSPACE_NAME = "YOUR_WORKSPACE_REGION"  # modifiy to use your own

# for creating Azure ML Compute Cluster
AMLCOMPUTE_CLUSTER_NAME = "bertsumext"  # modifiy to use your own
NODE_COUNT = 2
VM_SIZE = "STANDARD_NC6"  # this should be the VM that's supported by Azure and Azure ML


# for creating Azure ML Experiment
EXPERIMENT_NAME = "NLP-ExtSum"  # modifiy to use your own


# local folder to save the downloaded data
LOCAL_DATA_FOLDER = (
    "./bertsumext_aml/data/"
)  # modify to use your own, the penultimate level folder should exist
LOCAL_CACHE_DIR = (
    "./bertsumext_aml/cache/"
) 
# Training related parameter
MODEL_NAME = "distilbert-base-uncased"  # limited choice
ENCODER = "transformer"
# folder in the workspace where the data is uploaded to
TARGET_DATA_FOLDER = "/bertsum_processed_data"  # modify to use your own
TARGET_OUTPUT_DIR = f"output/{EXPERIMENT_NAME}/"
# cache dir in the workspace
TARGET_CACHE_DIR = f"cache/{EXPERIMENT_NAME}/"

TRAIN_FILE = "train.pt"
TEST_FILE = "test.pt"
# file name for saving the prediction
SUMMARY_FILENAME = "generated_summaries.txt"
# file name for saving the trained model
MODEL_FILENAME = "dist_extsum.pt"


# local path to download the output from the cluster
LOCAL_OUTPUT_DIR = "./bertsumext_aml/output"  # modifiy to use your own, the penultimate level folder


# local folder to store all the related files to be copied to the workspace
PROJECT_FOLDER = "./azureml_exp"
# conda environment name, the yaml file will be copied to the workspace
CONDA_ENV_NAME = "nlp_gpu"

##
# The number of lines at the head of data file used for preprocessing. -1 means all the lines.
TOP_N = 100
QUICK_RUN = True
if not QUICK_RUN:
    TOP_N = -1

## Create an AML Workspace

In [None]:
# Create the workspace using the specified parameters
ws = get_or_create_workspace(
    workspace_name=WORKSPACE_NAME,
    subscription_id=SUBSRIPTION_ID,
    resource_group=RESOURCE_GROUP,
    workspace_region=LOCATION,
)

In [None]:
print(
    "Workspace name: " + ws.name,
    "Azure region: " + ws.location,
    "Subscription id: " + ws.subscription_id,
    "Resource group: " + ws.resource_group,
    sep="\n",
)

## Create an AML GPU Compute Cluster

In [None]:
try:
    gpu_compute_target = ComputeTarget(workspace=ws, name=AMLCOMPUTE_CLUSTER_NAME)
    print("Found existing compute target.")
except ComputeTargetException:
    print("Creating a new compute target...")
    compute_config = AmlCompute.provisioning_configuration(
        vm_size=VM_SIZE, max_nodes=NODE_COUNT, 
        idle_seconds_before_scaledown="600"
    )

    # create the cluster
    gpu_compute_target = ComputeTarget.create(
        ws, AMLCOMPUTE_CLUSTER_NAME, compute_config
    )

    gpu_compute_target.wait_for_completion(show_output=True)

# use get_status() to get a detailed status for the current AmlCompute.
print(gpu_compute_target.get_status().serialize())

## Create an Experiment

In [None]:
experiment = Experiment(ws, name=EXPERIMENT_NAME)
ds = ws.get_default_datastore()

In [None]:
train_dataset, test_dataset = CNNDMSummarizationDataset(top_n=TOP_N, local_cache_path=LOCAL_DATA_FOLDER)
processor = ExtSumProcessor(model_name=MODEL_NAME, cache_dir=LOCAL_CACHE_DIR)
ext_sum_train = processor.preprocess(train_dataset, oracle_mode="greedy")
ext_sum_test = processor.preprocess(test_dataset, oracle_mode="greedy")
save_path = os.path.join(LOCAL_DATA_FOLDER, "processed")
os.makedirs(save_path, exist_ok=True)
torch.save(ext_sum_train, os.path.join(save_path, TRAIN_FILE))
torch.save(ext_sum_test, os.path.join(save_path, TEST_FILE))

In [None]:
ds.upload(src_dir=save_path, target_path=TARGET_DATA_FOLDER)

## Prepare for the Experiment Run
Prepare the local project folder which is mirror to the workspace for the experiment

In [None]:
ENTRY_SCRIPT = "extractive_summarization_cnndm_distributed_train.py"
os.makedirs(PROJECT_FOLDER, exist_ok=True)
os.system("python ../../tools/generate_conda_file.py --gpu --name {}".format(CONDA_ENV_NAME))
os.system("cp ./nlp_gpu.yaml {}".format(PROJECT_FOLDER))
os.system("cp {} {}".format(ENTRY_SCRIPT, PROJECT_FOLDER))
os.system("cp -r ../../utils_nlp {}".format(PROJECT_FOLDER))

## Submit Run

In [None]:
os.makedirs(LOCAL_OUTPUT_DIR, exist_ok=True)
os.makedirs(os.path.join(LOCAL_OUTPUT_DIR, EXPERIMENT_NAME), exist_ok=True)

In [None]:
NcclConfig=Nccl()
quick_run = "true" if QUICK_RUN else "false"
estimator = PyTorch(source_directory=PROJECT_FOLDER,
                    compute_target=gpu_compute_target,
                    script_params={
                        "--dist_url": "$AZ_BATCHAI_PYTORCH_INIT_METHOD",
                        "--rank": "$AZ_BATCHAI_TASK_INDEX",
                        "--node_count": NODE_COUNT,
                        "--data_dir":ds.path(f'{TARGET_DATA_FOLDER}').as_mount(),
                        "--cache_dir": ds.path(f'{TARGET_CACHE_DIR}').as_mount(),
                        '--output_dir':ds.path(f'{TARGET_OUTPUT_DIR}').as_mount(),
                        "--quick_run":  quick_run,
                        "--summary_filename": f'{SUMMARY_FILENAME}',
                        "--model_filename": f'{MODEL_FILENAME}',
                        "--model_name": MODEL_NAME,
                        "--encoder": ENCODER,
                        "--train_file": TRAIN_FILE,
                        "--test_file": TEST_FILE
                    },
                    entry_script= ENTRY_SCRIPT,
                    node_count=NODE_COUNT,
                    distributed_training=NcclConfig,
                    conda_dependencies_file=f'{CONDA_ENV_NAME}.yaml',
                    use_gpu=True)

In [None]:
run = experiment.submit(estimator)

In [None]:
RunDetails(run).show()

In [None]:
"""
If you stop the notebook and come back, 
you'll need to use the run_id in the output of the previous cell 
to get run details.
"""
# fetched_run = Run(experiment, "NLP-ExtSum_1579816237_ea238f69")
# RunDetails(fetched_run).show()

## Download Generated Summaries 

In [None]:
# need to clear the local output dir as the ds.download won't download if the path exists 
os.system("rm -rf  {}/*".format(LOCAL_OUTPUT_DIR))

In [None]:
ds.download(target_path=LOCAL_OUTPUT_DIR,
                   prefix=f'{TARGET_OUTPUT_DIR}{SUMMARY_FILENAME}',
                   show_progress=True)

In [None]:
# the script uses <q> as sentence separator so it can write the prediction into the files properly
# here we need to replace <q> with "\n" to prepare for evalation
# removing the ending "\n" is also a preparation step for evalution.
prediction = []
with open(os.path.join(LOCAL_OUTPUT_DIR, f'{TARGET_OUTPUT_DIR}{SUMMARY_FILENAME}'), "r") as filehandle:
    for cnt, line in enumerate(filehandle):
        prediction.append(line[0:-1].replace("<q>", "\n")) # remove the ending "\n"

In [None]:
prediction[0]

Compare with gold summaries

In [None]:
source = []
temp_target = []
for i in ext_sum_test:
    source.append(i["src_txt"]) 
    temp_target.append(" ".join(j) for j in i['tgt']) 
target = ['\n'.join(i) for i in list(temp_target)]

In [None]:
target[0]

In [None]:
source[0]

## Download and evaluation the trained model

In [None]:
## you can also download the saved model and run prediction if you are running the notebook on a gpu machine
#"""
ds.download(target_path=LOCAL_OUTPUT_DIR,
               prefix=f'{TARGET_OUTPUT_DIR}{MODEL_FILENAME}',
               show_progress=True)

In [None]:
BATCH_SIZE = 32
summarizer = ExtractiveSummarizer(processor, encoder=ENCODER, cache_dir=LOCAL_CACHE_DIR)
summarizer.model.load_state_dict(
    torch.load(os.path.join(LOCAL_OUTPUT_DIR, f'{TARGET_OUTPUT_DIR}{MODEL_FILENAME}'),
               map_location="cpu"))

prediction = summarizer.predict(ext_sum_test, num_gpus=torch.cuda.device_count(), batch_size=BATCH_SIZE, sentence_separator = "\n")
#"""

In [None]:
prediction[0]

In [None]:
rouge_scores = compute_rouge_python(cand=prediction, ref=target)
pprint.pprint(rouge_scores)

## Cleanup

In [None]:
import shutil
if os.path.exists(LOCAL_DATA_FOLDER):
    shutil.rmtree(LOCAL_DATA_FOLDER, ignore_errors=True)
if os.path.exists(LOCAL_OUTPUT_DIR):
    shutil.rmtree(LOCAL_OUTPUT_DIR, ignore_errors=True)
if os.path.exists(LOCAL_CACHE_DIR):
    shutil.rmtree(LOCAL_CACHE_DIR, ignore_errors=True)
if os.path.exists(PROJECT_FOLDER):
    shutil.rmtree(PROJECT_FOLDER, ignore_errors=True)