Source code for archai.trainers.nlp.ds_training_args

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

import json
import os
from dataclasses import asdict, dataclass, field
from typing import Any, Dict, Union

import deepspeed
import torch

from archai.common.file_utils import get_full_path


[docs]@dataclass class DsTrainingArguments: """Define arguments used in the DeepSpeed training pipeline. Args: output_dir: Output folder. ds_config: DeepSpeed configuration (dictionary or path to JSON file). do_eval: Whether to enable evaluation. max_steps: Maximum number of training steps. logging_steps: Number of steps between logs. save_steps: Number of steps between checkpoints. seed: Random seed. local_rank: Rank of process. backend: Distributed training backend. eval_steps: Number of steps between evaluations. pipe_parallel: Whether to use pipeline parallelism. pipe_parallel_size: Size of pipeline parallelism. pipe_parallel_loss_fn: Loss function for pipeline parallelism. pipe_parallel_partition_method: Partition method for pipeline parallelism. pipe_parallel_activation_checkpoint_steps: Number of steps between pipeline parallelism activation checkpoins. """ output_dir: str = field(metadata={"help": "Output folder."}) ds_config: Union[dict, str] = field( default_factory=dict, metadata={"help": "DeepSpeed configuration (dictionary or path to JSON file)."} ) do_eval: bool = field(default=True, metadata={"help": "Whether to enable evaluation."}) max_steps: int = field(default=1, metadata={"help": "Maximum number of training steps."}) logging_steps: int = field(default=10, metadata={"help": "Number of steps between logs."}) save_steps: int = field(default=500, metadata={"help": "Number of steps between checkpoints."}) seed: int = field(default=42, metadata={"help": "Random seed."}) local_rank: int = field(default=os.getenv("LOCAL_RANK", -1), metadata={"help": "Rank of process."}) backend: int = field(default="nccl", metadata={"help": "Distributed training backend."}) eval_steps: int = field(default=500, metadata={"help": "Number of steps between evaluations."}) eval_max_steps: int = field(default=None, metadata={"help": "Number of maximum steps during evaluation."}) pipe_parallel_size: int = field(default=1, metadata={"help": "Size of pipeline parallelism."}) pipe_parallel_loss_fn: callable = field(default=None, metadata={"help": "Loss function for pipeline parallelism."}) pipe_parallel_partition_method: str = field( default="parameters", metadata={"help": "Partition method for pipeline parallelism."} ) pipe_parallel_activation_checkpoint_steps: int = field( default=0, metadata={"help": "Number of steps between pipeline parallelism activation checkpoins."} ) dataloader_pin_memory: bool = field(default=True, metadata={"help": "Whether to pin the data loader memory."}) dataloader_num_workers: int = field(default=0, metadata={"help": "Number of subprocesses to use for data loading."}) def __post_init__(self) -> None: """Override post-initialization with custom instructions.""" self.output_dir = get_full_path(self.output_dir) if isinstance(self.ds_config, str): with open(self.ds_config, "r") as f: self.ds_config = json.load(f) torch.manual_seed(self.seed) deepspeed.runtime.utils.set_random_seed(self.seed) self.local_rank = int(self.local_rank) torch.cuda.set_device(self.local_rank)
[docs] def to_dict(self) -> Dict[str, Any]: """Convert attributes into a dictionary representation. Returns: Attributes encoded as a dictionary. """ return asdict(self)