Modern Distributed C++ with Ray

By Guyang Song and Yu Qi   

Introduction

With the explosive growth of data, the development speed of internet businesses has been significantly higher than that of computer hardware. This means that there are more and more fields and scenarios that require distributed systems. As a native programming language, C++ is widely used in modern distributed systems due to its high performance and lightweight characteristics. For example, frameworks such as Tensorflow, Caffe, XGboost, and Redis have all chosen C/C++ as the main programming language.

Compared with single-machine systems, it isn’t easy to build a C++ distributed system with complete features and high availability for production environments. Usually we need to address the following issues:

  • Communication: We need to define the communication protocol between components through serialization tools such as protobuf, and then communicate through the RPC framework (or socket). It is also important to consider issues such as service discovery, synchronous/asynchronous IO, and multiplexing.

  • Deployment: Machines need to meet specific resource specifications and deploy processes from multiple components. Implementation may require integrating with different cloud platforms to implement resource scheduling.

  • Fault tolerance: This includes monitoring failure events on any node or component as well as restarting and recovering system state.

The Ray C++ API was designed to help address these issues so that programmers can focus on the logic of the system itself. 

What is Ray

Ray Ecosystem
Ray Ecosystem, from Ion Stoica’s keynote at Ray Summit 2021.

Ray is an open source library for parallel and distributed Python. In the past few years, Ray and its ecosystem (Ray Tune, Ray Serve, RLlib, etc) have developed rapidly. It is widely used to build various AI and big data systems in companies such as Ant Group, Intel, Microsoft, Amazon, and Uber. Compared with existing big data computing systems (Spark, Flink, etc.), Ray is not based on a specific computing paradigm such as DataStream or DataSet. From a system-level perspective, Ray's API is more low-level and more flexible. Ray takes the existing concepts of functions and classes and translates them to the distributed setting as tasks and actors. Ray will not limit your application scenarios, whether it is batch processing, stream computing, graph computing, or machine learning, scientific computing, etc., as long as you need to use multiple machines to complete a specific task in cooperation, you can choose Ray to help you to construct your distributed system.

C++ API

C++ Stack

Originally, Ray only supported the Python API. In mid-2018, Ant Group contributed the Java API to the Ray Project. Recently, Ant Group contributed the C++ API to Ray. You may be wondering, why did we develop a C++ API when two popular languages were already supported? The main reason is that in some high-performance scenarios, Java and Python still cannot meet business needs even after system optimization. In addition, Ray Core and various components of Ray are pure C++ implementations. The C++ API can be used to seamlessly connect the user layer and the core layer which results in a system with no inter-language overhead. 

Task

A task in Ray corresponds to a function in single-machine programming. Through Ray's task API, it is easy to submit any C++ function to a distributed cluster for asynchronous execution. This greatly improves efficiency.

The code below defines the function heavy_compute. If we execute it 10,000 times serially in a single machine environment, it can take a very long time:

1
2
3
4
5
6
7
8
int heavy_compute(int value) {
  return value;   
}

std::vector<int> results;
for(int i = 0; i < 10000; i++) {
  results.push_back(heavy_compute(i));
}

Use Ray, we can easily transform it into a distributed function:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Define heavy_compute as remote function
RAY_REMOTE(heavy_compute);

std::vector<ray::ObjectRef<int>> results;
for(int i = 0; i <10000; i++) {
   // Use ray::Task to call remote functions, they are automatically dispatched by Ray to the nodes of the cluster to achieve distributed computing
   results.push_back(ray::Task(heavy_compute).Remote(i));
}

// Get the results
for(auto result: ray::Get(results)) {
   std::cout<< *result << std::endl;
}

Actor

Ray Tasks are stateless. If you want to implement stateful computing, you need to use an Actor.

An actor corresponds to a class in single-machine programming. Based on Ray's distributed ability, we can transform the class Counter below into a distributed Counter deployed on a remote node.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Counter {
  int count;
public:
  Counter(int init) {count = init;}
  int Add(int x) {return x + 1;}
};

