This blog post highlights two features in the latest Ray 1.2 release: native support for spilling to external storage, and support for libraries from the Python data processing ecosystem, including integrations for PySpark and Dask.
Over the past couple years, we’ve heard from many Ray users that they wish to incorporate parallel data processing more directly into their Python applications. These use cases range from processing input CSVs faster to shuffling hundreds of terabytes of ML input data (distributed ETL). The common desire is to stitch together an application with data processing and ML components in a single Python program, without needing to worry about setting up, maintaining, and gluing together separate clusters or services.
Rather than write another DataFrames library for Ray, we're focusing on supporting the integration of other frameworks so that you can
Break down compute silos: Invoke data processing APIs like Dask DataFrames directly from application code alongside ML libraries like Tune, XGBoost, and Horovod.
Leverage distributed memory: Automatic object spilling enables data library developers and users to take advantage of Ray’s shared-memory object store.
Running training jobs on the results of batch ETL jobs (e.g., distributed training, XGBoost on Ray).
Composing pipelines together (e.g., connecting Spark => PyTorch) from separately written operators (tasks).
However, in many cases the use of workflow orchestrators adds costs, due to the compute silo effect, in terms of system efficiency and operations.
The setup overhead of workflow tasks adds latency and reduces the cost efficiency of the job.
Intermediate data must be materialized to external storage (e.g., HDFS or S3), also adding latency.
There is high operational overhead maintaining separate distributed systems for data processing.
The pipeline itself must be written and configured in a separate configuration language (vs. a single Python program). This also limits expressivity.
Our goal for supporting data processing in Ray is to enable distributed applications with less glue code, increasing expressivity and reducing system overheads.
In the remainder of this blog we’ll cover the new object spilling feature in Ray and library integrations.
We’re happy to announce that Ray now supports automatic object spilling to local disk and Cloud object stores (e.g., S3). This feature is available starting in Ray 1.2 and will be enabled by default in the next release.
Object spilling enables libraries already using Ray’s object store to work with datasets that may not fit in memory. It also allows Ray programs to operate directly on big datasets. For example, you can now write a simple out-of-core distributed shuffle in just a few dozen lines of Python:
Here’s an example of using Ray’s object spilling to enable out-of-core workloads for Dask-on-Ray. Note that this code does a full sort of the dataset, so we will not compare it directly to the simple shuffle above.
Note that standalone systems like Spark already have native support for object spilling. However, this is a significant effort for libraries like Modin and Mars. This is where integration with Ray can reduce burden for library developers and enable new workloads for end users. We’ll cover how this works in the next section.
Note: To reproduce these benchmarks, you will need to run on the nightly wheels. You can also do this by installing Ray v1.2 (
pip install -U ray) and running
ray install-nightly from the command line.
All of the following examples are runnable with Ray v1.2, unless otherwise noted.
With PySpark support in RayDP, you can leverage the power of Spark with just a few lines of code. Once RayDP is initialized, you can use any PySpark API you want:
1$ pip install ray raydp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19import ray import raydp ray.init() @ray.remote class PySparkDriver: def __init__(self): self.spark = raydp.init_spark( app_name='RayDP example', num_executors=2, executor_cores=2, executor_memory='4GB') def foo(self): return self.spark.range(1000).repartition(10).count() driver = PySparkDriver.remote() print(ray.get(driver.foo.remote()))
Under the hood,
raydp.init_spark creates num_executors Ray Java actors that each launch a Spark executor. The actors communicate between each other using Spark’s internal IO layer. In the above example, we use the PySparkDriver actor class to wrap the Spark session so that it is callable from other parts of a Ray application. Note that this integration doesn’t use Ray’s object store, unlike the other integrations we’ll cover below.
If you like to learn more about RayDP, please check out the documentation.
We recently released a lightweight plugin for Dask that lets you use Ray as a backend for executing Dask tasks. This includes Ray-specific optimizations for large scale shuffle performance. Here’s how you can try it out:
1$ pip install ray 'dask[dataframe]' pandas numpy
Note that you do not need to install dask.distributed even if you are using a cluster because Ray will handle the distribution.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16import ray import dask import dask.dataframe as dd import pandas as pd import numpy as np from ray.util.dask import ray_dask_get dask.config.set(scheduler=ray_dask_get) # Sets Ray as the default backend. ray.init() df = pd.DataFrame(np.random.randint(0, 100, size=(2**10, 2**8))) df = dd.from_pandas(df, npartitions=10) print(df.head(10))
Under the hood, Dask dispatches tasks to Ray for scheduling and execution. Task inputs and outputs get stored in Ray’s distributed, shared-memory object store. This means that you can seamlessly mix Dask and other Ray library workloads.
If you like to learn more about Dask on Ray, please check out the documentation.
Modin DataFrames enables transparent scaling of Pandas on Ray. Try it out:
1$ pip install 'modin[ray]' pandas numpy
1 2 3 4 5 6 7 8 9 10import ray ray.init() # Modin defaults to backing Ray's object store with disk. Start Ray before importing modin to use shared memory instead. import modin.pandas as pd import numpy as np frame_data = np.random.randint(0, 100, size=(2**10, 2**8)) df = pd.DataFrame(frame_data) print(df.head(10))
If you like to learn more about Modin, please check out the documentation.
Here’s how you can try it out:
1$ pip install -U pymars ray==1.0
Note that you do not need to install pymars[distributed] even if you are using a cluster because Ray will handle the distribution.
1 2 3 4 5 6 7 8 9from mars.session import new_session ray_session = new_session(backend='ray').as_default() # Set Ray as the default backend. import mars.dataframe as md import mars.tensor as mt t = mt.random.randint(100, size=(2**10, 2**8)) df = md.DataFrame(t) print(df.head(10).execute())
The Mars-on-Ray integration is experimental and under active development, but you can learn more from the documentation.
This is just the beginning to provide support in Ray for 3rd party data processing libraries and frameworks. We’re actively developing features like object spilling and memory management to improve stability and performance. Keep an eye out for our next blog post on a large-scale distributed shuffle to better support these data processing frameworks.