Introducing Distributed XGBoost Training with Ray

By Kai Fricke, Richard Liaw and Amog Kamsetty   

XGBoost-Ray is a novel backend for distributed XGBoost training. It features multi node and multi GPU training, distributed data loading, advanced fault tolerance such as elastic training, and a seamless integration with hyperparameter optimization framework Ray Tune. XGBoost-Ray is fully compatible with the core XGBoost API. Distributing a training run across a cluster is as easy as changing three lines of code.

We are proud to announce XGBoost-Ray, the latest addition to our ecosystem of distributed machine learning libraries built on top of Ray. XGBoost-Ray leverages Ray to scale XGBoost training from single machines to clusters with hundreds of nodes - with minimal code changes. It remains fully compatible with the core XGBoost API.

In short, XGBoost-Ray

  • enables multi-node and multi-GPU training

  • comes with advanced fault tolerance handling mechanisms

  • supports distributed dataframes and distributed data loading

  • integrates seamlessly with Ray Tune, a distributed hyperparameter optimization library

  • interoperates easily with other Ray-based ML and data processing components

Multi-node and Multi-GPU Training

For training, XGBoost-Ray instantiates a configurable amount of training actors on the cluster. Each actor trains on a separate part of the data (data-parallel training). Actors communicate their gradients via tree-based allreduce. For multi-GPU training, XGBoost leverages NCCL2 for efficient cross-device communication.

xgboost_multinode
XGBoost-Ray supports multi-node/multi-GPU training. On a machine, GPUs communicate gradients via NCCL2. Between nodes, they use Rabit instead.

Distributed Hyperparameter Search

XGBoost-Ray seamlessly integrates with the hyperparameter optimization library Ray Tune. It automatically creates callbacks to report training status to Ray Tune, saves checkpoints, and takes care of allocating the right amount of resources to each trial depending on the distributed training configuration.

With this integration, users can concurrently run multiple distributed training jobs, which themselves start distributed training actors across the Ray cluster.

Tune-run
The Ray Tune integration makes sure to schedule distributed workers of a single trial on as few machines as possible to mitigate the impacts of node failure. Here, each color shade represents a different machine in the cluster.

Fault Tolerance and Elastic Training

XGBoost-Ray comes with two modes for failure handling. In non-elastic training, whenever a training actor dies (e.g. because the node went down), XGBoost-Ray waits until the cluster has enough resources to restart the failed actor. After restarting the actor and loading its share of the data, training is resumed from the last checkpoint. Notably, actors that were not originally affected by the failure retain their state and do not have to incur the data shuffling and data loading costs again.

In elastic training, XGBoost-Ray will continue training with fewer actors (and on fewer data) when an actor fails. When the resources for the actor become available again, data loading for this actor is triggered. The active actors continue training and only re-integrate the restarted actor back into the training run after it has loaded its data and is ready for training. 

In practice, this means that for some amount of time, training will continue on fewer data. While this comes with very slight decreases in training accuracy, we found that this impact is often negligible in practice given the high number of actors and large enough datasets, and the impact is usually offset by the amount of time saved by continuing training right away.

elasticNonElastic
Overview of the lifetime of a training run when one node dies. In practice, elastic training can often dramatically reduce total training time with minimal accuracy impairments.

Support for Distributed Dataframes and Data Loading

XGBoost-Ray integrates with Modin and the Ray-native MLDataset abstraction. With locality-aware actor scheduling and data sharding, XGBoost-Ray minimizes the network communication overhead while maintaining an even data distribution across actors.

distributedDataFrame
Locality-aware data loading first assigns existing partitions to distributed workers on the same node. This minimizes the amount of data transfer across nodes.

XGBoost-Ray also supports distributed data loading from sources such as Parquet (e.g. using Petastorm) or CSV, both from disk and cloud storage providers. The table below shows the currently supported data sources.

Type Centralized loading Distributed loading
Numpy array Yes No
Pandas dataframe Yes No
Single CSV Yes No
Multi CSV Yes Yes
Single Parquet Yes No
Multi Parquet Yes Yes
Petastorm Yes Yes
Ray MLDataset Yes Yes
Dask dataframe Yes Yes
Modin dataframe Yes Yes

Benchmarks

We ran a couple of benchmarks to examine if we incur any overhead compared to other distributed XGBoost backends. Please note that while these benchmarks are not very extensive, they should still be suitable to show that XGBoost-Ray performs on par with the examined distributed backends.

Comparison with XGBoost-Dask

