Message passing interface (MPI) is an efficient communication standard that is used in many machine learning frameworks (PyTorch, Horovod, Chainer, Xgboost, etc.) for distributed training. Clusterone provides several docker images with OpenMPI, a popular open source implementation of MPI.
In previous tutorials, we went over how to run distributed training with Tensorflow. Although it's not trivial, Tensorflow fortunately had many things built into the library. There's no other library you need to install to run distributed training, it uses gRPC built into the Tensorflow library. This is not true for almost all other machine learning frameworks. But do not fret, that is what this tutorial is for!
In this tutorial, we will cover two machine learning examples: XGBoost and Horovod. Other examples, such as PyTorch and Chainer, will be very similar to Horovod.
Throughout this tutorial, I am going to assume you know about XGBoost and Horovod. Although you do not need to know about them to follow along with this tutorial, it'll be easier to understand if you know them.
Message passing interface (MPI)
I am definitely not an expert in MPI, but I can share the little knowledge I have that is relevant.
In the early 90s, as parallel processing protocols were being developed, MPI became the leading standard and still is today. MPI is highly performant, scalable, and portable.
At the core of MPI is a collection of functions that define the communication protocol between different nodes in a cluster. You'll see things like
all_gather, etc. These communications can be to/from another node (point-to-point communication) or to/from many nodes (collective communication). Typically, collective communication involves all the nodes. PyTorch documentation has a nice visualization of what these communications look like.
Now that we have a sense of what MPI is, let's briefly talk about what's actually being communicated between the nodes. Exact implementations may be different, but I'll describe a simplified common scenario here. In deep learning training, what we learn is a set of parameter tensors. There's typically a "master copy" of the values of these tensors stored on one master node. This master node
sends these values to all the workers so that they calculate the loss function and the gradients. The calculated gradients are communicated back to the master and averaged (
reduce) and added to the parameter tensors. Then the process repeats. So in short, what's communicated between the nodes either the parameter values or the changes in the parameter values.
This sounds complicated to implement. Fortunately, most modern deep learning frameworks have these wrapped in a high level APIs, so you won't deal with these low level implementation.
One thing you'll need to know about MPI is that the MPI command is executed only on the master worker, then the master worker connects to the other workers to spin up processes. In order for this to work, the master worker requires password-less ssh access to all the workers. There are many resources that describe how to set this up; a simple Google search will show you pages like this. This is not difficult to do, but it still takes some time to setup. Fortunately, on Clusterone, this will be all setup for you. All you'll need to do is run an MPI command. Continue reading to learn how!
*Note: technically, node really refers to a process. This distinction is important when you have a case with many GPUs on each computer. In this case, each GPU will own one process, and each process is a node in this cluster.
Now that we have a good foundation of how distributed training and inter-node communication works, let's look at two examples. To keep this tutorial simple, I chose two examples (XGBoost and Horovod) with relatively simple code, but these examples (especially Horovod) should give you a good idea on how to run any MPI jobs on Clusterone.
See example code here.
XGBoost is one of the most popular machine learning tools these days. However, resources on distributed XGBoost is very scarce. There was an official tutorial using YARN on AWS, but it's "under construction" at the time of writing this tutorial. When I was first researching this field, I found this repo helpful.
Maintainers of XGBoost, DMLC, has built a set of distributed training tools for general maching learning, namely Rabit and DMLC-core. For our purposes, you can think of Rabit as a wrapper for MPI functions, and XGBoost has Rabit built in. DMLC-Core's DMLC Tracker is a job tracker, which sets up the distributed training job through its
dmlc-submit function. This
dmlc-submit function starts the MPI process as you can see in the source code.
On the XGBoost docker image available on Clusterone platform, you can find the submit function at
/dmlc-core/tracker/dmlc-submit. You can follow the instructions on GitHub to run the actual command. Here is a sample command:
That is it, we're done!
Just kidding. You're probably wondering about what goes in this Python script. You might have trained an XGBoost model on your laptop before, and it's not too difficult if you follow the documentation and examples. Making these codes compatible with distributed training is not difficult. Remember
rabit? You simply need to initialize and finalize it.
Yup. You simply sandwich your main train function between
xgb.rabit.finalize(). It's as simple as that. The functions
xgb.rabit.get_rank() can be useful to control the behaviors of the individual nodes. For example, in the above snippet, only the rank 0 node (master) saves the trained model, so that there is no conflict in saving files to the same destination.
Note: You can also run this with a configuration file instead of a Python script, see an example here.
One more thing before finishing this section. For distributed XGBoost, you need a dataset in LibSVM format, a sparse matrix format. Here is an example:
The first column represents the label. In this case, it looks like a binary classification problem (label of 0 or 1). It stores all the feature data in
index:value format, and it does not record 0 values. So for example, in the first row, index 4 is missing, which means the value at index 4 is 0. For more information, see here.
Finally, that is it! The tutorial repo has both a script and sample train/test data. You can test this locally as single node.
Or you can run distributed training on Clusterone! Navigate to the Clusterone Matrix (sign up for free if you haven't yet), follow the steps below:
- Click on
ADD NEW PROJECTthen
Use existing GitHub repository
clusterone/clusterone-tutorialsand add project
Create Job for this project, enter a job name, then click next until
- Enter information like the screenshot below
- Choose the same worker and ps type (any t2, m5, or c5 instances will do), and set 2 workers and 1 ps
- Create & Run!
If you want more workers, just increase the number of workers, and change
--num-workers 3 the appropriate number. (Don't forget to +1 for master node.)
Then wait for your code to start up. If the instances are available, your job should start pretty quickly.
Once everything is up and running, the training should be finished within 30 seconds (doh! I couldn't fit any larger dataset on GitHub). You should see
log.txt in your output tab with some results like shown below. If you're up for the challenge, you can run it a larger dataset that is publicly available on our platforms. Simply add
--data_dir /public/xgboost-benchmark-dataset at the end of your Python command when you're setting up a job on Clusterone. This should take at least several minutes, depending on what instance you run it on. Also, I encourage you to take a peek inside of
main.py to see what other parameters are available to you. For example, you can run more than 10 iterations by passing something like
Note: Instructions on how to run this tutorial using CLI is available on the GitHub repo.
See example code here.
Horovod is a traditional Russian folk dance. It's also the name of a distributed training library released by Uber just over a year ago.
In their origin release post, Uber notes two main issues with distributed TensorFlow. First, the learning curve is pretty steep. Although it has gotten easier with latest versions, especially using the Estimator class, it's still not easy. Trying to use multiple GPUs in a single computer? You need to use towers; what does that even mean? Second, it becomes inefficient at large scale. Uber showed that the native distributed TensorFlow only uses about half of the GPU resources, even after a lot of optimizations. Maybe it's network bottleneck, or saturated network interconnects, or who knows what...
Uber decided to take it upon themselves to do this right. It needed to be simple to implement and efficient. This was birth of Horovod. It's extemely easy to implement as you'll see soon. It's also very efficient. Mainly, Horovod uses MPI, ring allreduce, and tensor fusion. It's shown to reach GPU efficiency above 90% in some cases!
Okay, enough talk. Let's implement this. As I mentioned above, Horovod is very easy to implement. I'll show an example for Tensorflow here, but you can easily do the same with Keras or PyTorch. You need to add 3 things to your code:
- Initialize Horovod:
- Wrap your optimizer:
optimizer = hvd.DistributedOptimizer(optimzer)
- Add broacast hook:
If you're using multiple GPUs in each computer, you'll also need to add
config.gpu_options.visible_device_list = str(hvd.local_rank()) to your
tf.ConfigProto(). Here is what it would look like put together:
To see an entire working code, see here.
In order to run this on Clusterone, follow the same instructions as XGBoost above with the below
ENVIRONMENT tab information.
mpirun --allow-run-as-root -np 3 --hostfile /kube-openmpi/generated/hostfile -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib python openmpi/horovod/main.py
One important difference here is that you need to run the MPI command yourself. Fortunately, you can find the command you need to run on the Horovod repo. The repo also has good explanations of the
Because we're running inside of a Docker image as root, you need to pass the param
--allow-run-as-root. Here, if you want to add more workers, change
-np 3 to the match the number of workers you set on the
RESOURCES page (+1 for master node).
Another important thing about this command:
--hostfile /kube-openmpi/generated/hostfile. The master process needs to know about the other servers in the cluster and how many processes to start in each of them. This information is usually written out in a
hostfile. In the simplest sense, it's just a list of addresses so that the each node knows where all the other nodes are. For this purpose,
/kube-openmpi/generated/hostfile is generated at the startup of all OpenMPI jobs on Clusterone. In case you forget where that hostfile is, it is stored in environment variable
HOSTFILE. So your command could've also been:
mpirun --allow-run-as-root -np 3 --hostfile $HOSTFILE -bind-to none -map-by slot -mca pml ob1 -mca btl ^openib python openmpi/horovod/main.py
Soon, you should see something like this in your
Note: Instructions on how to run this tutorial using CLI is available on the GitHub repo.
I covered a lot of material in this tutorial. Here are some bullet-points that may be important:
- What is MPI?
- MPI defines how different processes communicate to each other
- OpenMPI is a popular implementation of MPI
- It is one of the most popular form of communication protocol in distributed machine learning
- How to run distributed XGBoost
- XGBoost is one of the most popular machine learning tools
- Distributed XGBoost is easy to run with the help of DMLC-Core
- Submit jobs through
dmlc-submitfunction, located at
/dmlc-core/tracker/dmlc-submiton Clusterone jobs
- You must provide data in LibSVM format
- Official DMLC tutorial and code
- How to run distributed Horovod
- Horovod is simple to implement and achieves high GPU scaling efficiency
- Supports TensorFlow, Keras, and PyTorch
- Find the command you need to run here
- Official repo has good examples
- General notes about running MPI jobs on Clusterone
- You need to add
- You need to pass list of hostfiles, located at
/kube-openmpi/generated/hostfile(this path is saved as environment variable
- Configuration for master node will be taken from parameter server (there is no parameter server in MPI jobs, it's a naming convention from TensorFlow). However, number of replicas will be ignored, as there can only be one master node. Usually, you want the same Docker image and the the instance type for both the parameter server (master) and the workers.
- You should be able to run any MPI jobs in similar manner
We, at Clusterone, are very excited to support OpenMPI, as this opens doors to many distributed machine learning frameworks. If you have questions about this tutorial, about Clusterone platform, or anything related, please feel free to ask us on Slack!
Lastly, Clusterone recently announced public beta version of its SaaS platform. You can sign up and start using Clusterone now: