Pipelining AI/ML Training Workloads with CUDA Streams

-

ninth in our series on performance profiling and optimization in PyTorch aimed toward emphasizing the critical role of performance evaluation and optimization in machine learning development. Throughout the series we’ve reviewed a wide selection of practical tools and techniques for analyzing and boosting the runtime performance of PyTorch-based AI/ML models. Our goal has been twofold:

  1. To emphasise the importance of routine evaluation and optimization of AI/ML workloads.
  2. To show the accessibility of a wide selection tools and techniques for analyzing and optimizing AI/ML runtime performance. You don’t must be a CUDA expert to meaningfully improve your model performance and reduce compute costs.

On this post, we are going to explore using CUDA streams, a strong feature of NVIDIA’s CUDA programming model that provides a classy approach to overlapping GPU operations and running them concurrently. Although we typically associate our AI/ML model training workload with a single monolithic (a.k.a., “unbreakable”) computation graph running on the GPU, there are some scenarios where the graph may be decomposed into two distinct subgraphs  and , where . In such cases CUDA streams enable “pipelining” the computation graph, i.e., programming our training step to run  (on batch input ) in parallel to (on the  output of ). This method is very useful when:

  • Neither subgraph fully utilizes the GPU when run alone, and
  • The 2 subgraphs are of comparable computational cost (i.e., neither dominates runtime).

We are going to explore two common scenarios where “pipelining” is possible:

  1. Partial-model training or finetuning:
    It’s common to freeze a pre-trained model  (e.g., feature extractor or encoder) and train only a model  (e.g., decoder). Because the frozen doesn’t depend on gradients from the , the 2 may be executed concurrently.
  2. Offloading data preprocessing to the GPU:
    A typical method for addressing bottlenecks within the input pipeline (also often known as GPU starvation), data preprocessing may be moved to the GPU. While prepending preprocessing operations to the model graph improves performance, additional gains may be achieved by running preprocessing on a separate CUDA stream in parallel with model execution—assuming preprocessing isn’t trivial in comparison with model compute.

To facilitate our discussion, we are going to define two toy training scripts and measure the training performance under different scenarios. The experiments were run on an Amazon EC2 g5.2xlarge instance (containing an NVIDIA A10G GPU and eight vCPUs) running a PyTorch (2.6) Deep Learning AMI (DLAMI).

Please note: the code snippets that we share are for demonstration purposes only —please don’t depend on their correctness or optimality. The impact of using CUDA streams will vary depending on model architecture and system configuration. We encourage you to conduct your personal profiling and experimentation before integrating CUDA streams (or every other tool technique we check with) into your workflow.

Part 1: Pipelining an Encoder-Decoder Model

The primary use-case we explore involves a CNN-based image segmentation model consisting of a hard and fast (pre-trained) encoder and a trainable decoder. On this scenario, for the reason that encoder weights are frozen and unaffected by backpropagation, the encoder may be executed independently of the decoder’s training. On this section, we assess the impact of pipelining the training process using CUDA streams.

A Toy Image Segmentation Training Experiment

We start by defining a straightforward CNN-based image encoder together with its corresponding decoder.

undefined

Next, we construct an artificial dataset of random images and segmentation maps.

from torch.utils.data import DataLoader
from torchvision.datasets.vision import VisionDataset

# A dataset with random images and per-pixel labels
class FakeDataset(VisionDataset):
    def __init__(self):
        super().__init__(root=None)
        self.size = 1000000

    def __getitem__(self, index):
        # create a random image
        img = torch.randint(0, 256, (3, img_size, img_size),
                            dtype=torch.uint8)

        # create a random label map
        goal = torch.randint(0, num_classes, (img_size, img_size))

        return img, goal

    def __len__(self):
        return self.size

train_set = FakeDataset()

train_loader = DataLoader(
    dataset=train_set,
    batch_size=8,
    num_workers=8
)

Finally, we define the loss function, optimizer, and training loop. Note, that we freeze the encoder’s weights and train only the decoder.

import time

device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(decoder.parameters())

# Freeze the encoder weights
encoder.requires_grad_(False)
encoder.eval().to(device)

decoder.train().to(device)

warmup = 10
active_batches = 100
total_iters = warmup + active_batches