Counter *CreateCounter(int init) {
  return new Counter(init);
}
RAY_REMOTE(CreateCounter, &Counter::Add);

// Create a actor
ActorHandle<Counter> actor = ray::Actor(CreateCounter).Remote(0);

// Call the actor's remote function
auto result = actor.Task(&Counter::Add).Remote(1);
EXPECT_EQ(1, *(ray::Get(result)));

Object

In the Task and Actor examples above, we use "ray::Get" to get the final results. One concept that cannot be avoided here is Object. Each call to the "Remote" method returns an object reference (ObjectRef), and each ObjectRef points to a unique remote object in the cluster. If you've ever used Future, you should be able to understand Object more easily. In Ray, objects will be stored in a distributed store based on shared memory. When you call the "ray::Get" method to get an object, it will fetch data from the remote node, and return the result to your program after deserialization.

Besides storing the middle computing results of applications, you can also create an Object through "ray::Put". In addition, you can also wait for the results of a group of objects through the "ray::Wait" interface.

1
2
3
4
5
6
7
8
9
10
// Put an object into the distributed store
auto obj_ref1 = ray::Put(100);
// Get data from the distributed store
auto res1 = obj_ref1.Get();
//Or call ray::Get(obj_ref1)
EXPECT_EQ(100, *res1);

// Waiting for a group of objects
auto obj_ref2 = ray::Put(200);
ray::Wait({obj_ref1, obj_ref2}, /*num_results=*/1, /*timeout_ms=*/1000);

From the basic API above, we know that Ray has solved the issues of communication, storage and data transmission. Ray also has some advanced features to solve other issues in distributed systems, such as scheduling options, fault tolerance, deployment and operation. The details of how to solve these other issues will be introduced in the following example.

Implementation of Distributed Storage System with Ray C++

Let's now go over a practical example to see how to use the Ray C++ API to build a simple key-value (KV) storage system.

Example description

KV Store

In this KV storage system, there is a main server and a backup server. Only the main server provides services. The backup server is used to back up data. The system’s requirement is to have automatic failure recovery ability. That is, after any server dies, data will not be lost and services can continue to be provided.

Note: This is just a demo, and does not focus on the optimization of the storage itself. The purpose is to use the simple code to show how to use Ray to develop a distributed storage system quickly (complete code is available here).

Server implementation

We’ll start by writing a single machine KV store first and then we’ll proceed to use Ray’s distributed deployment and scheduling abilities to transform it into a distributed KV store.

main server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MainServer {
 public:
  MainServer();

  std::pair<bool, std::string> Get(const std::string &key);

  void Put(const std::string &key, const std::string &val);

 private:
  std::unordered_map<std::string, std::string> data_;
};

std::pair<bool, std::string> MainServer::Get(const std::string &key) {
  auto it = data_.find(key);
  if (it == data_.end()) {
    return std::pair<bool, std::string>{};
  }

  return {true, it->second};
}

void MainServer::Put(const std::string &key, const std::string &val) {
  // First synchronize the data to the backup server
  ...

  // Update the local kv
  data_[key] = val;
}

It seems that this is a general KV store which is no different from a local KV. We don’t need to care about the details of the distribution, just focus on the storage logic itself. Note that in the implementation of Put, we need to write the data to the backup server synchronously and write locally to ensure data consistency.

backup server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class BackupServer {
 public:
  BackupServer();

  // When the master node restarts, it will call GetAllData for data recovery
  std::unordered_map<std::string, std::string> GetAllData() {
    return data_;
  }

  // When the master node writes data, it will call SyncData to synchronize the data to this backup node first
  void SyncData(const std::string &key, const std::string &val) {
    data_[key] = val;
  }

 private:
  std::unordered_map<std::string, std::string> data_;
};

Deployment

Cluster deployment

Before deploying the application, we need to deploy a Ray cluster first. Currently, Ray has supported one-click deployment on popular cloud platforms, such as AWS, Azure, GCP, Aliyun and Kubernetes environments. If you already have a configuration file, you can deploy via the command line after installing Ray:

