본 포스팅은 다음 글을 기반으로 작성하였습니다.
좋은 자료 공유해주셔서 감사합니다.
Overview of Parallelism
`병렬화(Parallelism)`란, 여러 개를 동시에 처리하는 기술을 의미하며 머신러닝에서는 주로 여러개의 디바이스에서 연산을 병렬화하여 속도나 메모리 효율성을 개선하기 위해 사용한다. 병렬화는 크게 `Data Parallelism`, `Model Parallelism`, `Pipeline Parallelism`으로 구분된다.
Data Parallelism
`데이터 병렬화(Data Parallelism)`는 데이터의 수가 많을 때, 데이터를 병렬 처리하여 학습 속도를 빠르게 하는 방법을 말한다. 이는 모든 디바이스에 모델을 복제하고, 서로 다른 데이터를 각 디바이스에 입력하는 방식으로 동작한다. 이를 통해서 학습 시 배치 사이즈를 디바이스의 수의 배수만큼 더 많이 입력할 수 있다. 그러나 이러한 데이터 병렬화는 하나의 디바이스에 하나의 모델이 완전히 올라갈 수 있을 때 사용이 가능하다.
torch.nn.DataParallel
`torch.nn.DataParallel`은 single-node와 multi-GPU에서 동작하는 multi-thread 모듈이다.
Forward Pass
`torch.nn.DataParallel`에 대한 Forward Pass 과정은 다음과 같다.
- Mini-Batch에 대해 single node(GPU-0)에서 각 디바이스(GPU-0, GPU-1, GPU-2, GPU-3)로 `Scatter`
- Scatter는 mini-batch를 쪼개는 것을 의미
- 각 디바이스별로 배치 사이즈는 (batch//4)에 해당
- single node에 올라와 있는 모델의 파라미터를 각 디바이스로 `Broadcast`
- Broadcast는 모델 파라미터를 각 디바이스마다 복사하는 것을 의미
- 각 디바이스로 복제된 모델로 `Forward`하여 logit을 계산
- 계산된 logit을 `Gather`하여 single node로 모음
- Gather는 각 디바이스에 존재하는 logit 값을 하나로 모아주는 것을 의미
- single node에서 모인 logits으로부터 loss를 계산한 후, `Loss Reduction`을 통해 최종 loss를 산출
- Loss Reduction이란 주어진 배치에 대해 loss를 계산하고, 이를 어떻게 반환할 것인가를 의미
- 모든 logit을 합친 후 element의 개수로 나눠주는 `mean`
- 모든 logit을 합치는 `sum`
- PyTorch에서는 loss 함수마다 `reduction`이라는 파라미터를 통해 이를 정의(default='mean')
- 대부분 'mean'을 사용하는 것이 무난하다는 의견이 많음
- Loss Reduction이란 주어진 배치에 대해 loss를 계산하고, 이를 어떻게 반환할 것인가를 의미
이를 코드로 나타내면 다음과 같다.
import torch.nn as nn
def data_parallel(module, inputs, labels, device_ids, output_device):
inputs = nn.parallel.scatter(inputs, device_ids) # 입력 데이터를 각 디바이스로 쪼갬
replicas = nn.parallel.replicate(module, device_ids) # 모델을 각 디바이스로 복제
logit = nn.parallel.parallel_apply(replicas, inputs) # 각 디바이스에 복제된 모델이 각 디바이스의 데이터를 forward하여 logit 계산
logits = nn.parallel.gather(outputs, output_device) # 모델의 logit을 하나의 디바이스로 모음
return logits
이 과정에서 문제점은, single node에 해당하는 0번 GPU에 logits이 쏠려 GPU 메모리 불균형이 발생한다는 점이다. 이는 0번 GPU에 logits이 아닌 loss를 Gather하는 방식으로 변경하면, logits에 비해 loss가 scalar이기 때문에 크기가 훨씬 작아 어느 정도 완화가 가능하다.
이를 구현하는 방법은 forward 함수를 오버라이드하는 것으로 가능하다. 여기서의 핵심은 Loss Computation과 Loss Reduction을 multi-thread 안에서 작동시키는 것이다. 즉, 모델의 forward 자체는 multi-thread에서 작동되기 때문에 Loss Computation 부분을 forward 함수 안에 포함시켜 구현하는 것이다.
Backward Pass
- 계산된 loss를 각 디바이스에 `Scatter`
- 다시 각 디바이스마다 (batch // 4) 크기의 데이터에 대한 loss 값 구성
- 전달받은 loss를 통해 각 디바이스에서 `backward`를 수행해 gradient를 계산
- 계산된 모든 gradients를 single node로 `Reduce`하여 전부 더해줌
- Reduce는 각 디바이스의 gradients를 single node로 모아주는 연산에 해당
- 원래 Reduce에서는 연산 과정으로 sum, max, min을 사용 가능한데 해당 부분은 sum에 해당
- 더해진 gradient를 이용하여 single node에 있는 모델을 업데이트
이 과정을 코드로 구현하면 다음과 같다.
from torch import nn
from torch.optim import Adam
# 데이터(data_loader) 및 모델(model) 구성 이후 과정
model = nn.DataParallel(model, device_ids=[0,1,2,3], output_device=0) # output_device가 single node에 해당
optimizer = Adam(model.parameters(), lr=3e-5)
loss_fn = nn.CrossEntropyLoss(reduction='mean')
# 모델 학습
for i, data in enumerate(data_loader):
optimizer.zero_grad()
tokens = tokenizer(
data["premise"],
data["hypothesis"],
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
logits = model(
input_ids=tokens.input_ids.cuda(),
attention_mask=tokens.attention_mask.cuda(),
return_dict=False,
)[0]
loss = loss_fn(logits, data["labels"].cuda())
loss.backward()
optimizer.step()
if i % 10 == 0:
print(f"step:{i}, loss:{loss}")
if i == 300:
break
위의 경우에서는 loss의 reduction이 2번 계산된다. 그림 4번에서 각 디바이스마다 loss reduction을 수행하는 부분, 그리고 그림 5번에서 각 디바이스에서 출력된 4개의 loss를 1개로 reduction하는 부분이 이에 해당한다. 이렇게 loss reduction을 2번 계산한다고 하더라도, loss computation 과정을 병렬화시킬 수 있고, single node(GPU-0)에 가해지는 메모리 부담이 훨씬 적기 때문에 효율적이다.
이를 코드로 구현하면 다음과 같다.
from torch import nn
# logits을 출력하는 일반적인 모델
class Model(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(768,3)
def forward(self, inputs):
outputs = self.linear(inputs)
return outputs
# forward pass에서 loss를 출력하는 parallel 모델
class ParallelLossModel(Model):
def __init__(self):
super().__init__()
def forward(self, inputs, labels):
logits = super(ParallelLossModel, self).forward(inputs)
loss = nn.CrossEntropyLoss(reduction='mean')(logits, labels)
return loss
`Huggingface Transformers` 모델들은 forward pass에서 곧바로 loss를 구하는 기능을 내장하고 있다. 위의 학습 코드에서 Transformer 모델의 `labels` 인자에 라벨을 입력하여 loss를 바로 출력할 수 있다.
# 기존 학습 코드
for i, data in enumerate(data_loader):
optimizer.zero_grad()
tokens = tokenizer(
data["premise"],
data["hypothesis"],
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
logits = model(
input_ids=tokens.input_ids.cuda(),
attention_mask=tokens.attention_mask.cuda(),
return_dict=False,
)[0]
loss = loss_fn(logits, data["labels"].cuda())
loss.backward()
optimizer.step()
if i % 10 == 0:
print(f"step:{i}, loss:{loss}")
if i == 300:
break
# 허깅페이스 모델들의 학습 코드
for i, data in enumerate(data_loader):
optimizer.zero_grad()
tokens = tokenizer(
data["premise"],
data["hypothesis"],
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
loss = model(
input_ids=tokens.input_ids.cuda(),
attention_mask=tokens.attention_mask.cuda(),
labels=data["labels"],
).loss
loss = loss.mean()
loss.backward()
optimizer.step()
if i % 10 == 0:
print(f"step:{i}, loss:{loss}")
if i == 300:
break
torch.nn.DataParallel의 문제점
Multi-thread 모듈이기 때문에 파이썬에서 비효율적
파이썬은 `Global Interpreter Lock(GIL)`에 의해 하나의 프로세스에서 동시에 여러 개의 thread가 작동할 수 없다. 따라서 근본적으로 multi-thread가 아닌 multi-process 프로그램으로 만들어서 여러 개의 프로세스를 동시에 실행하게 해야한다. 여기서 GIL이란 여러 개의 thread가 파이썬 바이트코드를 한번에 하나만 사용할 수 있게 락을 거는 것을 의미한다. GIL은 파이썬의 메모리 안정성을 보장하기 위해 설계되었는데, 이로 인해 파이썬 multi-thread 프로그램에서 multi-thread가 single-thread처럼 동작하는 성능 병목 현상을 발견할 수 있다.
하나의 모델에서 업데이트된 모델이 다른 디바이스로 매 step마다 복제되어야 함
위의 방식은 각 디바이스에서 계산된 gradient를 single node로 `Gather`하여 업데이트하기 때문에 매번 업데이트된 모델을 다른 디바이스들로 `Broadcast`해야 한다. 이 과정은 매우 비싼 과정에 속하며, gradient를 Gather하지 않고, 각 디바이스에서 자체적으로 업데이트, 즉 `step()`을 수행할 수 있다면 모델을 매번 복제할 필요가 없다.
이를 구현하기 위해 `All-reduce` 연산을 사용할 수 있다. 앞서 작성한 포스팅(https://sonstory.tistory.com/122)에도 설명되어 있듯이, 앞에 `All-`이 붙은 연산들은 연산을 수행한 뒤 모든 디바이스에 `Broadcast`하는 과정을 거친다. 따라서 All-reduce는 출력 값을 모두 모아서 더해주는 `Reduce` 연산을 수행한 뒤에 이를 모든 디바이스에 복제하게 되고, 이를 통해 자체적으로 `step()`을 수행할 수 있다. 그러나 문제점은, 이 또한 매우 비용이 높은 연산에 속한다는 것이다.
결론적으로, `Data Parallel(DP)`의 장단점을 정리하면 다음과 같다.
- 장점
- n개의 GPU에 동일한 모델을 올려 학습하기 때문에 학습 속도가 빨라짐
- 동일한 이유로 모델의 검증 및 예측 속도 또한 빨라짐
- 여러 개의 GPU에 데이터를 분할한 뒤 학습하여 배치 사이즈를 크게 구성할 수 있음(데이터 병렬화)
- 단점
- Multi-thread이기 때문에 파이썬에서 효율적이지 않음
- 하나의 모델에서 업데이트된 모델이 다른 GPU로 매 step마다 복제되어야 함
위 사진은 필자가 80GB A100 GPU 3대를 통해 `yanolja/EEVE-Korean-10.8B-v1.0`의 학습 과정에서 DP를 적용했을 때 메모리 구성 모습이다. 위에서 언급한 것과 같이 single node(GPU-0)에 메모리가 쏠리는 것을 확인할 수 있다. 다른 디바이스의 경우 `fp16`을 적용한 상태에서 모델만 올라간 상황으로 보인다.
Distributed Data Parallel(DDP)
`Distributed Data Parallel(DDP)`는 기존 DP의 문제점을 개선하기 위한 데이터 병렬처리 모듈로, single/multi-node & multi-GPU에서 동작하는 multi-process 모듈이다. 여기서 multi-process 모듈이란, 각 multi-GPU를 쓸 경우에 각 GPU가 자신을 위한 하나의 프로세스를 갖게 되는 것을 말한다. 이는 DP에서 각 GPU가 thread를 갖는 것과 차이가 있다. 이러한 프로세스 간의 통신을 위해 `torch.distributed.init_process_group()` 함수를 사용해야한다.
프로세스 간의 통신이 가능해진 상황에서, DDP가 작동하는 원리는 다음과 같다.
- 각기 다른 GPU를 가진 프로세스에 모델을 복제하고, Rank 0의 프로스세가 다른 프로세스들에게 가중치를 Broadcast
- 각 프로세스의 Rank는 고유한 숫자로, 프로세스의 고유 id에 해당
- 각 프로세스는 `DistributedSampler()`를 통해 overlapping 없이 서로 다른 mini-batch data를 load
- 각 프로세스의 GPU에서 Forward pass와 loss를 계산
- 매 iteration마다 Backward 시에 각 GPU에서 gradient를 구하고 서로 All-reduce
- 각 GPU에서 mini-batch에 대한 gradient를 계산하고, 통신을 통해 gradient의 평균 값을 구하여 각 프로세스의 모델이 동일한 값으로 weight update
여기서 DDP는 `Ring All-reduce`를 사용한다. 이는 마스터 프로세스를 사용하지 않기 때문에 특정 디바이스로 부하가 쏠리지 않고, 효율적인 방식으로 모든 디바이스의 파라미터를 동시에 업데이트하기 때문에 파라미터를 매번 replicate하지 않아도 된다는 장점을 가지고 있다.
또한 `backward`와 `all-reduce`를 중첩하는 이유는, `all_reduce`는 네트워크 통신에 해당하며 `backward()`와 `step()`은 GPU 연산이기 때문에 동시에 처리가 가능하기 때문이다. 이를 통해 computation과 communication이 최대한으로 overlap되어 연산 효율이 크게 증가한다. `backward` 연산과 `all-reduce` 연산을 중첩하는 경우에 `backward` 연산이 뒤쪽 레이어부터 순차적으로 이루어지기 때문에 계산이 끝난 뒤쪽 레이어부터 먼저 전송하게 된다.
여기서 `all-reduce` 연산의 경우 layer마다 이루어지는 것이 아니라 `Gradient Bucketing`을 수행하여 Bucket이 가득찰 때 수행하게 된다. Gradient Bucketing은 일정한 사이즈의 bucket에 gradient를 저장해두고, 가득차면 다른 프로세스로 전송하는 방식을 말한다. backward 연산 과정에서 뒤쪽부터 계산된 gradient들을 차례대로 bucket에 저장하다가 bucket의 용량이 가득차면 All-reduce를 수행해서 각 디바이스에 gradient의 평균 값을 계산하여 전달한다. 이는 `bucket_size_mb` 인자를 통해 메가바이트 단위로 용량을 설정할 수 있다. 아래 그림에서 bucket에 저장되는 것은 모델의 파라미터가 아닌 해당 레이어에서 출력된 gradient이다.
DDP의 코드 구현은 다음과 같다.
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
from torch.optim import Adam
from torch.utils.data import DataLoader, DistributedSampler
from transformers import BertForSequenceClassification, BertTokenizer
from datasets import load_dataset
# initialized process group
dist.init_process_group("nccl")
rank = dist.get_rank()
world_size = dist.get_world_size()
torch.cuda.set_device(rank)
device = torch.cuda.current_device()
# dataset 생성
datasets = load_dataset("multi_nli").data["train"]
datasets = [
{
"premise": str(p),
"hypothesis": str(h),
"labels": l.as_py(),
}
for p, h, l in zip(datasets[2], datasets[5], datasets[9])
]
# DistributedSamler 생성(데이터를 쪼개서 다른 프로세스로 전송)
sampler = DistributedSampler(
datasets,
batch_size=32,
num_workers=4,
sampler=sampler,
shuffle=False,
pin_memory=True
)
# 모델과 토크나이저 생성
model_name = 'bert-base-cased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name, num_labels=3).cuda()
# distributed data parallel module
model = DistributedDataParallel(model, device_ids=[device], output_device=device)
# optimizer
optimizer = Adam(model.parameters(), lr=3e-5)
# training
for i, data in enumerate(data_loader):
optimizer.zero_grad()
tokens = tokenizer(
data["premise"],
data["hypothesis"],
padding=True,
truncation=True,
max_length=512,
return_tensors="pt",
)
loss = model(
input_ids=tokens.input_ids.cuda(),
attention_mask=tokens.attention_mask.cuda(),
labels=data["labels"],
).loss
loss.backward()
optimizer.step()
if i % 10 == 0 and rank == 0:
print(f"step:{i}, loss:{loss}")
if i == 300:
break
여기서 주의할 점은, 멀티 프로세스 어플리케이션이기 때문에 `torch.distributed.lanuch`를 사용해야 한다는 점이다.
!python -m torch.distributed.launch --nproc_per_node=4 ..src/ddp.py