Distributed TensorFlow is a powerful tool to speed up training for neural networks. But using it isn't always a straight-forward process. Here's a step by step guide how to use distributed TensorFlow.
When I first got interested in the topic, I was surprised by the scarcity of good resources available. There is TensorFlow documentation and Tutorials, but it is surprisingly thin in regards to the complexity of the system, and I often found myself reading the TensorFlow implementation itself.
As much as I recommend it as an exercise, it is not the fastest way to find out how to build your first distributed TensorFlow script. And sometimes it feels like, well, banging your head against the wall.
So I decided to write about it. I will explain how to run Distributed TensorFlow in the abstract, keeping in mind that managing a cluster of machines is a project in itself. As it happens, Clusterone does that for you, so I will have a specific session about how to run distributed TensorFlow on Clusterone. Finally, scaling to dozens of machines requires some tweaking, so I will describe some experimentation I did.
Strategies for Distributing Deep Learning
When the model is too big to fit into memory on one machine, one can assign different parts of the graph to different machines. The parameters will live on that machine, and their training and update operations will happen there.
A basic way to do it is to have the first layers on a machine, the next layers on another machine, etc. However this is not the optimal thing to do as the deeper layers have to wait for the first layers during the forward pass, and the first layers need to wait for the deeper layers during the backprop. When the model have operations that happen in parallel (e.g. GoogLeNet), they can happen on different machines without coming across such a bottleneck.
In that approach, the entire graph will live on one (potentially replicated) machine called the
parameter server or
ps. What I mean by “potentially replicated” is that to handle lots of I/O, the same entire graph can live on several parameter servers that stay in sync.
Training operations will be executed on multiple machines called
workers. Each worker will be reading different data batches, computing gradients, and sending update ops to the parameter servers.
Two main options are possible for data parallelism:
- Synchronous training: all the workers will read the parameters at the same time, compute a training operation and wait for all the others to be done. Then the gradients will be averaged and a single update will be sent to the
parameter server. So at any point in time, the workers will all be aware of the same values for the graph parameters
- Asynchronous training: the
workerswill read from the
parameter server(s)asynchronously, compute their training operation, and send asynchronous updates. At any point in time, two different workers might be aware of different values for the graph parameters
This article is meant to be accessible so I will not go too far into the subtleties of Synchronous vs Asynchronous training, graph initialization, etc, but there are three great articles that do that very well — including the obvious TensorFlow help page (see Resources section).
In this article, I will focus on how to implement asynchronous training with data parallelism.
Build your Code for Data Parallelism
As explained earlier, in that configuration, the system has three kinds of nodes:
- One or more parameter server(s) host the models.
- A master worker coordinates the training operations, and takes care of initializing the model, counting the number of executed training steps, saving and restoring model checkpoints and saving summaries for TensorBoard. the master also takes care of fault-tolerance (if one
master worker) handle compute training steps and send updates to the
So that means the smallest cluster you can deploy here is one
master worker and one
ps. It can scale up to one
master worker, many
ps, and many
The reason you might want to have more than one parameter server is to handle a large volume of I/O from the workers. If there are only 2
workers, chances are one
ps can handle all the reads and updates requests. But if you have 10
workers and your model is reasonably large, one
ps may not be enough.
One of the potentially confusing things with Distributed TensorFlow is that very often, the same code will be sent to all nodes. So your
train.py or whatever you choose to call your script will be sent to the
workers and the
ps. Environment variables are then used to execute a certain code block on the
master node, another on the
workers, another on the
It might be obvious, but I have seen people confused by that approach so I wanted to clear that out of the way from the beginning.
Prepping up your code for Distributed TensorFlow requires three steps:
- Define the
- Assign your model to the
- Configure and launch a
1. Define the tf.train.ClusterSpec and tf.train.Server
tf.train.ClusterSpec object essentially maps tasks to machines.
It is then passed to create a
tf.train.Server, that creates (at least) one server per machine, and makes sure every machine in the cluster is aware of what the others are doing. It contains a set of devices (available devices on that particular machine) as well as a
tf.Session object, that will be called by the
tf.train.MonitoredTrainingSession (see later) to run the graph.
There is usually one task per machine, unless you have multi-GPU machines. In that case, you may want to assign one task per GPU.
From TensorFlow’s tutorial:
tf.train.ClusterSpec represents the set of processes that participate in a distributed TensorFlow computation. Every
tf.train.Server is constructed in a particular cluster.
tf.train.Server instance encapsulates a set of devices and a
tf.Session target that can participate in distributed training. A server belongs to a cluster (specified by a
tf.train.ClusterSpec), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster.
2. Assign model variables and ops to a worker
Using the with
tf.device command, you can now assign nodes (either ops or variables) to a specific task of a specific job. For example:
Nodes left out of this block are automatically assigned to a device by TensorFlow.
In the data parallelism framework, the nodes will be assigned to
ps and the operations to
workers. Doing this manually would not be scalable at all (imagine you have 10
ps, you don’t want to have to assign variables manually to each one of them). TensorFlow provides a convenient
tf.train.replica_device_setter that automatically takes care of assigning operations to devices.
It takes into input a
tf.train.ClusterSpec object and returns a function to be passed to
Several options are possible and I expect a lot more of TensorFlow functions to be available to help with device setting in the future. In our ps-worker approach , variable operations will be placed on
ps and training operations will be placed on
All the graph can be defined in the the code block above becomes:
3. Configure and launch a tf.train.MonitoredTrainingSession
tf.train.MonitoredTrainingSession is essentially the equivalent of
tf.Session for distributed training. It takes care of setting up a master worker node, that will handle:
- Initializing the graph
- Model checkpointing (the best brief explanation about checkpointing I have found, by Derek Murray)
- Exporting TensorBoard summaries
- Starting / stopping the session
tf.train.MonitoredTrainingSession takes a number of arguments, including the master node, where to save checkpoints and checkpointing and TensorBoard export frequencies.
is_chief, you need to define somewhere in your code which node is the master. You can get that from your cluster deployment system for example.
Setting up the number of training steps
I am guessing you were used to run some kind of
while loop inside the
with tf.Session block, and running one or several
sess.run instructions at each iteration.
That is not the way a
MonitoredTrainingSession works: all the instance sneed to terminate properly and sync and a checkpoint needs to be saved. For that reason, the number of training steps (potentially undefined) is directly passed to the
MonitoredTrainingSession via a list
You pass a specific hook called
tf.train.StopAtStepHook to the
MonitoredTrainingSession object. That hook defines the last step of the training, after which the
workers will be shut down.
Note: There are several other kinds of hooks, and you can define your own using the
tf.train.SessionRunHook class. I will not deal with that here.
The code looks as follow:
Very loosely speaking, here is how it works:
Data Parallelism on Clusterone
Now that we understand the components of Distributed TensorFlow code, I will provide some higher-level code snippets and the setup to run on Clusterone. That will speed up the code setup, especially on the cluster spec side. We also provide some high level wrappers so that you can run your code locally and remotely without changing it.
The code on Clusterone needs to be structured as follows:
Open Question - Choose the ps / worker ratio
The speed up is massive! But my code is not half as optimized as it could be, so I would expect to go even further with queuing for example.
An open question though is how many ps to have vs. the workers. That depends on the worker GPU and memory and network speed, so some benchmarking helps. Running Clusterone on AWS, with
p2.xlarge (1 GPU, 4 vCPU, 61GiB of RAM) as workers and
c4.2xlarge (8 vCPus, 15GiB of memory) as ps, I get the following benchmark.
parameter servers seems to improve performances, and they are less expensive than GPU machines. I haven’t tested the 1:1
worker:ps ratio. Note that the speed up for 64 GPUs vs 1 GPU is more than linear here (70x)
- Playing with the ps / worker ratio
- Higher level code: the
Experimentclass. Although several functions in that class are still experimental and might be removed, it solves painful problems, such as running validation steps once in a while for distributed training. - Check out our follow-up tutorial using Estimator class.
- Model parallelism
- Synchronous data parallelism
- Distributed Keras
On top of that, the infrastructure itself is hard to come by. Yahoo! has an open source repo on running Distributed TensorFlow on Spark, and I found out a couple of other repos. But that’s if you want to set up and manage your own cluster, which I don’t expect anybody with a non pathologic use-case to want to do.
The other obvious option is Google ML Engine, but I think it is overpriced and lacks transparency in terms of billing (I had a fun conversation with a Google support guy trying to understand what kind of hardware I would get for an ML Engine credit. I still don’t have a clear answer except a link to the pricing page I had originally landed on — and subsequently gotten confused by). And its handling of TensorBoard is far from perfect for comparing multiple jobs.
I am obviously biased but I think Clusterone solves those kinds of problem (well not that biased, I chose that job because the product solved a pain point I couldn’t find a solution too in the first place). The codebase and data handling is streamlined and rigorous, absolutely all infrastructure management is automated, and the prices are the lowest in the market (to my knowledge, $0.414 / hour of p2.xlarge GPU machine is the lowest out there). And, yes, there’s that nicely integrated TensorBoard.
You can sign up and start using Clusterone now: