AI in Multiple GPUs: Point-to-Point and Collective Operations

-

is an element of a series about distributed AI across multiple GPUs:

  • Part 1: Understanding the Host and Device Paradigm
  • Part 2: Point-to-Point and Collective Operations (this text)
  • Part 3: How GPUs Communicate
  • Part 4: Gradient Accumulation & Distributed Data Parallelism (DDP)
  • Part 5: ZeRO
  • Part 6: Tensor Parallelism 

Introduction

Within the previous post, we established the host-device paradigm and introduced the concept of ranks for multi-GPU workloads. Now, we’ll explore the particular communication patterns provided by PyTorch’s torch.distributed module to coordinate work and exchange data between these ranks. These operations, generally known as collectives, are the constructing blocks of distributed workloads.

Although PyTorch exposes these operations, it ultimately calls a backend framework that truly implements the communication. For NVIDIA GPUs, it’s NCCL (NVIDIA Collective Communications Library), while for AMD it’s RCCL (ROCm Communication Collectives Library).

NCCL implements multi-GPU and multi-node communication primitives optimized for NVIDIA GPUs and networking. It routinely detects the present topology (communication channels like PCIe, NVLink, InfiniBand) and selects probably the most efficient one.

Disclaimer 1: Since NVIDIA GPUs are probably the most common, we’ll give attention to the NCCL backend for this post.

Disclaimer 2: For brevity, the code presented below only provides the important arguments of every method as a substitute of all available arguments.

Disclaimer 3: For simplicity, we’re not showing the memory deallocation of tensors, but operations like scatter is not going to routinely free the memory of the source rank (if you happen to don’t understand what I mean, that’s fantastic, it’ll develop into clear very soon).

Communication: Blocking vs. Non-Blocking

To work together, GPUs must exchange data. The CPU initiates the communication by enqueuing NCCL kernels into CUDA streams (if you happen to don’t know what CUDA Streams are, try the first blog post of this series), however the actual data transfer happens directly between GPUs over the interconnect, bypassing the CPU’s important memory entirely. Ideally, the GPUs are connected with a high-speed interconnect like NVLink or InfiniBand (these interconnects are covered within the third post of this series).

This communication could also be synchronous (blocking) or asynchronous (non-blocking), which we explore below.

Synchronous (Blocking) Communication

  • Behavior: If you call a synchronous communication method, the host process stops and waits until the NCCL kernel is successfully enqueued on the present lively CUDA stream. Once enqueued, the function returns. This is frequently easy and reliable. Note that the host is just not waiting for the transfer to finish, only for the operation to be enqueued. Nonetheless, it blocks that specific stream from moving on to the following operation until the NCCL kernel is executed to completion.

Asynchronous (Non-Blocking) Communication

  • Behavior: If you call an asynchronous communication method, the decision returns immediately, and the enqueuing operation happens within the background. It doesn’t enqueue into the present lively stream, but quite to a dedicated internal NCCL stream per device. This permits your CPU to proceed with other tasks, a way generally known as overlapping computation with communication. The asynchronous API is more complex because it could result in undefined behavior if you happen to don’t properly use .wait() (explained below) and modify data while it’s being transferred. Nonetheless, mastering it is essential to unlocking maximum performance in large-scale distributed training.

Point-to-Point (One-to-One)

These operations are usually not considered collectives, but they’re foundational communication primitives. They facilitate direct data transfer between two specific ranks and are fundamental for tasks where one GPU must send specific information to a different.

  • Synchronous (Blocking): The host process waits for the operation to be enqueued to the CUDA stream before proceeding. The kernel is enqueued into the present lively stream.
    • torch.distributed.send(tensor, dst): Sends a tensor to a specified destination rank.
    • torch.distributed.recv(tensor, src): Receives a tensor from a source rank. The receiving tensor should be pre-allocated with the proper shape and dtype.
  • Asynchronous (Non-Blocking): The host process initiates the enqueue operation and immediately continues with other tasks. The kernel is enqueued right into a dedicated internal NCCL stream per device, which allows for overlapping communication with computation. These operations return a request(technically a Work object) that may be used to trace the enqueuing status.
    • request = torch.distributed.isend(tensor, dst): Initiates an asynchronous send operation.
    • request = torch.distributed.irecv(tensor, src): Initiates an asynchronous receive operation.
    • request.wait(): Blocks the host only until the operation has been successfully enqueued on the CUDA stream. Nonetheless, it does block the currently lively CUDA stream from executing later kernels until this specific asynchronous operation completes.
    • request.wait(timeout): If you happen to provide a timeout argument, the host behavior changes. It’s going to block the CPU thread until the NCCL work completes or times out (raising an exception). In normal cases, users don’t have to set the timeout.
    • request.is_completed(): Returns True if the operation has been successfully enqueued onto a CUDA stream. It could be used for polling. It doesn’t guarantee that the actual data has been transferred.

When PyTorch launches an NCCL kernel, it routinely inserts a dependency (i.e. forces a synchronization) between your current lively stream and the NCCL stream. This implies the NCCL stream won’t start until all previously enqueued work on the lively stream finishes — guaranteeing the tensor being sent already holds the ultimate values.

Similarly, calling req.wait() inserts a dependency in the opposite direction. Any work you enqueue on the present stream after req.wait() won’t execute until the NCCL operation completes, so you possibly can safely use the received tensors.

Major “Gotchas” in NCCL

While send and recv are labeled “synchronous,” their behavior in NCCL may be confusing. A synchronous call on a CUDA tensor blocks the host CPU thread only until the info transfer kernel is enqueued to the stream, not until the info transfer completes. The CPU is then free to enqueue other tasks.

