HomeBlogBlog Detail

Ray Data: Scalable Data Processing for AI workloads

By Alexey Kudinkin, Balaji Veeramani, Praveen Gorthy and Richard Liaw   |   November 4, 2025

Last year, we announced Ray Data GA at Ray Summit. 

Since then, we’ve seen Ray Data has seen rapid growth and community adoption. We’ve also seen the needs around Data + AI change rapidly:

  • There’s a rapid increase of users doing multimodal data processing. These workloads often require working with high dimensional data (videos, images, embeddings), in purpose-built formats (Lance, MCAP, etc) and require interacting with larger models in both inference and training contexts.

  • We’ve seen a continued influx of requirements around structured data operations as well -- including feature preprocessing, the need for scalable and efficient wide operations, and dataframe APIs for easily manipulating structured data.

  • We’ve seen users push scale and reliability in different ways -- we’re seeing usage of Ray Data on larger and larger datasets; pipelines are integrated with larger models which require the use of specialized inference engines like SGLang and vLLM; and as a result, the amount of compute needed to handle these workloads grows exponentially.

To meet the growing needs of data and AI workloads, the Ray Data team at Anyscale and the community at large has invested in 3 areas of development: multimodal data processing features, structured data support, and reliability at scale.

All of the below features are available on Ray 2.51.

LinkMultimodal Data Processing

There’s a rapid increase of Ray Data users trying to process large amounts of multimodal data at scale. We’ve worked with our community at large to address the challenges of these new emerging workloads.

LinkReducing block size explosion with download expression

Image batch inference pipelines often look something like this:

ray summit ray data blog image 1
ray summit ray data blog image 1

One problem we’ve seen with these pipelines is that inference can get bottlenecked on reads. In previous versions of Ray Data, some users need to download data with a custom map call. However, this often performs poorly because users need to manually repartition their dataset and implement their own download functionality.

One major feature is a new download expression (link) which:

  • Efficiently downloads files in a multi-threaded fashion, and 

  • intelligently partitions URIs to maximize throughput

Here is an API example:

Before

1ds = (
2    ray.data.read_parquet(INPUT_PATH)  
3    .repartition(NUM_PARTITIONS)
4    .map(fn=download_image)
5    .map(fn=deserialize_image)
6    .map(fn=transform_image)
7    .map_batches(
8        fn=ResNetActor,
9        batch_size=BATCH_SIZE,
10        num_gpus=1,
11	 concurrency=NUM_GPU_NODES,
12    )
13    .write_parquet(OUTPUT_PATH)
14)

After

1ds = ray.data.read_parquet(INPUT_PATH)
2        .with_column("bytes", download("image_url"))
3        .map(fn=deserialize_image)
4        .map(fn=transform_image)
5        .map_batches(
6            fn=ResNetActor,
7            batch_size=BATCH_SIZE,
8            num_gpus=1.0,
9            concurrency=NUM_GPU_NODES,
10        ).materialize()

This helps optimize batch inference pipelines that work on large multimodal data formats. 

LinkSupporting large models

For multimodal data processing, another emerging requirement is leveraging an inference engine to turn unstructured data (like images, video, or audio files) into structured data. 

In Ray 2.44, we announced Ray Data LLM, which provides a native integration with vLLM to easily enable users to scale out batch inference on their own datasets. Soon after, in Ray 2.45, we integrated SGLang so that users could pick and choose their favorite inference engine to use.

Over the past couple of releases, the community has also added significant features, including:

  • Cross-node model parallelism support to take advantage of the best and largest open source models like Kimi K2 and gpt-oss 120b

  • Support for arbitrary accelerators so that you can run vLLM and Ray Data on TPUs and other non-NVIDIA accelerators; 

  • Ability to share inference engine pools across different steps of your pipeline to effectively handle agentic inference chains. 

All of this is available on Ray Data 2.51. Check out more on the documentation.

Here is an example of using Ray Data for data parallelism and vLLM for cross-node model parallelism:

1config = vLLMEngineProcessorConfig(
2    model_source="unsloth/Llama-3.1-8B-Instruct",
3    engine_kwargs={
4        "enable_chunked_prefill": True,
5        "max_num_batched_tokens": 4096,
6        "max_model_len": 16384,
7        "pipeline_parallel_size": 4,
8        "tensor_parallel_size": 4,
9        "distributed_executor_backend": "ray",
10    },
11    batch_size=32,
12    concurrency=4,
13)
14
15processor = build_llm_processor(
16    config,
17    preprocess=lambda row: {
18        "messages": [{"role": "user", "content": row["prompt"]}],
19        "sampling_params": {"temperature": 0.7, "max_tokens": 100},
20    },
21    postprocess=lambda row: {
22        "prompt": row["prompt"],
23        "response": row["generated_text"],
24    },
25)
26ds = processor(ds)

LinkImproved tensor type support

