Distributed TensorFlow is a powerful tool to speed up training for neural networks. But using it isn't always a straight-forward process. Here's a step by step guide how to use distributed TensorFlow.

When I first got interested in the topic, I was surprised by the scarcity of good resources available. There is TensorFlow documentation and Tutorials, but it is surprisingly thin in regards to the complexity of the system, and I often found myself reading the TensorFlow implementation itself.

As much as I recommend it as an exercise, it is not the fastest way to find out how to build your first distributed TensorFlow script. And sometimes it feels like, well, banging your head against the wall.

So I decided to write about it. I will explain how to run Distributed TensorFlow in the abstract, keeping in mind that managing a cluster of machines is a project in itself. As it happens, Clusterone does that for you, so I will have a specific session about how to run distributed TensorFlow on Clusterone. Finally, scaling to dozens of machines requires some tweaking, so I will describe some experimentation I did.

Strategies for Distributing Deep Learning

Model Parallelism

When the model is too big to fit into memory on one machine, one can assign different parts of the graph to different machines. The parameters will live on that machine, and their training and update operations will happen there.

A basic way to do it is to have the first layers on a machine, the next layers on another machine, etc. However this is not the optimal thing to do as the deeper layers have to wait for the first layers during the forward pass, and the first layers need to wait for the deeper layers during the backprop. When the model have operations that happen in parallel (e.g. GoogLeNet), they can happen on different machines without coming across such a bottleneck.


Data Parallelism

In that approach, the entire graph will live on one (potentially replicated) machine called the parameter server or ps. What I mean by “potentially replicated” is that to handle lots of I/O, the same entire graph can live on several parameter servers that stay in sync.

Training operations will be executed on multiple machines called workers. Each worker will be reading different data batches, computing gradients, and sending update ops to the parameter servers.


Two main options are possible for data parallelism:

  • Synchronous training: all the workers will read the parameters at the same time, compute a training operation and wait for all the others to be done. Then the gradients will be averaged and a single update will be sent to the parameter server. So at any point in time, the workers will all be aware of the same values for the graph parameters
  • Asynchronous training: the workers will read from the parameter server(s) asynchronously, compute their training operation, and send asynchronous updates. At any point in time, two different workers might be aware of different values for the graph parameters

This article is meant to be accessible so I will not go too far into the subtleties of Synchronous vs Asynchronous training, graph initialization, etc, but there are three great articles that do that very well — including the obvious TensorFlow help page (see Resources section).

In this article, I will focus on how to implement asynchronous training with data parallelism.

Build your Code for Data Parallelism

As explained earlier, in that configuration, the system has three kinds of nodes:

  • One or more parameter server(s) host the models.
  • A master worker coordinates the training operations, and takes care of initializing the model, counting the number of executed training steps, saving and restoring model checkpoints and saving summaries for TensorBoard. the master also takes care of fault-tolerance (if one ps or a worker crashes).
  • workers (including the master worker) handle compute training steps and send updates to the parameter servers

So that means the smallest cluster you can deploy here is one master worker and one ps. It can scale up to one master worker, many ps, and many workers.

The reason you might want to have more than one parameter server is to handle a large volume of I/O from the workers. If there are only 2 workers, chances are one ps can handle all the reads and updates requests. But if you have 10 workers and your model is reasonably large, one ps may not be enough.

One of the potentially confusing things with Distributed TensorFlow is that very often, the same code will be sent to all nodes. So your main.py or train.py or whatever you choose to call your script will be sent to the workers and the ps. Environment variables are then used to execute a certain code block on the master node, another on the workers, another on the ps, etc .

It might be obvious, but I have seen people confused by that approach so I wanted to clear that out of the way from the beginning.

Prepping up your code for Distributed TensorFlow requires three steps:

  1. Define the tf.trainClusterSpec and tf.train.Server
  2. Assign your model to the ps and workers
  3. Configure and launch a tf.train.MonitoredTrainingSession

1. Define the tf.train.ClusterSpec and tf.train.Server

tf.train.ClusterSpec and tf.train.Server

The tf.train.ClusterSpec object essentially maps tasks to machines.

It is then passed to create a tf.train.Server, that creates (at least) one server per machine, and makes sure every machine in the cluster is aware of what the others are doing. It contains a set of devices (available devices on that particular machine) as well as a tf.Session object, that will be called by the tf.train.MonitoredTrainingSession (see later) to run the graph.

There is usually one task per machine, unless you have multi-GPU machines. In that case, you may want to assign one task per GPU.

From TensorFlow’s tutorial:

A tf.train.ClusterSpec represents the set of processes that participate in a distributed TensorFlow computation. Every tf.train.Server is constructed in a particular cluster.

