Ray Datasets for large-scale machine learning ingest and scoring

By Clark Zinzow, Alex Wu, Jiajun Yao, Eric Liang and Chen Shen   

We're happy to introduce Ray Datasets, a data loading and preprocessing library built on Ray. Datasets leverages Ray’s task, actor, and object APIs to enable large-scale machine learning (ML) ingest, training, and inference, all within a single Python application.

blog-ray-datasets-1

In a nutshell, Datasets:

  • Is the standard way to load distributed data into Ray, supporting popular storage backends and file formats.

  • Supports common ML preprocessing operations including basic parallel data transformations such as map, batched map, and filter, and global operations such as sort, shuffle, groupby, and stats aggregations.

  • Supports operations requiring stateful setup and GPU acceleration.

  • Works seamlessly with Ray-integrated data processing libraries (Spark, Pandas, NumPy, Dask, Mars) and ML frameworks (TensorFlow, Torch, Horovod).

In this blog post, we will survey the current state of distributed training and model scoring pipelines and give an overview of Ray Datasets and how it solves problems present in the status quo. If that leaves you wanting more, be sure to register for our upcoming webinar, where you can get a first-hand look at Datasets in action. With that, let’s dive in!

Current ML training and inference pipelines

The status quo pipeline

Today, users often stitch together their training pipeline using various distributed compute frameworks. While this approach has its advantages in re-using existing systems, there are several drawbacks, which we covered in our blog post on data ingest in third-gen ML architectures.

blog-ray-datasets-2
Training pipeline status quo: workflow orchestration frameworks are required to orchestrate this multi-language, multi-job pipeline with intermediate data persistence.

The vision

Ray is enabling simple Python scripts to replace these pipelines, avoiding their tradeoffs and also improving performance. Ray Datasets are a key part of this vision, acting as the "distributed Arrow" format for exchanging data between distributed steps in Ray.

Introduction to Ray Datasets

Datasets in a nutshell

Datasets is fundamentally a distributed dataset abstraction, where the underlying data blocks (partitions) are distributed across a Ray cluster, sitting in distributed memory.

blog-ray-datasets-3
A Dataset holds references to one or more in-memory data blocks, distributed across a Ray cluster.

This distributed representation allows for the Dataset to be built by distributed parallel tasks, each pulling a block’s worth of data from a source (e.g., S3) and putting the block into the node’s local object store, with the client-side Dataset object holding references to the distributed blocks. Operations on the client-side Dataset object then result in parallel operations on those blocks.

A Dataset’s blocks can contain data of any modality, including text, arbitrary binary bytes (e.g., images), and numerical data; however, the full power of Datasets is unlocked when used with tabular data. In this case, each block consists of a partition of a distributed table, and these row-based partitions are represented as Arrow Tables under the hood, yielding a distributed Arrow dataset.

blog-ray-datasets-4
Visualization of a Dataset that has three Arrow table blocks, with each block holding 1000 rows.

How does Datasets fit into my training pipeline?

Datasets is not intended as a replacement for generic data processing systems like Spark. Datasets is meant to be the last-mile bridge between ETL pipelines and distributed applications running on Ray.

blog-ray-datasets-5
Ray Datasets is the last-mile data bridge to a Ray cluster.

This bridge becomes extra powerful when using Ray-integrated DataFrame libraries for your data processing stage, as this allows you to run a full data-to-ML pipeline on top of Ray, eliminating the need to materialize data to external storage as an intermediate step. Ray serves as the universal compute substrate for your ML pipeline, with Datasets forming the distributed data bridge between pipeline stages.

blog-ray-datasets-6
When the entire pipeline runs on Ray, distributed data can seamlessly pass from relational data processing to model training without touching a disk or centralized data broker.

Check out our blog post on ingest in third-generation ML architectures for more on how this works under the hood.

Basic features

Scalable parallel I/O

Datasets aims to be a universal parallel data loader, data writer, and exchange format, providing a narrow data waist for Ray applications and libraries to interface with.