for idx, data in enumerate(train_loader):
    inputs = data[0].to(device=device, non_blocking=True).float()
    labels = data[1].to(device=device, non_blocking=True)
    optimizer.zero_grad()
    with torch.no_grad():
        features = encoder(inputs)
    output = decoder(features)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()

    if idx == warmup:
        # sync the GPU and begin the timer
        torch.cuda.synchronize()
        t0 = time.perf_counter()

    if idx == total_iters:
        break

# wait for the GPU to finnish after which stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')

Our baseline training script achieves a median throughput of 83 steps per second, with a median GPU utilization of 85%.

Pipelining the Model Execution With CUDA Streams

Within the revised version of the training loop shown below, we introduce two CUDA streams: one for executing the encoder and one for training the decoder. In each iteration, we perform two operations concurrently:

  1. Train the decoder using the image features and labels from batch .
  2. Execute the encoder on input batch  to generate its image features.
encoder_stream = torch.cuda.Stream()
decoder_stream = torch.cuda.Stream()

# initialize the features to None
features = None

for idx, data in enumerate(train_loader):
    inputs = data[0].to(device, non_blocking=True).float()
    labels_next = data[1].to(device, non_blocking=True)

    if features isn't None:
        with torch.cuda.stream(decoder_stream):
            decoder_stream.wait_stream(encoder_stream)

            optimizer.zero_grad()
            output = decoder(features)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()

    with torch.cuda.stream(encoder_stream):
        with torch.no_grad():
            features =  encoder(inputs)
        # Record that features was produced on s1_backbone
        features.record_stream(encoder_stream)

    labels = labels_next

    if idx == warmup:
        # sync the GPU and begin the timer
        torch.cuda.synchronize()
        t0 = time.perf_counter()
    if idx == total_iters:
        break

# wait for the GPU to complete after which stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')

This modification yields a median throughput of 91 steps per second, representing a 9.6% speedup. This can be a significant improvement — especially considering that our baseline already had high GPU utilization (85%).

Sensitivity of Pipelining to Workload Properties

The effectiveness of pipelining with CUDA streams is extremely depending on the specifics of the training workload and runtime environment. If the encoder is significantly larger than the decoder (or vice versa), pipelining may offer little profit and even hinder performance. Conversely, when the GPU is underutilized, pipelining tends to yield more substantial gains.

As an example this dependency, we reran the experiment with various batch sizes. The outcomes are summarized below:

Impact of Pipelining With CUDA Streams on Throughput (by Writer)

Because the batch size increases, the good thing about pipelining diminishes. This is probably going because larger batch sizes naturally result in higher (and more efficient) GPU utilization, leaving less room for improvement through concurrent execution.

Part 2: Offloading Augmentations onto the GPU

On this section, we are going to apply using CUDA streams to the acceleration of knowledge augmentation. In previous blog posts (e.g., here and here), we’ve studied the issue of bottlenecks on the info input pipeline from different perspectives and reviewed several techniques for diagnosing and addressing them. A typical causes of those bottlenecks is CPU resource exhaustion, where the CPU cannot meet the computational demands of the preprocessing pipeline. The result’s GPU starvation — a scenario through which the expensive GPU sits idle, waiting for data to reach.

One effective solution is to dump heavy data preprocessing to the GPU. We are going to show this system and take it a step further by executing the augmentations on a dedicated CUDA stream, enabling concurrent execution with the model training.

A Toy Image Classification Training Experiment

We start by defining a straightforward CNN-based image classification model:

import torch
import torch.nn as nn

import torch
import torch.nn as nn

img_size = 256
num_classes = 10
model = nn.Sequential(
    # Start with 256x256 image
    nn.Conv2d(3, 16, kernel_size=1),
    nn.ReLU(inplace=True),
    nn.Conv2d(16, 32, kernel_size=2, stride=2),  # 2x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(32, 64, kernel_size=2, stride=2),  # 4x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(64, 128, kernel_size=2, stride=2),  # 8x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(128, 256, kernel_size=2, stride=2),  # 16x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(256, 512, kernel_size=2, stride=2),  # 32x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(512, 1024, kernel_size=2, stride=2),  # 64x downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(1024, 2048, kernel_size=2, stride=2),  # 128X downsample
    nn.ReLU(inplace=True),
    nn.Conv2d(2048, 4096, kernel_size=2, stride=2),  # 256X
    nn.Flatten(),
    nn.Linear(4096, num_classes)
)

