Distributed Training

using Apache MXNet

In deep learning, distributed training is a method to train deep neural networks using multiple machines connected in a network. In this tutorial, we will learn how to do distributed training using Apache MXNet.

We will use MXNet's imperative interface (Gluon) which is more intuitive than the symbolic interface.

We will use data parallelism to distribute the training. Data parallelism is the most common way to do distributed training. However, if you have a model that is too big to fit in GPU memory, you might want to use model parallelism instead. Please check this example.

Let's start with single GPU:

We will take a simple example (shown below) that trains a LeNet network using MNIST data and extend it to do distributed training.

Distributed training architecture

Running distributed training using Apache MXNet involves running three different types of processes - worker, parameter server and scheduler. Consider the example of running distributed training using two machines each having four GPUs:

Worker:

Worker is responsible for fetching a batch of data, running forward and backward pass, computing the parameter updates and sending those updates to the parameter server. Each worker does not train using the entire dataset. If there are N workers, the dataset is partitioned into N parts and each worker trains using one of those parts.

One worker is created for each machine in the cluster.

Parameter Server:

Parameter server is a key-value store that stores the parameters during training. Parameters in the network are stored distributed across multiple parameter servers. No one parameter server has all parameters.

One parameter server is created for each machine in the cluster.

Scheduler:

Scheduler is responsible for scheduling the workers and parameter servers. There is only one scheduler in the entire cluster.

Moving to distributed training

Here is the distributed version of the MNIST sample shown above. In this section we will examine sections of the code to understand how to do distributed training using MXNet.

Step 1: Create the distributed key-value store:

MXNet uses distributed key-value store to store the parameters (weights, biases, etc) of the neural network being trained. Creating a new distributed kv store is just one line of code:

store = mxnet.kv.create('dist')

Step 2: Configure the trainer to use the distributed key-value store:

It is the job of the trainer to take the gradients computed in the backward pass and update the parameters of the model. We'll tell the trainer to store and update the parameters in the distributed kv store we just created instead of doing it in GPU of CPU memory.

trainer = gluon.Trainer(net.collect_params(),
                        'sgd', {'learning_rate': .1},
                        kvstore=store)

Step 3: Split the training data:

In distributed training using data parallelism, training data is split into equal parts across all workers and each worker uses its subset of the training data for training. For example, if we had two machines, each running a worker, each worker managing four GPU's we'll split the data like shown below. Note that we don't split the data depending on the number of GPUs but split it depending on the number of workers.

Each worker can find out the total number of workers in the cluster and its own rank which is an integer between 0 and N-1 where N is the number of workers.

store = kv.create('dist')
print("Total number of workers: %d" % store.num_workers)
print("This worker's rank: %d" % store.rank)

Knowing the number of workers and a particular worker's rank, it is easy to split the dataset into partitions and pick one partition to train depending on the rank of the worker. You can provide a sampler as an argument to DataLoader. Sampler can pick data from the dataset however it chooses too. Here is a sampler that will split the data into partitions and pick a partition depending on the rank of the worker:

We can then create a DataLoader using the SplitSampler like shown below:

# Load the training data
train_data = gluon.data.DataLoader(
               gluon.data.vision.MNIST(train=True, transform=transform),
               batch_size, 
               sampler=SplitSampler(60000, store.num_workers, store.rank))

Step 4: Training with multiple GPUs

Note that we didn't split the dataset by the number of GPUs. We split it by the number of workers which usually translates to number of machines. It is the worker's responsibility to split the partition it has across multiple GPUs it might have and run the training in parallel across multiple GPUs.

First we need to specify the list of GPUs we want to use for training:

ctx = [mx.gpu(i) for i in range(gpus_per_machine)]

Here is the snippet that trains an epoch.

for batch in train_data:
    # Train the batch using multiple GPUs
    train_batch(batch, ctx, net, trainer)

Like you can see all distributed training happens inside 'train_batch' which is shown below:

Here is the code that runs the forward (computing loss) and backward (computing gradients) pass on multiple GPUs:

Final Step: Launching the distributed training

Note that there are several processes that needs to be launched on multiple machines to do distributed training. One worker and one parameter server needs to be launched on each of the machine. Scheduler needs to be launched on one of the machines. While this can be done manually, MXNet provides the launch.py tool to do this easily.

For example, the following command launches distributed training on two machines:

python ~/mxnet/tools/launch.py -n 2 -s 2 -H hosts \
    --sync-dst-dir /home/ubuntu/dist \
    --launcher ssh \
    "python /home/ubuntu/dist/dist.py"
  • -n 2 specifies the number of workers that must be launched
  • -s 2 specifies the number of parameter servers that must be launched.
  • --sync-dst-dir specifies a destination location where the contents of the current directory with be rsync'd
  • --launcher ssh tells launch.py to use ssh to login to each machine in the cluster and launch processes.
  • "python /home/ubuntu/dist/dist.py" is the command that will get executed in each of the launched processes.
  • Finally, -H hosts specifies the list of hosts in the cluster to be used for distributed training.

Let's take a look at the hosts file.

~/dist$ cat hosts 
d1
d2

'd1' and 'd2' are the hostnames of the machines I want to use for distributed training. 'launch.py' should be able to ssh into these machines by providing just the hostname on the command line. For example:

~/dist$ ssh d1
Welcome to Ubuntu 16.04.3 LTS (GNU/Linux 4.4.0-1049-aws x86_64)

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

  Get cloud support with Ubuntu Advantage Cloud Guest:
    http://www.ubuntu.com/business/services/cloud

0 packages can be updated.
0 updates are security updates.


Last login: Wed Jan 31 18:06:45 2018 from 72.21.198.67

Note that I did not have to provide any kind of authentication to login to the machine. This can be done through multiple methods. One easy way is to specify the ssh certificates in ~/.ssh/config. Example:

~$ cat ~/.ssh/config 
Host d1
    HostName ec2-34-201-108-233.compute-1.amazonaws.com
    port 22
    user ubuntu
    IdentityFile /home/ubuntu/test.pem
    IdentitiesOnly yes

Host d2
    HostName ec2-34-238-232-97.compute-1.amazonaws.com
    port 22
    user ubuntu
    IdentityFile /home/ubuntu/test.pem
    IdentitiesOnly yes

A better way is to use ssh agent forwarding. Check this article for more details.