One of the core challenges with users using Ray Data has been working with tensors, which is a bread and butter data format when working on multimodal data. Ray Data allows users to operate on data with numpy, pandas, and pyarrow. However, historically, the tabular data ecosystem (pyarrow, pandas) has had weaker support for tensors, which can come in different forms and shapes. 

In order to provide interoperability, Ray Data has needed to develop its own custom types around Tensors for users to seamlessly use the libraries they want to use. 

In the past couple of releases, we’ve made substantial improvements revamping our tensors support: with better type handling/inference, block unification, and concatenation -- resulting in better reliability and performance of handling tensors in Ray Data.

LinkMCAP Support

A lot of robotics and autonomy companies leverage the MCAP data format to record sensor logs, optimized for high performance writes and providing nice interoperability with visualization tools like Foxglove. 

These same companies are using Ray Data for multimodal data processing, and we’ve heard multiple users want to be able to directly read and decode from MCAP files in order to simplify their pipelines.

In Ray 2.51, we’ve added a new MCAP data source, which allows for direct reads from MCAP logs and leverages key MCAP features like time-range filters and select topics to read from.

1ds = ray.data.read_mcap( 
2    "s3://bucket/mcap-data/", 
3    topics={"/camera/image_raw", "/lidar/points"}, 
4    time_range=(1000000000, 5000000000),
5    message_types={"sensor_msgs/Image", "sensor_msgs/PointCloud2"} 
6)

Read more on the documentation.

LinkStructured Data Support

Another area we’ve invested heavily is adapting Ray Data’s to provide higher level DataFrame-style APIs.

LinkColumnar Expressions API

Historically, Ray Data APIs were predominantly UDF-based like map, map_batches, filter, etc. While providing UDF might be very expressive and convenient for the user, it severely limits effectiveness of the optimization engine and the query planner:

  • UDFs are black-boxes – it handles the entire block/row and the Ray Data query engine simply doesn’t know what is going on in it.

  • Optimizations like predicate-pushdown, projection-pushdown, limit-pushdown, and others rely on operation reordering based on the analysis of the pipeline’s logical plan, where “reducing” operations (like filter or projection) are moved closer to the data source.

Using UDFs in the pipeline often render such optimizations ineffective – being unable to push such optimizations through them.

To address that problem we’ve introduced expression-based APIs: such APIs, instead of UDFs, accept columnar expressions – vectorized transformations applied to another column(s) and/or literals. 

Columnar expressions could be easily analyzed by Ray Data, since they clearly define all of the columns they depend on by explicitly referencing them in the expression tree. This allows for a rich set of optimizations being applied either directly (like duplication computation elimination) or pushed through them (like predicate/projection/limit-pushdowns), which isn’t feasible with UDFs.

Consider 2 following examples: we are adding a new column called “doubles” by using map_batches:

1import ray
2
3# Add a column doubling the values of the other
4def _add_column_doubles(block):
5	block["doubles"] = block["id"] * 2
6	return block
7
8ds = (
9ray.data.range(1e9)
10.map_batches(_add_column_doubles)
11# This predicate could save a lot of 
12# compute time if pushed into the source!
13#
14# However, it can’t be *safely* pushed 
15# through since engine can’t know exactly
16# how UDF is using column "id"
17.filter(lambda row: row["id"] < 3)
18.materialize()
19)

Or by using with_column, with corresponding columnar expression:

1import ray
2from ray.data.expressions import col
3
4ds = (
5ray.data.range(1e9)
6# Add a column doubling the values of the other
7.with_column(col("id") * 2)
8# This predicate could now be pushed through closer to the 
9# data source, substantially reducing amount of computation
10.filter(lambda row: row["id"] < 3)
11.materialize()
12)

LinkNew Optimization Rules

In addition to expression based  APIs, we’ve added following optimizations:

  • Projection pushdown: pushes down requested projection (list of columnar expressions) closer to the data source

  • Predicate pushdown: pushes down requested filters (as a list of columnar expressions) closer to the data source

Both of these rules could lead to substantial reduction in IO and computations performed by reducing the amount of unused data from being pointlessly propagated between operations in the pipeline.

LinkJoins, Shuffles, Aggregations 

One common operation in dataframes is wide operations like joins and shuffles. In 2.46, we announced a new hash-shuffle backend and support for joins in Ray Data.

Since then:

  1. Hash-shuffle has become the default shuffling algorithm in Ray Data (since 2.50), substantially outperforming the previous range-based solution. 

  2. We’ve improved performance of range-based shuffle and sorting operations.

  3. We’ve expanded the set of joins that Ray Data supports: anti- and semi-joins.

  4. We’ve added additional support for joining tables that contain variable-width types (lists, etc) as well as extensions types (like Ray Data tensors) not natively supported by Arrow.

  5. We’ve also added a variety of prebuilt aggregations, like ApproximateTopK and ApproximateQuantile. This allows for much more efficient performance in calculating quantiles and top K values, improving performance for various feature preprocessing steps.

