Large-scale distributed training with TorchX and Ray

By Mark Saroufim and Jules S. Damji   
blog-torchx-ray-thumb

We're excited to introduce a new Ray Scheduler for TorchX — a joint effort from the PyTorch and Anyscale teams that allows developers to run scalable and distributed PyTorch workloads without setting up infrastructure or changing training scripts.

Ray, created at RISELab by the founders of Anyscale, provides a rich set of native libraries for ML workloads and a general-purpose core for building distributed applications.

On top of the libraries provided by Ray, there is a rich ecosystem of libraries and integrations that enable PyTorch users to achieve greater scale. Two great examples are PyTorch Distributed and PyTorch Lightning, enabling users to take advantage of the amazing PyTorch and Ray capabilities together.

This blog introduces how TorchX extends functionality to submit PyTorch jobs via a newly developed Ray Scheduler. Using TorchX SDK and the Ray Job Submission SDK, developers can build and deploy PyTorch machine learning applications from R&D to production. TorchX provides the ability to string together built-in components like hyperparameter optimization, model serving, and distributed data-parallel into complex pipelines while leveraging the most popular job schedulers in open source.

A joint engineering effort between the Meta AI PyTorch and Anyscale ML teams, this new Ray Scheduler component allows developers to run scalable and distributed PyTorch workloads without setting up an infrastructure for their choice of the cloud or changing their training scripts — all can be done via the TorchX SDK or CLI.

You can run all of the below live in a Google Colab notebook.

TorchX developer user journey

To comprehend how PyTorch developers use TorchX SDK and convert or deploy their scripts into jobs deployed on a remote Ray cluster via a new Ray Scheduler, let’s examine some use cases and show some code examples.

In 5 steps, you can convert your PyTorch Python script into a TorchX job and submit it for execution on a Ray cluster in your cloud. 

Step 1: Install ray and torchX on your laptop.

pip install ray “torchx[dev]”

Step 2: Create your simple_ray_job.py as you would for any PyTorch training script in your IDE or editor.

Step 3: Launch a Ray cluster (as shown below) using a <cloud>_cluster.yaml, which specifies the kind of Ray cluster: number and kind of nodes, GPU vs. CPUs, etc. Look at the example file in the repo; though comprehensive and expansive, the YAML file can be curtailed to the needs of your specific demand of a TorchX job.

Step 4: Submit a TorchX job to the Ray cluster using the TorchX CLI as shown below.

Step 5: Monitor the job’s progress or final status by fetching the logs as shown below.

Submitting TorchX jobs to Ray Scheduler

blog-submitting-torchx-jobs-to-ray-scheduler

As a user, you only need to write your training script. We’ll use a distributed data parallel training script in the below example. You can also pass in other parameters, like your working directory or requirements.txt. Then just call the TorchX CLI while specifying the Ray Scheduler, which will look for an available Ray cluster, start running your job, and stream back the logs to your local client. This helps decouple your training script from your infrastructure so that you can easily move to large multi-node workloads with multiple GPUs without changing your code.

Submit TorchX jobs to the cloud of your choice

Submitting a job to a cloud of your choice is simple. For instance, if you wish to submit a TorchX job to a Ray cluster on AWS or GCP, assuming you have all your target cloud’s credentials set as required by the specific cloud provider, you can expand on a simple aws_ray_cluster.yaml or gcp_ray_cluster.yaml file to meet your compute node type and needs.

Launching a cluster

ray up -y <cloud>_ray_cluster.yaml

Through TorchX’s Ray Scheduler, you can just as easily pick the cloud of your choice to submit your job to.

Defining a TorchX component

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# component.py
from typing import Dict, List, Optional

import torchx
import torchx.specs as specs
from torchx.specs import Resource, macros, named_resources

def trainer() -> specs.AppDef:
    # cmd = "python -m torch.distributed.run -"
    return specs.AppDef(
        name="compute-ws",
        roles=[
            specs.Role(
                name="worker",
                entrypoint="python",
                args=["-m", "compute_world_size"],
                env={},
                image="",
                resource=specs.Resource(cpu=1, gpu=0, memMB=4000),
                num_replicas=2,
            )
        ],
    )

Writing a distributed PyTorch job

The simplest possible distributed PyTorch job would compute the world size and make sure all nodes agree to that world size.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
%%writefile scripts/compute_world_size.py
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import os

import torch
import torch.nn.functional as F
from torch.distributed import init_process_group, all_reduce, get_rank, get_world_size