A tf.train.Server instance encapsulates a set of devices and a tf.Session target that can participate in distributed training. A server belongs to a cluster (specified by a tf.train.ClusterSpec), and corresponds to a particular task in a named job. The server can communicate with any other server in the same cluster.

2. Assign model variables and ops to a worker

Using the with tf.device command, you can now assign nodes (either ops or variables) to a specific task of a specific job. For example:

Nodes left out of this block are automatically assigned to a device by TensorFlow.

In the data parallelism framework, the nodes will be assigned to ps and the operations to workers. Doing this manually would not be scalable at all (imagine you have 10 ps, you don’t want to have to assign variables manually to each one of them). TensorFlow provides a convenient tf.train.replica_device_setter that automatically takes care of assigning operations to devices.

It takes into input a tf.train.ClusterSpec object and returns a function to be passed to tf.device.

Several options are possible and I expect a lot more of TensorFlow functions to be available to help with device setting in the future. In our ps-worker approach , variable operations will be placed on ps and training operations will be placed on workers.

All the graph can be defined in the the code block above becomes:

3. Configure and launch a tf.train.MonitoredTrainingSession


tf.train.MonitoredTrainingSession is essentially the equivalent of tf.Session for distributed training. It takes care of setting up a master worker node, that will handle:


tf.train.MonitoredTrainingSession takes a number of arguments, including the master node, where to save checkpoints and checkpointing and TensorBoard export frequencies.

For is_chief, you need to define somewhere in your code which node is the master. You can get that from your cluster deployment system for example.

Setting up the number of training steps

I am guessing you were used to run some kind of while loop inside the with tf.Session block, and running one or several sess.run instructions at each iteration.

That is not the way a MonitoredTrainingSession works: all the instance sneed to terminate properly and sync and a checkpoint needs to be saved. For that reason, the number of training steps (potentially undefined) is directly passed to the MonitoredTrainingSession via a list SessionRunHook objects.

You pass a specific hook called tf.train.StopAtStepHook to the MonitoredTrainingSession object. That hook defines the last step of the training, after which the ps and workers will be shut down.

Note: There are several other kinds of hooks, and you can define your own using the tf.train.SessionRunHook class. I will not deal with that here.

The code looks as follow:

Very loosely speaking, here is how it works:


Data Parallelism on Clusterone

Now that we understand the components of Distributed TensorFlow code, I will provide some higher-level code snippets and the setup to run on Clusterone. That will speed up the code setup, especially on the cluster spec side. We also provide some high level wrappers so that you can run your code locally and remotely without changing it.

The code on Clusterone needs to be structured as follows:

Open Question - Choose the ps / worker ratio

The speed up is massive! But my code is not half as optimized as it could be, so I would expect to go even further with queuing for example.

An open question though is how many ps to have vs. the workers. That depends on the worker GPU and memory and network speed, so some benchmarking helps. Running Clusterone on AWS, with p2.xlarge (1 GPU, 4 vCPU, 61GiB of RAM) as workers and c4.2xlarge (8 vCPus, 15GiB of memory) as ps, I get the following benchmark.


Adding parameter servers seems to improve performances, and they are less expensive than GPU machines. I haven’t tested the 1:1 worker:ps ratio. Note that the speed up for 64 GPUs vs 1 GPU is more than linear here (70x)

What’s next

  • Playing with the ps / worker ratio
  • Higher level code: the Experiment class. Although several functions in that class are still experimental and might be removed, it solves painful problems, such as running validation steps once in a while for distributed training. - Check out our follow-up tutorial using Estimator class.
  • Model parallelism
  • Synchronous data parallelism
  • Distributed Keras




On top of that, the infrastructure itself is hard to come by. Yahoo! has an open source repo on running Distributed TensorFlow on Spark, and I found out a couple of other repos. But that’s if you want to set up and manage your own cluster, which I don’t expect anybody with a non pathologic use-case to want to do.

The other obvious option is Google ML Engine, but I think it is overpriced and lacks transparency in terms of billing (I had a fun conversation with a Google support guy trying to understand what kind of hardware I would get for an ML Engine credit. I still don’t have a clear answer except a link to the pricing page I had originally landed on — and subsequently gotten confused by). And its handling of TensorBoard is far from perfect for comparing multiple jobs.

I am obviously biased but I think Clusterone solves those kinds of problem (well not that biased, I chose that job because the product solved a pain point I couldn’t find a solution too in the first place). The codebase and data handling is streamlined and rigorous, absolutely all infrastructure management is automated, and the prices are the lowest in the market (to my knowledge, $0.414 / hour of p2.xlarge GPU machine is the lowest out there). And, yes, there’s that nicely integrated TensorBoard.

. . .

You can sign up and start using Clusterone now: