Distributed deep learning with Ray Train is now in Beta

By Matthew Deng, Amog Kamsetty, Richard Liaw and Will Drevo   

Introducing Ray Train, an easy-to-use library for distributed deep learning. In this post, we show how Ray Train improves developer velocity, is production-ready, and comes with batteries included.

Ray Train

Ray is simplifying the APIs of its ML ecosystem as it heads towards Ray 2.0. This blog announces a core feature, distributed deep learning, as part of a broader series of changes to the Ray ML ecosystem.

Today’s distributed deep learning tools suffer from a major problem: there exists a wide gap between prototyping and production model training.

Existing solutions such as Kubeflow and Sagemaker force practitioners to make a tradeoff between developer velocity and scalability.

To address this gap, we built Ray Train, a library that simplifies distributed training. Currently in its Beta release, it offers the following features:

  • Scale to multi-GPU and multi-node training with 0 code changes

  • Runs seamlessly on any cloud (AWS, GCP, Azure, Kubernetes, or on-prem)

  • Supports PyTorch, TensorFlow, and Horovod 

  • Distributed data loading and hyperparameter tuning

  • Built-in loggers for TensorBoard and MLflow

The upcoming roadmap for Ray Train can be found below.


In this post, we will introduce some of the benefits and values that Ray Train provides for distributed deep learning training today. We will showcase examples of using Ray Train with PyTorch, but they can be adapted to work with TensorFlow and Horovod as well.

Background: Why Ray Train?

Existing solutions for distributed training often fall on either side of the wide gap between prototyping and production model training.

The prototyping side is populated with frameworks and libraries that are lightweight, focusing on development velocity and fast iteration, such as Huggingface Transformers and PyTorch Lightning. These frameworks are targeted towards data scientists -- and leave the burden of cluster management and operations to the MLOps practitioner, who has to manage these frameworks at scale. 

On the other side are heavyweight production frameworks like Kubeflow and SageMaker. The Kubeflow Training Operator provides a solution for distributed training on Kubernetes, but it lacks ease of use for development. Debugging a distributed training environment often requires relaunching the entire job and waiting for multiple minutes to see results. 

We wanted to build a framework that could bring the best of both worlds together -- extremely fast iteration while making it really easy to scale on different cluster environments.

We built Ray Train with the following requirements in mind:

  1. Developer velocity: Reduce the friction to go from training on your laptop to training on any distributed cluster.

  2. Production-ready: Run end-to-end distributed model training with first class support for cloud compute and experiment monitoring.

  3. Batteries included: Capable of integrating with third party libraries and comes with powerful built-in integrations for large-scale data loading and distributed hyperparameter optimization. 

Developer Velocity

The number one request we heard among developers and data scientists was to reduce the iterative cycle of development from a local or Jupyter environment to a production environment.

Existing solutions will often make you wait for instances to start up in order to run your training script every time you make a code change or can be very complicated to integrate. 

Ray Train is fast to integrate, easy to scale, and allows you to iterate very quickly.

Ray Train is fast to integrate

Quickly distribute your existing PyTorch, TensorFlow, or Horovod code with five simple steps.

For example, an existing PyTorch training script can be converted to run across num_workers=4 worker processes with the following changes:

1
2
3
4
5
6
7
8
9
10
11
12
def train_func(): # 1. Wrap code in a function.
# Existing model and dataloader setup.
    model = train.torch.prepare_model(model)  # 2. Sets up data parallelism.
    dataloader = train.torch.prepare_data_loader(dataloader) # 2. Samples data.
    for _ in range(num_epochs):
# Existing training loop.

num_workers = 4 # 3. Define your parallelization factor. 
trainer = Trainer(backend="torch", num_workers=num_workers) # 3. Initialize Trainer.
trainer.start() # 3. Setup worker processes.
results = trainer.run(train_func) # 4. Run training function.
trainer.shutdown() # 5. Tear down worker processes.