1
ray up -y config.yaml

For specific configurations, please refer to the documentation here.

Another option, if you already have running machines, you can manually set up a Ray cluster by using the ray start command on each machine:

Choose a machine as the master node:

ray start --head

Join the cluster on other machines:

ray start --address=${HEAD_ADDRESS}

Actor deployment

After the Ray cluster, we need to deploy the two actor instances, the MainServer and BackupServer, to provide distributed storage services. We can use the Ray API to create an actor easily to implement actor deployment.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static MainServer *CreateMainServer() {return new MainServer();}

static BackupServer *CreateBackupServer() {return new BackupServer();}

// Declare remote function
RAY_REMOTE(CreateMainServer, CreateBackupServer);

const std::string MAIN_SERVER_NAME = "main_actor";
const std::string BACKUP_SERVER_NAME = "backup_actor";

// Deploy the actor instances to the Ray cluster by ray::Actor
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .Remote();

  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .Remote();
}

Scheduling

Specify resources

If you have special requirements for the hardware of the actor running environment, you can also specify resources, such as CPU, memory and other resources.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// required resources: cpu:1, memory 1G
const std::unordered_map<std::string, double> RESOURCES{
    {"CPU", 1.0}, {"memory", 1024.0 * 1024.0 * 1024.0}};

// Deploy the actor instances to the Ray cluster with resource requirements
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .SetResources(RESOURCES) //Set resources
      .Remote();

  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .SetResources(RESOURCES) //Set resources
      .Remote();
}

Scheduling strategy

We hope to schedule the two actors of the main server and the backup server to different machines to ensure that the failure of one machine will not affect the two servers at the same time. In this case, we can use Ray's Placement Group to achieve the special scheduling requirement.

A placement Group allows you to preset some resources from the cluster for Task and Actor.

1
2
3
4
5
6
7
8
9
10
11
12
ray::PlacementGroup CreateSimplePlacementGroup(const std::string &name) {
  // Set Placement Group resources
  std::vector<std::unordered_map<std::string, double>> bundles{RESOURCES, RESOURCES};

  // Create Placement Group and set the scheduling strategy to SPREAD
  ray::PlacementGroupCreationOptions options{
      false, name, bundles, ray::PlacementStrategy::SPREAD};
  return ray::CreatePlacementGroup(options);
}

auto placement_group = CreateSimplePlacementGroup("my_placement_group");
assert(placement_group.Wait(10));

The code above creates a Placement Group, and the scheduling strategy is SPREAD. The meaning of SPREAD is to schedule actors to different nodes. For more scheduling strategies, please refer to the documentation here.

Next, we can schedule actors to different nodes through the Placement Group.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Scheduling actor instances to different nodes in the Ray cluster
void StartServer() {
  // Scheduling the main server
  ray::Actor(CreateMainServer)
      .SetName(MAIN_SERVER_NAME)
      .SetResources(RESOURCES)
      .SetPlacementGroup(placement_group, 0) //Scheduled to one node
      .Remote();

  // Scheduling backup server
  ray::Actor(CreateBackupServer)
      .SetName(BACKUP_SERVER_NAME)
      .SetResources(RESOURCES)
      .SetPlacementGroup(placement_group, 1) //Scheduled to another node
      .Remote();
}

Service discovery and component communication

Now that we have deployed the two actor instances of the main server and the backup server to the two nodes in the Ray cluster, we need to achieve the service discovery of the main server and the client-server communication.

Ray's named actor can easily achieve service discovery. We set the actor's name when creating the actor, and then use ray::GetActor(name) to discover the created actor. A Ray Task can handle the communication between the client and the server. Using Ray Task, we don't need to care about the details of communication, such as transmission protocol.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Client {
 public:
  Client() {
   // Discover the actor
    main_actor_ = ray::GetActor<MainServer>(MAIN_SERVER_NAME);
  }

  bool Put(const std::string &key, const std::string &val) {
    // Use Ray Task to call the Put function of the remote main server
    (*main_actor_).Task(&MainServer::Put).Remote(key, val).Get();

    return true;
  }

  std::pair<bool, std::string> Get(const std::string &key) {
    // Use Ray Task to call the Get function of the remote main server
    return *(*main_actor_).Task(&MainServer::Get).Remote(key).Get();
  }

 private:
  boost::optional<ray::ActorHandle<MainServer>> main_actor_;
};

