Home Artificial Intelligence Distributed Machine Learning at Instacart System Architecture Case Study: Parallel Achievement ML Jobs Learnings & Future Work

Distributed Machine Learning at Instacart System Architecture Case Study: Parallel Achievement ML Jobs Learnings & Future Work

Distributed Machine Learning at Instacart
System Architecture
Case Study: Parallel Achievement ML Jobs
Learnings & Future Work

Writer: Han Li

At Instacart, we take pride in offering a various range of machine learning (ML) products that empower every aspect of our marketplace, including customers, shoppers, retailers, and types. Together with the expansion of business and ML applications, we’ve got encountered an increasing variety of use cases that require distributed ML techniques to effectively scale our ML products.

To support the emerging requests, we’ve got designed the distributed ML system with the next key design considerations:

  • We must find a way to scale a lot of distributed ML workloads on each CPU and GPU instances with a strong system that maintains high performance and reliability.
  • : We aim to totally utilize distributed computation resources to maximise system throughput and achieve the fastest execution at optimal cost.
  • : The chosen computation framework needs to be as extensible as possible to support diverse distributed ML paradigms. As well as, we should always find a way to handle diverse environments which are specific to different ML workloads.

Our team is currently working on a wide range of ML applications that utilize the facility of distributed computing to efficiently solve complex problems. Some examples include:

  • across distributed hosts with efficient resource utilization and reasonable queuing time.
  • Using to enable deep learning models to work with large datasets more effectively.
  • Developing capabilities with a lot of parallel jobs processing over an enormous amount of knowledge.

To fulfill these requirements, we’ve got chosen Ray because the foundational computation framework for distributed ML. In this text, we are going to walk through the system architecture, analyze a case study, compare it with legacy systems, and display the worth of Ray-based distributed ML.

We recognize that a system with a straightforward user experience is crucial for increasing developer productivity and maximizing the impact of our distributed ML applications. Subsequently, along with the design principles of , and , we prioritize the next objectives to simplify the user experience:

  • : We aim to simplify the event process for ML engineers by providing them with tools to check their distributed code seamlessly in an area environment and distributed environments.
  • : Our objective is to supply a serverless development model that empowers our engineers to develop and run workloads without the burden of managing any services. We consider that by eliminating the necessity to handle service management, we are able to effectively improve MLE’s productivity.
  • : Our goal is to seamlessly integrate distributed ML workloads with Instacart ML Platform Griffin, without making a redundant set of MLOps tools or platforms. Moderately than reinventing the wheel, we aim to boost our existing infrastructure.

Now we have prolonged our workflow orchestration tools on Griffin to offer a unified experience for running each prototyping and production workloads on Kubernetes-based distributed ML environments powered by Ray. Fig 1 illustrates the end-to-end user experience.

Fig 1: Distributed ML Application in Development & Production on Ray Cluster
Fig 1: Distributed ML Application in Development & Production on Ray Cluster

Listed here are breakdowns of every component interacting with Ray distributed environment:

  • Through the development stage, users can package prototyping code snippets on their development environment and launch them on distant AWS EKS hosts through an internal launcher API that abstracts Ray Cluster access through Ray Job API (see Fig 1a).
Fig 1a: Connect development environment with Ray Cluster
  • When users are able to automate their code in production pipelines (i.e. Airflow), they’ll use the identical set of APIs to launch their containerized application to production Ray Clusters (see Fig 1b).
Fig 1b: Automated containerized application running on Ray Cluster
  • Each Ray Cluster is independently configured with Python installations and environment variables. This effectively separates the workspace environments between clusters, avoiding overhead of maintaining a monolithic python environment, which might be discussed intimately in the subsequent section (see Fig 1c).
Fig 1c: Isolated Python environments between different Ray Clusters
  • At the applying level, users need to construct ML applications using Ray APIs to attain a wide range of distributed computation patterns, reminiscent of Ray Core APIs, Ray AIR and Ray Serve.
  • On the controller side, we use KubeRay because the controller of provisioning, scaling, and deleting Ray Cluster resources on Kubernetes.

Next, we are going to dive deep right into a case study of Instacart Achievement ML, the very first use case of Ray-based distributed ML at Instacart, by which 1000’s of models must be trained in parallel efficiently.

At Instacart, ML is applied throughout the lifecycle of fulfilling every order placed on the Instacart App (Fig 2). This includes multiple orders together to enhance efficiency, to find out optimal delivery routes, on order arrival time, and to stop lost deliveries when there will not be enough available shoppers, and lots of more. A big variety of such models are trained with data collected in a selected neighborhood, and a few of those models are also sensitive to the information collected inside a selected timeframe, due to this fact it’s common practice to partition the complete national dataset based on one or multiple space/time attributes, after which launch one training job per data partition, preferably in parallel.

Fig 2: Some common scenarios of Achievement ML in Instacart

Probably the most common “attribute” is the geographical location of orders, called “zones.” At Instacart, we manage 1000’s of unique zones to partition our national dataset. For every Achievement ML application, it’s typical to launch as many as to cover all zones in a single model experiment.

Previous Solutions & Limitations

Fig 3: Our previous system of parallel zone-level model training

