Ray has many library integrations, from machine learning libraries such as Horovod and Hugging Face to data processing frameworks such as Spark, Modin, and Dask. But what does it mean to be "integrated with Ray"? And what benefits does it provide to library developers and users?
This blog post answers these questions by looking at three common Ray library integration patterns. To do this, we’ll look at several examples of Ray libraries at each integration level, show how each pattern or “level” of integration solves different problems for library authors and users, and explain how to decide on the right level of integration.
Level 1: Scheduling only; Communication done outside Ray
Pytorch Lightning [code]: Similarly, Pytorch Lightning supports distributed data parallel training in Ray, where Ray is used to launch processes.
Spark / RayDP [code]: RayDP enables Spark to run inside Ray by launching Spark executors as Ray Java actors.
At this level, libraries are leveraging Ray as a language-integrated actor scheduler. The actual communication between actors is mostly done out-of-band (outside Ray). For example, Horovod-on-Ray uses NCCL or MPI-based collective communications, and RayDP uses Spark's internal RPC and object manager.
At a first glance, it seems that this integration is superficial. However, this level of integration is often the right choice for libraries that have mature, high-performance internal communication stacks but still want additional features such as:
Fine-grained control over worker scheduling and fault tolerance (e.g., Elastic Horovod).
The ability to run subroutines within a larger distributed program (e.g., tuning), or pass data in-memory to downstream stages (e.g., RayDP => MLDataset => Training).
1 2 3 4 5 6 7 8 9 10# Out-of-band communication example. # Launch the actors. workers = [MPIActor.remote(...) for _ in range(10)] for i, w in enumerate(workers): w.init_for_rank.remote(i) # Ray triggers the collective op, but the actual communication # happens between the workers out-of-band (e.g., via MPI). ray.get([w.collective_op.remote() for w in workers])
We see libraries integrating with Ray in this way often because getting the APIs right to compose and schedule distributed programs is hard. Ray provides the common "distributed glue" here so libraries can focus on their core functionality rather than operational aspects.
Hugging Face [blog]: Hugging Face uses Ray actor calls for faster distributed document retrieval for fine-tuning.
Online resource allocation at Ant [blog]: Ant Group uses actor communication to implement online optimization for ads and recommendations.
Scikit-Learn [code]: Ray supports Scikit-Learn through the joblib parallel interface.
Many apps and libraries use Ray for parallelizing their programs, leveraging both Ray's scheduling functionality (Level 1 benefits) as well as using task and actor method calls for communication. Under the hood, Ray is translating task and actor invocations into low-level gRPC calls.
Libraries that are a good fit for this kind of integration require one or more of the following:
Low latency communication and a desire for a simpler programming model than sending raw RPC calls to a pool of compute workers.
1 2 3 4 5 6 7 8 9 10# Task example. results = [evaluate.remote(latest_params) for arg in work_to_do] ray.get(results) # Actor example. workers = [Actor.remote() for _ in range(5)] for w in workers: w.update(latest_params) results = [w.evaluate.remote() for w in workers] ray.get(results)
For applications that require specialized communication, we currently still recommend a Level 1 type integration. In the future, Ray Collectives will enable more libraries to be implemented natively in Ray with high performance.
XGBoost [code]: XGBoost on Ray leverages the object store to hold the distributed data matrix used for training.
Modin [code]: Modin uses the object store to store partition blocks in shared memory, and tasks to execute compute over these blocks.
Mars [code]: Mars uses actors and the object store to implement distributed execution and data storage.
Apache Airflow [code]: The Ray Airflow provider allows Airflow operators to be executed in Ray and pass intermediate data using the object store.
Dask [code]: The Ray Dask scheduler stores intermediate Dask results in the object store and leverages tasks for execution.
Last but not least, libraries may also seek to leverage Ray's distributed object store, which is seamlessly integrated with Ray's task and actor APIs. Technically speaking, any application that is passing large (>100KB in size) objects around in Ray is already leveraging Ray's distributed memory system for performance.
Level 3 libraries further take advantage of these distributed memory features of Ray:
First-class object references, which can be passed and retrieved freely between tasks and actors in Ray, and are automatically garbage collected if they fall out of scope.
Shared-memory support, which means that large objects can be shared by multiple workers on the same machine without any copies, greatly increasing efficiency.
Object spilling, which enables large-scale data processing workloads to be executed by spilling objects to disk or remote storage.
1 2 3 4 5 6 7 8# Store a large dataset in the object store. data_R = [ray.put(block) for block in large_data_blocks] # Store a small dataset in the object store. data_S = ray.put(small_data) # Example of implementing broadcast join between R and S using tasks. joined_R_S = [join.remote(R_i, data_S) for R_i in data_R]
It can be tempting to think "more integration is better". However, in accordance with the rule of least power, we recommend choosing the minimal level of integration needed for the library in question. Our goal with Ray is to enable users to build complex distributed applications with the simplicity of a single Python file, which is enabled even with Level 1 integration.
Here are a couple rules of thumb:
Libraries that have very specialized communication requirements (i.e., distributed allreduce) should start with Level 1 integration (which can be expanded over time).
For example, Horovod on Ray uses mostly actors, but is expanding into using objects and tasks as well for data preprocessing.
It's ok to pick and choose what Ray features to leverage---they all work together, but most applications will use only a subset.
For example, RayDP uses Ray actors only, whereas Dask-on-Ray uses tasks and objects.
The level of effort to integrate also varies depending on the level of integration and the maturity of the library. As a general rule:
Level 1 integration can start out at <200 lines of code.
This is typically because libraries already have well-separated routines for launching workers.
Level 2-3 integration can, depending on the library, range from straightforward to very high effort. For example,
The Dask-on-Ray integration took only a few hundred lines of code, because Dask was designed to have pluggable schedulers.
Similarly, scikit-learn integration took ~50 lines of code, leveraging joblib as an integration point.
On the other hand, fully integrating e.g., Spark to use Ray's memory subsystem would be a large engineering project.
A final question is "how can Ray libraries work together"?
Here are a few common patterns:
Library as a subroutine: in this pattern, one distributed library is run as a subroutine of another. This can be either:
Passing data between libraries:
In Airflow-on-Ray, data is passed between workflow steps using Ray's in-memory object store.
RayDP writes the output of a Spark job into the Ray object store, allowing it to be easily accessible from downstream application logic via the MLDataset API.
Finally, an app can simply invoke multiple libraries. A Ray application can naturally leverage libraries in separate tasks and actors without the libraries ever needing to know about each other.
We hope this blog makes it easier for future developers to make an informed decision on how to build libraries on top of Ray. If you have any questions, please reach out to us on the Ray forums. If you're interested in working with us to make it easier to leverage Ray, we're hiring!