We're happy to introduce Ray Datasets, a data loading and preprocessing library built on Ray. Datasets leverages Ray’s task, actor, and object APIs to enable large-scale machine learning (ML) ingest, training, and inference, all within a single Python application.
In a nutshell, Datasets:
Is the standard way to load distributed data into Ray, supporting popular storage backends and file formats.
Supports common ML preprocessing operations including basic parallel data transformations such as map, batched map, and filter, and global operations such as sort, shuffle, groupby, and stats aggregations.
Supports operations requiring stateful setup and GPU acceleration.
Works seamlessly with Ray-integrated data processing libraries (Spark, Pandas, NumPy, Dask, Mars) and ML frameworks (TensorFlow, Torch, Horovod).
In this blog post, we will survey the current state of distributed training and model scoring pipelines and give an overview of Ray Datasets and how it solves problems present in the status quo. If that leaves you wanting more, be sure to register for our upcoming webinar, where you can get a first-hand look at Datasets in action. With that, let’s dive in!
Today, users often stitch together their training pipeline using various distributed compute frameworks. While this approach has its advantages in re-using existing systems, there are several drawbacks, which we covered in our blog post on data ingest in third-gen ML architectures.
Ray is enabling simple Python scripts to replace these pipelines, avoiding their tradeoffs and also improving performance. Ray Datasets are a key part of this vision, acting as the "distributed Arrow" format for exchanging data between distributed steps in Ray.
Datasets is fundamentally a distributed dataset abstraction, where the underlying data blocks (partitions) are distributed across a Ray cluster, sitting in distributed memory.
This distributed representation allows for the Dataset to be built by distributed parallel tasks, each pulling a block’s worth of data from a source (e.g., S3) and putting the block into the node’s local object store, with the client-side Dataset object holding references to the distributed blocks. Operations on the client-side Dataset object then result in parallel operations on those blocks.
A Dataset’s blocks can contain data of any modality, including text, arbitrary binary bytes (e.g., images), and numerical data; however, the full power of Datasets is unlocked when used with tabular data. In this case, each block consists of a partition of a distributed table, and these row-based partitions are represented as Arrow Tables under the hood, yielding a distributed Arrow dataset.
Datasets is not intended as a replacement for generic data processing systems like Spark. Datasets is meant to be the last-mile bridge between ETL pipelines and distributed applications running on Ray.
This bridge becomes extra powerful when using Ray-integrated DataFrame libraries for your data processing stage, as this allows you to run a full data-to-ML pipeline on top of Ray, eliminating the need to materialize data to external storage as an intermediate step. Ray serves as the universal compute substrate for your ML pipeline, with Datasets forming the distributed data bridge between pipeline stages.
Check out our blog post on ingest in third-generation ML architectures for more on how this works under the hood.
Datasets aims to be a universal parallel data loader, data writer, and exchange format, providing a narrow data waist for Ray applications and libraries to interface with.
This is accomplished by heavily leveraging Arrow’s I/O layer, using Ray’s high-throughput task execution to parallelize Arrow’s high-performance single-threaded I/O. The Datasets I/O layer has scaled to multi-petabyte data ingest jobs in production at Amazon.
Datasets’ scalable I/O is all available behind a dead-simple API, expressible via a single call:
With Arrow’s I/O layer comes support for many of your favorite tabular file formats (JSON, CSV, Parquet) and storage backends (local disk, S3, GCS, Azure Blog Storage, HDFS). Beyond tabular data, we’ve added support for parallel reads and writes of NumPy, text, and binary files. This comprehensive support for reading many formats from many external sources, coupled with the extremely scalable parallelization scheme, makes Datasets the preferred way to ingest large amounts of data into a Ray cluster.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15# Read structured data from disk, cloud storage, etc. ray.data.read_parquet("s3://path/to/parquet") ray.data.read_json("...") ray.data.read_csv("...") ray.data.read_text("...") # Read tensor / image / file data. ray.data.read_numpy("...") ray.data.read_binary_files("...") # Create from in-memory objects. ray.data.from_objects([list, of, python, objects]) ray.data.from_pandas([list, of, pandas, dfs]) ray.data.from_numpy([list, of, numpy, arrays]) ray.data.from_arrow([list, of, arrow, tables])
In addition to storage I/O, Datasets also allows for bidirectional in-memory data exchange with many popular distributed frameworks when they are run on Ray, such as Spark, Dask, Modin, and Mars, as well as Pandas and NumPy for small local in-memory data. For convenient ingestion of data into model trainers, Datasets provides an exchange API for both PyTorch and TensorFlow, yielding the familiar framework-specific datasets, torch.util.data.IterableDataset and tf.data.Dataset.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16# Convert from existing DataFrames. ray.data.from_spark(spark_df) ray.data.from_dask(dask_df) ray.data.from_modin(modin_df) # Convert to DataFrames and ML datasets. dataset.to_spark() dataset.to_dask() dataset.to_modin() dataset.to_torch() dataset.to_tf() # Convert to objects in the shared memory object store. dataset.to_numpy_refs() dataset.to_arrow_refs() dataset.to_pandas_refs()
Datasets offers convenient data preprocessing functionality for common last-mile transformations that you wish to perform right before training your model or doing batch inference. “Last-mile preprocessing” covers transformations that differ across models or that involve per-run or per-epoch randomness. Datasets allow you to do these operations in parallel while keeping everything in (distributed) memory with
.map_batches(fn), with no need to persist the results back to storage before starting to train your model or do batch inference.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16# Simple transforms. dataset.map(fn) dataset.flat_map(fn) dataset.map_batches(fn) dataset.filter(fn) # Aggregate operations. dataset.repartition() dataset.groupby() dataset.aggregate() dataset.sort() # ML Training utilities. dataset.random_shuffle() dataset.split() dataset.iter_batches()
To enable batch inference on large datasets, Datasets supports running stateful computations on GPUs. This is quite simple: instead of calling
.map_batches(fn) with a stateless function, call
.map_batches(callable_cls, compute="actors"). The callable class will be instantiated on a Ray actor, and re-used multiple times to transform input batches for inference:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23# Example of GPU batch inference on an ImageNet model. def preprocess(image: bytes) -> bytes: return image class BatchInferModel: def __init__(self): self.model = ImageNetModel() def __call__(self, batch: pd.DataFrame) -> pd.DataFrame: return self.model(batch) ds = ray.data.read_binary_files("s3://bucket/image-dir") # Preprocess the data. ds = ds.map(preprocess) # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s] # Apply GPU batch inference with actors, and assign each actor a GPU using # ``num_gpus=1`` (any Ray remote decorator argument can be used here). ds = ds.map_batches(BatchInferModel, compute="actors", batch_size=256, num_gpus=1) # -> Map Progress (16 actors 4 pending): 100%|██████| 200/200 [00:07, 27.60it/s] # Save the results. ds.repartition(1).write_json("s3://bucket/inference-results")
Reading into, transforming, and consuming/writing out of your Dataset creates a series of execution stages. By default, these stages are eagerly executed via blocking calls, which provides an easy-to-understand bulk synchronous parallel execution model and maximal parallelism for each stage:
However, this doesn’t allow you to overlap computation across stages: when the first data block is done loading, we can’t start transforming it until all other data blocks are done being loaded as well. If different stages require different resources, this lock-step execution may over-saturate the current stage’s resources while leaving all other stage’s resources idle. Pipelining solves this problem:
Pipelining is natively supported in the Datasets API: simply call .window() or .repeat() to generate a DatasetPipeline that can be read, transformed, and written just like a normal Dataset. This means you can easily incrementally process or stream data for ML training and inference. Read more about it in our pipelining docs.
Ray has a robust distributed dataplane, combining decentralized scheduling with a best-in-class distributed object layer, featuring:
Efficient zero-copy reads via shared memory for workers on the same node, obviating the need for serialization when sharing data across worker processes.
Locality-aware scheduling, where data-intensive tasks are scheduled onto the node that has the most of the task’s required data already local.
Resilient object transfer protocols, with a memory manager that ensures prefetching and forward progress while bounding the amount of memory usage.
A fast data transport implementation, transferring data chunks in parallel in order to maximize transfer throughput.
Given Ray's distributed dataplane, building a library like Datasets becomes comparatively simple. Datasets delegates most of the heavy lifting to the Ray dataplane, focusing on providing higher-level features such as convenient APIs, data format support, and stage pipelining.
The Ray Datasets project is still in its early stages. Post-beta, we plan to add a number of additional features, including:
Support for more data formats and integrations
Reducing object store memory overhead in large-scale pipelines
Improved performance and scalability of shuffle (scaling to 100+TB)
That said, users are already finding Datasets to be providing distinct advantages today.
One organization utilizing Ray for their ML infra has found Datasets to be effective at speeding up their training and inference workloads at small scales:
At training, Ray Datasets was 8x faster than Pandas + S3 + Petastorm when used from a single GPU instance, showing reduction in serialization overheads:
At inference, Dask-on-Ray + Datasets + Torch was 5x faster than Pandas + Torch again even when evaluating on a single machine:
Another ML platform group evaluating a larger scale S3 → Datasets → Horovod data pipeline found significant gains when scaling their ingest pipeline to a cluster of machines. In this use case, not only did Datasets provide higher throughput, but better shuffle quality since it supported a true distributed shuffle.
Benchmark: 1.5 TB synthetic tabular dataset, 16 nodes (40 vCPUs, 180 GB RAM).
To summarize, Datasets simplifies ML pipelines by providing a flexible and scalable API for working with data within Ray. We're just getting started with Datasets, but users are already finding it effective for training and inference at a variety of scales. Check out the documentation or register for our upcoming webinar to get a first-hand look at Datasets in action.
This post is based on Alex Wu and Clark Zinzow's talk, “Unifying Data preprocessing and training with Ray Datasets,” from PyData Global 2021.