blog-ray-datasets-7

This is accomplished by heavily leveraging Arrow’s I/O layer, using Ray’s high-throughput task execution to parallelize Arrow’s high-performance single-threaded I/O. The Datasets I/O layer has scaled to multi-petabyte data ingest jobs in production at Amazon.

Datasets’ scalable I/O is all available behind a dead-simple API, expressible via a single call: ray.data.read_<format>().

Data format compatibility

With Arrow’s I/O layer comes support for many of your favorite tabular file formats (JSON, CSV, Parquet) and storage backends (local disk, S3, GCS, Azure Blog Storage, HDFS). Beyond tabular data, we’ve added support for parallel reads and writes of NumPy, text, and binary files. This comprehensive support for reading many formats from many external sources, coupled with the extremely scalable parallelization scheme, makes Datasets the preferred way to ingest large amounts of data into a Ray cluster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Read structured data from disk, cloud storage, etc.
ray.data.read_parquet("s3://path/to/parquet")
ray.data.read_json("...")
ray.data.read_csv("...")
ray.data.read_text("...")

# Read tensor / image / file data.
ray.data.read_numpy("...")
ray.data.read_binary_files("...")

# Create from in-memory objects.
ray.data.from_objects([list, of, python, objects])
ray.data.from_pandas([list, of, pandas, dfs])
ray.data.from_numpy([list, of, numpy, arrays])
ray.data.from_arrow([list, of, arrow, tables])

Data framework compatibility

In addition to storage I/O, Datasets also allows for bidirectional in-memory data exchange with many popular distributed frameworks when they are run on Ray, such as Spark, Dask, Modin, and Mars, as well as Pandas and NumPy for small local in-memory data. For convenient ingestion of data into model trainers, Datasets provides an exchange API for both PyTorch and TensorFlow, yielding the familiar framework-specific datasets, torch.util.data.IterableDataset and tf.data.Dataset.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Convert from existing DataFrames.
ray.data.from_spark(spark_df)
ray.data.from_dask(dask_df)
ray.data.from_modin(modin_df)

# Convert to DataFrames and ML datasets.
dataset.to_spark()
dataset.to_dask()
dataset.to_modin()
dataset.to_torch()
dataset.to_tf()

# Convert to objects in the shared memory object store.
dataset.to_numpy_refs()
dataset.to_arrow_refs()
dataset.to_pandas_refs()

Last-mile preprocessing

Datasets offers convenient data preprocessing functionality for common last-mile transformations that you wish to perform right before training your model or doing batch inference. “Last-mile preprocessing” covers transformations that differ across models or that involve per-run or per-epoch randomness. Datasets allow you to do these operations in parallel while keeping everything in (distributed) memory with .map_batches(fn), with no need to persist the results back to storage before starting to train your model or do batch inference.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Simple transforms.
dataset.map(fn)
dataset.flat_map(fn)
dataset.map_batches(fn)
dataset.filter(fn)

# Aggregate operations.
dataset.repartition()
dataset.groupby()
dataset.aggregate()
dataset.sort()

# ML Training utilities.
dataset.random_shuffle()
dataset.split()
dataset.iter_batches()

Stateful GPU tasks

To enable batch inference on large datasets, Datasets supports running stateful computations on GPUs. This is quite simple: instead of calling .map_batches(fn) with a stateless function, call .map_batches(callable_cls, compute="actors"). The callable class will be instantiated on a Ray actor, and re-used multiple times to transform input batches for inference:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# Example of GPU batch inference on an ImageNet model.
def preprocess(image: bytes) -> bytes:
    return image

class BatchInferModel:
    def __init__(self):
        self.model = ImageNetModel()
    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        return self.model(batch)

ds = ray.data.read_binary_files("s3://bucket/image-dir")

# Preprocess the data.
ds = ds.map(preprocess)
# -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s]

