Ray Train is a scalable machine learning library for distributed training and fine-tuning.
Ray Train allows you to scale model training code from a single machine to a cluster of machines in the cloud, and abstracts away the complexities of distributed computing. Whether you have large models or large datasets, Ray Train is the simplest solution for distributed training.
With Ray Train, you can:
Scale to multi-GPU and multi-node training with 0 code changes
Run any cloud (AWS, GCP, Azure, Kubernetes, or on-prem)
Use any ML framework including PyTorch, TensorFlow, Horovod, and many more
Easily run distributed data loading and hyperparameter tuning
Use built-in loggers for TensorBoard and MLflow
Existing solutions for distributed training often fall on either side of the wide gap between prototyping and production model training.
The prototyping side is populated with frameworks and libraries that are lightweight, focusing on development velocity and fast iteration, such as Huggingface Transformers and PyTorch Lightning. These frameworks are targeted towards data scientists—and leave the burden of cluster management and operations to the MLOps practitioner, who has to manage these frameworks at scale.
On the other side are heavyweight production frameworks like Kubeflow and SageMaker. The Kubeflow Training Operator provides a solution for distributed training on Kubernetes, but it lacks ease of use for development. Debugging a distributed training environment often requires relaunching the entire job and waiting for multiple minutes to see results.
We wanted to build a framework that could bring the best of both worlds together—extremely fast iteration while making it really easy to scale on different cluster environments.
Ray Train makes it easy to distributed computing across any accelerator, including GPUs, CPUS, or heterogeneous clusters with both types of accelerators. With Ray Train, you can maximize efficiency while reducing costs, and ensure you’re training on the most cost-effective accelerators for your model.
If the GPU training is bottlenecked on expensive CPU preprocessing and the preprocessed Dataset is too large to fit in object store memory, then materializing the dataset doesn’t work. In this case, Ray’s native support for heterogeneous resources enables you to simply add more CPU-only nodes to your cluster, and Ray Data automatically scales out CPU-only preprocessing tasks to CPU-only nodes, making GPUs more saturated.
In general, adding CPU-only nodes can help in two ways:
Adding more CPU cores helps further parallelize preprocessing. This approach is helpful when CPU compute time is the bottleneck.
Increasing object store memory, which 1) allows Ray Data to buffer more data in between preprocessing and training stages, and 2) provides more memory to make it possible to cache the preprocessed dataset. This approach is helpful when memory is the bottleneck.
When it comes to iterative development, it is invaluable to run code immediately after writing it. But unfortunately, scaling development from a local or Jupyter environment to a production environment can be complicated and complex.
At Anyscale, we’re on a mission to make scalable computing effortless, which is why the Anyscale Platform supports VSCode and Jupyter notebooks directly in the API. That way, developers can program on their laptop and scale training to the cloud without the hassle of manually setting up distributed computing. This sets Ray Train apart from existing solutions, which will often make you wait for instances to start up in order to run your training script every time you make a code change or can be very complicated to integrate.
Ultimately, Ray Train is fast to integrate, easy to scale, and allows you to iterate very quickly.
When it comes to moving a training job to production, there are many aspects to consider such as the cost of long-running jobs, the ability to run on specific clouds or Kubernetes, and the availability of the right monitoring and experiment tracking functionalities. Ray Train was built with these production-level requirements in mind, so you can easily develop and scale to the cloud.
Ray Train provides simple solutions for three common scaling patterns: distributing across multiple machines, training on GPUs, and running on a remote cluster.
Scale to Multiple Machines: With Ray, scaling Ray Train from your laptop to a multi-node setup is handled entirely by setting up your Ray cluster. The same Ray Train script running locally can be run on a Ray cluster with multiple nodes without any additional modifications, just as if it were running on a single machine with more resources. You can further increase num_workers to increase your training parallelism and utilize your cluster resources.
Train on GPUs: With Ray Train, you can configure your Ray cluster to run on any accelerator, whether that’s CPUs, GPUs, or heterogeneous clusters with both CPUs and GPUs.
Run a Remote Cluster: Ray Train is the easiest way to deploy your AI/ML model to a remote cluster using distributed computing.
We know how important it is to be able to trust your model training process. Ray Train supports a variety of features to improve reliability including:
Fault Tolerance: Handle node failures in a long-running training job on a cluster of preemptible machines/pods.
Spot Instance Support: When training large or long running jobs, the cost of compute instances can easily grow to significant amounts. Ray Train solves this by providing built-in fault tolerance which allows you to run distributed training workers on spot instances, reducing the cost of your workflow by up to 90%. When a spot instance is interrupted, Ray Train will restore the worker on a new instance and resume training.
 Figure 1. Ray Train autoscaling spot instance support.
