Horovod is a distributed training framework for TensorFlow. The goal of Horovod is to make distributed Deep Learning fast and easy to use.
The primary motivation for this project is to make it easy to take a single-GPU TensorFlow program and successfully train it on many GPUs faster. This has two aspects:
- How much modifications does one have to make to a program to make it distributed, and how easy is it to run it.
- How much faster would it run in distributed mode?
Internally at Uber we found the MPI model to be much more straightforward and require far less code changes than the Distributed TensorFlow with parameter servers. See the Usage section for more details.
In addition to being easy to use, Horovod is fast. Below is a chart representing the benchmark that was done on 32 servers with 4 Pascal GPUs each connected by RoCE-capable 25 Gbit/s network:
Horovod achieves 90% scaling efficiency for both Inception V3 and ResNet-101, and 79% scaling efficiency for VGG-16. See the Benchmarks page to find out how to reproduce these numbers.
While installing MPI and NCCL itself may seem like an extra hassle, it only needs to be done once by the team dealing with infrastructure, while everyone else in the company who builds the models can enjoy the simplicity of training them at scale.
To install Horovod:
- Install Open MPI or another MPI implementation.
Steps to install Open MPI are listed here.
- Install the
horovod
pip package.
$ pip install horovod
This basic installation is good for laptops and for getting to know Horovod. If you're installing Horovod on a server with GPUs, read the Horovod on GPU page. If you want to use Docker, read the Horovod in Docker page.
Horovod core principles are based on MPI concepts such as size, rank, local rank, allreduce, allgather and broadcast. See here for more details.
To use Horovod, make the following additions to your program:
-
Run
hvd.init()
. -
Pin a server GPU to be used by this process using
config.gpu_options.visible_device_list
. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth. -
Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
-
Wrap optimizer in
hvd.DistributedOptimizer
. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients. -
Add
hvd.BroadcastGlobalVariablesHook(0)
to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you're not usingMonitoredTrainingSession
, you can simply execute thehvd.broadcast_global_variables
op after global variables have been initialized. -
Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing
checkpoint_dir=None
totf.train.MonitoredTrainingSession
ifhvd.rank() != 0
.
Example (see the examples directory for full training examples):
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
# Make training operation
train_op = opt.minimize(loss)
# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
config=config,
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Perform synchronous training.
mon_sess.run(train_op)
The example commands below show how to run distributed training. See the Running Horovod page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs. See the Horovod in Docker page for details about running Horovod in Docker.
- To run on a machine with 4 GPUs:
$ mpirun -np 4 \
-H localhost:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
- To run on 4 machines with 4 GPUs each:
$ mpirun -np 16 \
-H server1:4,server2:4,server3:4,server4:4 \
-bind-to none -map-by slot \
-x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH \
-mca pml ob1 -mca btl ^openib \
python train.py
Horovod supports Keras and regular TensorFlow in similar ways.
See full training simple and advanced examples.
Note: Keras 2.0.9 has a known issue that makes each worker allocate all GPUs on the server, instead of the GPU assigned by the local rank. If you have multiple GPUs per server, upgrade to Keras 2.1.2, or downgrade to Keras 2.0.8.
Horovod supports Estimator API and regular TensorFlow in similar ways.
See a full training example.
Horovod supports mixing and matching Horovod collectives with other MPI libraries, such as mpi4py, provided that the MPI was built with multi-threading support.
You can check for MPI multi-threading support by querying the hvd.mpi_threads_supported()
function.
Note: Make sure that MPI library will NOT re-initialize MPI. For example:
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Verify that MPI multi-threading is supported.
assert hvd.mpi_threads_supported()
# Make sure MPI is not re-initialized.
import mpi4py.rc
mpi4py.rc.initialize = False
from mpi4py import MPI
assert hvd.size() == MPI.COMM_WORLD.Get_size()
Learn how to optimize your model for inference and remove Horovod operations from the graph here.
One of the unique things about Horovod is its ability to interleave communication and computation coupled with the ability to batch small allreduce operations, which results in improved performance. We call this batching feature Tensor Fusion.
See here for full details and tweaking instructions.
Horovod has the ability to record the timeline of its activity, called Horovod Timeline.
See here for full details and usage instructions.
- Run distributed training in Microsoft Azure using Batch AI and Horovod.
See the Troubleshooting page and please submit the ticket if you can't find an answer.
Please cite Horovod in your publications if it helps your research:
@article{sergeev2018horovod,
Author = {Alexander Sergeev and Mike Del Balso},
Journal = {arXiv preprint arXiv:1802.05799},
Title = {Horovod: fast and easy distributed deep learning in {TensorFlow}},
Year = {2018}
}
- Sergeev, A., Del Balso, M. (2017) Meet Horovod: Uber’s Open Source Distributed Deep Learning Framework for TensorFlow. Retrieved from https://eng.uber.com/horovod/
- Sergeev, A. (2017) Horovod - Distributed TensorFlow Made Easy. Retrieved from https://www.slideshare.net/AlexanderSergeev4/horovod-distributed-tensorflow-made-easy
- Sergeev, A., Del Balso, M. (2018) Horovod: fast and easy distributed deep learning in TensorFlow. arXiv:1802.05799
The Horovod source code was based off the Baidu tensorflow-allreduce repository written by Andrew Gibiansky and Joel Hestness. Their original work is described in the article Bringing HPC Techniques to Deep Learning.