Introducing Ray Lightning: Multi-node PyTorch Lightning training made easy

By Amog Kamsetty, Richard Liaw and Will Drevo   

TL;DR: Use PyTorch Lightning with Ray to enable multi-node training and automatic cluster configuration with minimal code changes.

pytorch-lightning-ray

PyTorch Lightning is a library that provides a high-level interface for PyTorch, and helps you organize your code and reduce boilerplate. By abstracting away engineering code, it makes deep learning experiments easier to reproduce and improves developer productivity.

PyTorch Lightning also includes plugins to easily parallelize your training across multiple GPUs which you can read more about in this blog post. This parallel training, however, depends on a critical assumption: that you already have your GPU(s) set up and networked together in an efficient way for training

While you may have a managed cluster like SLURM for multi-node training on the cloud, setting up the cluster and its configuration is no easy task.  As described in this blog post configuring the cluster involves:

  • Making sure all the nodes in the cluster can communicate with each other

  • Making the code accessible to each node

  • Setting up the proper PyTorch environment variables on each node

  • Running the training script individually on each node.

Multi-node training with PyTorch Lightning has a couple of other limitations as as well:

  • Setting up a multi-node cluster on any cloud provider (AWS, Azure, GCP, or Kubernetes) requires a significant amount of expertise

  • Multi-node training is not possible if you want to use a Jupyter notebook

  • Automatically scaling your GPUs up / down to reduce costs will require a lot of infrastructure and custom tooling.

Ray Lightning vs. PyTorch Native comparison

Wouldn’t it be great to be able to leverage multi-node training without needing  extensive infrastructure expertise?

And wouldn’t it be even better if you could do so with no code changes?

Introducing Ray Lightning

Ray Lightning is a simple plugin for PyTorch Lightning to scale out your training. Here are the main benefits of Ray Lightning:

  • Simple setup. No changes to existing training code.

  • Easily scale up. You can write the same code for 1 GPU, and change 1 parameter to scale to a large cluster.

  • Works with Jupyter Notebook. Simply launch a Jupyter Notebook from the head node and access all the resources on your cluster.

  • Seamlessly create multi-node clusters on AWS/Azure/GCP via the Ray Cluster Launcher.

  • Integration with Ray Tune for large-scale distributed hyperparameter search and SOTA algorithms.

And best of all, it is fully open source and free to use!

Underneath the hood, Ray Lightning leverages Ray, a simple library for distributed computing in Python. 

With Ray Lightning, scaling up your PyTorch Lightning training becomes much easier and much more flexible!

How does Ray Lightning work?

Ray Lightning uses the PyTorch Lightning “plugin” interface to offer a RayPlugin that you can add to your Trainer. It works similar to the built-in DDPSpawn Plugin that PyTorch Lightning has, but instead of spawning new processes for training, the RayPlugin creates new Ray Actors. These actors are just Python processes, except they can be scheduled anywhere on the Ray cluster, allowing you to do multi-node programming without leaving your Python script. 

Ray Lightning Architecture
Communication between Ray actors on multi-node cluster

Each Ray actor will contain a copy of your LightningModule and they will automatically set the proper environment variables and create the PyTorch communication group together. This means that underneath the hood, Ray is just running standard PyTorch DistributedDataParallel, giving you the same performance, but with Ray you can run your training job programmatically and automatically scale instances up and down as you train.

Managing the Cluster

Typically,  managing clusters can be a pain, especially if you don’t have an infra or ML platform team. But with Ray, this becomes very easy —  you can start a Ray cluster with the Ray cluster launcher.

Ray’s cluster launcher supports all the major cloud providers (AWS, GCP, Azure) and also has a Kubernetes operator. So you can run your Ray program wherever you need. And once your code can run on a Ray cluster, migrating or changing clouds is easy.

To launch a Ray cluster on AWS for example, you need a cluster YAML file specifying configuration details like below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
cluster_name: ml

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    availability_zone: us-west-2a,us-west-2b

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ubuntu

head_node:
    InstanceType: p3.8xlarge
    ImageId: latest_dlami

    # You can provision additional disk space with a conf as follows
    BlockDeviceMappings:
        - DeviceName: /dev/sda1
          Ebs:
              VolumeSize: 100
worker_nodes:
    InstanceType: p3.2xlarge
    ImageId: latest_dlami

file_mounts: {
    "/path1/on/remote/machine": "/path1/on/local/machine",
}

# List of shell commands to run to set up nodes.
setup_commands:
    - pip install -U ray-lightning

The information you put in file_mounts will be synced to all nodes in the cluster, so this is where you can put your training script. For any additional dependencies that you need to install, you can specify them (i.e. pip install foobar) in the setup_commands. They will be installed on all nodes in the cluster.

Once you have your YAML file, you can simply do ray up cluster.yaml to launch the nodes and create a Ray cluster.

Then you can do ray attach cluster.yaml to ssh into the head node of your Ray cluster.

The great thing about the cluster launcher is that it will automatically add new nodes if more resources are requested than the current cluster has available. Also, if there are idle nodes, Ray will automatically terminate them.

Putting it all together

Now let’s see how we can put everything together and easily train a simple MNIST Classifier on the cloud.

Package Installation

First let’s install Ray Lightning using:  

1
pip install ray-lightning

This will also install PyTorch Lightning and Ray for us.

Vanilla PyTorch Lightning

First step is to get our PyTorch Lightning code ready. We first need to create our classifier model which is an instance of LightningModule. Here is an example of a simple MNIST Classifier adapted from the PyTorch Lightning guide:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import pytorch_lightning as pl
import torch
from torch.utils.data import random_split, DataLoader
from torchvision.datasets import MNIST
from torchvision import transforms

