Constructing Scalable and Fault-Tolerant NCCL Applications

-


The NVIDIA Collective Communications Library (NCCL) provides communication APIs for low-latency and high-bandwidth collectives, enabling AI workloads to scale from just just a few GPUs on a single host to 1000’s of GPUs in a knowledge center. This post discusses NCCL features that support run-time rescaling for cost optimization, in addition to minimizing service downtime from faults by dynamically removing faulted staff.

Enabling scalable AI with NCCL

NCCL was introduced in 2015 to speed up AI training using a couple of GPU to coach the model together. Over the subsequent decade, training workloads have expanded to 1000’s of GPUs, and recent models proceed to extend in size and complexity. 

Today, each training and inference workloads depend on multi-GPU collectives that mix data parallelism, tensor parallelism, and expert parallelism to fulfill latency and throughput goals. NCCL collectives proceed to form the communication backbone for these strategies, synchronizing computation across multiple staff (generally known as ranks) inside a communicator.

Typically, a deep learning framework will perform a single initialization step at launch time to find out data sharding and assign each GPU their specific tasks in multiple dimensions of parallelism. Nonetheless, because the model size and the necessity for parallelism in these inference engines increases, dynamically reallocating resources at runtime becomes attractive for minimizing operational footprint.  

A dynamically scalable inference engine can reply to increased user traffic by allocating additional GPUs and spreading the work across them, or relinquishing excess GPUs when traffic is low so as to optimize cost. These are examples of planned scaling events through which all parts of the system are working as designed. We’ll show that this pattern is helpful for fault tolerance as well.

The diagram shows four workers with a status bar indicating near 100% compute load. After scaling up to six workers the status bar shows reduced compute load.
The diagram shows four workers with a status bar indicating near 100% compute load. After scaling up to six workers the status bar shows reduced compute load.
Figure 1. An inference cluster experiences increased traffic, which can impact response latency. The framework allocates two additional staff which join the communicator to share the load

How NCCL communicators enable dynamic application scaling

NCCL communicators were heavily inspired by MPI communicators. Nonetheless, NCCL introduced vital differences and recent concepts to enable dynamic application scaling. 

  • NCCL communicators may be created from scratch by the appliance at any point during execution by passing a uniqueId to ncclCommInit. In contrast, MPI creates a special communicator called MPI_COMM_WORLD during initialization, and all other communicators are subsets created with MPI_Comm_split.
  • NCCL communicators may be configured to be non-blocking in order that initialization functions may proceed within the background.
  • In NCCL, the appliance chooses the project of ranks to communicator members, allowing applications to optimize the communicator layout.

Once a communicator is created, the set of members (ranks) is taken into account immutable. Subsequently a NCCL application performing a scale-up operation executes a sequence very like a second initialization. A brand new uniqueId is obtained and shared across all ranks who pass it to ncclCommInit. An optimized application may enable nonblocking mode to let the initialization work proceed within the background while continuing to process requests using the old communicator until the brand new one is prepared.

Similarly, a scale-down may be implemented the identical way using ncclCommInit, or the appliance can call ncclCommShrink, which has been optimized to scale back initialization time by re-using rank information from the old communicator. This optimization is especially useful for very large communicators, but additionally provides a simplified API at any scale.

Fault-tolerant NCCL applications

Fault detection, attribution, and mitigation encompass a fancy topic that spans all the application stack from physical layers as much as application layers. To learn more about faults and checkpoint recovery, see Ensuring Reliable Model Training on NVIDIA DGX Cloud. To learn more about observability and fault-tolerance improvements in Dynamo 0.4, see Dynamo 0.4 Delivers 4x Faster Performance, SLO-Based Autoscaling, and Real-Time Observability.

Along with traditional checkpointing and load-balancing fault mitigation techniques, NCCL communicators may be dynamically resized after a fault allowing recovery inside the appliance without fully restarting the workload.