def compute_world_size() -> int:

   rank = int(os.getenv("RANK"))  # pyre-ignore[6]
   world_size = int(os.getenv("WORLD_SIZE"))  # pyre-ignore[6]
   master_port = int(os.getenv("MASTER_PORT"))  # pyre-ignore[6]
   master_addr = os.getenv("MASTER_ADDR")
   backend = "gloo"

   print(f"initializing `{backend}` process group")
   init_process_group(  # pyre-ignore[16]
       backend=backend,
       init_method=f"tcp://{master_addr}:{master_port}",
       rank=rank,
       world_size=world_size,
   )
   print("successfully initialized process group")

   rank = get_rank()  # pyre-ignore[16]
   world_size = get_world_size()  # pyre-ignore[16]

   t = F.one_hot(torch.tensor(rank), num_classes=world_size)
   all_reduce(t)  # pyre-ignore[16]
   computed_world_size = int(torch.sum(t).item())
   print(
       f"rank: {rank}, actual world_size: {world_size}, computed world_size: {computed_world_size}"
   )
   return computed_world_size

def main() -> None:
   compute_world_size()

if __name__ == "__main__":
   main()

Submitting a job to a cluster

Once your cluster is up and running, you can now submit your TorchX job with the TorchX CLI after the above command is successfully finished. Through TorchX’s Ray Scheduler, your job will be submitted to the Ray cluster launched above in the cloud of your choice. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
! LOGLEVEL=INFO torchx run -s ray -cfg dashboard_address=54.214.124.247:20002,working_dir=scripts ./component.py:trainer

OUTPUT:
38:2:238:76:44mtorchx 2022-02-17 23:33:28 INFO    Uploading package gcs://_ray_pkg_755775e16a6c3645.zip.
38:2:238:76:44mtorchx 2022-02-17 23:33:28 INFO    Creating a file package for local directory '/tmp/tmppc9csbps'.
ray://torchx/54.214.124.247:20002-raysubmit_ntquG1dDV6CtFUC5
38:2:238:76:44mtorchx 2022-02-17 23:33:29 INFO    Launched app: ray://torchx/54.214.124.247:20002-raysubmit_ntquG1dDV6CtFUC5
38:2:238:76:44mtorchx 2022-02-17 23:33:30 INFO    AppStatus:
  msg: <NONE>
  num_restarts: -1
  roles:
  - replicas:
    - hostname: ''
      id: 0
      role: ray
      state: !!python/object/apply:torchx.specs.api.AppState
      - 2
      structured_error_msg: <NONE>
    role: ray
  state: PENDING (2)
  structured_error_msg: <NONE>
  ui_url: null

38:2:238:76:44mtorchx 2022-02-17 23:33:30 INFO    Job URL: None

Use TorchX SDK and Ray Jobs API

A common way to check the status of your submission is by using TorchX SDK and Ray Jobs API. The sections below show the commands and results from common tasks such as examining the job status and monitoring a job’s progress, which are common queries for developers. 

Examine job status

After submitting a job, it’s queued and in a pending state, and after that the job can either be successful, fail because of some application error, or be interrupted by you.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# Get a job status
# PENDING, FAILED, INTERRUPTED ETC..
! torchx describe ray://torchx/54.214.124.247:20002-raysubmit_ntquG1dDV6CtFUC5

Output
{ 'metadata': {},
  'name': '54.214.124.247:20002-raysubmit_ntquG1dDV6CtFUC5',
  'roles': [ { 'args': [],
               'base_image': None,
               'entrypoint': '<MISSING>',
               'env': {},
               'image': '',
               'max_retries': 0,
               'metadata': {},
               'name': 'ray',
               'num_replicas': 1,
               'port_map': {},
               'resource': { 'capabilities': {},
                             'cpu': -1,
                             'gpu': -1,
                             'memMB': -1},
               'retry_policy': <RetryPolicy.APPLICATION: 'APPLICATION'>}]}

Monitor a job’s progress

Collecting logs works in much the same way as getting the job description works. The key here is that logs are actually distributed over multiple machines, yet you get them all streamed back to your console without having to worry about which machine has which logs.

1
2
3
4
5
6
7
8
9
10
# Get job logs
# Aggregate logs from all machines in the same place
! torchx log ray://torchx/54.214.124.247:20002-raysubmit_ntquG1dDV6CtFUC5

Output:
ray/0 2022-02-17 15:33:32,983	INFO worker.py:853 -- Connecting to existing Ray cluster at address: 10.10.12.9:6379
(CommandActor pid=3575) successfully initialized process group
(CommandActor pid=3575) rank: 1, actual world_size: 2, computed world_size: 2
(CommandActor pid=3574) successfully initialized process group
(CommandActor pid=3574) rank: 0, actual world_size: 2, computed world_size: 2

What’s next

Any existing TorchX component can now run on top of Ray. Over time, we’re looking to add more reference projects to TorchX to let anyone bootstrap an end-to-end ML infrastructure and seamlessly run it and scale it on top of Ray.

Acknowlegements

We want to acknowledge the joint engineering efforts by the Meta AI and Anyscale teams for this endeavor. 

Meta AI Team: Mark Saroufim, Aliaksandr Ivanou, Can Balioglu, Tristan Rice, Kiuk Chung, Geeta Chauhan

Anyscale Team: Amog Kamsetty, Jiao Dong, Jules S. Damji, Richard Liaw

Sharing

Sign up for product updates