Dağıtılmış, çok düğümlü ve çok GPU’lu eğitim
PyTorch, tek ve birden çok cihazda birden çok GPU kullanan eğitim modellerine olanak tanır. Bu örnekte, bir modelin birden fazla GPU’ya kopyalanması ve her bir GPU’nun modeli, eğitim örneklerinin bir kısmı üzerinde, eğitmesi işlemlerinin nasıl gerçekleştirildiğini keşfedeceğiz. Modelin gradyanları, katılan tüm GPU’lar arasında senkronize edilecek ve eğitimin geriye doğru geçişi sırasında ortalaması alınacaktır. Bu da modeli tüm GPU’larda aynı tutar. Bu örnekte, MNIST basamaklı veri kümesinde sınıflandırma yapmak için her bir düğümde üç GPU bulunan iki düğüm kullanacağız.
Yürütme modeli
Eğitimde yer alacak her düğümde, eğitim komut dosyasının bir örneğini yürüteceğiz. Her eğitim komut dosyasında, komut dosyasını çalıştıran düğümde eğitime katılacak her GPU için tek bir işlem çatallanacak. Örneğin, eğitimde iki düğüm yer alacaksa ve her düğümde üç GPU kullanılmışsa, her düğümde bir tane olmak üzere iki komut dosyası çalıştıracağız ve her komut dosyası üç eğitim sürecini çatallayacak ve her işlem bir tane GPU kullanacaktır. Toplamda, eğitimi gerçekleştiren altı süreç ve her düğümde iki ana süreç olacaktır. GPU’larda eğitimi çalıştıran altı süreç, MPI, gloo veya NCCL arka uçlarını çalıştıran aynı iletişim grubunun parçası olacaktır. Aşağıdaki şekil bu senaryoyu göstermektedir.