What’s happening with these code changes? 

  1. Wrap your code in a function: Ray Train packages and sends this function to run on each worker process. 

  2. Use Ray Train “prepare” utility functions: These set up data parallelism when run on multiple workers (under the hood, this uses Distributed Data Parallelism).

  3. Instantiate Ray Train trainer: Define your training backend and number of workers. Worker processes are started and the backend communication framework (e.g., Torch process group) is set up.

  4. Run training function: The training function is executed on each worker and results are communicated back to the Trainer.

  5. Tear down work processes: Clean up processes and release resources.

To see this in action, see here for a simple example you can run directly on your laptop.

Ray Train is easy to scale

Ray Train provides simple solutions for three common scaling patterns: distributing across multiple machines, training on GPUs, and running on a remote cluster.

Scale to multiple machines 

With Ray, scaling Ray Train from your laptop to a multi-node setup is handled entirely by setting up your Ray cluster.

The same Ray Train script running locally can be run on a Ray cluster with multiple nodes without any additional modifications, just as if it were running on a single machine with more resources. You can further increase num_workers to increase your training parallelism and utilize your cluster resources.

Training with multiple GPUs

GPU training can be toggled via the Trainer. If your Ray cluster has GPUs, you can turn on GPU training by enabling the use_gpu flag when initializing the Trainer. Each worker process will be associated with a single GPU (which may be on the same or different machines), and by default it will use NCCL for communication. 

Tying this together, to scale your workload across 100 GPUs, you would update your Trainer:

1
trainer = Trainer(backend=”torch”, num_workers=100, use_gpu=True)

Run on a remote cluster

Submit Ray Train jobs to a remote cluster from your laptop, another server, or a Jupyter notebook easily using  Ray Client (for interactive runs) or with Ray Jobs (for production-ready runs).

Ray Train is quick to iterate 

While distributed training infrastructure solutions such as SageMaker and Kubeflow offer scaling, what really makes Ray Train stand out is its focus on developer productivity.

When it comes to iterative development, it is invaluable to run code immediately after writing it.  Take a look at how long it takes to re-run a distributed training script after making a code change on two 8-GPU nodes. 

Iteration time
Image: Ray Train has minimal overhead for iterative development.

In this example, we chose to use a familiar MNIST training script which has a training loop that executes in one minute. The difference in iteration time occurs during the startup period between writing code and running code -  with Ray Train there is zero additional overhead in starting up instances and minimal time spent setting up the distributed processes, so you can start your training function within seconds

The reason for the difference in startup time is that in Ray, you are able to reuse a cluster when iterating. While there still exists a one-time cost of starting up the cluster, subsequent runs are much faster. So not only can you run the same code on your laptop as your cluster, but you can also iteratively test your code directly on a distributed cluster!

Production Ready

When it comes to moving a training job to production, there are many additional aspects to consider such as the cost of long-running jobs, the ability to run on specific clouds or Kubernetes, and having the right monitoring and experiment tracking functionality. Ray Train was built with these production-level requirements in mind. 

Save compute costs with spot instances and fault tolerance 

When training large or long running jobs, the cost of compute instances can easily grow to significant amounts. Ray Train solves this by providing built-in fault tolerance which allows you to run distributed training workers on spot instances, reducing the cost of your workflow by up to 90%. When a spot instance is interrupted, Ray Train will restore the worker on a new instance and resume training.

timeline of fault tolerance
Image: Timeline of fault tolerance handling logic.

Fault tolerance can be enabled by implementing logic to save and load checkpoints

Deploy anywhere: Multi-cloud and Kubernetes-ready

Spending tons of time and developer resources to build out a ML training system that is tightly coupled to a proprietary cloud provider can limit flexibility in the future and/or lead to high cost of compute. 

Developing on Ray with Ray Train allows teams to avoid cloud lock-in, and deploy their training or ML platforms with or without Kubernetes anywhere, including on-prem. 

Monitor with integrated tools

Ray Train has a TrainingCallback interface that can be used to process intermediate results (e.g., at the end of a training epoch). A few out-of-the-box callbacks are available, some of which integrate with your favorite monitoring tools.

Callback
Image: Using callbacks to integrate with different monitoring tools.

In the near future, we plan to add additional callbacks for other integrations. In the meantime, the user can also extend the TrainingCallback API to define their own custom callback logic!

Batteries Included

