Training deep learning models faster using PyTorch, Slurm and AWS
Deep neural networks have become increasingly complex and large in recent years, and as a result, training these models can be very time-consuming and computationally expensive. This is where parallelism comes in. Parallelism involves breaking down the computation into smaller parts that can be executed simultaneously on different computing resources.
There are several types of parallelism that can be used for deep learning training. The two most common are data parallelism and model paralellism.
Data parallelism involves replicating the model across multiple devices (usually multiple GPUs) and then dividing the training data into subsets, with each device processing a different subset of the data. During the forward and backward passes, each device calculates the gradients independently and then communicates the gradients to the other devices. The gradients are then averaged, and the model is updated accordingly. This method is useful to speed up traning, as we can process more data in parallel.
Model parallelism involves dividing the model across multiple devices, with each device responsible for computing a different part of the model. This method is useful when the model architecture is too large to fit onto a single device.
In this tutorial we will use data parallelism.
When training deep learning models on multiple devices, such as multiple GPUs or multiple machines, it is important to understand some key concepts that define the distributed training environment.
- Node: A node refers to a single machine in the distributed training environment and it can have multiple GPUs. When you are using multiple machines for training you call it a multi-node training.
- Master node: The master node is the node that coordinates the distributed training process. It is responsible for initializing the parameters of the model, distributing the data and the model across all nodes, and collecting and aggregating gradients from all nodes.
- Worker node: A worker node is any node in the distributed training environment that is not the master node. Worker nodes are responsible for computing gradients on a subset of the data and sending the gradients to the master node for aggregation and updating the model parameters.
When training a deep learning model using a distributed environment, the training script will create multiple processes. For example, in a data parallel training with 2 nodes of 8 GPUs, considering one process per GPU, you would have 16 processes. Each process would have a replica of the model and would be running on a its respective GPU. To work together, each process needs to know the number of process running the same job (world size). Also, they have identifiers called local rank and rank. The local rank identifies the process order within the node it is running on and the rank identifies the process order among all process from all nodes. These concepts will be further explained in practice throughout this article.
Now that you know these basic concepts, let us put in practice. First, we will setup a machine on AWS.
- First you go to AWS website and create an account
- Then, after login, you search for EC2 and open it
3. In the EC2 panel you go to Instances →Launch instances
This will open a configuration page to launch a virtual machine instance.
In our example we will use the following image.
Also, we will set instance type as g4dn.metal which corresponds to a 8 GPUs machine, each with 16 GiB memory. You can view the instance types that are best suited for your project on https://aws.amazon.com/ec2/instance-types/.web
Select a SSH key (or create a new one). When you create a new key pair, this file will be downloaded. This SSH file will be used to connect to the instance.
Then we click on Launch instance.
After initialization process is done, you can select the instance and click in Connect.
Then you can follow this steps to connect to the instance in a terminal
chmod 400 keyfilename.pem
ssh -i key.pem ec2-user@ec2-12-345-678-912.us-east-2.compute.amazonaws.com
After that you are connected to the instance.
You can copy files using SCP command in another terminal as follow:
scp -i keyfilename.pem file_to_copy.txt ec2-user@ec2-12-345-678-912.us-east-2.compute.amazonaws.com:/home/ubuntu
Then you can install packages in the instance. In our example we used:
pip3 install torch torchvision torchaudio tqdm
We will start with a single GPU training as the basis code, which will be adapted to multi GPU data parallel training on the next section. In this script, we have a simple training running on the GPU 0.
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader
from tqdm.auto import tqdm # Define the hyperparameters and data loaders
batch_size = 64
learning_rate = 0.001
num_epochs = 10
train_transform = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
train_dataset = CIFAR10(root='./data', train=True, transform=train_transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
# Create the model
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', prertained=False)
# Change model head classifier to dataset num_classes
model.fc = nn.Linear(512, 10)
# Move the model to device
model = model.to(0)
# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Train the model
print('Training started')
progress_bar = tqdm(range(num_epochs))
for epoch in range(num_epochs):
running_loss = 0.0
for i, data in enumerate(train_loader):
inputs, labels = data
inputs, labels = inputs.to(0), labels.to(0)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
# Print statistics
progress_bar.update(1)
progress_bar.set_description(f"Epoch {epoch+1}/{num_epochs} train_loss: {running_loss / len(train_loader)}")
running_loss = 0.0
print("Saving model...")
torch.save(model.state_dict(), "model.pth")
print('nFinished training')
You can save this script to a train.py
file and run python train.py
Now that we have the base code for training, I will explain how adapt it to multi GPU data parallel training.
Distributed environment setup
First, we will add the following piece of code at begining of the traning script:
# Libraries used in the distributed training
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
import torch.distributed as dist
import os# Initializes the distributed backend which will take care of synchronizing nodes/GPUs
# only works with torch.distributed.launch // torch.run
# Set the URL for communication
dist_url = "env://" # default
# Retrieve world_size, rank and local_rank
world_size = int(os.environ['WORLD_SIZE'])
rank = int(os.environ["RANK"])
local_rank = int(os.environ['LOCAL_RANK'])
# Initialize the process group
dist.init_process_group(
backend="nccl",
init_method=dist_url,
world_size=world_size,
rank=rank)
# synchronizes all the threads to reach this point before moving on
dist.barrier()
At beginning, we set the URL for the communication between nodes/GPUs. The default value is “env://”, which works with torch.distributed.launch and torch.run.
The next few lines of code retrieve the rank, world_size, and local_rank of the current process from the environment variables. These values are used to specify the order of the current process within all the processes, the total number of processes, and the order of the current process within the node it is running on, respectively. In this case, with a single node with 8 GPUs, the world_size
will be 8, and the rank
and local_rank
values will range from 0 to 7 within their respective processes, which will be running on its respective GPU (e.g. process with rank 3 will be running on GPU 3).
The dist.init_process_group()
function initializes the process group with the specified backend, init_method, world_size, and rank. This function creates a communication group for all the processes, allowing them to communicate and synchronize with each other during training.
The dist.barrier()
function synchronizes all the processes to reach this point before moving on, ensuring that all processes are ready for training and preventing any processes from starting before others are ready.
Data sampling
In distributed training, it is important to ensure that each process sees a unique subset of the training data during each epoch to avoid duplicated work and to make sure the model is exposed to the entire dataset. This is why we need to use a sampler to shuffle and partition the training dataset across the different processes.
train_dataset = CIFAR10(root='./data', train=True, transform=train_transform, download=False)
train_sampler = DistributedSampler(dataset=train_dataset, shuffle=True, num_replicas=world_size, rank=rank)
train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
In this code snippet, we are creating a DistributedSampler
object with num_replicas
set to world_size
(which is the number of processes), and rank
set to the rank of the current process. The DistributedSampler
will divide the training dataset into num_replicas
chunks and each process will receive one of those chunks based on its rank
. By setting shuffle
to True
, the sampler will randomly shuffle the data before partitioning it, which further ensures that each process sees a unique subset of the data.
If we did not use that, each model replica would see the same sequence of the dataset on each process, which would lead to duplicated work and harm the model training.
Model
In this data parallel training, we will use DistributedDataParallel
, which is a PyTorch module that allows you to parallelize the training of your deep learning models across multiple GPUs.
# Create the model
model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=False)# Change model head classifier to dataset num_classes
model.fc = nn.Linear(512, 10)
# Move the model to device
model.to(local_rank)
model = DistributedDataParallel(model, device_ids=[local_rank])
This modification wraps the model with DistributedDataParallel
to enable distributed training. The device_ids
argument specifies the GPU that the model will be trained on. Thus, each model replica will run on a different process on its respective GPU, identified by local_rank.
In our single-node with 8 GPUs case, we have 8 model replicas, each running by 8 different processs and each process is running in a different GPU.
Training
for epoch in range(num_epochs):
train_loader.sampler.set_epoch(epoch)
running_loss = 0.0
for i, data in enumerate(train_loader):
inputs, labels = data
inputs, labels = inputs.to(local_rank), labels.to(local_rank)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
In distributed training, each process will have its own instance of the data loader, and the sampler will ensure that each process gets a different subset of the data to train on. By setting the epoch for the sampler, we ensure that each process gets a different set of data samples in each epoch, which helps to improve the diversity of the data seen by the model during training. We can do that by using train_loader.sampler.set_epoch(epoch)
on each epoch.
The other modification is that inputs
and labels
are now being moved to the GPU that is associated with the current process, which is identified by the local_rank
variable.
Saving data to disk
if local_rank == 0:
print("Saving model...")
torch.save(model.state_dict(), "model.pth")
print('nFinished training')
We use local_rank == 0
as a condition to ensure that only one process (specifically, the one with local rank 0) saves the final trained model and prints the message “Finished training”.
In a distributed training setting, all processes are training the model in parallel, and it would be redundant for each process to save the model or print the message. Therefore, we use the local_rank == 0
condition to ensure that only one process does these actions.
Running the script
In order to run the training, we will use torch.distributed.run
, which is a PyTorch utility that simplifies the process of running distributed training. It provides a simple API for launching a distributed training job and automatically handles tasks such as setting up the distributed environment, initializing processes, and aggregating results.
We will save the data parallel training script in a train.py
file and run on terminal with:
python -m torch.distributed.run --nnodes=1 --nproc-per-node=8 train.py
The --nnodes
argument specifies the number of nodes being used, while the --nproc-per-node
argument specifies the number of processes to be launched per node.
Results and discussions
As we are using data parallel training with multiple GPUs, the training is speed up, in comparison with the same settings running on a single GPU. As each GPU process batch_size=64
samples per step, by using 8 GPUs results in processing a total of 64*8=512
samples per step.
As we have learned, an approach to increase the computational power available to speed up training is by using multiple GPUs. However, using multiple GPUs in a single machine can sometimes be limited by hardware constraints. For example, most AWS instance types have a maximum of 8 GPUs available per machine.
To overcome these limitations, it is common to use multiple machines connected over a network to run distributed GPU training. This approach is known as distributed training and involves dividing the training workload across multiple machines, each with their own set of GPUs.
We will now learn how to implement this in pratice!
Setup
The first step is to create a security group so that all machines can communicate with each other. In order to do that you can go to Security Groups → Create security group.
During the setup of the security group, you will define a security group name:
Then, you need to add Inbound and Outbound rules defined as below, to allow traffic between the nodes:
After that, you need to setup the machines (nodes) you will use for training. In our example we will use two g4dn.metal, with 8 GPUs each, totalizing 16 GPUs. In order to do that, you simply launch two instances using the step-by-step that we learned earlier. During the setup of all these machines, you must select the security group you created in the previous step.
Then, you connect to the machines you have launched, copy the training files, install the packages and run torch.distributed.run
command in each of them, but with some modifications.
python -m torch.distributed.run
--nproc_per_node=8
--nnodes=2
--node_rank=$NODE_RANK
--master_addr=172.41.27.152
--master_port=12345
train.py
--nproc_per_node
: Specifies the number of GPUs to use in the node you are running. In our example, both 2 nodes run with 8 GPUs. You can set different values on each node individually.--nnodes
: Specifies the total number of nodes participating in the distributed training job. In--node_rank
: Specifies the rank of the current node, where you must set a unique value for each node, varying from 0 to--nnodes
.--master_addr
: Specifies the IP address of the machine that is running the master process. In order to set that, you need to choose one of the machines to be the master node. You can choose any instance to be the master, but it is usually a good idea to choose an instance with good network connectivity and sufficient resources to handle the coordination tasks. You can obtain the IP address of the master instance in the console page.--master_port
: Specifies the port number that the master process is listening on.train.py
: Specifies the training script to run.
Slurm
Now you already know how to run a multinode training. However, a manual approach in not really scalable. Imagine you want to use 20 nodes… You would need to setup manually all 20 machines and run the commands in all of them, one by one. That is where Slurm comes in.
Slurm is an open-source workload manager and job scheduler that helps manage the allocation of computing resources.
In our context, we use Slurm to automatically launch the machines and run the commands.
In order to do that, we first need to setup an AWS Cluster. We will use the tutorial available at https://github.com/pytorch/examples/blob/main/distributed/ddp-tutorial-series/slurm/setup_pcluster_slurm.md
0. Sign in to an AWS instance. You can use Cloud9 service in order to faciliate the process. Search for Cloud9 and then click on Create environment, and setup the new instance. After that, you select the instance you have launched and click on Open in Cloud9.
- Create a SSH key
aws ec2 create-key-pair --key-name my_key_pair --query KeyMaterial --output text > my_key_pair.txt
chmod 600 my_key_pair.txt
3. Install pcluster
pip install aws-parallelcluster --upgrade
4. Create a cluster config.yaml
file
There is a template config file at the tutorial page. In our example we configured as follow.
Region: us-east-2Image:
Os: ubuntu1804
SharedStorage:
- MountDir: /shared
Name: shared-fs
StorageType: FsxLustre
FsxLustreSettings:
StorageCapacity: 1200
DeploymentType: SCRATCH_1
StorageType: SSD
HeadNode:
InstanceType: m5.large
Networking:
SubnetId: subnet-0123d1a06567289b1
Ssh:
KeyName: my_key_pair
Scheduling:
Scheduler: slurm
SlurmQueues:
- Name: train
ComputeResources:
- Name: g4dnmetal
InstanceType: g4dn.metal
MinCount: 2
MaxCount: 2
Networking:
SubnetIds:
- subnet-0123d1a06567289b1
To set the SubnetIds
you can go to VPC console page at https://console.aws.amazon.com/vpc/ (or search for VPC) and get the Subnet ID for some available Subnet (or create a new if no one is available).
Also, you need to set the Region in the config.yaml file to match the location specified in the Availability Zone field of the Subnet you selected.
The instance type you set according to your project requirements. We set the head node as a simple m5.large
and the worker nodes as g4dn.metal
instances, which have 8 GPUs each.
The MaxCount
and MinCount
parameters you set as the number of nodes you want to use.
5. Create the cluster
pcluster create-cluster --cluster-name clusterexample --cluster-configuration config.yaml
5a. Track progress
pcluster list-clusters
Wait a few minutes until the cluster initialization is complete
6. Login to cluster master node
pcluster ssh --cluster-name clusterexample -i my_key_pair.txt
After connected to the master node, you need to install the packages you will need and copy the training files to the /shared/
folder. This way, all saved files and installed packages will be already shared with all worker nodes. You also can use the nano
command to create the files.
Then, instead of directly running torch.distributed.run
, you will use a Slurm script to do that for you. In order to do that, you need to create a slurm-example.sh
file and run it with sbatch slurm-example.sh
We configured the slurm-example.sh
file as follow:
#!/bin/bash#SBATCH --job-name=multinode-example
#SBATCH --nodes=2 # number of nodes
#SBATCH --ntasks-per-node=8 # number of processes per node
#SBATCH --gpus-per-task=1 # number of GPUs per process
#SBATCH --cpus-per-task=1 # number of CPUs per process
#SBATCH --gres=gpu:8 # number of GPUs per node (gres=gpu:N)
nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
echo Node IP: $head_node_ip
export LOGLEVEL=INFO
srun python3 -m torch.distributed.run
--nnodes 2
--nproc_per_node 8
--rdzv_id=$RANDOM
--rdzv_backend=c10d
--rdzv_endpoint=$head_node_ip:12345
/shared/train.py
The logs are saved in slurm-
in the current directory and you can use cat slurm-
in order to see the the logs.
IMPORTANT: Do not forget to always terminate instances after trainings, in order to avoid unnecessary AWS charges.
In this tutorial, we explored the world of distributed training and delved into various strategies for improving training efficiency and accelerating model convergence. From single GPU training, to multi GPU setups and even distributed training across multiple nodes using SLURM, we covered a wide range of techniques that can significantly enhance the training process.