Dağıtılmış, çok düğümlü ve çok GPU’lu eğitim

PyTorch, tek ve birden çok cihazda birden çok GPU kullanan eğitim modellerine olanak tanır. Bu örnekte, bir modelin birden fazla GPU’ya kopyalanması ve her bir GPU’nun modeli, eğitim örneklerinin bir kısmı üzerinde, eğitmesi işlemlerinin nasıl gerçekleştirildiğini keşfedeceğiz. Modelin gradyanları, katılan tüm GPU’lar arasında senkronize edilecek ve eğitimin geriye doğru geçişi sırasında ortalaması alınacaktır. Bu da modeli tüm GPU’larda aynı tutar. Bu örnekte, MNIST basamaklı veri kümesinde sınıflandırma yapmak için her bir düğümde üç GPU bulunan iki düğüm kullanacağız.

Yürütme modeli

Eğitimde yer alacak her düğümde, eğitim komut dosyasının bir örneğini yürüteceğiz. Her eğitim komut dosyasında, komut dosyasını çalıştıran düğümde eğitime katılacak her GPU için tek bir işlem çatallanacak. Örneğin, eğitimde iki düğüm yer alacaksa ve her düğümde üç GPU kullanılmışsa, her düğümde bir tane olmak üzere iki komut dosyası çalıştıracağız ve her komut dosyası üç eğitim sürecini çatallayacak ve her işlem bir tane GPU kullanacaktır. Toplamda, eğitimi gerçekleştiren altı süreç ve her düğümde iki ana süreç olacaktır. GPU’larda eğitimi çalıştıran altı süreç, MPI, gloo veya NCCL arka uçlarını çalıştıran aynı iletişim grubunun parçası olacaktır. Aşağıdaki şekil bu senaryoyu göstermektedir.

/assets/pytorch-education/dist.png

Sinir ağı modeli (Neural network model)

Derin Sinir Ağı örneğinde kullanılan sinir ağı modelini kullanacağız:

import torch

class DeepNeuralNetwork(torch.nn.Module):
    def __init__(self, num_layers, input_features, num_hidden_features, num_classes):
        super(DeepNeuralNetwork, self).__init__()
        self.hidden_layers = torch.nn.ModuleList()
        self.hidden_layers.append(torch.nn.Linear(input_features, num_hidden_features))

        for _ in range(num_layers-2):
            self.hidden_layers.append(torch.nn.Linear(num_hidden_features, num_hidden_features))
        self.hidden_layers.append(torch.nn.Linear(num_hidden_features, num_classes))
        self.relu_activation = torch.nn.ReLU()

    def forward(self, samples):
        out = samples
        for layer in self.hidden_layers[:-1]:
            out = layer(out)
            out = self.relu_activation(out)
        out = self.hidden_layers[-1](out)
        return out

Üst süreç - ana komut dosyası

Her düğümde çalıştırılacak komut dosyası, eğitimde kullanılan veri kümesini düğümüne indirecek ve eğitime katılan düğüm sayısını belirlemek için bazı ortam değişkenlerini okuyacaktır. En önemlisi, eğitimi gerçekleştirecek alt süreçleri çatallayacak (fork) ve düğümündeki her GPU için bir işlem başlatacaktır.

Bu örnekte, eğitim işini yürütmek için SLURM’un kullanıldığını varsayıyoruz. Paralel bir işi yürütmek için SLURM kullanıldığında, işin meta verilerinin bir kısmı ortam değişkenleri olarak saklanır. "SLURM_NPROCS" ortam değişkenini kullanarak eğitim işinde yer alan görevlerin sayısını okuyabiliriz. Bu değer, bu iş adımını çalıştırırken kullanılan SLURM parametresi -n/--ntasks değerine eşit olacaktır. Ayrıca, "SLURM_PROCID" ortam değişkenini kullanarak tüm katılan görevler arasındaki geçerli görevlerin sayısını okuyacağız. Yukarıdaki değişkenlerin her ikisi de SLURM tarafından otomatik olarak ayarlanmaktadır.

