Ray Distributed Library Patterns

By Eric Liang and Zhe Zhang   

Ray has many library integrations, from machine learning libraries such as Horovod and Hugging Face to data processing frameworks such as Spark, Modin, and Dask. But what does it mean to be "integrated with Ray"? And what benefits does it provide to library developers and users?

This blog post answers these questions by looking at three common Ray library integration patterns. To do this, we’ll look at several examples of Ray libraries at each integration level, show how each pattern or “level” of integration solves different problems for library authors and users, and explain how to decide on the right level of integration.

LinkThree Levels of Ray Integration

Level 1: Scheduling only; Communication done outside Ray

Ray Distributed Library Patterns (Figure 1)
  • Horovod [code]: Horovod uses Ray actors to schedule Horovod worker processes, which enables dynamic scaling and data pre-processing to be handled within Ray.

  • Pytorch Lightning [code]: Similarly, Pytorch Lightning supports distributed data parallel training in Ray, where Ray is used to launch processes.

  • Spark / RayDP [code]: RayDP enables Spark to run inside Ray by launching Spark executors as Ray Java actors.

At this level, libraries are leveraging Ray as a language-integrated actor scheduler. The actual communication between actors is mostly done out-of-band (outside Ray). For example, Horovod-on-Ray uses NCCL or MPI-based collective communications, and RayDP uses Spark's internal RPC and object manager.

Ray Distributed Library Patterns (Figure 2)

At a first glance, it seems that this integration is superficial. However, this level of integration is often the right choice for libraries that have mature, high-performance internal communication stacks but still want additional features such as:

  1. Fine-grained control over worker scheduling and fault tolerance (e.g., Elastic Horovod).

  2. The ability to run  subroutines within a larger distributed program (e.g., tuning), or pass data in-memory to downstream stages (e.g., RayDP => MLDataset => Training).

1# Out-of-band communication example.
2
3# Launch the actors.
4workers = [MPIActor.remote(...) for _ in range(10)]
5for i, w in enumerate(workers):
6   w.init_for_rank.remote(i)
7
8# Ray triggers the collective op, but the actual communication
9# happens between the workers out-of-band (e.g., via MPI).
10ray.get([w.collective_op.remote() for w in workers])

We see libraries integrating with Ray in this way often because getting the APIs right to compose and schedule distributed programs is hard. Ray provides the common "distributed glue" here so libraries can focus on their core functionality rather than operational aspects.

LinkLevel 2: Scheduling and communication

Ray Distributed Library Patterns (Figure 3)
  • Hugging Face [blog]: Hugging Face uses Ray actor calls for faster distributed document retrieval for fine-tuning.

  • Online resource allocation at Ant [blog]: Ant Group uses actor communication to implement online optimization for ads and recommendations.

  • Scikit-Learn [code]: Ray supports Scikit-Learn through the joblib parallel interface.

  • Seldon Alibi [blog]: Similarly, Alibi uses actors and Serve as methods for scaling out batch prediction.

Many apps and libraries use Ray for parallelizing their programs, leveraging both Ray's scheduling functionality (Level 1 benefits) as well as using task and actor method calls for communication. Under the hood, Ray is translating task and actor invocations into low-level gRPC calls.

Libraries that are a good fit for this kind of integration require one or more of the following:

  1. Low latency communication and a desire for a simpler programming model than sending raw RPC calls to a pool of compute workers.

  2. Computation that can be parallelized into many individual tasks (e.g., distributed multiprocessing or joblib).

  3. The ability to coordinate a complex topology of actors or tasks (e.g., for implementing reinforcement learning, online decision system, or model serving pipeline).

1# Task example.
2results = [evaluate.remote(latest_params) for arg in work_to_do]
3ray.get(results)
4
5# Actor example.
6workers = [Actor.remote() for _ in range(5)]
7for w in workers:
8    w.update(latest_params)
9results = [w.evaluate.remote() for w in workers]
10ray.get(results)