class LightningMNISTClassifier(pl.LightningModule):
    def __init__(self, config, data_dir=None):
        super(LightningMNISTClassifier, self).__init__()

        self.data_dir = data_dir
        self.lr = config["lr"]
        layer_1, layer_2 = config["layer_1"], config["layer_2"]
        self.batch_size = config["batch_size"]

        # mnist images are (1, 28, 28) (channels, width, height)
        self.layer_1 = torch.nn.Linear(28 * 28, layer_1)
        self.layer_2 = torch.nn.Linear(layer_1, layer_2)
        self.layer_3 = torch.nn.Linear(layer_2, 10)
        self.accuracy = pl.metrics.Accuracy()

    def forward(self, x):
        batch_size, channels, width, height = x.size()
        x = x.view(batch_size, -1)
        x = self.layer_1(x)
        x = torch.relu(x)
        x = self.layer_2(x)
        x = torch.relu(x)
        x = self.layer_3(x)
        x = F.softmax(x, dim=1)
        return x

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

    def training_step(self, train_batch, batch_idx):
        x, y = train_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        self.log("ptl/train_loss", loss)
        self.log("ptl/train_accuracy", acc)
        return loss

    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch
        logits = self.forward(x)
        loss = F.nll_loss(logits, y)
        acc = self.accuracy(logits, y)
        return {"val_loss": loss, "val_accuracy": acc}

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x["val_loss"] for x in outputs]).mean()
        avg_acc = torch.stack([x["val_accuracy"] for x in outputs]).mean()
        self.log("ptl/val_loss", avg_loss)
        self.log("ptl/val_accuracy", avg_acc)

    def prepare_data(self):
        self.dataset = MNIST(
            self.data_dir,
            train=True,
            download=True,
            transform=transforms.ToTensor())

    def train_dataloader(self):
        dataset = self.dataset
        train_length = len(dataset)
        dataset_train, _ = random_split(
            dataset, [train_length - 5000, 5000],
            generator=torch.Generator().manual_seed(0))
        loader = DataLoader(
            dataset_train,
            batch_size=self.batch_size,
            num_workers=1,
            drop_last=True,
            pin_memory=True,
        )
        return loader

    def val_dataloader(self):
        dataset = self.dataset
        train_length = len(dataset)
        _, dataset_val = random_split(
            dataset, [train_length - 5000, 5000],
            generator=torch.Generator().manual_seed(0))
        loader = DataLoader(
            dataset_val,
            batch_size=self.batch_size,
            num_workers=1,
            drop_last=True,
            pin_memory=True,
        )

Then we need to instantiate this model, create our Trainer and start training.

1
2
3
4
model = LightningMNISTClassifier(config, data_dir="./")

trainer = pl.Trainer( max_epochs=10)
trainer.fit(model)

And that’s it for single threaded execution - you can now train your classifier on your laptop. Now let’s parallelize across a large cluster using GPUs with the Ray Lightning Plugin.

Parallelizing on Laptop with Ray Lightning

To use Ray Lightning, we simply need to add the RayPlugin to our Trainer.

Let’s first see how we can parallelize training across the cores of our laptop by adding the RayPlugin. For now, we will disable GPUs. To go straight to parallel training on a cluster with GPUs, head on over to the next section.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from ray_lightning import RayPlugin

class LightningMNISTClassifier(...):
   # ... etc

# variables for Ray around parallelism and hardware
num_workers = 8
use_gpu = False

# Initialize ray.
ray.init()

model = LightningMNISTClassifier(config, data_dir)

trainer = pl.Trainer(
    max_epochs=10, 
    plugins=[RayPlugin(num_workers=num_workers, use_gpu=use_gpu)])
trainer.fit(model)

And with just those small changes, we can run the script again, except have training parallelized with 8 workers (i.e. 8 processes).

For this, you will need a Ray cluster though. Let’s see how to do that.

Multi-node Training on a Ray Cluster

To leverage multiple GPUs, and possible multiple nodes for training, we just have to use the Ray cluster launcher with the RayPlugin.

First, we start up the Ray Cluster by following the instructions above

1
ray up cluster.yaml

For a full step-by-step guide with all the possible configurations you can add to your YAML file you can check out the instructions here.

Make sure to add your training script to the file_mounts section and any pip dependencies as part of the setup_commands.

Once your cluster has started, then you can ssh into the head node via

1
ray attach cluster.yaml

You should see your training script synced on this head node since you added to the file_mounts of your cluster.yaml. 

Now you just take the same code from the previous section, and make 2 changes:

  • ray.init() -> ray.init(“auto”) so Ray knows to connect to the cluster instead of just starting a local instance

  •  In the code snippet, set use_gpu to True and num_workers to be the number of total processes/GPUs you want to use for training.

And final step is to just run your Python script:

1
python train.py

And that’s it! You should be seeing the GPUs in your cluster being used for training.

You’ve now successfully run a multi-node, multi-GPU distributed training job with very few code changes and no extensive cluster configuration!

Next steps

You’re now up and running with multi-GPU training on your cloud of choice. 

But Ray Lightning comes with many more features:

  • If standard PyTorch DDP is not your cup of tea, try out these alternatives instead:

  • Ray Lightning also integrates with Ray Tune allowing you to run distributed hyperparameter tuning experiments with each training run also run in a parallel fashion. Check out the full Ray+PyTorch Lightning E2E guide for more details.

  • Use Ray Client to do training on the cloud without ever having to leave your laptop. More details here.

And if you’re curious to learn more about what the entire Ray ecosystem can offer you can check out these guides:

Happy training, and may your model's error always be low! :)

Sharing