A distributed training framework should allow developers to be flexible in incorporating tools and utilities, but also should not require them to code vital functionality from scratch. We wanted to let developers leverage as much of the open-source data ecosystem as possible.

Distributed Data Loading with Ray Datasets

Large-scale training jobs simply cannot be cost-effective without efficient data loading. To solve this problem, Ray Train integrates with Ray Datasets to perform distributed data loading.

Ray dataset
Image: Passing a Ray Dataset allows data to be sharded across the workers.

This integration unlocks a number of features.

  1. Sharded datasets: Only the fraction of the data that is needed by each worker will be transferred to and read by the worker, enabling you to train on large datasets that do not fit into a single node’s memory.

  2. Windowed datasets: Only apply the chain of Dataset operations to a subset of the data at a time, limiting the working set size. This allows you to efficiently train on large datasets that do not fit into your cluster’s memory at the same time.

  3. Pipelined execution: When training on one batch of data, the next batch will be processed/loaded in parallel. This can reduce GPU idle time and decrease overall job training time.

  4. Global shuffling: Shuffling of the entire dataset between epochs to optimize training performance over no shuffling or local shuffling.

Ray dataset 2
Image: Ray Datasets provides an 8x improvement over other solutions.

By pairing distributed training with distributed data, we see large improvements in the size, quality, and speed of large-scale ML data ingestion. For a more comprehensive walkthrough, see Deep Dive: Data Ingest in a Third Generation ML Architecture.

Hyperparameter Optimization with Ray Tune

Ray Train provides an integration with Ray Tune that allows you to perform hyperparameter optimization in just a few lines of code.

1
2
trainable = trainer.to_tune_trainable(train_func)
analysis = tune.run(trainable, config=...)

Tune will create one Trial per hyperparameter configuration. In each Trial, a new Trainer will be initialized  and run the training function with its generated configuration.

trainer trainable
Image: Using Ray Tune to conduct a distributed hyperparameter search.

This native integration provides a few conveniences: 

  1. Minimal changes: Training function utilities (train.report(), train.checkpoint()) are directly translated to their Tune equivalents (tune.report(), tune.checkpoint()).

  2. Resource management: Resource requirements are defined when initializing the Trainer, so you do not need to explicitly define any resource requests in your Tune experiment.

For examples, see Ray Train Examples - Ray Tune Integration Examples

Flexibility: use anything in the Python ecosystem 

It’s important that a distributed training framework doesn’t box you in or limit your flexibility. 

Ray is and always will be open-source, and will work with anything in the Python ecosystem.

Loading data with Dask? No problem! Run Dask with the performance of Ray with Dask on Ray.

ETLs with Apache Spark? Again, this is common. Run Spark with the speed of Ray with Spark on Ray, or simply call out to a Ray cluster from within your PySpark notebook with the interactive Ray Client or run a Ray Job.

Ultimately, we feel the ML ecosystem is so broad and multifaceted that the best solution for distributed training will be both open-source and flexible.

Next Steps

As of today, Deep Learning on Ray Train is officially in Beta. 

As development continues, Ray Train will be extended with a focus on integrating with both Ray and third party libraries. This includes:

  • Dataset preprocessing and feature transformations to operate on Ray Datasets

  • Streamlined model exporting to Ray Serve for model serving

  • Expanding beyond deep learning to integrate with XGBoost-Ray, LightGBM-Ray, and HuggingFace Transformers

  • Elastic training to support dynamically scaling the number of training workers for improved speed and performance

  • Support for advanced deep learning models such as Graph Neural Networks and Embeddings

  • Support for alternative distributed training strategies such as asynchronous parameter servers

  • Numerous performance optimizations such as FP16 compression

  • Improved profiling and monitoring with tools such as PyTorch Profiler

  • Integrations with more of your favorite experimentation tracking tools

To learn more about Ray Train, you can visit the documentation. If you are already using Ray Train, we’d love to hear your feedback through the User Survey. Lastly, If you have any questions about Ray Train, you can reach out to the Ray community on Discourse and Slack.

Contributions: We are actively seeking development partners and open-source committers, so please drop a Github issue or get in contact if you’re interested!

Sharing

Tags

Ray Train

Sign up for product updates