For applications that require specialized communication, we currently still recommend a Level 1 type integration. In the future, Ray Collectives will enable more libraries to be implemented natively in Ray with high performance.

LinkLevel 3: Scheduling, communication, and distributed memory

RayDistributedLibraryPatterns (Figure 4)
  • XGBoost [code]: XGBoost on Ray leverages the object store to hold the distributed data matrix used for training.

  • Modin [code]: Modin uses the object store to store partition blocks in shared memory, and tasks to execute compute over these blocks.

  • Mars [code]: Mars uses actors and the object store to implement distributed execution and data storage.

  • Apache Airflow [code]: The Ray Airflow provider allows Airflow operators to be executed in Ray and pass intermediate data using the object store.

  • Dask [code]: The Ray Dask scheduler stores intermediate Dask results in the object store and leverages tasks for execution.

Last but not least, libraries may also seek to leverage Ray's distributed object store, which is seamlessly integrated with Ray's task and actor APIs. Technically speaking, any application that is passing large (>100KB in size) objects around in Ray is already leveraging Ray's distributed memory system for performance.

Level 3 libraries further take advantage of these distributed memory features of Ray:

  1. First-class object references, which can be passed and retrieved freely between tasks and actors in Ray, and are automatically garbage collected if they fall out of scope.

  2. Shared-memory support, which means that large objects can be shared by multiple workers on the same machine without any copies, greatly increasing efficiency.

  3. Object spilling, which enables large-scale data processing workloads to be executed by spilling objects to disk or remote storage.

1# Store a large dataset in the object store.
2data_R = [ray.put(block) for block in large_data_blocks]
3
4# Store a small dataset in the object store.
5data_S = ray.put(small_data)
6
7# Example of implementing broadcast join between R and S using tasks.
8joined_R_S = [join.remote(R_i, data_S) for R_i in data_R]

Ray's built-in libraries such as Tune, RLlib, and Serve are also Level 3 library examples, leveraging the object store to provide best-in-class performance and flexibility.

LinkDeciding on the right level of integration

It can be tempting to think "more integration is better". However, in accordance with the rule of least power, we recommend choosing the minimal level of integration needed for the library in question. Our goal with Ray is to enable users to build complex distributed applications with the simplicity of a single Python file, which is enabled even with Level 1 integration.

Here are a couple rules of thumb:

  • Libraries that have very specialized communication requirements (i.e., distributed allreduce) should start with Level 1 integration (which can be expanded over time).

    • For example, Horovod on Ray uses mostly actors, but is expanding into using objects and tasks as well for data preprocessing.

  • It's ok to pick and choose what Ray features to leverage---they all work together, but most applications will use only a subset.

    • For example, RayDP uses Ray actors only, whereas Dask-on-Ray uses tasks and objects.

The level of effort to integrate also varies depending on the level of integration and the maturity of the library. As a general rule:

  • Level 1 integration can start out at <200 lines of code.

  • This is typically because libraries already have well-separated routines for launching workers.

  • Level 2-3 integration can, depending on the library, range from straightforward to very high effort. For example,

  • The Dask-on-Ray integration took only a few hundred lines of code, because Dask was designed to have pluggable schedulers.

  • Similarly, scikit-learn integration took ~50 lines of code, leveraging joblib as an integration point.

  • On the other hand, fully integrating e.g., Spark to use Ray's memory subsystem would be a large engineering project.

LinkInteroperability between Ray libraries

A final question is "how can Ray libraries work together"?

Here are a few common patterns:

Finally, an app can simply invoke multiple libraries. A Ray application can naturally leverage libraries in separate tasks and actors without the libraries ever needing to know about each other.

LinkConclusion

We hope this blog makes it easier for future developers to make an informed decision on how to build libraries on top of Ray. If you have any questions, please reach out to us on the Ray forums. If you're interested in working with us to make it easier to leverage Ray, we're hiring!

Ready to try Anyscale?

Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.