Data Processing Support in Ray

By Sang Cho, Alex Wu, Clark Zinzow, Eric Liang and Stephanie Wang   

This blog post highlights two features in the latest Ray 1.2 release: native support for spilling to external storage, and support for libraries from the Python data processing ecosystem, including integrations for PySpark and Dask.

Over the past couple years, we’ve heard from many Ray users that they wish to incorporate parallel data processing more directly into their Python applications. These use cases range from processing input CSVs faster to shuffling hundreds of terabytes of ML input data (distributed ETL). The common desire is to stitch together an application with data processing and ML components in a single Python program, without needing to worry about setting up, maintaining, and gluing together separate clusters or services.

Rather than write another DataFrames library for Ray, we're focusing on supporting the integration of other frameworks so that you can

  1. Break down compute silos: Invoke data processing APIs like Dask DataFrames directly from application code alongside ML libraries like Tune, XGBoost, and Horovod.

  2. Leverage distributed memory: Automatic object spilling enables data library developers and users to take advantage of Ray’s shared-memory object store.

LinkA single substrate for distributed data processing and machine learning

The status quo for many teams today is to use workflow orchestrators such as Airflow or Kubeflow to stitch distributed programs together. This enables use cases such as:

  • Running training jobs on the results of batch ETL jobs (e.g., distributed training, XGBoost on Ray).

  • Composing pipelines together (e.g., connecting Spark => PyTorch) from separately written operators (tasks).

However, in many cases the use of workflow orchestrators adds costs, due to the compute silo effect, in terms of system efficiency and operations.

System efficiency:

  1. The setup overhead of workflow tasks adds latency and reduces the cost efficiency of the job.

  2. Intermediate data must be materialized to external storage (e.g., HDFS or S3), also adding latency.

Operationally:

  1. There is high operational overhead maintaining separate distributed systems for data processing.

  2. The pipeline itself must be written and configured in a separate configuration language (vs. a single Python program). This also limits expressivity.

Our goal for supporting data processing in Ray is to enable distributed applications with less glue code, increasing expressivity and reducing system overheads.

In the remainder of this blog we’ll cover the new object spilling feature in Ray and library integrations.

LinkObject Spilling

We’re happy to announce that Ray now supports automatic object spilling to local disk and Cloud object stores (e.g., S3). This feature is available starting in Ray 1.2 and will be enabled by default in the next release.

Object spilling enables libraries already using Ray’s object store to work with datasets that may not fit in memory. It also allows Ray programs to operate directly on big datasets. For example, you can now write a simple out-of-core distributed shuffle in just a few dozen lines of Python:

Object Spilling 1
Simple shuffle example written in Ray with different object store memory limits on a single 32-core machine (AWS i3.8xlarge). The maximum working set size for this shuffle is 2x the input data size, so a memory limit less than that will require spilling. You can try running this yourself on Ray master with python -m ray.experimental.shuffle - help.

Here’s an example of using Ray’s object spilling to enable out-of-core workloads for Dask-on-Ray. Note that this code does a full sort of the dataset, so we will not compare it directly to the simple shuffle above.

objectSpilling2
Dask-on-Ray sort with Ray's object spilling disabled vs. enabled. This is run on a 32-core machine (AWS i3.8xlarge) with a 30GB memory limit for Ray's object store. With object spilling disabled, the application receives an out-of-memory error on 100GB.

Note that standalone systems like Spark already have native support for object spilling. However, this is a significant effort for libraries like Modin and Mars. This is where integration with Ray can reduce burden for library developers and enable new workloads for end users. We’ll cover how this works in the next section.

Note: To reproduce these benchmarks, you will need to run on the nightly wheels. You can also do this by installing Ray v1.2 (pip install -U ray) and running ray install-nightly from the command line.

LinkLibrary Integrations

All of the following examples are runnable with Ray v1.2, unless otherwise noted.

LinkPySpark on Ray

With PySpark support in RayDP, you can leverage the power of Spark with just a few lines of code. Once RayDP is initialized, you can use any PySpark API you want:

Installation:

1
$ pip install ray raydp

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import ray
import raydp

ray.init()

@ray.remote
class PySparkDriver:
  def __init__(self):
  self.spark = raydp.init_spark(
    app_name='RayDP example',
    num_executors=2,
    executor_cores=2,
    executor_memory='4GB')

  def foo(self):
    return self.spark.range(1000).repartition(10).count()

driver = PySparkDriver.remote()
print(ray.get(driver.foo.remote()))

Under the hood, raydp.init_spark creates num_executors Ray Java actors that each launch a Spark executor. The actors communicate between each other using Spark’s internal IO layer. In the above example, we use the PySparkDriver actor class to wrap the Spark session so that it is callable from other parts of a Ray application. Note that this integration doesn’t use Ray’s object store, unlike the other integrations we’ll cover below.

If you like to learn more about RayDP, please check out the documentation.

LinkDask on Ray

We recently released a lightweight plugin for Dask that lets you use Ray as a backend for executing Dask tasks. This includes Ray-specific optimizations for large scale shuffle performance. Here’s how you can try it out:

Installation:

1
$ pip install ray 'dask[dataframe]' pandas numpy

Note that you do not need to install dask.distributed even if you are using a cluster because Ray will handle the distribution.

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import ray

import dask

import dask.dataframe as dd 
import pandas as pd
import numpy as np
from ray.util.dask import ray_dask_get

dask.config.set(scheduler=ray_dask_get) # Sets Ray as the default backend.

ray.init()

df = pd.DataFrame(np.random.randint(0, 100, size=(2**10, 2**8)))
df = dd.from_pandas(df, npartitions=10)
print(df.head(10))

Under the hood, Dask dispatches tasks to Ray for scheduling and execution. Task inputs and outputs get stored in Ray’s distributed, shared-memory object store. This means that you can seamlessly mix Dask and other Ray library workloads.

If you like to learn more about Dask on Ray, please check out the documentation.

LinkModin DataFrames

Modin DataFrames enables transparent scaling of Pandas on Ray. Try it out:

Installation:

1
$ pip install 'modin[ray]' pandas numpy

Example:

1
2
3
4
5
6
7
8
9
10
import ray
ray.init() # Modin defaults to backing Ray's object store with disk. Start Ray before importing modin to use shared memory instead.

import modin.pandas as pd
import numpy as np

frame_data = np.random.randint(0, 100, size=(2**10, 2**8))
df = pd.DataFrame(frame_data)

print(df.head(10))

If you like to learn more about Modin, please check out the documentation.

LinkMars on Ray

Mars is a tensor-based framework that can scale numpy, Pandas, and scikit-learn applications. It recently added Ray as a backend for distributed execution.

Here’s how you can try it out:

Installation:

1
$ pip install -U pymars ray==1.0

Note that you do not need to install pymars[distributed] even if you are using a cluster because Ray will handle the distribution.

Example

1
2
3
4
5
6
7
8
9
from mars.session import new_session
ray_session = new_session(backend='ray').as_default() # Set Ray as the default backend.

import mars.dataframe as md
import mars.tensor as mt

t = mt.random.randint(100, size=(2**10, 2**8))
df = md.DataFrame(t)
print(df.head(10).execute())

The Mars-on-Ray integration is experimental and under active development, but you can learn more from the documentation.

LinkConclusion

This is just the beginning to provide support in Ray for 3rd party data processing libraries and frameworks. We’re actively developing features like object spilling and memory management to improve stability and performance. Keep an eye out for our next blog post on a large-scale distributed shuffle to better support these data processing frameworks.

In the meantime, if you’re interested in learning more, check out the whitepaper or join us on the Ray Discourse.