Our legacy solution implemented a distributed task queue service using Celery; Fig 3 illustrates the architectural diagram. For every unique model application (represented as Model within the diagram) that requires parallel training across zone 1~n, training jobs for all zones are published as tasks to the identical task queue on Message Broker. Subsequently, a gaggle of Celery staff subscribes to every task queue and executes the tasks inside it. The tasks are executed asynchronously, and every time a Celery employee is free, the duty on the front of the queue is removed and assigned to the free employee. Once the employee finishes, it updates the duty status within the Message Backend and fetches the subsequent task within the queue to execute.

Because the variety of ML applications requiring zone level parallel training increased over time, this kind of monolithic service began to get hard to further scale as a result of the next reasons:

  • Many ML applications shared the identical task queue and Celery distributed employee services. It turned out that distributed staff couldn’t get fully utilized very efficiently:
    – Employee node needed to be over-provisioned with enough resource headroom to suit all sorts of models. Nevertheless, in Achievement ML, most models are quite lightweight, reminiscent of regression, classification, etc. Subsequently, when a small size model was running, the employee was poorly utilized. For instance, within the left-hand side graph of Fig 4, a light-weight model was running on a 16-CPU instance, with CPU utilization only around 10% to fifteen%.
    – The long-running service could leave the system idle if the load was unbalanced throughout the day. For instance, within the right-hand side of Fig 4, the queue was not running any tasks for over 60% of the day, resulting in inefficient resource usage.
Fig 4: Examples of low utilized service containers (left) and idle task queues (right)
  • : It’s very hard to attain faster execution time without upscaling the duty queue services:
    – Low Celery employee concurrency is configured per host of task queue, since it’s not feasible to easily increase concurrency to suit all models on a monolithic service.
    – When a task queue gets assigned with too many tasks, reminiscent of Fig 5 by which 300 to 1k+ tasks are in queue all day round, the queuing time of every task to be executed will significantly increase, slowing down the end-to-end execution time of every ML application.
    – Subsequently, the one strategy to speed up a busy queue is to do upscaling so as to add more physical hosts, which continues adding poorly utilized resources to the system.
Fig 5: Examples of a really busy task queue hosting too many tasks
  • : It’s difficult to administer Python dependencies for all model applications running on the identical queue. Upgrading a selected python package version is difficult.
  • : This technique is complex to automate, and doesn’t provide a straightforward approach to duplicate the system in an area environment for testing purposes.

Improvements by Latest System

After we began adopting Achievement ML workloads for our recent distributed ML systems, essentially the most significant change was the migration from long-running monolithic services to Ray-based serverless applications. As a substitute of all models running on the identical environment, each Achievement ML application is launched as an independent Ray job related to a dedicated Ray Cluster to handle all its zone level training jobs, as illustrated in Fig 6. This recent design offers several benefits in comparison with our legacy solution:

Fig 6: Architecture Overview of distributed Achievement ML workflows hosted on Ray Cluster
  • With our recent system, each model now has its own environment, enabling independent management of Python dependencies and resource usage on a per-model basis (Fig 7).
Fig 7: A diagram as an instance the concept of workspace isolation between different models
  • . This reduces costs on computation resources, and most significantly, it makes the infrastructure much easier to scale to host upcoming recent applications.
  • Serverless applications offer users the power to provision resources per Ray employee more accurately, leading to way more . Fig 8 provides a real-world example of this improvement by comparing the before/after CPU utilization of a Achievement production model running 1.5k unique training tasks on the identical group of hosts:
    – Within the legacy system described earlier, only just a few Celery staff were configured to run on each host, leading to CPU utilization of around .
    – With our recent Ray-based architecture, we are able to more accurately customize resource requirements. Through benchmarking, we found that allocating just 2 CPUs per zonal training job is sufficient. Because of this, we’re capable of create more concurrent Ray staff on the identical instance, significantly increasing CPU utilization per host to as much as .
Fig 8: Before(left) and after(right) CPU utilization of the identical model training the identical zones.
  • Improved resource utilization contributed to . With the identical production model mentioned above running on 10 16-CPU instances, there have been only 10 Celery concurrent staff in total on the legacy service while a Ray based application can launch as many as 70+ Ray staff. !

Now we have also found that converting existing training functions into Ray distributed function calls is a comparatively straightforward process. For example, we’ve got refactored an existing project implemented within the legacy solution to make use of Ray (see Fig 9). The conversion required only just a few changes to existing functions or class objects to make them Ray distant executable. Moreover, we would have liked only just a few lines of code to orchestrate the distant calls asynchronously with rate limit handling, if obligatory.

Fig 9: Code example to convert an existing Forecast class object right into a Ray Actor object

We’ve gained beneficial insights from our experience constructing distributed ML solutions and migrating early use cases away from legacy systems. We’ve discovered that hosting a monolithic service because the computation backend for all distributed ML applications has limitations in scalability, efficiency, and variety, given the rapidly evolving and highly diversified nature of ML applications. As an example, by transitioning from monolithic services to standalone serverless applications for Achievement zone level training, we were capable of significantly boost execution time, resource utilization, and development simplicity.

Although the topology of every application might be highly customized by the applying owner in distributed ML, it’s essential to have unified construct and launch tools on the ML platform layer. This approach enables us to support more forms of distributed ML workloads in the longer term while ensuring our platform stays extensible.

Looking ahead, we aim to foster broader adoption of distributed ML applications across Instacart on multiple ML product lines. Our goal for 2023 is to further mature our Ray-based distributed ML solution because the unified solution running as a part of Griffin. As we embark on this journey, we’re excited to see what distributed ML may also help us achieve next at Instacart.



Please enter your comment!
Please enter your name here