TLDR: On a single node/multi worker benchmark, performance of XGBoost-Ray and XGBoost-Dask is very similar.

To compare with XGBoost-Dask, we ran benchmarks with the NYC taxi dataset on a single AWS m5.4xlarge instance with 16 cores and 64 GB memory. One file/partition of the NYC dataset contains the collected data of a whole month. For January 2009, this was about 1.5 million rows and 18 features.

We trained on several dataset sizes (~1.5M to ~12M rows) across different amounts of workers (1 to 8). As you can see in the plots below, Dask and Ray usually achieve the same performance. There are some trials where Dask took more time to train (e.g. for 3M rows/2 input files with 8 workers). There is no inherent explanation for this overhead, so we attribute this to measurement error.

xgboost-dask benchmarks
Training times for single node benchmarks (lower is better). XGBoost-Ray and XGBoost-Dask achieve similar performance.

We ran the same benchmarks for fewer training rounds, but the results look the same. XGBoost-Ray never took longer to train the data than XGBoost-Dask. Generally we can conclude that single node performance is on par between Ray and Dask.

Comparison with XGBoost-Dask

TLDR: Performance of XGBoost-Ray and XGBoost-Spark is very similar on XGBoost 0.90. Bumping to XGBoost 1.3 gives great performance improvements.

For comparison with XGBoost-Spark, we ran the benchmarks using Uber’s production deployment of XGBoost-Spark against a collection of synthetic datasets derived from production data. Since Uber's XGBoost-Spark is deployed with XGBoost 0.90, we added compatibility with XGBoost 0.90 to XGBoost-Ray for the purposes of these benchmarks.

We trained on several synthetic dataset sizes (~400k to ~2B rows) that cover a collection of deployment size datasets used within Uber across different numbers of workers (5 to 150). Each distributed training worker runs on separate machines.

spark-benchmark xgboost
Training times for multi-node benchmarks (lower is better). XGBoost-Ray and XGBoost-Spark achieve similar performance.

Generally, we can see that there is no significant performance difference between XGBoost-Spark and XGBoost-Ray with XGBoost 0.90. When bumping the library version to XGBoost 1.3, we see speedups of up to 4x on CPUs, but we attribute these to improvements in the XGBoost library backend, and not in the distributed orchestration.

Note that it is much easier to upgrade the XGBoost version within a Python-based deployment such as in XGBoost-Ray: Upgrading to a higher version of XGBoost and XGBoost-Ray only requires updating the pinned version within a Dockerfile whereas updating XGBoost-Spark often requires upgrading to a newer, compatible version of Spark across the infrastructure and ensuring backward compatibility of Spark Transformers, as they are not strictly contained within a Docker installation.

Overall, XGBoost-Ray is able to achieve similar performance as XGBoost-Spark and XGBoost-Dask in terms of speed, scalability, and peak memory utilization. Thus, the decision on which backend to use should be guided by other factors such as ease of use, the need for specific features (such as multi-GPU training, fault tolerance, and support for distributed hyperparameter optimization), and maintainability.

Fault Tolerance Benchmark

TLDR: Elastic training achieves virtually the same performance while reducing training time as if no worker had failed.

In these benchmarks, we examine what happens when training actors fail and XGBoost-Ray's fault tolerance mechanisms kick in. 

First we establish a baseline by training on the full dataset. We then remove up to three actors (fewer_workers) to see the accuracy impact when training on fewer data.

We then test three different failure handling modes. In non_elastic mode, when an actor dies (after 50% of the training run), we wait until it comes back before proceeding with training. 

In elastic_no_comeback mode, dying actors never come back. Instead we proceed to train with fewer actors on fewer data.

In elastic_comeback, we first continue training on fewer data. However, after about 75% of the boosting rounds, the dead actors come back and are re-integrated into training again.

The evaluation dataset is kept constant across all runs.

Training properties

Number of rows: 30M

Number of features: 500

Number of boost rounds: 300

Result overview

The table below shows a comparison of fault tolerance modes. Elastic training achieves a very similar total training time as runs without any failures, without significantly sacrificing accuracy.