Popular methods for deploying inference workloads (reminiscent of Kubernetes) already provide mechanisms for re-launching alternative staff, but the appliance must also initiate fault-mitigation steps for the NCCL communicator as well. Recovering from a fault contained to a subset of ranks is analogous to a scale-down procedure through which the ranks are faraway from the communicator.

The difference is that even healthy ranks should expect NCCL to either return an error or hang on any collective operation. Typical recovery for the healthy ranks starts with ncclCommAbort on the present communicator, followed by ncclCommInit to form a brand new communicator with the surviving ranks.

The diagram indicates two of six workers have faulted, but recovery has excluded these two from the NCCL communicator so that inference may continue.
The diagram indicates two of six workers have faulted, but recovery has excluded these two from the NCCL communicator so that inference may continue.
Figure 2. Faulted staff prevent inference from being accomplished. Fault mitigation removes the employees, and allows the healthy staff to proceed accepting requests

NCCL 2.27 introduced ncclCommShrink, which is an optimization and simplification to this recovery process. When passed the NCCL_SHRINK_ABORT flag and a listing of which ranks to exclude, ncclCommShrink cancels any hung operations, and creates a brand new communicator without the necessity to call ncclGetUniqueId or ncclCommInit.

Dynamic-scaling and fault-tolerant application example

Using these concepts, you’ll be able to construct a straightforward example of a NCCL application which may reply to scaling requests from the framework:

#include 
#include 
#include 
#include 
#include 
#include 
#include 

#include "nccl.h"

/* the assorted sorts of scaling this instance supports: */
enum scalingRequestType { NONE, SCALING_NORMAL, SCALING_ABORT, SHRINK_NORMAL, SHRINK_ABORT };

/* Framework Functions: The particular details should not vital, so
implementation just isn't included.*/
void frameworkGetInferenceWork(void **queries, enum scalingRequestType *scaling);
void frameworkNotifyTimeout();
void frameworkNotifyError();
void frameworkDetermineNewRank(int *rank, int *count);
void frameworkGetUniqueId(ncclUniqueId *uid);
void frameworkPutUniqueId(ncclUniqueId uid);
void frameworkGetExcludedRanks(std::vector *excluded);
void exitAbort();
void exitCleanly();


/* Example placeholder function for most important job of this employee.  Assumes the necessity
to make use of a communicator to coordinate work across staff. */
void executePrefillAndDecode(ncclComm_t comm, void *queries);

/* forward declarations of scaleCommunicator and shrinkCommunicator that are
implemented below. These replace the comm with a brand new, resized communicator. */
void scaleCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);
void shrinkCommunicator(ncclComm_t *comm, enum scalingRequestType *scaling);

/* In this instance, use C++ exception handling to exit from
executePrefillAndDecode in order that the framework may react to an error.  Use
multiple sorts of exceptions to separate various classes of errors. */
struct AppException :  public std::runtime_error {
  AppException(const std::string& message): std::runtime_error(message) {}
};
struct AppNCCLTimeoutException : public AppException {
    AppNCCLTimeoutException(const std::string& message): AppException(message) {}
};
struct AppNCCLErrorException : public AppException {
    AppNCCLErrorException(const std::string& message): AppException(message) {}
};

/* We use a custom NCCL_CHECK macro which raises a C++ exception unless the
operation returns ncclSuccess or ncclInProgress */
#define NCCL_CHECK(call) do { 
    ncclResult_t result = call; 
    if (result != ncclSuccess && result != ncclInProgress) { 
        printf("NCCL error: %s at %s:%dn", ncclGetErrorString(result), __FILE__, __LINE__); 
        AppNCCLErrorException("NCCL Error"); 
    } 
} while (0)

