From PyTorch DDP to Speed up to Trainer, mastery of distributed training with ease

-


Zachary Mueller's avatar

This tutorial assumes you might have a basic understanding of PyTorch and the right way to train an easy model. It should showcase training on multiple GPUs through a process called Distributed Data Parallelism (DDP) through three different levels of accelerating abstraction:

  • Native PyTorch DDP through the pytorch.distributed module
  • Utilizing 🤗 Speed up’s light wrapper around pytorch.distributed that also helps make sure the code might be run on a single GPU and TPUs with zero code changes and miminimal code changes to the unique code
  • Utilizing 🤗 Transformer’s high-level Trainer API which abstracts all of the boilerplate code and supports various devices and distributed scenarios



What’s “Distributed” training and why does it matter?

Take some very basic PyTorch training code below, which sets up and trains a model on MNIST based on the official MNIST example

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

We define the training device (cuda):

device = "cuda"

Construct some PyTorch DataLoaders:

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

Move the model to the CUDA device:

model = BasicNet().to(device)

Construct a PyTorch optimizer:

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

Before finally making a simplistic training and evaluation loop that performs one full iteration over the dataset and calculates the test accuracy:

model.train()
for batch_idx, (data, goal) in enumerate(train_loader):
    data, goal = data.to(device), goal.to(device)
    output = model(data)
    loss = F.nll_loss(output, goal)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, goal in test_loader:
        data, goal = data.to(device), goal.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(goal.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

Typically from here, one could either throw all of this right into a python script or run it on a Jupyter Notebook.

Nevertheless, how would you then get this script to run on say two GPUs or on multiple machines if these resources can be found, which could improve training speed through distributed training? Just doing python myscript.py will only ever run the script using a single GPU. That is where torch.distributed comes into play



PyTorch Distributed Data Parallelism

Because the name implies, torch.distributed is supposed to work on distributed setups. This could include multi-node, where you might have various machines each with a single GPU, or multi-gpu where a single system has multiple GPUs, or some combination of each.

To convert our above code to work inside a distributed setup, just a few setup configurations must first be defined, detailed within the Getting Began with DDP Tutorial

First a setup and a cleanup function should be declared. This can open up a processing group that each one of the compute processes can communicate through

Note: for this section of the tutorial it needs to be assumed these are sent in python script files. In a while a launcher using Speed up shall be discussed that removes this necessity

import os
import torch.distributed as dist

def setup(rank, world_size):
    "Sets up the method group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "Cleans up the distributed environment"
    dist.destroy_process_group()

The last piece of the puzzle is how do I send my data and model to a different GPU?

That is where the DistributedDataParallel module comes into play. It should copy your model onto each GPU, and when loss.backward() is known as the backpropagation is performed and the resulting gradients across all these copies of the model shall be averaged/reduced. This ensures each device has the identical weights post the optimizer step.

Below is an example of our training setup, refactored as a function, with this capability:

Note: Here rank is the general rank of the present GPU in comparison with all the opposite GPUs available, meaning they’ve a rank of 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    
    ddp_model.train()
    for batch_idx, (data, goal) in enumerate(train_loader):
        data, goal = data.to(rank), goal.to(rank)
        output = model(data)
        loss = F.nll_loss(output, goal)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

The optimizer must be declared based on the model on the particular device (so ddp_model and never model) for all the gradients to properly be calculated.

Lastly, to run the script PyTorch has a convenient torchrun command line module that may help. Just pass within the variety of nodes it should use in addition to the script to run and you might be set:

torchrun --nproc_per_node=2 --nnodes=1 example_script.py

The above will run the training script on two GPUs that continue to exist a single machine and that is the barebones for performing only distributed training with PyTorch.

Now let’s discuss Speed up, a library aimed to make this process more seameless and in addition help with just a few best practices



🤗 Speed up

Speed up is a library designed to can help you perform what we just did above, while not having to change your code greatly. On top of this, the information pipeline innate to Speed up may improve performance to your code as well.

First, let’s wrap all the above code we just performed right into a single function, to assist us visualize the difference:

def train_ddp(rank, world_size):
    setup(rank, world_size)
    
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    
    ddp_model.train()
    for batch_idx, (data, goal) in enumerate(train_loader):
        data, goal = data.to(rank), goal.to(rank)
        output = ddp_model(data)
        loss = F.nll_loss(output, goal)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, goal in test_loader:
            data, goal = data.to(rank), goal.to(rank)
            output = ddp_model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(goal.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

Next let’s discuss how Speed up may help. There’s just a few issues with the above code:

  1. That is barely inefficient, provided that n dataloaders are made based on each device and pushed.
  2. This code will only work for multi-GPU, so special care would must be made for it to be ran on a single node again, or on TPU.

Speed up helps this through the Accelerator class. Through it, the code stays much the identical apart from three lines of code when comparing a single node to multinode, as shown below:

def train_ddp_accelerate():
    accelerator = Accelerator()
    
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    
    model = BasicNet()

    
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    
    model.train()
    for batch_idx, (data, goal) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, goal)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, goal in test_loader:
            data, goal = data.to(device), goal.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(goal.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

With this your PyTorch training loop is now setup to be ran on any distributed setup due to the Accelerator object. This code can then still be launched through the torchrun CLI or through Speed up’s own CLI interface, speed up launch.

Because of this its now trivialized to perform distributed training with Speed up and keeping as much of the barebones PyTorch code the identical as possible.

Earlier it was mentioned that Speed up also makes the DataLoaders more efficient. This is thru custom Samplers that may send parts of the batches mechanically to different devices during training allowing for a single copy of the information to be known at one time, fairly than 4 without delay into memory depending on the configuration. Together with this, there is barely a single full copy of the unique dataset in memory total. Subsets of this dataset are split between all the nodes which might be utilized for training, allowing for much larger datasets to be trained on a single instance without an explosion in memory utilized.



Using the notebook_launcher

Earlier it was mentioned you may start distributed code directly out of your Jupyter Notebook. This comes from Speed up’s notebook_launcher utility, which allows for starting multi-gpu training based on code within a Jupyter Notebook.

To make use of it’s as trivial as importing the launcher:

from speed up import notebook_launcher

And passing the training function we declared earlier, any arguments to be passed, and the variety of processes to make use of (resembling 8 on a TPU, or 2 for 2 GPUs). Each of the above training functions might be ran, but do note that after you begin a single launch, the instance must be restarted before spawning one other

notebook_launcher(train_ddp, args=(), num_processes=2)

Or:

notebook_launcher(train_ddp_accelerate, args=(), num_processes=2)



Using 🤗 Trainer

Finally, we arrive at the best level of API — the Hugging Face Trainer.

This wraps as much training as possible while still with the ability to train on distributed systems without the user needing to do anything in any respect.

First we’d like to import the Trainer:

from transformers import Trainer

Then we define some TrainingArguments to manage all the standard hyper-parameters. The trainer also works through dictionaries, so a custom collate function must be made.

Finally, we subclass the trainer and write our own compute_loss.

Afterwards, this code may even work on a distributed setup with none training code needing to be written by any means!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        goal = inputs["labels"]
        loss = F.nll_loss(outputs, goal)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)
trainer.train()
    ***** Running training *****
      Num examples = 60000
      Num Epochs = 1
      Instantaneous batch size per device = 64
      Total train batch size (w. parallel, distributed & accumulation) = 64
      Gradient Accumulation steps = 1
      Total optimization steps = 938
Epoch Training Loss Validation Loss
1 0.875700 0.282633

Similarly to the above examples with the notebook_launcher, this might be done again here by throwing all of it right into a training function:

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            goal = inputs["labels"]
            loss = F.nll_loss(outputs, goal)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)



Resources

To learn more about PyTorch Distributed Data Parallelism, try the documentation here

To learn more about 🤗 Speed up, try the documentation here

To learn more about 🤗 Transformers, try the documentation here



Source link

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x