This tutorial assumes you might have a basic understanding of PyTorch and the right way to train an easy model. It should showcase training on multiple GPUs through a process called Distributed Data Parallelism (DDP) through three different levels of accelerating abstraction:
- Native PyTorch DDP through the
pytorch.distributedmodule - Utilizing 🤗 Speed up’s light wrapper around
pytorch.distributedthat also helps make sure the code might be run on a single GPU and TPUs with zero code changes and miminimal code changes to the unique code - Utilizing 🤗 Transformer’s high-level Trainer API which abstracts all of the boilerplate code and supports various devices and distributed scenarios
What’s “Distributed” training and why does it matter?
Take some very basic PyTorch training code below, which sets up and trains a model on MNIST based on the official MNIST example
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu
def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
We define the training device (cuda):
device = "cuda"
Construct some PyTorch DataLoaders:
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
Move the model to the CUDA device:
model = BasicNet().to(device)
Construct a PyTorch optimizer:
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
Before finally making a simplistic training and evaluation loop that performs one full iteration over the dataset and calculates the test accuracy:
model.train()
for batch_idx, (data, goal) in enumerate(train_loader):
data, goal = data.to(device), goal.to(device)
output = model(data)
loss = F.nll_loss(output, goal)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, goal in test_loader:
data, goal = data.to(device), goal.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(goal.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
Typically from here, one could either throw all of this right into a python script or run it on a Jupyter Notebook.
Nevertheless, how would you then get this script to run on say two GPUs or on multiple machines if these resources can be found, which could improve training speed through distributed training? Just doing python myscript.py will only ever run the script using a single GPU. That is where torch.distributed comes into play
PyTorch Distributed Data Parallelism
Because the name implies, torch.distributed is supposed to work on distributed setups. This could include multi-node, where you might have various machines each with a single GPU, or multi-gpu where a single system has multiple GPUs, or some combination of each.
To convert our above code to work inside a distributed setup, just a few setup configurations must first be defined, detailed within the Getting Began with DDP Tutorial
First a setup and a cleanup function should be declared. This can open up a processing group that each one of the compute processes can communicate through
Note: for this section of the tutorial it needs to be assumed these are sent in python script files. In a while a launcher using Speed up shall be discussed that removes this necessity
import os
import torch.distributed as dist
def setup(rank, world_size):
"Sets up the method group and configuration for PyTorch Distributed Data Parallelism"
os.environ["MASTER_ADDR"] = 'localhost'
os.environ["MASTER_PORT"] = "12355"
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
"Cleans up the distributed environment"
dist.destroy_process_group()
The last piece of the puzzle is how do I send my data and model to a different GPU?
That is where the DistributedDataParallel module comes into play. It should copy your model onto each GPU, and when loss.backward() is known as the backpropagation is performed and the resulting gradients across all these copies of the model shall be averaged/reduced. This ensures each device has the identical weights post the optimizer step.
Below is an example of our training setup, refactored as a function, with this capability:
Note: Here rank is the general rank of the present GPU in comparison with all the opposite GPUs available, meaning they’ve a rank of
0 -> n-1
from torch.nn.parallel import DistributedDataParallel as DDP
def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
ddp_model.train()
for batch_idx, (data, goal) in enumerate(train_loader):
data, goal = data.to(rank), goal.to(rank)
output = model(data)
loss = F.nll_loss(output, goal)
loss.backward()
optimizer.step()
optimizer.zero_grad()
cleanup()
The optimizer must be declared based on the model on the particular device (so ddp_model and never model) for all the gradients to properly be calculated.
Lastly, to run the script PyTorch has a convenient torchrun command line module that may help. Just pass within the variety of nodes it should use in addition to the script to run and you might be set:
torchrun --nproc_per_node=2 --nnodes=1 example_script.py
The above will run the training script on two GPUs that continue to exist a single machine and that is the barebones for performing only distributed training with PyTorch.
Now let’s discuss Speed up, a library aimed to make this process more seameless and in addition help with just a few best practices
🤗 Speed up
Speed up is a library designed to can help you perform what we just did above, while not having to change your code greatly. On top of this, the information pipeline innate to Speed up may improve performance to your code as well.
First, let’s wrap all the above code we just performed right into a single function, to assist us visualize the difference:
def train_ddp(rank, world_size):
setup(rank, world_size)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
ddp_model.train()
for batch_idx, (data, goal) in enumerate(train_loader):
data, goal = data.to(rank), goal.to(rank)
output = ddp_model(data)
loss = F.nll_loss(output, goal)
loss.backward()
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, goal in test_loader:
data, goal = data.to(rank), goal.to(rank)
output = ddp_model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(goal.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
Next let’s discuss how Speed up may help. There’s just a few issues with the above code:
- That is barely inefficient, provided that
ndataloaders are made based on each device and pushed. - This code will only work for multi-GPU, so special care would must be made for it to be ran on a single node again, or on TPU.
Speed up helps this through the Accelerator class. Through it, the code stays much the identical apart from three lines of code when comparing a single node to multinode, as shown below:
def train_ddp_accelerate():
accelerator = Accelerator()
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])
train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)
model = BasicNet()
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
train_loader, test_loader, model, optimizer = accelerator.prepare(
train_loader, test_loader, model, optimizer
)
model.train()
for batch_idx, (data, goal) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, goal)
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
model.eval()
correct = 0
with torch.no_grad():
for data, goal in test_loader:
data, goal = data.to(device), goal.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(goal.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')
With this your PyTorch training loop is now setup to be ran on any distributed setup due to the Accelerator object. This code can then still be launched through the torchrun CLI or through Speed up’s own CLI interface, speed up launch.
Because of this its now trivialized to perform distributed training with Speed up and keeping as much of the barebones PyTorch code the identical as possible.
Earlier it was mentioned that Speed up also makes the DataLoaders more efficient. This is thru custom Samplers that may send parts of the batches mechanically to different devices during training allowing for a single copy of the information to be known at one time, fairly than 4 without delay into memory depending on the configuration. Together with this, there is barely a single full copy of the unique dataset in memory total. Subsets of this dataset are split between all the nodes which might be utilized for training, allowing for much larger datasets to be trained on a single instance without an explosion in memory utilized.
Using the notebook_launcher
Earlier it was mentioned you may start distributed code directly out of your Jupyter Notebook. This comes from Speed up’s notebook_launcher utility, which allows for starting multi-gpu training based on code within a Jupyter Notebook.
To make use of it’s as trivial as importing the launcher:
from speed up import notebook_launcher
And passing the training function we declared earlier, any arguments to be passed, and the variety of processes to make use of (resembling 8 on a TPU, or 2 for 2 GPUs). Each of the above training functions might be ran, but do note that after you begin a single launch, the instance must be restarted before spawning one other
notebook_launcher(train_ddp, args=(), num_processes=2)
Or:
notebook_launcher(train_ddp_accelerate, args=(), num_processes=2)
Using 🤗 Trainer
Finally, we arrive at the best level of API — the Hugging Face Trainer.
This wraps as much training as possible while still with the ability to train on distributed systems without the user needing to do anything in any respect.
First we’d like to import the Trainer:
from transformers import Trainer
Then we define some TrainingArguments to manage all the standard hyper-parameters. The trainer also works through dictionaries, so a custom collate function must be made.
Finally, we subclass the trainer and write our own compute_loss.
Afterwards, this code may even work on a distributed setup with none training code needing to be written by any means!
from transformers import Trainer, TrainingArguments
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
goal = inputs["labels"]
loss = F.nll_loss(outputs, goal)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
***** Running training *****
Num examples = 60000
Num Epochs = 1
Instantaneous batch size per device = 64
Total train batch size (w. parallel, distributed & accumulation) = 64
Gradient Accumulation steps = 1
Total optimization steps = 938
| Epoch | Training Loss | Validation Loss |
|---|---|---|
| 1 | 0.875700 | 0.282633 |
Similarly to the above examples with the notebook_launcher, this might be done again here by throwing all of it right into a training function:
def train_trainer_ddp():
model = BasicNet()
training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)
def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}
class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
goal = inputs["labels"]
loss = F.nll_loss(outputs, goal)
return (loss, outputs) if return_outputs else loss
trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)
trainer.train()
notebook_launcher(train_trainer_ddp, args=(), num_processes=2)
Resources
To learn more about PyTorch Distributed Data Parallelism, try the documentation here
To learn more about 🤗 Speed up, try the documentation here
To learn more about 🤗 Transformers, try the documentation here
