How ByteDance Scales Offline Inference with multi-modal LLMs to 200 TB Data

By Amog Kamsetty, Hao Chen and Liguang Xie   

The original article is written in Chinese by Wanxing Wang, YuanZhe Hu and Xiang Li, senior engineers in ByteDance Infrastructure Group. It was edited & translated to English by Amog Kamsetty, Hao Chen, and Liguang Xie from the Anyscale team.

At ByteDance, the company behind Tiktok, we leverage multi-modal models to enable many applications, such as text-based image retrieval or object detection. 

One of the key steps to power these applications is offline batch inference for embedding generation. We have a large dataset containing both image and text data, and we pass this through many replicas of a multi-modal model to generate embeddings of both the images and text in the same vector space.

batch inference pipeline

An overview of the batch inference pipeline. Individual batches from a large dataset are processed in parallel through a trained model to generate embeddings.

The key characteristic of our workload is its scale, both in the data size and the model size. Not only do we need to embed ~200 TB of data in each run, but our model is also >10 billion parameters, exceeding the memory of a single GPU device. 

Building a highly performant system to handle both our data and model scale poses many technical challenges.

To overcome these challenges, we leverage Ray as our computing framework. We found that Ray, and in particular Ray Data, proved to be more flexible and scalable for large-scale model parallel inference compared to alternatives such as Spark.

LinkWorkload Model

bytedance-gpu-workloads

The model architecture. We use a twin-tower Albert+Vision Transformer model to embed both text and images. The model is pipeline sharded layer-wise across 3 GPUs.

To embed both images and text, we use a multimodal Vision Transformer + Albert twin tower model. Since the joint model is too large to fit on a single GPU, we simultaneously split each model across GPU devices. One portion of the GPU accommodates Albert's Layers, while the other portion contains Vision Transformer’s layers.

The overall job consists of three stages, where Image and Text Tokens are passed between stages. As a result, the computing resources required for each stage differ, highlighting the need for flexible allocation of computing power.

LinkChallenges with Offline Inference with LLMs

LinkModel Parallel Inference

With the rapid explosion in ML model size, state-of-the-art models, such as our twin-tower model, may exceed the memory capacity of a single GPU. This necessitates the need for model sharding.

There are two common ways to shard a model for inference:

  • Pipeline Parallelism: This method involves splitting the model by layer and  distributing the layer groups across multiple GPUs. For instance, in the below diagram, the first GPU stores L0-L1, while the second stores L2-L3. Since the sizes of each layer may vary, the distribution is not necessarily even. Some larger layers may occupy a single GPU, while several smaller layers reside together on another.

    bytedance-pipeline-parallelism

    Pipeline parallelism splits up a model layer-wise across multiple GPUs.

  • Tensor Parallelism: This technique focuses on splitting the weights from  the same layer across different GPUs. For example, in the lower-left diagram, a portion of weight A0 from L0 is placed on GPU 0, while the remaining part, weight A1, is instead assigned to GPU 1. During inference, the final result is obtained through matrix operations.

    bytedance-tensor-parallelism

    Tensor parallelism splits the weights of each layer across multiple GPUs.

In addition, more complex parallelization strategies also exist, including hybrid methods that combine both pipeline and tensor parallel approaches.

Model parallelization offers several advantages:

  1. Support for larger models. We can run inference on models that do not fit on a single GPU device.

  2. Cost reduction. Sharding the model enables inference on GPUs with relatively smaller memory capacities, thus reducing cost. Higher-end GPUs can then be allocated for training, which typically requires more resources.

  3. Spatial multiplexing. Spatial  multiplexing techniques, such as NVIDIA's Multi-Process Service (MPS) technology, partition GPU memory into different processes based on spatial allocation. This improves GPU utilization. Without sharding, one process might greedily occupy all the GPU memory. With sharding,  each process is guaranteed a portion of GPU memory.

LinkDynamic & Heterogeneous Scheduling

bytedance-heterogeneous-and-elastic-scheduling

The figure on the left shows heterogeneous scheduling, where each stage in the pipeline needs to be scaled independently. The figure on the right shows elastic scheduling, where the parallelism for each may need to be dynamically adjusted during runtime.

The second challenge pertains to distributed scheduling, with two key requirements:

  1. Supporting heterogeneous resources. Our batch inference pipeline encompasses simultaneous data processing and inference. Ideally, the costlier GPUs should be reserved for inference, while the CPUs are used for data pre-processing.

  2. Elastic resource scheduling. Upon sharding the model, each shard (or stage in the pipeline) may have varying computational requirements and throughput. Estimating the computing power requirements in advance is challenging, and requires continuous parameter adjustments to achieve optimal execution efficiency. Therefore, an ideal computing framework can dynamically scale each stage independently based on observed computing efficiency during runtime. As a result, more GPU resources can be allocated to slower stages, thus maximizing overall throughput of the job.

LinkData I/O & Transfer

Lastly, for any offline computing job, avoiding any overheads with data I/O and transfer between stages is essential.

  1. Ideally, data can be transferred between inference stages purely in-memory, instead of serializing and deserializing from disk.

  2. On the inference side, minimizing data IO waiting time is vital for maximizing GPU utilization, so that the GPU is not idle for long periods of time.

Current mainstream computing frameworks like Flink and Spark struggle to fulfill these requirements and meet these challenges. They are typically bound to relatively fixed batch or stream computing paradigms and lack the necessary flexibility at the scheduling level to easily shard large models and maximize throughput for hybrid CPU+GPU batch inference workloads.

LinkScalable Offline Batch Inference with Ray Data

