Distributed Machine Learning at Instacart

How Instacart uses distributed Machine Learning to efficiently train thousands of models in production

Han Li
tech-at-instacart

--

Author: Han Li

At Instacart, we take pride in offering a diverse range of machine learning (ML) products that empower every aspect of our marketplace, including customers, shoppers, retailers, and brands. Along with the growth of business and ML applications, we have encountered an increasing number of use cases that require distributed ML techniques to effectively scale our ML products.

To support the emerging requests, we have designed the distributed ML system with the following key design considerations:

  • Scalability: We must be able to scale a large number of distributed ML workloads on both CPU and GPU instances with a robust system that maintains high performance and reliability.
  • Resource Efficiency: We aim to fully utilize distributed computation resources to maximize system throughput and achieve the fastest execution at optimal cost.
  • Diversity: The selected computation framework should be as extensible as possible to support diverse distributed ML paradigms. In addition, we should be able to handle diverse environments that are specific to different ML workloads.

Our team is currently working on a variety of ML applications that utilize the power of distributed computing to efficiently solve complex problems. Some examples include:

  • Parallelizing thousands of small to mid-sized training models across distributed hosts with efficient resource utilization and reasonable queuing time.
  • Using data parallelism to enable deep learning models to work with large datasets more effectively.
  • Developing scalable batch inference capabilities with a large number of parallel jobs processing over a vast amount of data.

To meet these requirements, we have chosen Ray as the foundational computation framework for distributed ML. In this article, we will walk through the system architecture, analyze a case study, compare it with legacy systems, and demonstrate the value of Ray-based distributed ML.

System Architecture

We recognize that a system with a simple user experience is essential for increasing developer productivity and maximizing the impact of our distributed ML applications. Therefore, in addition to the design principles of scalability, resource efficiency, and diversity, we prioritize the following objectives to simplify the user experience:

  • Development Simplicity: We aim to simplify the development process for ML engineers by providing them with tools to test their distributed code seamlessly in a local environment and distributed environments.
  • Maintenance Simplicity: Our objective is to offer a serverless development model that empowers our engineers to develop and run workloads without the burden of managing any services. We believe that by eliminating the need to handle service management, we can effectively improve MLE’s productivity.
  • Seamless Integration: Our goal is to seamlessly integrate distributed ML workloads with Instacart ML Platform Griffin, without creating a redundant set of MLOps tools or platforms. Rather than reinventing the wheel, we aim to enhance our existing infrastructure.

We have extended our workflow orchestration tools on Griffin to provide a unified experience for running both prototyping and production workloads on Kubernetes-based distributed ML environments powered by Ray. Fig 1 illustrates the end-to-end user experience.

Fig 1: Distributed ML Application in Development & Production on Ray Cluster
Fig 1: Distributed ML Application in Development & Production on Ray Cluster

Here are breakdowns of each component interacting with Ray distributed environment:

  • During the development stage, users can package prototyping code snippets on their development environment and launch them on remote AWS EKS hosts through an internal launcher API that abstracts Ray Cluster access through Ray Job API (see Fig 1a).
Fig 1a: Connect development environment with Ray Cluster
  • When users are ready to automate their code in production pipelines (i.e. Airflow), they can use the same set of APIs to launch their containerized application to production Ray Clusters (see Fig 1b).
Fig 1b: Automated containerized application running on Ray Cluster
  • Each Ray Cluster is independently configured with Python installations and environment variables. This effectively separates the workspace environments between clusters, avoiding overhead of maintaining a monolithic python environment, which will be discussed in detail in the next section (see Fig 1c).
Fig 1c: Isolated Python environments between different Ray Clusters
  • At the application level, users need to build ML applications using Ray APIs to achieve a wide variety of distributed computation patterns, such as Ray Core APIs, Ray AIR and Ray Serve.
  • On the controller side, we use KubeRay as the controller of provisioning, scaling, and deleting Ray Cluster resources on Kubernetes.

Next, we will dive deep into a case study of Instacart Fulfillment ML, the very first use case of Ray-based distributed ML at Instacart, in which thousands of models need to be trained in parallel efficiently.

Case Study: Parallel Fulfillment ML Jobs

At Instacart, ML is applied throughout the lifecycle of fulfilling every order placed on the Instacart App (Fig 2). This includes batching multiple orders together to improve efficiency, routing to determine optimal delivery routes, ETA prediction on order arrival time, and supply/demand forecasting to prevent lost deliveries when there are not enough available shoppers, and many more. A significant number of such models are trained with data collected in a specific neighborhood, and some of those models are also sensitive to the data collected within a particular time frame, therefore it’s common practice to partition the entire national dataset based on one or multiple space/time attributes, and then launch one training job per data partition, preferably in parallel.

Fig 2: Some common scenarios of Fulfillment ML in Instacart

The most common “attribute” is the geographical location of orders, called “zones.” At Instacart, we manage thousands of unique zones to partition our national dataset. For each Fulfillment ML application, it’s typical to launch as many as thousands of parallel training jobs to cover all zones in a single model experiment.

Previous Solutions & Limitations

Fig 3: Our previous system of parallel zone-level model training

Our legacy solution implemented a distributed task queue service using Celery; Fig 3 illustrates the architectural diagram. For each unique model application (represented as Model in the diagram) that requires parallel training across zone 1~n, training jobs for all zones are published as tasks to the same task queue on Message Broker. Subsequently, a group of Celery workers subscribes to each task queue and executes the tasks inside it. The tasks are executed asynchronously, and whenever a Celery worker is free, the task at the front of the queue is removed and assigned to the free worker. Once the worker finishes, it updates the task status in the Message Backend and fetches the next task in the queue to execute.