/* Define a custom NCCL_WAIT macro, which is able to wait for some fixed amount of
time before assuming something is mistaken. */
#define WAIT_TIMEOUT_MS 10000
#define NCCL_WAIT(comm) do { 
    ncclResult_t asyncError; 
    auto start = std::chrono::steady_clock::now(); 
    NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); 
    while (asyncError == ncclInProgress) { 
        usleep(10); 
        NCCL_CHECK(ncclCommGetAsyncError(comm, &asyncError)); 
        auto now = std::chrono::steady_clock::now(); 
        auto waitingTime = std::chrono::duration_cast 
            <:chrono::milliseconds>(now - start).count(); 
        if (WAIT_TIMEOUT_MS > waitingTime ) { 
            throw AppNCCLTimeoutException("NCCL Timeout"); 
        } 
    } 
    NCCL_CHECK(asyncError); 
} while (0)

/* Use ncclCommInitRankConfig to create a brand new communicator to interchange the old
   one.  Optionally call ncclCommAbort. */
void scaleCommunicator(ncclComm_t *comm, int scalingFlag) {

  int rank, rankCount;
  ncclComm_t oldComm = *comm;
  ncclComm_t newComm = NULL;
  if (scalingFlag == SCALING_ABORT) {
    /* The framework has indicated there was an error.  ncclCommAbort will exit
    any operation currently in progress, and destroy the communicator. */
    NCCL_CHECK(ncclCommAbort(oldComm));
    NCCL_WAIT(oldComm);
  } else {
    /* Normal condition: simply clean up the old communicator before making a
    recent one.*/
    NCCL_CHECK(ncclCommDestroy(oldComm));
  }

  /* enable non-blocking NCCL communicator in order that we may detect and react to
  timeouts. */
  ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
  ncclUniqueId uniqueId;
  config.blocking = 0;

  /* ask the framework what rank we're to be assigned in the brand new communicator,
  and what number of ranks there can be total. These are required inputs to
  ncclCommInit.*/
  frameworkDetermineNewRank(&rank, &rankCount);
  if (rank == 0) {
    /* This employee is special: it can generate the ncclUniqueId, and share it
    with other ranks. */
    ncclGetUniqueId(&uniqueId);
    frameworkPutUniqueId(uniqueId);
  } else if (rank > 0) {
    frameworkGetUniqueId(&uniqueId);
  } else if (rank < 0) {
    /* special value for scale-down: this rank is being removed and may
    exit. */
    exitCleanly();
  }

  /* perform NCCL communicator initialization, and because it is a non-blocking
  communicator, wait until the operation completes. */
  NCCL_CHECK(ncclCommInitRankConfig(&newComm, rankCount, uniqueId, rank, &config));
  NCCL_WAIT(newComm);
  *comm = newComm;
}

/* shrinkCommunicator: Use ncclCommShrink as a simplified and optimized option
when cutting down. */
void shrinkCommunicator(ncclComm_t *comm, int scalingFlag) {

  ncclComm_t oldComm = *comm;

  int ncclShrinkOption;
  bool exiting = false;
  ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
  config.blocking = 0;
  ncclComm_t newComm;
  std::vector excluded;

  /* query the framework for which ranks can be excluded in the brand new
  communicator. */
  frameworkGetExcludedRanks(&excluded);
  int oldRank;
  NCCL_CHECK(ncclCommUserRank( oldComm, &oldRank) );
  for (int i=0; i<(int)excluded.size(); i++) {
    if (oldRank == excluded[i]) {
      exiting = true;
    }
  }

  ncclShrinkOption = scalingFlag == SHRINK_ABORT ? NCCL_SHRINK_ABORT : NCCL_SHRINK_DEFAULT;
  if (!exiting) {
    /* execute the shrink operation.  After executing, wait on the old
    communicator for achievement, and at last assign *comm to be the brand new communicator.
    */
    NCCL_CHECK(ncclCommShrink(oldComm, excluded.data(), excluded.size(), 
      &newComm, &config, ncclShrinkOption));
    NCCL_WAIT(oldComm);
    NCCL_WAIT(newComm);
    *comm = newComm;
  }
  if (ncclShrinkOption == NCCL_SHRINK_ABORT) {
    ncclCommAbort(oldComm);
  } else {
    ncclCommDestroy(oldComm);
  }
  if (exiting) { exitCleanly(); }
}


/* persistent state between mainLoop iterations */
ncclComm_t comm = NULL;
void *queries = NULL;