byte-dance-ray-data

An overview of Ray Data, its features, and where it sits on the Ray stack.

To address the challenges, we chose Ray as the computing framework for our batch inference job. In particular, we made the choice to utilize Ray Data. Ray Data supports a wide variety of datasources, including unstructured data, and provides a suite of commonly employed data processing operators, facilitating general parallel computing. Notably, it natively supports scheduling with heterogeneous resources, enabling our batch inference workload.

bytedance-actor-pool

Ray Data execution. Data is streamed from the datasource through multiple stages. Intermediate data can be buffered in queues in the Ray object store. Each stage can also elastically scale for optimal parallelism.

A key feature is its streaming execution paradigm, enabling multiple stages of the pipeline to run concurrently. This capability significantly enhances the overall execution speed of parallel computing tasks by simultaneously running CPU pre-processing with GPU inference, while preventing OOM issues when working with large datasets. In essence, Ray Data proves to be an immensely practical data processing tool that can help us build large-scale model inference frameworks with high efficiency.

In addition, Ray Data supports elastic resource scheduling for each stage, allowing resources to be distributed across all stages in the inference pipeline to maximize throughput.

LinkBuilding the inference application

With Ray Data, building a scalable offline inference application for large models can be expressed in just a single Python script.

We define 3 model classes, one to load each model shard. Then, we construct the Ray Dataset pipeline, stitching together the stages of the pipeline

  1. Reading data from the datasource

  2. Inference through model shard 1

  3. Inference through model shard 2

  4. Inference through model shard 3

  5. Writing the prediction results

The basic pseudocode looks like this:

1class ModelLayers1:
2  def __init__(self):
3    self.layers_1 = # ...
4
5  def __call__(self, batch):
6      return self.layers_1.forward(batch)
7
8class ModelLayers2:
9  def __init__(self):
10    self.layers_2 = # ...
11
12  def __call__(self, batch):
13      return self.layers_2.forward(batch)
14
15class ModelLayers3:
16  def __init__(self):
17    self.layers_3 = # ...
18
19  def __call__(self, batch):
20      return self.layers_3.forward(batch)
21
22ray.data.from_items(records)
23  .map_batches(ModelLayers1, compute="actors", num_cpus=1)
24  .map_batches(ModelLayers2, compute="actors", num_gpus=1)
25  .map_batches(ModelLayers3, compute="actors", num_gpus=1)
26  .write_json("/tmp/results")

Running this script on a Ray cluster will automatically parallelize each of the stages, using all the CPUs and GPUs in the cluster. Ray Data dynamically scales each stage at runtime, even if we don’t optimally shard the model upfront, leading to good performance.

LinkRay Comparison to Spark

bytedance-cmpare-to-spark

An overview of Spark execution vs. Ray execution. With Spark, stages are executed in a bulk-synchronous parallel fashion, while with Ray Data, the stages are executed in a pipelined fashion.

In comparison to Spark, Ray significantly enhances execution efficiency, with the advantages becoming increasingly pronounced as the job scale expands. To illustrate, consider a simple example involving two GPUs, where the model is divided into two shards and the objective is to process three data samples. 

As seen in the top figure, when employing Spark, two Executors need to be initiated to load the parameters of the first model group, process the three data samples, and subsequently write the processed data to external storage. Following this, the next two Executors load the parameters of the second model shard and process the samples separately, requiring the same processing steps as before, before ultimately writing the results to external storage. This process is rather inefficient, and the support for heterogeneous resources is not particularly user-friendly.

By contrast, as seen in the bottom diagram, utilizing Ray only needs to initiate two Actors, equivalent to the two Executors in Spark. However, these two Actors can load the parameters of the respective model shard, enabling a pipelined execution process between them. The data samples stream through the two Actors sequentially. Additionally, an additional Actor can be scheduled on the CPU to read or store data. The framework utilizes the Ray shared memory object store to store intermediate results, circumventing serialization overhead and significantly enhance execution efficiency.

LinkDeploying the offline inference job on the cloud with KubeRay

Once we have our Ray application, we also need a way to deploy on a Ray cluster. We opted to use KubeRay, a comprehensive solution provided by the open source community. As mentioned earlier, a Ray cluster consists of a head node and worker nodes, which collectively form the computing resources. These resources can be physical machines, Docker containers, and in the case of Kubernetes (K8s), they are represented as pods. When starting a Ray cluster, we employed the KubeRay operator, which handles the entire lifecycle of the cluster, including cluster creation and termination. KubeRay continues to see active development with contributions from and usage by various companies, including ByteDance, Microsoft, and Ant.

bytedance-kuberay-manage-ray-cluster

An overview of how we leverage KubeRay to manage Ray clusters in our platform. 

Within ByteDance, our users can access Ray through our internal platform, submit jobs or engage in interactive programming using Notebooks. The platform operates through YAML and Restful API provided by Kuberay. Notably, Kuberay also offers support for auto-scaling and horizontal scaling.  By utilizing Ray cluster's internal load metrics, users can make informed decisions on resource scaling based on the metrics, triggering Kuberay to provision additional pods or remove idle ones as needed.

LinkConclusion

In conclusion, today we discussed large model offline inference and the key challenges, and introduced how to use Ray to build a large model inference framework. Looking ahead, we will continue to strengthen cooperation with the community, optimize our platform, and explore more application scenarios on Ray.


Learn More

To hear more on how companies are using Ray for LLM and GenAI use cases, join us Sept 18-20 at Ray Summit 2023. Check out the full agenda and learn more about offline inference in these Ray deep dive sessions:

Ready to try Anyscale?

Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.