As the number of ML applications requiring zone level parallel training increased over time, this type of monolithic service started to get hard to further scale due to the following reasons:

  • Inefficient resource usage: Many ML applications shared the same task queue and Celery distributed worker services. It turned out that distributed workers couldn’t get fully utilized very efficiently:
    - Worker node had to be over-provisioned with enough resource headroom to fit all types of models. However, in Fulfillment ML, most models are quite lightweight, such as regression, classification, etc. Therefore, when a small size model was running, the worker was poorly utilized. For example, in the left-hand side graph of Fig 4, a lightweight model was running on a 16-CPU instance, with CPU utilization only around 10% to 15%.
    - The long-running service could leave the system idle if the load was unbalanced throughout the day. For example, in the right-hand side of Fig 4, the queue was not running any tasks for over 60% of the day, leading to inefficient resource usage.
Fig 4: Examples of low utilized service containers (left) and idle task queues (right)
  • Long queuing time: It’s very hard to achieve faster execution time without upscaling the task queue services:
    - Low Celery worker concurrency is configured per host of task queue, because it’s not feasible to simply increase concurrency to fit all models on a monolithic service.
    - When a task queue gets assigned with too many tasks, such as Fig 5 in which 300 to 1k+ tasks are in queue all day round, the queuing time of each task to be executed will significantly increase, slowing down the end-to-end execution time of each ML application.
    - Therefore, the only way to speed up a busy queue is to do upscaling to add more physical hosts, which continues adding poorly utilized resources to the system.
Fig 5: Examples of a very busy task queue hosting too many tasks
  • Difficult dependency management: It’s difficult to manage Python dependencies for all model applications running on the same queue. Upgrading a specific python package version is challenging.
  • Maintenance burden: This system is complex to automate, and does not provide a simple approach to replicate the system in a local environment for testing purposes.

Improvements by New System

When we started adopting Fulfillment ML workloads for our new distributed ML systems, the most significant change was the migration from long-running monolithic services to Ray-based serverless applications. Instead of all models running on the same environment, each Fulfillment ML application is launched as an independent Ray job associated with a dedicated Ray Cluster to handle all its zone level training jobs, as illustrated in Fig 6. This new design offers several advantages compared to our legacy solution:

Fig 6: Architecture Overview of distributed Fulfillment ML workflows hosted on Ray Cluster
  • With our new system, each model now has its own isolated workspace environment, enabling independent management of Python dependencies and resource usage on a per-model basis (Fig 7).
Fig 7: A diagram to illustrate the idea of workspace isolation between different models
  • Long-running task queue services are now deprecated. This reduces costs on computation resources, and most importantly, it makes the infrastructure much easier to scale to host upcoming new applications.
  • Serverless applications offer users the ability to provision resources per Ray worker more accurately, resulting in much more efficient resource utilization. Fig 8 provides a real-world example of this improvement by comparing the before/after CPU utilization of a Fulfillment production model running 1.5k unique training tasks on the same group of hosts:
    - In the legacy system described earlier, only a few Celery workers were configured to run on each host, resulting in CPU utilization of around 10–15%.
    - With our new Ray-based architecture, we can more accurately customize resource requirements. Through benchmarking, we found that allocating just 2 CPUs per zonal training job is sufficient. As a result, we are able to create more concurrent Ray workers on the same instance, significantly increasing CPU utilization per host to up to 80%.
Fig 8: Before(left) and after(right) CPU utilization of the same model training the same zones.
  • Improved resource utilization contributed to faster end-to-end execution. With the same production model mentioned above running on 10 16-CPU instances, there were only 10 Celery concurrent workers in total on the legacy service while a Ray based application can launch as many as 70+ Ray workers. As a result, we are now able to greatly speed up end-to-end completion time–previously ~4 hours– to 20 minutes!

We have also found that converting existing training functions into Ray distributed function calls is a relatively straightforward process. As an example, we have refactored an existing project implemented in the legacy solution to use Ray (see Fig 9). The conversion required only a few changes to existing functions or class objects to make them Ray remote executable. Additionally, we needed only a few lines of code to orchestrate the remote calls asynchronously with rate limit handling, if necessary.

Fig 9: Code example to convert an existing Forecast class object into a Ray Actor object

Learnings & Future Work

We’ve gained valuable insights from our experience building distributed ML solutions and migrating early use cases away from legacy systems. We’ve discovered that hosting a monolithic service as the computation backend for all distributed ML applications has limitations in scalability, efficiency, and diversity, given the rapidly evolving and highly diversified nature of ML applications. For instance, by transitioning from monolithic services to standalone serverless applications for Fulfillment zone level training, we were able to significantly boost execution time, resource utilization, and development simplicity.

Although the topology of each application can be highly customized by the application owner in distributed ML, it’s essential to have unified build and launch tools on the ML platform layer. This approach enables us to support more types of distributed ML workloads in the future while ensuring our platform remains extensible.

Looking ahead, we aim to foster broader adoption of distributed ML applications across Instacart on multiple ML product lines. Our goal for 2023 is to further mature our Ray-based distributed ML solution as the unified solution running as part of Griffin. As we embark on this journey, we’re excited to see what distributed ML can help us achieve next at Instacart.

--

--