There’s an exception: the very first call to torch.distributed.recv() in a process is truly blocking and waits for the transfer to complete, likely as a result of internal NCCL warm-up procedures. Subsequent calls will only block until the operation is enqueued.

Consider this instance where rank 1 hangs since the CPU tries to access a tensor that the GPU has not yet received:

rank = torch.distributed.get_rank()
if rank == 0:
   t = torch.tensor([1,2,3], dtype=torch.float32, device=device)
   # torch.distributed.send(t, dst=1) # No send operation is performed
else: # rank == 1 (assuming only 2 ranks)
   t = torch.empty(3, dtype=torch.float32, device=device)
   torch.distributed.recv(t, src=0) # Blocks only until enqueued (after first run)
   print("This WILL print if NCCL is warmed-up")
   print(t) # CPU needs data from GPU, causing a block
   print("This can NOT print")

The CPU process at rank 1 gets stuck on print(t) since it triggers a host-device synchronization to access the tensor’s data, which never arrives.

If you happen to run this code multiple times, notice that This WILL print if NCCL is warmed-up is not going to get printed within the later executions, for the reason that CPU continues to be stuck at print(t).

Collectives

Every collective operation function supports each sync and async operations through the async_op argument. It defaults to False, meaning synchronous operations.

One-to-All Collectives

These operations involve one rank sending data to all other ranks within the group.

Broadcast

  • torch.distributed.broadcast(tensor, src): Copies a tensor from a single source rank (src) to all other ranks. Every process finally ends up with a similar copy of the tensor. The tensor parameter serves two purposes: (1) when the rank of the method matches the src, the tensor is the info being sent; (2) otherwise, tensor is used to avoid wasting the received data.
rank = torch.distributed.get_rank()
if rank == 0: # source rank
  tensor = torch.tensor([1,2,3], dtype=torch.int64, device=device)
else: # destination ranks
  tensor = torch.empty(3, dtype=torch.int64, device=device)
torch.distributed.broadcast(tensor, src=0)
Image by writer: Broadcast visual animation

Scatter

  • torch.distributed.scatter(tensor, scatter_list, src): Distributes chunks of knowledge from a source rank across all ranks. The scatter_list on the source rank incorporates multiple tensors, and every rank (including the source) receives one tensor from this list into its tensor variable. The destination ranks just pass None for the scatter_list.
# The scatter_list should be None for all non-source ranks.
scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(device) for i in range(0,4,2)]
tensor = torch.empty(2, dtype=torch.int64).to(device)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} received: {tensor}')
Image by writer: Scatter visual animation

All-to-One Collectives

These operations gather data from all ranks and consolidate it onto a single destination rank.

Reduce

  • torch.distributed.reduce(tensor, dst, op): Takes a tensor from each rank, applies a discount operation (like SUMMAXMIN), and stores the on the destination rank (dst) only.
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], device=device)
torch.distributed.reduce(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)
Image by writer: Reduce visual animation

Gather

  • torch.distributed.gather(tensor, gather_list, dst): Gathers a tensor from every rank into an inventory of tensors on the destination rank. The gather_list should be an inventory of tensors (appropriately sized and typed) on the destination and None in all places else.
# The gather_list should be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(device)
torch.distributed.gather(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')

The variable world_size is the overall variety of ranks. It could be obtained with torch.distributed.get_world_size(). But don’t worry about implementation details for now, crucial thing is to know the concepts.

Image by writer: Gather visual animation

All-to-All Collectives

In these operations, every rank each sends and receives data from all other ranks.

All Reduce

  • torch.distributed.all_reduce(tensor, op): Same as reduce, however the result’s stored on rank as a substitute of only one destination.
# Example for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, device=device)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")
Image by writer: All Reduce visual animation

All Gather

  • torch.distributed.all_gather(tensor_list, tensor): Same as gather, however the gathered list of tensors is accessible on  rank.
# Example for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor([rank], dtype=torch.float32, device=device)
tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)]
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}")
Image by writer: All Gather visual animation

Reduce Scatter

  • torch.distributed.reduce_scatter(output, input_list): Equivalent of performing a reduce operation on an inventory of tensors after which scattering the outcomes. Each rank receives a unique a part of the reduced output.
# Example for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = [torch.tensor([rank + i], dtype=torch.float32, device=device) for i in range(world_size)]
output = torch.empty(1, dtype=torch.float32, device=device)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} received reduced value: {output.item()}")
Image by writer: Reduce Scatter visual animation

Synchronization

The 2 most often used operations are request.wait() and torch.cuda.synchronize(). It’s crucial to grasp the difference between these two:

  • request.wait(): That is used for asynchronous operations. It synchronizes the currently lively CUDA stream for that operation, ensuring the stream waits for the communication to finish before proceeding. In other words, it blocks the currently lively CUDA stream until the info transfer finishes. On the host side, it only causes the host to attend until the kernel is enqueued; the host does not wait for the info transfer to finish.
  • torch.cuda.synchronize(): It is a more forceful command that pauses the host CPU thread until all previously enqueued tasks on the GPU have finished. It guarantees that the GPU is totally idle before the CPU moves on, but it could create performance bottlenecks if used improperly. Each time you’ll want to perform benchmark measurements, you must use this to make sure you capture the precise moment the GPUs are done.

Conclusion

Congratulations on making it to the tip! On this post, you learned about:

  • Point-to-Point Operations
  • Sync and Async in NCCL
  • Collective operations
  • Synchronization methods

Within the next blog post we’ll dive into PCIe, NVLink, and other mechanisms that enable communication in a distributed setting!

References

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x