Cihaz üzerindeki her bir GPU için daha sonra tanımlayacağımız train_process_on_gpu fonksiyonunu çalıştırmak için bir işlem çatallıyoruz. Bu işlem, tek bir GPU kullanarak eğitim yapacaktır. Bu aşamada çatallama için torch.multiprocessing.spawn işlemi kullanılacaktır. Her ana işlem, GPU sayısı kadar çok train_process_on_gpu işlemi çatallayacaktır.

if __name__=="__main__":
    # "group_size", eğitime katılan SLURM görevlerinin sayısıdır. Bizim durumumuzda, aynı zamanda eğitime katılan düğüm sayısıdır.
    group_size = int(os.environ["SLURM_NPROCS"])
    # 'id_in_group', yürütülen tüm görevler arasındaki mevcut görevin kimliğidir. Yani düğüm numarasıdır.
    id_in_group = int(os.environ["SLURM_PROCID"])
    # Bu düğümde eğitim için kullanılabilen GPU sayısı
    gpus_per_device = torch.cuda.device_count()
    # Eğitime katılacak toplam GPU sayısı. İletişim grubunun parçası olacak süreçlerin sayısıdır.
    world_size = gpus_per_device * group_size
    print(f"Bu, eğitim için kullanılacak {id_in_group} düğümden {group_size} düğüm numarasıdır. Toplamda, eğitim için kullanılacak toplam {world_size} GPU vardır. ")
    # torch.multiprocessing, yerel Python multiprocessing kitaplığı üzerindeki bir sarmalayıcıdır. PyTorch eğitiminde kullanılan süreçleri oluşturmak için kullanılır.
    import torch.multiprocessing as mp
    args = (world_size, id_in_group, gpus_per_device)
    # Her train_process_on_gpu işlevi, eğitimi tek bir GPU üzerinde çalıştıran bir süreç olacaktır. Mevcut düğümde ne kadar GPU varsa o kadar süreç yaratıyoruz.
    mp.spawn(train_process_on_gpu, args=args, nprocs=gpus_per_device, join=True)

Alt süreç - GPU’da eğitim fonksiyonu

Train fonksiyonu, eğitimin tamamından sorumludur ve her GPU için bir örnekle tüm düğümlerde yürütülecektir. İşlem, iletişim grubunu oluşturmalı, veri kümesini yüklemeli, veri örnekleyicileri oluşturmalı ve eğitim döngüsünü yürütmelidir. Bu durumda düğümlerden birinin test yapması gerekir.

İletişim grupları oluşturma

İşlem çatallandığında, ilk argümanı aynı ana süreç tarafından tüm çatallı süreçler arasındaki indeksi olacaktır. Bu sayıyı bu işlemin kullanacağı GPU’nun kimliği olarak kullanabiliriz. Kalan argümanlar, spawn işlevindeki süreçleri çatallarken kullanılan args adlı parametreden iletilir.

Bu fonksiyonda yaptığımız ilk şey, bu işlemin eğitimi çalıştıran tüm süreçler arasındaki sırasını hesaplamaktır. Daha sonra init_process_group fonksiyonunu kullanarak eğitim için kullanılacak iletişim grubunu oluşturuyoruz ve iletişim arka ucu olarak glooyu kullanıyoruz. Unutulmamalıyız ki, iletişim grubunun oluşturulabilmesi için ana düğümün IP adresi ve iletişim için bir bağlantı noktası belirtilmelidir. İşi çalıştırmak için kullanılanları SLURM betiğinde belirteceğiz ve betiği bu eğitimin sonunda göstereceğiz.

def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
    # Eğitim için kullanılan tüm 'train_process_on_gpu' süreçleri arasında bu sürecin sıralaması
    rank = gpu_id_in_node + id_in_group*gpus_per_device
    print(f"rank {rank}, gpu_id_in_node {gpu_id_in_node}")
    dist.init_process_group('gloo', rank=rank, world_size=world_size)
    ...

