############################ Distributed XGBoost with Ray ############################ `Ray `_ is a general purpose distributed execution framework. Ray can be used to scale computations from a single node to a cluster of hundreds of nodes without changing any code. The Python bindings of Ray come with a collection of well maintained machine learning libraries for hyperparameter optimization and model serving. The `XGBoost-Ray `_ project provides an interface to run XGBoost training and prediction jobs on a Ray cluster. It allows to utilize distributed data representations, such as `Modin `_ dataframes, as well as distributed loading from cloud storage (e.g. Parquet files). XGBoost-Ray integrates well with hyperparameter optimization library Ray Tune, and implements advanced fault tolerance handling mechanisms. With Ray you can scale your training jobs to hundreds of nodes just by adding new nodes to a cluster. You can also use Ray to leverage multi GPU XGBoost training. Installing and starting Ray =========================== Ray can be installed from PyPI like this: .. code-block:: bash pip install ray If you're using Ray on a single machine, you don't need to do anything else - XGBoost-Ray will automatically start a local Ray cluster when used. If you want to use Ray on a cluster, you can use the `Ray cluster launcher `_. Installing XGBoost-Ray ====================== XGBoost-Ray is also available via PyPI: .. code-block:: bash pip install xgboost_ray This will install all dependencies needed to run XGBoost on Ray, including Ray itself if it hasn't been installed before. Using XGBoost-Ray for training and prediction ============================================= XGBoost-Ray uses the same API as core XGBoost. There are only two differences: 1. Instead of using a ``xgboost.DMatrix``, you'll use a ``xgboost_ray.RayDMatrix`` object 2. There is an additional :class:`ray_params ` parameter that you can use to configure distributed training. Simple training example ----------------------- To run this simple example, you'll need to install `scikit-learn `_ (with ``pip install sklearn``). In this example, we will load the `breast cancer dataset `_ and train a binary classifier using two actors. .. code-block:: python from xgboost_ray import RayDMatrix, RayParams, train from sklearn.datasets import load_breast_cancer train_x, train_y = load_breast_cancer(return_X_y=True) train_set = RayDMatrix(train_x, train_y) evals_result = {} bst = train( { "objective": "binary:logistic", "eval_metric": ["logloss", "error"], }, train_set, evals_result=evals_result, evals=[(train_set, "train")], verbose_eval=False, ray_params=RayParams(num_actors=2, cpus_per_actor=1)) bst.save_model("model.xgb") print("Final training error: {:.4f}".format( evals_result["train"]["error"][-1])) The only differences compared to the non-distributed API are the import statement (``xgboost_ray`` instead of ``xgboost``), using the ``RayDMatrix`` instead of the ``DMatrix``, and passing a :class:`RayParams ` object. The return object is a regular ``xgboost.Booster`` instance. Simple prediction example ------------------------- .. code-block:: python from xgboost_ray import RayDMatrix, RayParams, predict from sklearn.datasets import load_breast_cancer import xgboost as xgb data, labels = load_breast_cancer(return_X_y=True) dpred = RayDMatrix(data, labels) bst = xgb.Booster(model_file="model.xgb") pred_ray = predict(bst, dpred, ray_params=RayParams(num_actors=2)) print(pred_ray) In this example, the data will be split across two actors. The result array will integrate this data in the correct order. The RayParams object ======================== The ``RayParams`` object is used to configure various settings relating to the distributed training. .. autoclass:: xgboost_ray.RayParams Multi GPU training ================== Ray automatically detects GPUs on cluster nodes. In order to start training on multiple GPUs, all you have to do is to set the ``gpus_per_actor`` parameter of the ``RayParams`` object, as well as the ``num_actors`` parameter for multiple GPUs: .. code-block:: python ray_params = RayParams( num_actors=4, gpus_per_actor=1, ) This will train on four GPUs in parallel. Note that it usually does not make sense to allocate more than one GPU per actor, as XGBoost relies on distributed libraries such as Dask or Ray to utilize multi GPU taining. Setting the number of CPUs per actor ==================================== XGBoost natively utilizes multi threading to speed up computations. Thus if your are training on CPUs only, there is likely no benefit in using more than one actor per node. In that case, assuming you have a cluster of homogeneous nodes, set the number of CPUs per actor to the number of CPUs available on each node, and the number of actors to the number of nodes. If you are using multi GPU training on a single node, divide the number of available CPUs evenly across all actors. For instance, if you have 16 CPUs and 4 GPUs available, each actor should access 1 GPU and 4 CPUs. If you are using a cluster of heterogeneous nodes (with different amounts of CPUs), you might just want to use the `greatest common divisor `_ for the number of CPUs per actor. E.g. if you have a cluster of three nodes with 4, 8, and 12 CPUs, respectively, you'd start 6 actors with 4 CPUs each for maximum CPU utilization. Fault tolerance =============== XGBoost-Ray supports two fault tolerance modes. In **non-elastic training**, whenever a training actor dies (e.g. because the node goes down), the training job will stop, XGBoost-Ray will wait for the actor (or its resources) to become available again (this might be on a different node), and then continue training once all actors are back. In **elastic-training**, whenever a training actor dies, the rest of the actors continue training without the dead actor. If the actor comes back, it will be re-integrated into training again. Please note that in elastic-training this means that you will train on fewer data for some time. The benefit is that you can continue training even if a node goes away for the remainder of the training run, and don't have to wait until it is back up again. In practice this usually leads to a very minor decrease in accuracy but a much shorter training time compared to non-elastic training. Both training modes can be configured using the respective :class:`RayParams ` parameters. Hyperparameter optimization =========================== XGBoost-Ray integrates well with `hyperparameter optimization framework Ray Tune `_. Ray Tune uses Ray to start multiple distributed trials with different hyperparameter configurations. If used with XGBoost-Ray, these trials will then start their own distributed training jobs. XGBoost-Ray automatically reports evaluation results back to Ray Tune. There's only a few things you need to do: 1. Put your XGBoost-Ray training call into a function accepting parameter configurations (``train_model`` in the example below). 2. Create a :class:`RayParams ` object (``ray_params`` in the example below). 3. Define the parameter search space (``config`` dict in the example below). 4. Call ``tune.run()``: * The ``metric`` parameter should contain the metric you'd like to optimize. Usually this consists of the prefix passed to the ``evals`` argument of ``xgboost_ray.train()``, and an ``eval_metric`` passed in the XGBoost parameters (``train-error`` in the example below). * The ``mode`` should either be ``min`` or ``max``, depending on whether you'd like to minimize or maximize the metric * The ``resources_per_actor`` should be set using ``ray_params.get_tune_resources()``. This will make sure that each trial has the necessary resources available to start their distributed training jobs. .. code-block:: python from xgboost_ray import RayDMatrix, RayParams, train from sklearn.datasets import load_breast_cancer num_actors = 4 num_cpus_per_actor = 1 ray_params = RayParams( num_actors=num_actors, cpus_per_actor=num_cpus_per_actor) def train_model(config): train_x, train_y = load_breast_cancer(return_X_y=True) train_set = RayDMatrix(train_x, train_y) evals_result = {} bst = train( params=config, dtrain=train_set, evals_result=evals_result, evals=[(train_set, "train")], verbose_eval=False, ray_params=ray_params) bst.save_model("model.xgb") from ray import tune # Specify the hyperparameter search space. config = { "tree_method": "approx", "objective": "binary:logistic", "eval_metric": ["logloss", "error"], "eta": tune.loguniform(1e-4, 1e-1), "subsample": tune.uniform(0.5, 1.0), "max_depth": tune.randint(1, 9) } # Make sure to use the `get_tune_resources` method to set the `resources_per_trial` analysis = tune.run( train_model, config=config, metric="train-error", mode="min", num_samples=4, resources_per_trial=ray_params.get_tune_resources()) print("Best hyperparameters", analysis.best_config) Ray Tune supports various `search algorithms and libraries (e.g. BayesOpt, Tree-Parzen estimators) `_, `smart schedulers like successive halving `_, and other features. Please refer to the `Ray Tune documentation `_ for more information. Additional resources ==================== * `XGBoost-Ray repository `_ * `XGBoost-Ray documentation `_ * `Ray core documentation `_ * `Ray Tune documentation `_