Next, we create an artificial dataset with an augmentation pipeline intentionally designed to cause a severe performance bottleneck:

import random
from torch.utils.data import DataLoader
import torchvision.transforms.v2 as T
from torchvision.datasets.vision import VisionDataset
import torchvision.transforms.v2.functional as F
import torchvision.ops as ops

# A dataset with random images and labels
class FakeDataset(VisionDataset):
    def __init__(self, transform = None):
        super().__init__(root=None, transform=transform)
        self.size = 1000000

    def __getitem__(self, index):
        # create a random image
        img = torch.randint(0, 256, (3, img_size, img_size),
                           dtype=torch.uint8)
        # create a random label
        goal = torch.randint(0, num_classes, (1, ))

        if self.transform:
            # Apply tranformations
            img = self.transform(img)

        return img, goal

    def __len__(self):
        return self.size

augmentations = T.Compose([
    T.ToDtype(torch.float32),
    T.RandomCrop(img_size//2),
    T.Resize(img_size),
    T.RandomRotation(degrees=45.0),
    T.GaussianBlur(kernel_size=7),
    T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
])

train_set = FakeDataset(transform=augmentations)

train_loader = DataLoader(
    dataset=train_set,
    batch_size=32,
    num_workers=8
)

Finally, we define the loss function, optimizer, and training loop:

import time

device = torch.device("cuda")
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters())

model.train().to(device)

warmup = 10
active_batches = 100
total_iters = warmup + active_batches

for idx, data in enumerate(train_loader):
    inputs = data[0].to(device=device, non_blocking=True)
    labels = data[1].to(device=device, non_blocking=True).squeeze()
    optimizer.zero_grad()
    output = model(inputs)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()

    if idx == warmup:
        # sync the GPU and begin the timer
        torch.cuda.synchronize()
        t0 = time.perf_counter()

    if idx == total_iters:
        break

# wait for the GPU to finnish after which stop the timer
torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')

Running this baseline script ends in a median throughput of 20.41 steps per second and a GPU utilization of only 42%. The heavy data augmentations are choking the CPU resulting in GPU starvation. See our previous post for more information on detecting bottlenecks on the info input pipeline.

Offloading Data Augmentations to the GPU

To handle the performance bottleneck on the info input pipeline, we move the augmentations onto the GPU.

Step one is to define custom data transforms that apply random rotations and crops per sample in a batch. This is significant since the built-in torchvision transforms apply the identical augmentation across your entire batch — losing the per-sample randomness seen on the CPU.

We implement the  transform using the roi_align operator.

class BatchRandomCrop(T.Transform):
    def __init__(self, output_size):
        super().__init__()
        self.output_size = output_size

    def transform(self, img: torch.Tensor, params: dict):
        batch_size, _, original_height, original_width = img.shape
        device = img.device
        max_top = original_height - self.output_size
        max_left = original_width - self.output_size

        # Generate random top and left coords for every image within the batch
        random_top = torch.randint(0, max_top + 1, (batch_size,),
                                   device=device, dtype=torch.float32)
        random_left = torch.randint(0, max_left + 1, (batch_size,),
                                    device=device, dtype=torch.float32)

        image_indices = torch.arange(batch_size, device=device,
                                     dtype=torch.float32)

        boxes = torch.stack([
            image_indices,
            random_left,
            random_top,
            random_left + self.output_size,
            random_top + self.output_size
        ], dim=1)

        cropped_batch = ops.roi_align(
            img,
            boxes,
            output_size=self.output_size
        )
        return cropped_batch 

We implement the transfrom by iterating over all of the pictures within the batch and applying a random rotation to each. Note that this version isn’t vectorized; a completely vectorized implementation could be more would require greater effort.