Veri kümesi ve veri örnekleyici

Eğitim prosedürüne katılan tüm süreçlerde aynı modelin kopyaları olacaktır. Ancak, her biri farklı bir örnek seti kullanarak eğitim işlemini gerçekleştirecektir. Bu dağıtılmış örneklemeyi DistributedDataSampler nesnesini kullanarak oluşturuyoruz. u nesne, birden fazla işlem tarafından kullanılacak bir veri kümesini alır ve her işlemin eğitmek için farklı bir örnek kümesi alacağı şekilde bir örnekleme modeli oluşturur. Sampler’ı oluşturup kaç işlemin kullanılacağını ve mevcut işlemin sırasını söyledikten sonra, eğitim aşamasında eğitim partilerini almak için kullanacağımız sampler’ı data loader’a aktarıyoruz.

def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
    # ....
    from torch.utils.data.distributed import DistributedSampler
    train_dataset = MNIST(root='../data', train = True, download = True,
        transform=transforms.ToTensor())

    # Örnekleyiciyi oluşturuyoruz ve eğitimde kullanılan toplam süreç sayısı ve mevcut sürecin sıralaması ile birlikte veri kümesini iletiyoruz. Bu iş için bir örnekleme yöntemi oluşturacaktır.
    data_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)

    # Verileri, eğitim gruplarını döndürürken kullanması gereken örnekleme şeması hakkında bilgilendirmek için örnekleyicinin yanı sıra DataLoader'a da iletiyoruz.
    dataloader = DataLoader(train_dataset, batch_size=128, sampler=data_sampler)
    num_features = 28*28
    num_classes = len(train_dataset.classes)
    # ....

Dağıtılmış model

Dağıtık bir eğitim için tüm süreçlerde özdeş bir sinir ağı modeli oluşturuyoruz, ancak bunu bir DistributedDataParallel nesnesi ile sarıyoruz ve o nesneyi eğitim için kullanıyoruz. Ayrıca mevcut cihazda hangi GPU’nun eğitim için kullanılacağını da bildiriyoruz. Bu sarılmış model, eğitime katılan tüm GPU’lardaki modellerin eğitilebilir parametrelerini senkronize edecektir. Daha spesifik olarak, geriye doğru geçişi senkronize edecek ve tüm gradyan kümelerinin ortalamasını alarak, geriye doğru geçişin sonunda modelin tüm kopyalarının aynı gradyanlara sahip olmasını garanti edecektir.

def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
    # ....
    model = DeepNeuralNetwork(4, num_features, 128, num_classes=num_classes).to(gpu_id_in_node)
    parallel_model = DistributedDataParallel(model, device_ids=[gpu_id_in_node])
    # ....

Eğitim döngüsü

Dağıtılmış eğitim için kullanılan eğitim döngüsü, tek düğümlü, tek GPU eğitimi için kullanılanla aynı görünmektedir. Bu döngüde sırasıyla eğitim örneklerini getirmek için DataLoader‘ı kullanacağız, onları eğitim modeline geçireceğiz, bir kayıp hesaplayacağız, modelden geriye doğru bir geçiş yapacağız ve gradyanları güncelleyeceğiz. Tüm iletişim ve senkronizasyon DistributedDataParallel objesi tarafından otomatik olarak yapılır. Aslında, sadece modelden geriye doğru geçişte senkronizsyon gereklidir. Bu noktada, tüm katılan süreçlerden gelen gradyanlar senkronize edilir ve geriye doğru geçişin sonunda tüm süreçlerin aynı gradyan kümesine sahip olacağı şekilde ortalaması alınır.