Sinir ağı modeli (Neural network model)
Derin Sinir Ağı örneğinde kullanılan sinir ağı modelini kullanacağız:
import torch
class DeepNeuralNetwork(torch.nn.Module):
def __init__(self, num_layers, input_features, num_hidden_features, num_classes):
super(DeepNeuralNetwork, self).__init__()
self.hidden_layers = torch.nn.ModuleList()
self.hidden_layers.append(torch.nn.Linear(input_features, num_hidden_features))
for _ in range(num_layers-2):
self.hidden_layers.append(torch.nn.Linear(num_hidden_features, num_hidden_features))
self.hidden_layers.append(torch.nn.Linear(num_hidden_features, num_classes))
self.relu_activation = torch.nn.ReLU()
def forward(self, samples):
out = samples
for layer in self.hidden_layers[:-1]:
out = layer(out)
out = self.relu_activation(out)
out = self.hidden_layers[-1](out)
return out
Üst süreç - ana komut dosyası
Her düğümde çalıştırılacak komut dosyası, eğitimde kullanılan veri kümesini düğümüne indirecek ve eğitime katılan düğüm sayısını belirlemek için bazı ortam değişkenlerini okuyacaktır. En önemlisi, eğitimi gerçekleştirecek alt süreçleri çatallayacak (fork) ve düğümündeki her GPU için bir işlem başlatacaktır.
Bu örnekte, eğitim işini yürütmek için SLURM’un kullanıldığını varsayıyoruz. Paralel bir işi yürütmek için
SLURM kullanıldığında, işin meta verilerinin bir kısmı ortam değişkenleri olarak saklanır.
"SLURM_NPROCS"
ortam değişkenini kullanarak eğitim işinde yer alan görevlerin sayısını okuyabiliriz.
Bu değer, bu iş adımını çalıştırırken kullanılan SLURM parametresi -n/--ntasks
değerine eşit olacaktır.
Ayrıca, "SLURM_PROCID"
ortam değişkenini kullanarak tüm katılan görevler arasındaki geçerli görevlerin
sayısını okuyacağız. Yukarıdaki değişkenlerin her ikisi de SLURM tarafından otomatik olarak ayarlanmaktadır.
Cihaz üzerindeki her bir GPU için daha sonra tanımlayacağımız train_process_on_gpu
fonksiyonunu çalıştırmak
için bir işlem çatallıyoruz. Bu işlem, tek bir GPU kullanarak eğitim yapacaktır. Bu aşamada çatallama için torch.multiprocessing.spawn
işlemi kullanılacaktır. Her ana işlem, GPU sayısı kadar çok train_process_on_gpu
işlemi çatallayacaktır.
if __name__=="__main__":
# "group_size", eğitime katılan SLURM görevlerinin sayısıdır. Bizim durumumuzda, aynı zamanda eğitime katılan düğüm sayısıdır.
group_size = int(os.environ["SLURM_NPROCS"])
# 'id_in_group', yürütülen tüm görevler arasındaki mevcut görevin kimliğidir. Yani düğüm numarasıdır.
id_in_group = int(os.environ["SLURM_PROCID"])
# Bu düğümde eğitim için kullanılabilen GPU sayısı
gpus_per_device = torch.cuda.device_count()
# Eğitime katılacak toplam GPU sayısı. İletişim grubunun parçası olacak süreçlerin sayısıdır.
world_size = gpus_per_device * group_size
print(f"Bu, eğitim için kullanılacak {id_in_group} düğümden {group_size} düğüm numarasıdır. Toplamda, eğitim için kullanılacak toplam {world_size} GPU vardır. ")
# torch.multiprocessing, yerel Python multiprocessing kitaplığı üzerindeki bir sarmalayıcıdır. PyTorch eğitiminde kullanılan süreçleri oluşturmak için kullanılır.
import torch.multiprocessing as mp
args = (world_size, id_in_group, gpus_per_device)
# Her train_process_on_gpu işlevi, eğitimi tek bir GPU üzerinde çalıştıran bir süreç olacaktır. Mevcut düğümde ne kadar GPU varsa o kadar süreç yaratıyoruz.
mp.spawn(train_process_on_gpu, args=args, nprocs=gpus_per_device, join=True)
Alt süreç - GPU’da eğitim fonksiyonu
Train fonksiyonu, eğitimin tamamından sorumludur ve her GPU için bir örnekle tüm düğümlerde yürütülecektir. İşlem, iletişim grubunu oluşturmalı, veri kümesini yüklemeli, veri örnekleyicileri oluşturmalı ve eğitim döngüsünü yürütmelidir. Bu durumda düğümlerden birinin test yapması gerekir.
İletişim grupları oluşturma
İşlem çatallandığında, ilk argümanı aynı ana süreç tarafından tüm çatallı süreçler arasındaki indeksi olacaktır. Bu sayıyı bu işlemin kullanacağı GPU’nun kimliği olarak kullanabiliriz. Kalan argümanlar, spawn
işlevindeki süreçleri çatallarken kullanılan args
adlı parametreden iletilir.
Bu fonksiyonda yaptığımız ilk şey, bu işlemin eğitimi çalıştıran tüm süreçler arasındaki sırasını hesaplamaktır.
Daha sonra init_process_group
fonksiyonunu kullanarak eğitim için kullanılacak iletişim grubunu oluşturuyoruz ve
iletişim arka ucu olarak gloo
yu kullanıyoruz. Unutulmamalıyız ki, iletişim grubunun oluşturulabilmesi için ana
düğümün IP adresi ve iletişim için bir bağlantı noktası belirtilmelidir. İşi çalıştırmak için kullanılanları SLURM
betiğinde belirteceğiz ve betiği bu eğitimin sonunda göstereceğiz.
def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
# Eğitim için kullanılan tüm 'train_process_on_gpu' süreçleri arasında bu sürecin sıralaması
rank = gpu_id_in_node + id_in_group*gpus_per_device
print(f"rank {rank}, gpu_id_in_node {gpu_id_in_node}")
dist.init_process_group('gloo', rank=rank, world_size=world_size)
...
Veri kümesi ve veri örnekleyici
Eğitim prosedürüne katılan tüm süreçlerde aynı modelin kopyaları olacaktır. Ancak, her biri farklı bir örnek seti kullanarak
eğitim işlemini gerçekleştirecektir. Bu dağıtılmış örneklemeyi DistributedDataSampler
nesnesini kullanarak oluşturuyoruz.
u nesne, birden fazla işlem tarafından kullanılacak bir veri kümesini alır ve her işlemin eğitmek için farklı bir örnek kümesi
alacağı şekilde bir örnekleme modeli oluşturur. Sampler’ı oluşturup kaç işlemin kullanılacağını ve mevcut işlemin sırasını
söyledikten sonra, eğitim aşamasında eğitim partilerini almak için kullanacağımız sampler’ı data loader’a aktarıyoruz.
def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
# ....
from torch.utils.data.distributed import DistributedSampler
train_dataset = MNIST(root='../data', train = True, download = True,
transform=transforms.ToTensor())
# Örnekleyiciyi oluşturuyoruz ve eğitimde kullanılan toplam süreç sayısı ve mevcut sürecin sıralaması ile birlikte veri kümesini iletiyoruz. Bu iş için bir örnekleme yöntemi oluşturacaktır.
data_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
# Verileri, eğitim gruplarını döndürürken kullanması gereken örnekleme şeması hakkında bilgilendirmek için örnekleyicinin yanı sıra DataLoader'a da iletiyoruz.
dataloader = DataLoader(train_dataset, batch_size=128, sampler=data_sampler)
num_features = 28*28
num_classes = len(train_dataset.classes)
# ....
Dağıtılmış model
Dağıtık bir eğitim için tüm süreçlerde özdeş bir sinir ağı modeli oluşturuyoruz, ancak bunu bir DistributedDataParallel
nesnesi ile sarıyoruz ve o nesneyi eğitim için kullanıyoruz. Ayrıca mevcut cihazda hangi GPU’nun eğitim için kullanılacağını
da bildiriyoruz. Bu sarılmış model, eğitime katılan tüm GPU’lardaki modellerin eğitilebilir parametrelerini senkronize edecektir.
Daha spesifik olarak, geriye doğru geçişi senkronize edecek ve tüm gradyan kümelerinin ortalamasını alarak, geriye doğru geçişin
sonunda modelin tüm kopyalarının aynı gradyanlara sahip olmasını garanti edecektir.
def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
# ....
model = DeepNeuralNetwork(4, num_features, 128, num_classes=num_classes).to(gpu_id_in_node)
parallel_model = DistributedDataParallel(model, device_ids=[gpu_id_in_node])
# ....
Eğitim döngüsü
Dağıtılmış eğitim için kullanılan eğitim döngüsü, tek düğümlü, tek GPU eğitimi için kullanılanla aynı görünmektedir.
Bu döngüde sırasıyla eğitim örneklerini getirmek için DataLoader
‘ı kullanacağız, onları eğitim modeline geçireceğiz, bir kayıp hesaplayacağız,
modelden geriye doğru bir geçiş yapacağız ve gradyanları güncelleyeceğiz. Tüm iletişim ve senkronizasyon DistributedDataParallel
objesi tarafından otomatik olarak yapılır. Aslında, sadece modelden geriye doğru geçişte senkronizsyon gereklidir. Bu noktada,
tüm katılan süreçlerden gelen gradyanlar senkronize edilir ve geriye doğru geçişin sonunda tüm süreçlerin aynı gradyan kümesine
sahip olacağı şekilde ortalaması alınır.
def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
# ....
optimizer = torch.optim.Adam(parallel_model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()
epochs = 2
for epoch in range(epochs):
total_loss = 0
for data, labels in dataloader:
data = data.reshape(-1, 28*28)
data = data.to(gpu_id_in_node)
labels = labels.to(gpu_id_in_node)
optimizer.zero_grad()
y_score = parallel_model(data)
loss = criterion(y_score, labels)
# Geriye geçiş sırasında, tüm işlemler gradyanlarını senkronize edecek ve tüm GPU'lar sonunda aynı gradyanlara sahip olacaktır.
loss.backward()
optimizer.step()
total_loss+=loss
print(f"loss at rank {rank} is {total_loss}")
Modeli test etmek
Zamana duyarlı bir işlem olmadığı için yalnızca tek bir düğüm üzerinde test yapıyoruz. Bu test, tek düğümlü, tek GPU örneğinde yapılan testle tamamen aynı şekilde yapılır. Tek fark, yalnızca rank == 0 olan işlemin testi yürüteceğini belirtmemizdir.
def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
# ....
if rank == 0:
test_dataset = MNIST(root='../data', train = False, download = True,
transform=transforms.ToTensor())
test_dataloader = DataLoader(test_dataset, batch_size=128)
n_samples = 0
n_correct = 0
for data, labels, in test_dataloader:
data = data.reshape(-1, 28*28)
data = data.to(gpu_id_in_node)
labels = labels.to(gpu_id_in_node)
y_score = parallel_model(data)
_, predicted = torch.max(y_score.data, 1)
n_samples += labels.size(0)
n_correct += (predicted == labels).sum().item()
acc = 100.0 * n_correct / n_samples
print(f'10000 test görüntüsünde ağın doğruluğu : {acc} %')
SLURM komut dosyası
Son olarak, çoklu GPU, çok düğümlü işi yürütmek için TRUBA’da kullandığımız SLURM betiğini göstereceğiz.
Komut dosyasında, kullanmak istediğimiz düğüm sayısını ve eğitimde yer alan görev sayısını belirtiyoruz.
Ayrıca komut dosyası içerisinde iletişim grubundaki ana işlemin IP adresi olan "MASTER_ADDR"
ve "MASTER_PORT"
olmak üzere iki ortam değişkeni belirledik, bunlar iletişim grubunun iletişim kuracağı bağlantı noktası olarak kullanılacaktır.
Lütfen kullanmadan önce bağlantı noktasının boş olduğundan emin olun. Son olarak, srun
komutunu ve ayırdığımız
düğümleri kullanarak eğitim komut dosyasını çalıştırıyoruz. Bunun için PyTorch’un kurulu olduğu bir conda ortamı kullanıyoruz.
#!/bin/bash
#SBATCH --account=<account> #your_user_account
#SBATCH --job-name=pytorch
#SBATCH --partition=<partition>
#SBATCH --nodes=2
#SBATCH --ntasks=2 # Number of parent processes that will be used
# Kullanılacak üst işlem sayısı
#SBACTH --ntasks-per-node=1 # We specify that we want a single parent process to run on each node
# Her düğümde tek bir üst sürecin çalışmasını istediğimizi belirtiyoruz.
#SBATCH --gres=gpu:3 # number of GPUs used in each node
# her düğümde kullanılan GPU sayısı
#SBATCH --cpus-per-task=8
#SBATCH --time=02:00:00
# Setup
module purge
module add centos7.3/lib/cuda/10.1
hostname
source /truba/home/<account>/anaconda3/bin/activate
conda init
conda activate <conda_env> # PyTorch'un kurulu olduğu conda ortamının adı
export MASTER_PORT=12900 # portun boş olduğundan emin olun
export MASTER_ADDR=$(srun --ntasks=1 hostname 2>&1 | tail -n1)
# -N ve -n, bu iş adımında kullanılan düğüm sayısını ve görev sayısını belirtir
srun -N 2 -n 2 python 01_distributed.p
Çıktı
After dispatching the SLURM script on the Palamut-cuda partition, we receive the following output:
Palamut-cuda bölümünde SLURM betiğini gönderdikten sonra aşağıdaki çıktıyı alıyoruz:
Bu, eğitim için kullanılacak 1 düğümden 2 düğüm numarasıdır. Toplamda, eğitim için kullanılacak toplam 6 GPU vardır. Bu, eğitim için kullanılacak 0 düğümden 2 düğüm numarasıdır. Toplamda, eğitim için kullanılacak toplam 6 GPU vardır. bu düğümün sıralaması 5. Bu işlemin bu cihazda kullanacağı GPU numarası 2 loss at rank 5 is 72.71366119384766 loss at rank 5 is 22.71541976928711 bu düğümün sıralaması 4. Bu işlemin bu cihazda kullanacağı GPU numarası 1 loss at rank 4 is 73.31228637695312 loss at rank 4 is 23.012968063354492 bu düğümün sıralaması 1. Bu işlemin bu cihazda kullanacağı GPU numarası 1 loss at rank 1 is 72.8398208618164 loss at rank 1 is 21.87244415283203 bu düğümün sıralaması 3. Bu işlemin bu cihazda kullanacağı GPU numarası 0 loss at rank 3 is 71.55941009521484 loss at rank 3 is 21.87774085998535 bu düğümün sıralaması 2. Bu işlemin bu cihazda kullanacağı GPU numarası 2 loss at rank 2 is 72.82304382324219 loss at rank 2 is 21.772422790527344 bu düğümün sıralaması 0. Bu işlemin bu cihazda kullanacağı GPU numarası 0 loss at rank 0 is 73.36946105957031 loss at rank 0 is 23.250808715820312 10000 test görüntüsünde ağın doğruluğu : 92.79 %