Fault tolerance

Process fault tolerance

Ray provides the ability of process fault tolerance. After the actor process dies, Ray will automatically re-create the actor instance. We only need to set the maximum number of restarts of the actor.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Schedule the actor instances and set the maximum number of restarts
void StartServer() {
  ray::Actor(CreateMainServer)
      .SetName("main_actor")
      .SetResources(RESOURCES)
      .SetPlacementGroup(placement_group, 0)
      .SetMaxRestarts(1) //Set the maximum number of restarts to achieve automatic fault recovery
      .Remote();

  ray::Actor(CreateBackupServer)
      .SetName("backup_actor")
      .SetResources(RESOURCES)
      .SetPlacementGroup(placement_group, 1)
      .SetMaxRestarts(1) //Set the maximum number of restarts to achieve automatic fault recovery
      .Remote();
}

State fault tolerance

Although Ray can handle process failures and re-create the actor instances, we also need to recover the state of the actor after the process restarted. For example, after the main actor restarted, the data in the memory will be lost and we need to recover from the backup server.

1
2
3
4
5
6
7
8
9
10
11
12
MainServer::MainServer() {
  // If the current instance is restarted, do failover handling
  if (ray::WasCurrentActorRestarted()) {
    HandleFailover();
  }
}

void MainServer::HandleFailover() {
  backup_actor_ = *ray::GetActor<BackupServer>(BACKUP_SERVER_NAME);
  // Pull all data from the backup server
  data_ = *backup_actor_.Task(&BackupServer::GetAllData).Remote().Get();
}

Note: The fault tolerance of the main server is done in the constructor. Ray provides an API to determine whether the actor instance is restarted. If the actor instance is a restarted actor, the data will be restored. The detail is to pull data from the backup server. The fault tolerance processing of the backup server is similar.

Operation and monitoring

Ray provides a dashboard for operation and monitoring. So, we can view the state of the Ray cluster and applications in real time.

In the simple KV store example above, we can see information such as the actor list, node list, process logs, and catch some abnormal events.

Actor list

Actor List

Node list

Node List

Logs

Logs

Events

Events

Quick start

Ray is multi-language so running C++ applications requires both a Python environment and a C++ environment. In result, we packaged the C++ library into a wheel which is managed by pip as well as python. You can quickly generate a Ray C++ project template in the following way.

Note: Environmental requirements: Linux or macOS system, Python 3.6-3.9 version, C++14 environment,  bazel 3.4 or above version (optional, template based on bazel).

Install the latest version of Ray with C++ support:

pip install -U ray[cpp]

Generate a C++ project template through Ray command line:

mkdir ray-template && ray cpp --generate-bazel-project-template-to ray-template

Change directory to the project template, compile and run:

cd ray-template && sh run.sh

The run mode will launch a Ray cluster locally before the running of the example, and will automatically shut down the Ray cluster after the example runs. If you want to connect an existing Ray cluster, you can start it as follows:

ray start --head

RAY_ADDRESS=127.0.0.1:6379 sh run.sh

After the testing, you can shut down the Ray cluster by the command:

ray stop

Now, you can start to develop your own C++ distributed system based on the project template!

Summary

This blog introduced the Ray C++ API and showed a practical example of how to use it to build a distributed key-value storage system. The code was around 200 lines, but it showed how to deal with the issues of deployment, scheduling, communication, fault tolerance, operation and monitoring. To learn more about this new API, please check out the Ray documentation. If you would like to contribute to the continued growth of this project, you can submit a pull request on github or submit an issue. If you would like to get in touch with us, you can contact us through Ray’s slack channel or through the official WeChat Chinese account by searching “Ray中文社区”.

Sharing