condition affected_workers eval-error eval-logloss train-error time_total_s
fewer_workers 0 0.133326 0.405359 0.132591 1441.44
fewer_workers 1 0.134000 0.406041 0.133103 1227.45
fewer_workers 2 0.133977 0.405873 0.132885 1249.45
fewer_workers 3 0.133333 0.405081 0.132205 1291.54
non_elastic 1 0.133552 0.405618 0.132754 2205.95
non_elastic 2 0.133211 0.405076 0.132403 2226.96
non_elastic 3 0.133552 0.405618 0.132754 2033.94
elastic_no_comeback 1 0.133763 0.40556 0.1327 1272.45
elastic_no_comeback 2 0.133285 0.405306 0.132288 1218.19
elastic_no_comeback 3 0.133335 0.405607 0.13244 1230.8
elastic_comeback 1 0.133763 0.40556 0.1327 1231.58
elastic_comeback 2 0.133771 0.405885 0.132655 1197.55
elastic_comeback 3 0.133704 0.405487 0.132481 1259.37

Discussion

The evaluation error is very similar for all runs. In fact, for some configurations we get a better evaluation performance even when training on fewer data - this can be attributed to chance and non-deterministic training behavior, especially in a distributed setting. In any case, the performance differences are negligible. This is a direct effect of good data shuffling - which is best practice for large training jobs.

What is more important here is the difference in training time. With non-elastic training, we wait until the dead actors are back again and have loaded their data. The rest of the machines remain idle during that time. Elastic training is able to vastly reduce the total runtime - it remains in the same region as training without any dying actors.

In summary, elastic training reduces training time with seemingly few drawbacks when training on large shuffled datasets for a relatively large number of boosting rounds.

Example

In order to run all the code in this section, scikit-learn and xgboost_ray need to be installed. This can be done by pip install sklearn xgboost_ray.

Training

Let's first start by creating a simple single node non-distributed setup with core XGBoost.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from xgboost import DMatrix, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = DMatrix(train_x, train_y)

evals_result = {}
bst = train(
   {
       "objective": "binary:logistic",
       "eval_metric": ["logloss", "error"],
   },
   train_set,
   evals_result=evals_result,
   evals=[(train_set, "train")],
   verbose_eval=False)

bst.save_model("model.xgb")

Now we can turn this into a distributed training run by changing only three lines of code: The import, the matrix initialization, and passing the RayParams object. We highlighted the lines where there are changes below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from xgboost_ray import RayDMatrix, RayParams, train
from sklearn.datasets import load_breast_cancer

train_x, train_y = load_breast_cancer(return_X_y=True)
train_set = RayDMatrix(train_x, train_y)

evals_result = {}
bst = train(
   {
       "objective": "binary:logistic",
       "eval_metric": ["logloss", "error"],
   },
   train_set,
   evals_result=evals_result,
   evals=[(train_set, "train")],
   verbose_eval=False,
   ray_params=RayParams(num_actors=2, cpus_per_actor=1))

bst.save_model("model.xgb")

Prediction

1
2
3
4
5
6
7
8
9
10
11
12
from xgboost import DMatrix, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = DMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
predictions = predict(bst, dpred)

print(predictions)

Similarly, to run distributed inference only three lines of code need to be changed.

1
2
3
4
5
6
7
8
9
10
11
12
from xgboost_ray import RayDMatrix, RayParams, predict
from sklearn.datasets import load_breast_cancer
import xgboost as xgb

data, labels = load_breast_cancer(return_X_y=True)

dpred = RayDMatrix(data, labels)

bst = xgb.Booster(model_file="model.xgb")
predictions = predict(bst, dpred, ray_params=RayParams(num_actors=2))

print(predictions)

Scikit-learn API

XGBoost-Ray can also act as a drop-in replacement for sklearn-style models, such as XGBRegressor or XGBClassifier.

1
2
3
4
5
6
7
8
9
10
from xgboost_ray import RayXGBClassifier, RayParams
from sklearn.datasets import load_breast_cancer

X, y = load_breast_cancer(return_X_y=True)

clf = RayXGBClassifier(
    n_jobs=4,  # Number of distributed actors
)

clf.fit(X, y)

Learn more about the Scikit-learn API here.

Conclusion

XGBoost-Ray is a feature-rich backend for distributed XGBoost. It is able to achieve similar performance as XGBoost-Spark and XGBoost-Dask in terms of speed, scalability, and peak memory utilization.  Some advantages of XGBoost-Ray include full GPU support, advanced fault tolerance handling, and a seamless integration with the hyperparameter optimization library Ray Tune.

For more information, please see our GitHub repository. Feel free to reach out to us Discourse or Slack - we'd love to hear about your use cases, and we'll be happy to help make you a successful user of XGBoost-Ray!

Acknowledgement

XGBoost-Ray is developed in collaboration with Uber. We thank Michael Mui, Di Yu, Rich Porter and Xu Ning for their contribution, feedback and advice on this project.

Sharing