Figure 1. Ray Train autoscaling spot instance support.Persistent Storage: Save your model to persistent storage, and use it for downstream serving/inference.
Checkpointing: Ray Train provides a way to snapshot training progress with Checkpoints. Saving checkpoints to a persistent storage location allows you to resume training from the last checkpoint in case of a node failure. See Saving and Loading Checkpoints for a detailed guide on how to set up checkpointing.
Ray Train also supports the ability to save checkpoints from multiple workers—also known as distributed checkpointing. When doing model-parallel training, Ray Train checkpointing provides an easy way to upload model shards from each worker in parallel, without needing to gather the full model to a single node.
This is important because in model parallel training strategies where each worker only has a shard of the full-model, you can save and report checkpoint shards in parallel from each worker.
 Figure 2. Distributed checkpointing in Ray Train. Each worker uploads its own checkpoint shard to persistent storage independently.
Figure 2. Distributed checkpointing in Ray Train. Each worker uploads its own checkpoint shard to persistent storage independently.Distributed checkpointing is the best practice for saving checkpoints when doing model-parallel training (e.g., DeepSpeed, FSDP, Megatron-LM).
There are two major benefits:
It is faster, resulting in less idle time. Faster checkpointing incentivizes more frequent checkpointing!
Each worker can upload its checkpoint shard in parallel, maximizing the network bandwidth of the cluster. Instead of a single node uploading the full model of size M, the cluster distributes the load across N nodes, each uploading a shard of size M / N.
Distributed checkpointing avoids needing to gather the full model onto a single worker’s CPU memory.
This gather operation puts a large CPU memory requirement on the worker that performs checkpointing and is a common source of OOM errors.
It’s important that a distributed training framework doesn’t box you in or limit your flexibility. Ray is and always will be open-source, and will work with anything in the Python ecosystem.
Loading data with Dask? No problem! Run Dask with the performance of Ray with Dask on Ray.
ETLs with Apache Spark? Again, this is common. Run Spark with the speed of Ray with Spark on Ray, or simply call out to a Ray cluster from within your PySpark notebook with the interactive Ray Client or run a Ray Job.
Ultimately, we feel the ML ecosystem is so broad and multifaceted that the best solution for distributed training will be both open-source and flexible. That’s why Ray Train works with any framework, including:
PyTorch
Hugging Face
DeepSpeed
Tensorflow
XGBoost
Keras
Horovod
LightGBM
And more!
Spending tons of time and developer resources to build out a ML training system that is tightly coupled to a proprietary cloud provider can limit flexibility in the future and/or lead to high cost of compute.
Developing on Ray with Ray Train allows teams to avoid cloud lock-in, and deploy their training or ML platforms with or without Kubernetes anywhere, including on-prem.
Ray Train is built on top of Ray Core, and it integrates with other Ray libraries like Ray Data and Ray Tune. This enables advanced capabilities for a range of ML workloads like:
Data loading and preprocessing
Large-scale training jobs simply cannot be cost-effective without efficient data loading. To solve this problem, Ray Train integrates with Ray Data to offer a performant and scalable streaming solution for loading and preprocessing large datasets. Key advantages include:
Streaming data loading and preprocessing, scalable to petabyte-scale data.
Scaling out heavy data preprocessing to CPU nodes, to avoid bottlenecking GPU training.
Automatic and fast failure recovery.
Automatic on-the-fly data splitting across distributed training workers.
 Figure 3. Passing a Ray Dataset allows data to be sharded across the workers.
Figure 3. Passing a Ray Dataset allows data to be sharded across the workers.By integrating these two Ray Libraries, we can unlock a number of features including:
Sharded Datasets: Only the fraction of the data that is needed by each worker will be transferred to and read by the worker, enabling you to train on large datasets that do not fit into a single node’s memory.
Windowed Datasets: Only apply the chain of Dataset operations to a subset of the data at a time, limiting the working set size. This allows you to efficiently train on large datasets that do not fit into your cluster’s memory at the same time.
Pipelined Execution: When training on one batch of data, the next batch will be processed/loaded in parallel. This can reduce GPU idle time and decrease overall job training time.
Global Shuffling: Shuffling of the entire dataset between epochs to optimize training performance over no shuffling or local shuffling.
 Figure 4. Ray Datasets provides an 8x improvement over other solutions.
Figure 4. Ray Datasets provides an 8x improvement over other solutions.By pairing distributed training with distributed data, we see large improvements in the size, quality, and speed of large-scale ML data ingestion. For a more comprehensive walkthrough, see Deep Dive: Data Ingest in a Third Generation ML Architecture.
Hyperparameter tuning with Ray Tune is natively supported with Ray Train.
This native integration provides a few conveniences:
Minimal Changes: Training function utilities (train.report(), train.checkpoint()) are directly translated to their Tune equivalents (tune.report(), tune.checkpoint()).
Resource Management: Resource requirements are defined when initializing the Trainer, so you do not need to explicitly define any resource requests in your Tune experiment.
Developing on Ray with Ray Train allows teams to avoid cloud lock-in, and deploy their training or ML platforms with or without Kubernetes anywhere, including on-prem.