/* mainLoop: called repeatedly in the course of the lifetime of this employee. */
void mainLoop() {
  enum scalingRequestType scalingFlag;

  /* The framework provides the employees with some work to do (queries) and
  signals any scaling actions that ought to occur.  The framework will ensure all
  staff observe the identical value for scalingFlag during each go through the
  mainloop.
  */
  frameworkGetInferenceWork(&queries, &scalingFlag);

  /* Act on the scalingFlag: */
  if (scalingFlag == SCALING_NORMAL || scalingFlag == SCALING_ABORT) {
    scaleCommunicator(&comm, scalingFlag);
  } else if (scalingFlag == SHRINK_NORMAL || scalingFlag == SHRINK_ABORT) {
    shrinkCommunicator(&comm, scalingFlag);
  }

  /* Perform inference work.  Catch any exceptions raised and communicate any
  problems to the framework. */
  try {
    executePrefillAndDecode(comm, queries);
  } catch (const AppNCCLTimeoutException &e) {
    frameworkNotifyTimeout();
  } catch (const AppNCCLErrorException &e) {
    frameworkNotifyError();
  }
}

This instance is modeled on a distributed inference application and demonstrates how a framework can direct staff to perform scale-up or scale-down operations. The core logic is captured in two key functions: scaleCommunicator and shrinkCommunicator. These are invoked by the framework as needed. The first inference work is handled by executePrefillAndDecode, which uses an lively communicator that may be replaced over the employee’s lifetime.

The applying is built around a central mainLoop that represents the continual work of an inference employee. On each iteration, the employee gets recent tasks from the framework and checks for a scalingFlag that signals if a resizing operation should occur. The framework ensures that these scaling requests are delivered synchronously to all staff. Within the event of a fault, a employee will either trip or receive an error from NCCL. In either scenario, the exception handling path notifies the framework, prompting a fault recovery to start.

Coordinated actions amongst staff require a central monitoring component, which we will call an Application Monitor. This component is often answerable for tracking employee health, traffic load, and request latency. Based on these metrics, the Application Monitor signals the employees when to scale the pool up or down.

To handle increased traffic, for instance, the Application Monitor identifies available GPUs, launches recent employee processes, after which sets the scaling flag to signal the present staff to expand the communicator. The scaleCommunicator function manages this process, where staff coordinate to ascertain the brand new communicator size and share the required ncclUniqueId.

Conversely, when traffic subsides, the Application Monitor signals a scale-down, identifying which ranks must be removed. For this specific case, the shrinkCommunicator function provides an optimized path using ncclCommShrink, a simplified interface that doesn’t require generating and distributing a brand new ncclUniqueId. Once ranks exit, their underlying GPU resources may be released back to the cluster’s allocation system or cloud provider.

Finally, each scaleCommunicator and shrinkCommunicator are equipped to handle fault recovery. Once the Application Monitor identifies a faulted component, it could possibly direct the healthy staff to remove it by invoking the Abort path of either function. These paths take extra steps—calling ncclCommAbort or setting the NCCL_SHRINK_ABORT flag—to make sure that the lively communicator doesn’t hang while waiting for a peer that has failed.

Start with scalable and fault-tolerant NCCL applications

NCCL support for dynamic communicators provides a strong tool for constructing modern, resilient AI infrastructure. By moving beyond a static, launch-time configuration, you’ll be able to create applications that adapt to changing workloads and may be optimized for efficiency and value. 

As well as, with the flexibility to call ncclCommAbort or ncclCommShrink, handling unexpected hardware or software faults is feasible with out a full abort and restart. Construct your next multi-GPU application with these dynamic capabilities to create a scalable and fault-tolerant system. Download the latest NCCL release or use a pre-built container, reminiscent of the PyTorch NGC Container.



Source link

ASK ANA

What are your thoughts on this topic?
Let us know in the comments below.

0 0 votes
Article Rating
guest
0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

Share this article

Recent posts

0
Would love your thoughts, please comment.x
()
x