class BatchRandomRotation(T.Transform):
    def __init__(self, degrees):
        super().__init__()
        self .degrees = degrees

    def transform(self, inpt: torch.Tensor, params: dict):
        # split the batch into a listing of individual images
        images = list(torch.unbind(inpt, dim=0))

        augmented_images = []
        for img_tensor in images:
            # generate a random angle
            angle = random.uniform(-self.degrees, self.degrees)

            # apply the rotation to the one image
            transformed_img = F.rotate(
                img_tensor,
                angle=angle
            )
            augmented_images.append(transformed_img)

        # stack the transformed images
        return torch.stack(augmented_images, dim=0)

We now define that mimics the CPU-based augmentation pipeline defined above:

batch_transform = T.Compose([
    T.ToDtype(torch.float32),
    BatchRandomCrop(img_size//2),
    T.Resize(img_size),
    BatchRandomRotation(degrees=45.0),
    T.GaussianBlur(kernel_size=7),
    T.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
]) 

Finally, we reset the dataset and update the training loop to use the brand new :

train_set = FakeDataset(transform=None)

train_loader = DataLoader(
    dataset=train_set,
    batch_size=32,
    num_workers=8
)

for idx, data in enumerate(train_loader):
    inputs = data[0].to(device=device, non_blocking=True)
    labels = data[1].to(device=device, non_blocking=True).squeeze()
    
    # apply augmentations
    inputs = batch_transform(inputs)
    
    optimizer.zero_grad()
    output = model(inputs)
    loss = criterion(output, labels)
    loss.backward()
    optimizer.step()

    if idx == warmup:
        torch.cuda.synchronize()
        t0 = time.perf_counter()

    if idx == total_iters:
        break

torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')

This updated training script improves throughput to 35.22 steps per second — a 72.57% speedup over the baseline result.

Pipelining Augmentations With CUDA Streams

Next, we pipeline the augmentation and training steps using two separate CUDA streams: one for running the info transform one for training the model. In each iteration of the loop we perform two concurrent operations:

  1. We train the model on the augmented batch .
  2. Perform GPU-based data augmentations on batch 
transform_stream = torch.cuda.Stream()
model_stream = torch.cuda.Stream()

# initialize the transformed value to None
transformed = None

for idx, data in enumerate(train_loader):
    inputs = data[0]
    labels_next = data[1]

    if transformed isn't None:
        with torch.cuda.stream(model_stream):
            labels = labels.to(device, non_blocking=True).squeeze()
            model_stream.wait_stream(transform_stream)
            optimizer.zero_grad()
            output = model(transformed)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()

    with torch.cuda.stream(transform_stream):
        inputs = inputs.to(device, non_blocking=True)
        transformed = batch_transform(inputs)
        # Record that the tensor was produced on transform_stream
        transformed.record_stream(transform_stream)

    labels = labels_next

    if idx == warmup:
        torch.cuda.synchronize()
        t0 = time.perf_counter()
    if idx == total_iters:
        break

torch.cuda.synchronize()
total_time = time.perf_counter() - t0
print(f'throughput: {active_batches / total_time}')

This further improves the throughput to 38.82 steps per second — a ten.2% increase over the serialized solution, and 90.20% faster than the unique baseline

Sensitivity of Pipelining to Workload Properties

As we saw in Part 1, the good thing about pipelining using CUDA streams varies based on the main points of the workload. Within the table below, we capture the outcomes for several different batch sizes:

Impact of Pipelining With CUDA Streams on Throughput (by Writer)

Because the batch size increases, GPU offloading becomes simpler, significantly boosting performance. At the identical time, the gains from pipelining decrease. This is probably going do to the very fact larger batch sizes increase the GPU efficiency, reducing the opportunities for overlap.

Summary

Relating to running AI/ML workloads, every millisecond counts. On this post we explored the impact of pipelining an AI/ML training step using CUDA stream in two common scenarios: partial model training and offloading data augmentations to the GPU. In each cases, the pipelined solution outperformed the serialized implementation — though the extent of the development varied significantly based on the worth of the batch size.

As we’ve emphasized throughout the post, the expected impact of using CUDA streams can vary greatly based on the AI/ML workload. For instance, in cases where the GPU is already being efficiently utilized, the overhead of using CUDA streams may very well result in a degradation in runtime performance. We strongly recommend testing this system on your personal workloads before adopting this approach.

We hope you’ll discover the technique described on this post useful. For more tip, tricks, and techniques for profiling and optimizing AI/ML workflows, take a look at the opposite posts on this series.

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x