In Ray 1.2.0, we’ve added a library for “collective communication primitives” to Ray. These primitives can be used in your Ray program to simplify the exchange of information across many distributed processes at the same time, speeding up certain distributed operations by an order of magnitude.
In many distributed computing applications, processes need to communicate with other processes in order to exchange information or synchronize progress. This type of communication usually relies upon “send” and “receive” operations -- a sender process sends a message to a receiver process, such as the image above shows.
This communication pattern is known as point-to-point communication -- which should be quite familiar to Ray users -- and can be realized using Ray’s “ray.remote()” and “ray.get()” APIs.
However, there are many cases where a sending process may want to communicate with multiple receivers at once. A typical example is in data-parallel distributed deep learning: a training process has to broadcast its gradients to all other peer processes in order to perform a coordinated update of model parameters.
Theoretically, one can use multiple send and receive operations (e.g., in Ray, many “ray.put()” and “ray.get()” among different actors/tasks), such as Figure 2 (left) shows. However, this ends up being programmatically cumbersome and might end up with suboptimal communication performance.
Collective communication primitives allow programs to express such communication patterns between many processes (a “group”). The key advantages of providing specialized APIs and implementations for these communication primitives are:
The ability to implement a collective communication backend at a low level in order to make the best use of the network hardware (Ethernet, InfiniBand, etc.), and provide the greatest communication performance.
The ability to optimize for different types of computing devices, such as GPUs, and avoid many unnecessary overheads.
In this section, we’ll walk through a high-level example of a collective communication primitive. Among the many collective communication primitives, allreduce is the most adopted one in many distributed ML training systems, including Horovod and distributed TensorFlow.
The image below from the NCCL documentation illustrates the AllReduce procedures.
Specifically, it starts with independent arrays (notated with four different colors) “in0”, “in1”, “in2”, “in3” on each of the 4 processes of a collective group. Each process in the group is assigned with a unique integer ID, called “rank”. It then performs communications and reductions on data (e.g. sum) across all ranks and ends with identical arrays “out”, where out[i] = sum(in0[i], in1[i], in2[i], in3[i]) for each rank k.
This maps perfectly with the gradient synchronization procedure in data-parallel distributed deep learning, as shown in Figure 4: in each training iteration, on each training worker, the gradient is calculated during the backpropagation pass, and synchronized via an “allreduce” operation across all training processes. After this, all processes will have the aggregated gradients from all other processes and can safely apply the gradients to update their local parameters -- keeping all processes synchronized!
There are a variety of different ways that “allreduce” can be implemented, and each implementation can have different performance characteristics -- if you are interested in learning more, you can check out this survey paper.
This blogpost introduces a set of native Ray collective communication primitives for distributed CPUs or GPUs. Let’s walk through the usage of these collective communication APIs.
Before starting, make sure you have installed the latest ray>=1.2.0 wheel. You can import the collective API using the code below:
1import ray.util.collective as col
Under the namespace col, we have provided a series of collective primitives that can be used in Ray task and actor implementations to specify collective communication. Next, let’s walk through an example.
Suppose we want to launch a distributed ML training task on 16 GPUs spread across a 16-node cluster, each with one GPU. The ray actor API allows you to define a GPU actor in the following way:
1 2 3 4 5 6 7import ray import cupy @ray.remote(num_gpus=1) class GPUWorker: def __init__(self): self.gradients = cupy.ones((10,), dtype=cupy.float32)
The next code snippet spawns 16 of these actors. Ray will automatically create them and assign each of them a GPU:
1 2num_workers = 16 workers = [GPUWorker.remote() for i in range(num_workers)]
Note that in each one of the GPU actors, we have created a CuPy array self.gradients on its designated GPU at initialization. For walkthrough purposes, think of this array self.gradients as the gradients of model parameters generated at each iteration of the training, that need to be repeatedly synchronized across all GPU actors.
With the standard Ray API, communicating the gradients between GPU workers would require a series of ray.get calls, and passing around the ObjectRefs across different actors. Below we provide a snippet of example code to achieve this using the original ray.get and ray.put APIs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39import ray import cupy @ray.remote(num_gpus=1) class GPUWorker: def __init__(self): self.gradients = cupy.ones((10,), dtype=cupy.float32) def put_gradients(self): return ray.put(self.gradients) def reduce_gradients(self, grad_id_refs): grad_ids = ray.get(grad_id_refs) reduced_result = cupy.ones((10,), dtype=float32) for grad_id in grad_ids: array = ray.get(grad_id) reduced_result += array result_id = ray.put(reduced_result) return result_id def get_reduced_gradient(self, reduced_gradient_id_ref): reduced_gradient_id = ray.get(reduced_gradient_id_ref) reduced_gradient = ray.get(reduced_gradient_id) # do whatever with the reduced gradients return True # Allreduce the gradients using Ray APIs # Let all workers put their gradients into the Ray object store. gradient_ids = [worker.put_gradients.remote() for worker in workers] ray.wait(object_ids, num_returns=len(object_ids, timeout=None)) # Let worker 0 reduce the gradients reduced_id_ref = workers.reduce_gradients.remote(gradient_ids) # All others workers get the reduced gradients results =  for i, worker in enumerate(workers): results.append(worker.get_reduced_gradient.remote([reduced_id_ref])) ray.get(results)
While the ray.put and ray.get are simple yet powerful APIs for Ray users to implement various distributed code, communicating between workers will require going through the Ray object store, introducing small overheads caused by object serialization and deserialization, or by object movement between CPU RAM and GPU memory, such as in the above case. These overheads might be amplified when the same communication patterns happen often and repetitively -- such as in distributed ML training on GPUs.
However, with collective communication primitives we can use a single ray.util.collectve.allreduce() call to simplify the code above and boost the performance significantly.
Like most collective communication libraries, we first establish a collective group for this group of 16 GPU worker actors:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20import cupy import ray import ray.util.collective as col @ray.remote(num_gpus=1) class GPUWorker: def __init__(self): self.gradients = cupy.ones((10,), dtype=cupy.float32) def setup(self, world_size, rank): col.init_collective_group( world_size=world_size, rank=rank, backend="nccl") def allreduce(self): col.allreduce(self.gradients) return self.gradients setup_rets = ray.get([w.setup(16, i) for i, w in enumerate(workers)])
Once the collective group is created, we can perform collective communication using the primitives. In this case, we want to "allreduce’" the gradients, so:
1results = ray.get([w.allreduce.remote() for w in workers])
Using a simple collective allreduce call, we have managed to reduce the gradients across all 16 workers and store them in the self.gradients buffer in-place.
In the above example, we choose the NCCL backend as the collective allreduce backend, which is extremely advantageous to communicate contents between distributed GPU compared to Ray’s gRPC based implementations, since NCCL is optimized to achieve high bandwidth and low latency over PCIe and NVLink high-speed interconnects within a node and over NVIDIA Mellanox Network across nodes.
See the two microbenchmarks below comparing the performance of AllReduce using Ray on two setups with and without the NCCL backend (ray.util.collective).
Figure 5 shows a node with 2 GPUs, each worker is spawned on 1 GPU with NVLink enabled. Note the values corresponding to the Y-axis are in log-scale.
Figure 6 shows a cluster with 7 nodes, each node with 2 GPUs; each worker is spawned on 1 GPU (hence 14 workers in total). Note the values corresponding to the Y-axis are in log-scale.
In short, these graphs show ray.col.collective.allreduce can be 10 - 1000x faster than the assembled allreduce function via ray.get and ray.put.
Besides collective communication and NCCL backends, the ray primitives APIs also support fast point-to-point communication between distributed GPUs, as well as the GLOO backend, optimized for collective communication between distributed CPUs. You can check out this documentation for a full description of the collective primitives in Ray.
On top of these collective primitives, we are building Ray-native distributed ML training systems, such as parameter servers. A key advantage is that they can be used to distribute very arbitrary Python-based ML code beyond TensorFlow and PyTorch, such as Spacy/Thinc, JAX, or even numpy code.
As a side product, we have also generated a Python version of the Facebook GLOO library, and continuously maintain it under ray-projects/pygloo. You might find it useful for your application! If you have any questions or thoughts about Ray, please feel free to join the Ray Discourse or Slack. Finally, if you’re interested in helping to improve Ray and its user experience, Anyscale is hiring!
Thanks to the following Ray team members and open source contributors: Dacheng Li, Lianmin Zheng, Xiwen Zhang, and Ion Stoica.