def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
    # ....
    optimizer = torch.optim.Adam(parallel_model.parameters(), lr=0.001)
    criterion = torch.nn.CrossEntropyLoss()
    epochs = 2
    for epoch in range(epochs):
        total_loss = 0
        for data, labels in dataloader:
            data = data.reshape(-1, 28*28)
            data = data.to(gpu_id_in_node)
            labels = labels.to(gpu_id_in_node)
            optimizer.zero_grad()
            y_score = parallel_model(data)
            loss = criterion(y_score, labels)
            # Geriye geçiş sırasında, tüm işlemler gradyanlarını senkronize edecek ve tüm GPU'lar sonunda aynı gradyanlara sahip olacaktır.
            loss.backward()
            optimizer.step()
            total_loss+=loss
        print(f"loss at rank {rank} is {total_loss}")

Modeli test etmek

Zamana duyarlı bir işlem olmadığı için yalnızca tek bir düğüm üzerinde test yapıyoruz. Bu test, tek düğümlü, tek GPU örneğinde yapılan testle tamamen aynı şekilde yapılır. Tek fark, yalnızca rank == 0 olan işlemin testi yürüteceğini belirtmemizdir.

def train_process_on_gpu(gpu_id_in_node, world_size, id_in_group, gpus_per_device):
    # ....
    if rank == 0:
        test_dataset = MNIST(root='../data', train = False, download = True,
            transform=transforms.ToTensor())
        test_dataloader = DataLoader(test_dataset, batch_size=128)
        n_samples = 0
        n_correct = 0
        for data, labels, in test_dataloader:
            data = data.reshape(-1, 28*28)
            data = data.to(gpu_id_in_node)
            labels = labels.to(gpu_id_in_node)
            y_score = parallel_model(data)
            _, predicted = torch.max(y_score.data, 1)
            n_samples += labels.size(0)
            n_correct += (predicted == labels).sum().item()
        acc = 100.0 * n_correct / n_samples
        print(f'10000 test görüntüsünde ağın doğruluğu : {acc} %')

SLURM komut dosyası

Son olarak, çoklu GPU, çok düğümlü işi yürütmek için TRUBA’da kullandığımız SLURM betiğini göstereceğiz.

Komut dosyasında, kullanmak istediğimiz düğüm sayısını ve eğitimde yer alan görev sayısını belirtiyoruz. Ayrıca komut dosyası içerisinde iletişim grubundaki ana işlemin IP adresi olan "MASTER_ADDR" ve "MASTER_PORT" olmak üzere iki ortam değişkeni belirledik, bunlar iletişim grubunun iletişim kuracağı bağlantı noktası olarak kullanılacaktır. Lütfen kullanmadan önce bağlantı noktasının boş olduğundan emin olun. Son olarak, srun komutunu ve ayırdığımız düğümleri kullanarak eğitim komut dosyasını çalıştırıyoruz. Bunun için PyTorch’un kurulu olduğu bir conda ortamı kullanıyoruz.

#!/bin/bash
#SBATCH --account=<account> #your_user_account
#SBATCH --job-name=pytorch
#SBATCH --partition=<partition>
#SBATCH --nodes=2
#SBATCH --ntasks=2 # Number of parent processes that will be used
           # Kullanılacak üst işlem sayısı
#SBACTH --ntasks-per-node=1 # We specify that we want a single parent process to run on each node
                # Her düğümde tek bir üst sürecin çalışmasını istediğimizi belirtiyoruz.
#SBATCH --gres=gpu:3 # number of GPUs used in each node
             # her düğümde kullanılan GPU sayısı

#SBATCH --cpus-per-task=8
#SBATCH --time=02:00:00

# Setup
module purge
module add centos7.3/lib/cuda/10.1

hostname
source /truba/home/<account>/anaconda3/bin/activate
conda init
conda activate <conda_env> # PyTorch'un kurulu olduğu conda ortamının adı

export MASTER_PORT=12900 # portun boş olduğundan emin olun
export MASTER_ADDR=$(srun --ntasks=1 hostname 2>&1 | tail -n1)
# -N ve -n, bu iş adımında kullanılan düğüm sayısını ve görev sayısını belirtir
srun -N 2 -n 2 python 01_distributed.p