# Apply GPU batch inference with actors, and assign each actor a GPU using
# ``num_gpus=1`` (any Ray remote decorator argument can be used here).
ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1)
# -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s]

# Save the results.
ds.repartition(1).write_json("s3://bucket/inference-results")

Pipelined compute with Datasets

Reading into, transforming, and consuming/writing out of your Dataset creates a series of execution stages. By default, these stages are eagerly executed via blocking calls, which provides an easy-to-understand bulk synchronous parallel execution model and maximal parallelism for each stage:

blog-ray-datasets-8

However, this doesn’t allow you to overlap computation across stages: when the first data block is done loading, we can’t start transforming it until all other data blocks are done being loaded as well. If different stages require different resources, this lock-step execution may over-saturate the current stage’s resources while leaving all other stage’s resources idle. Pipelining solves this problem:

blog-ray-datasets-9

Pipelining is natively supported in the Datasets API: simply call .window() or .repeat() to generate a DatasetPipeline that can be read, transformed, and written just like a normal Dataset. This means you can easily incrementally process or stream data for ML training and inference. Read more about it in our pipelining docs.

Why build Datasets on top of Ray, anyway?

Ray has a robust distributed dataplane, combining decentralized scheduling with a best-in-class distributed object layer, featuring:

  • Efficient zero-copy reads via shared memory for workers on the same node, obviating the need for serialization when sharing data across worker processes.

  • Locality-aware scheduling, where data-intensive tasks are scheduled onto the node that has the most of the task’s required data already local.

  • Resilient object transfer protocols, with a memory manager that ensures prefetching and forward progress while bounding the amount of memory usage.

  • A fast data transport implementation, transferring data chunks in parallel in order to maximize transfer throughput.

Given Ray's distributed dataplane, building a library like Datasets becomes comparatively simple. Datasets delegates most of the heavy lifting to the Ray dataplane, focusing on providing higher-level features such as convenient APIs, data format support, and stage pipelining.

How is the community using Datasets?

The Ray Datasets project is still in its early stages. Post-beta, we plan to add a number of additional features, including:

  • Support for more data formats and integrations

  • Reducing object store memory overhead in large-scale pipelines

  • Improved performance and scalability of shuffle (scaling to 100+TB)

That said, users are already finding Datasets to be providing distinct advantages today.

Case study 1: ML training and inference vs Petastorm/Pandas

One organization utilizing Ray for their ML infra has found Datasets to be effective at speeding up their training and inference workloads at small scales:

At training, Ray Datasets was 8x faster than Pandas + S3 + Petastorm when used from a single GPU instance, showing reduction in serialization overheads:

blog-ray-datasets-10
Benchmark: NYC Taxi dataset (5 GB subset), single g4dn.4xlarge instance.

At inference, Dask-on-Ray + Datasets + Torch was 5x faster than Pandas + Torch again even when evaluating on a single machine:

blog-ray-datasets-11
Benchmark: NYC Taxi dataset (5 GB subset), single r5d.4xlarge instance

Case study 2: ML ingest at large scale

Another ML platform group evaluating a larger scale S3 → Datasets → Horovod data pipeline found significant gains when scaling their ingest pipeline to a cluster of machines. In this use case, not only did Datasets provide higher throughput, but better shuffle quality since it supported a true distributed shuffle.

Aggregate throughput
Petastorm 2.16 GB/s
Datasets 8.18 GB/s

Benchmark: 1.5 TB synthetic tabular dataset, 16 nodes (40 vCPUs, 180 GB RAM).

Conclusion

To summarize, Datasets simplifies ML pipelines by providing a flexible and scalable API for working with data within Ray. We're just getting started with Datasets, but users are already finding it effective for training and inference at a variety of scales. Check out the documentation or register for our upcoming webinar to get a first-hand look at Datasets in action.

This post is based on Alex Wu and Clark Zinzow's talk, “Unifying Data preprocessing and training with Ray Datasets,” from PyData Global 2021.

Sharing

Tags

Ray Datasets

Sign up for product updates