LinkParquet Reading Performance

The parquet file format is one of the cornerstones of large scale data processing workloads. We have optimized Ray Data’s Parquet Data Source to specifically improve its performance on large datasets.

As a result, we saw up to 3x better performance on reading large datasets with more than 10k files, primarily coming from optimizations in the dataset metadata collection.

LinkCatalog support

Ray Data now connects to Unity Catalog, letting you read UC-governed tables and volumes in-region with policies fully enforced. With built-in support for Delta Lake, Apache Iceberg, structured, and unstructured data, you can now take advantage of Ray Data’s powerful data processing primitives right on your governed data. 

1import ray
2
3ds = ray.data.read_unity_catalog(  
4    table="main.sales.transactions",
5    url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
6    token="dapi...",
7    region="us-west-2"
8)
9ds.show(3)

Write support to Unity Catalog and Delta Lake is also on the way, allowing for end-to-end, governable pipelines powered by Ray Data.

LinkPerformance and stability

To showcase some of the optimizations called out above we’ve captured a series of internal benchmarks that we’re sharing below.

LinkBenchmarking Shuffle Implementations

Setup: 8 x 8 vCPU, 64Gb nodes

1path = f"s3://ray-benchmark-data/tpch/parquet/sf100/lineitem"
2
3start = time.perf_counter()
4
5ds = (
6    ray.data.read_parquet(path)
7    .rename_columns({
8        "column05": "l_extendedprice",
9        "column08": "l_returnflag",
10        "column13": "l_shipinstruct",
11        "column14": "l_shipmode",
12    })
13    # Compute avg price of parcels per returned status, shipping instruction, shipping mode
14    .groupby(["l_returnflag", "l_shipinstruct", "l_shipmode"])  # ~84 groups
15    .mean("l_extendedprice")
16)
17
18ds.materialize()

Results:


2.46

2.51

(2.51) Range-based shuffle 


582 s (1.0 x)

(2.51) Hash-based shuffle

233 s

83 s

(7.0x)

LinkQuantile Calculation

We first compare the built-in Quantile aggregation in 2.46 with the new ApproximateQuantile in 2.51, using 4 m7i.4xlarge worker nodes, which each have 16 CPUs and 64 GB of memory. 

Note that this is not an apples-to-apples comparison, but this aims to show that ApproximateQuantile is a scalable and efficient implementation when you can tolerate lower precision.

1import ray.data
2from ray.data.aggregate import Quantile, ApproximateQuantile
3from ray.data.context import DataContext, ShuffleStrategy
4
5# Needed for 2.46
6ctx = DataContext.get_current()
7ctx.shuffle_strategy = ShuffleStrategy.HASH_SHUFFLE
8
9ds = ray.data.read_parquet("s3://ray-benchmark-data/tpch/parquet/sf100/orders")
10ds = ds.rename_columns({
11   "column2": "ORDERSTATUS",
12   "column3": "TOTALPRICE",
13   "column5": "ORDER-PRIORITY",
14})
15aggs = [
16    ApproximateQuantile(on="TOTALPRICE", quantiles=[0.25, 0.5, 0.75], alias_name="L_Q")]
17# If on 2.46: 
18# aggs = [
19    Quantile(on="TOTALPRICE", q=0.25, alias_name="L_Q"),
20    Quantile(on="TOTALPRICE", q=0.5, alias_name="M_Q"),
21    Quantile(on="TOTALPRICE", q=0.75, alias_name="H_Q")
22]
23ds = ds.groupby(key=["ORDERSTATUS",  "ORDER-PRIORITY"]).aggregate(*aggs
24).materialize()
25print(ds.show())

Comparing the two, we see that performance is 4.8x faster using Ray Data 2.51’s new ApproximateQuantile calculation on the same hardware:


2.46

2.51

Comparing Quantile to  ApproximateQuantile

311.78 s

65.40 s

LinkImage Batch Inference

We also compare image batch inference pipelines on Ray, running on 8 g6.xlarge worker nodes. This pipeline looks roughly like the following:

ray summit ray data blog image 2
ray summit ray data blog image 2

Comparing this pipeline from 2.46 to 2.51, we see that runtime improves by 2x.


2.46

2.51

Image Batch Inference

1160 s

549 s

You can find the code of this pipeline on Github.

LinkTo conclude

We are incredibly excited about the progress Ray Data has made in the past year, driven by the needs of the community and the rapidly evolving AI landscape. 

The advancements in multimodal data processing, structured data support, and improvements in performance and stability show how Ray Data is becoming a powerful and versatile data processing library for AI workloads. 

Check out the Ray Data documentation and Github.

Ready to try Anyscale?

Access Anyscale today to see how companies using Anyscale and Ray benefit from rapid time-to-market and faster iterations across the entire AI lifecycle.