0


使用Pytorch进行多卡训练

  当一块GPU不够用时,我们就需要使用多卡进行并行训练。其中多卡并行可分为数据并行和模型并行。具体区别如下图所示:

  由于模型并行比较少用,这里只对数据并行进行记录。对于pytorch,有两种方式可以进行数据并行:数据并行(DataParallel, DP)和分布式数据并行(DistributedDataParallel, DDP)。

  在多卡训练的实现上,DP与DDP的思路是相似的:

  1、每张卡都复制一个有相同参数的模型副本。

  2、每次迭代,每张卡分别输入不同批次数据,分别计算梯度。

  3、DP与DDP的主要不同在于接下来的多卡通信:

  DP的多卡交互实现在一个进程之中,它将一张卡视为主卡,维护单独模型优化器。所有卡计算完梯度后,主卡汇聚其它卡的梯度进行平均并用优化器更新模型参数,再将模型参数更新至其它卡上。

  DDP则分别为每张卡创建一个进程,每个进程相应的卡上都独立维护模型和优化器。在每次每张卡计算完梯度之后,进程之间以NCLL(NVIDIA GPU通信)为通信后端,使各卡获取其它卡的梯度。各卡对获取的梯度进行平均,然后执行后续的参数更新。由于每张卡上的模型与优化器参数在初始化时就保持一致,而每次迭代的平均梯度也保持一致,那么即使没有进行参数复制,所有卡的模型参数也是保持一致的。

  Pytorch官方推荐我们使用DDP。DP经过我的实验,两块GPU甚至比一块还慢。当然不同模型可能有不同的结果。下面分别对DP和DDP进行记录。

DP

  Pytorch的DP实现多GPU训练十分简单,只需在单GPU的基础上加一行代码即可。以下是一个DEMO的代码。

  1. import torch
  2. from torch import nn
  3. from torch.optim import Adam
  4. from torch.nn.parallel import DataParallel
  5. class DEMO_model(nn.Module):
  6. def __init__(self, in_size, out_size):
  7. super().__init__()
  8. self.fc = nn.Linear(in_size, out_size)
  9. def forward(self, inp):
  10. outp = self.fc(inp)
  11. print(inp.shape, outp.device)
  12. return outp
  13. model = DEMO_model(10, 5).to('cuda')
  14. model = DataParallel(model, device_ids=[0, 1]) # 额外加这一行
  15. adam = Adam(model.parameters())
  16. # 进行训练
  17. for i in range(1):
  18. x = torch.rand([128, 10]) # 获取训练数据,无需指定设备
  19. y = model(x) # 自动均匀划分数据批量并分配至各GPU,输出结果y会聚集到GPU0中
  20. loss = torch.norm(y)
  21. loss.backward()
  22. adam.step()

  其中model = DataParallel(model, device_ids=[0, 1])这行将模型复制到0,1号GPU上。输入数据x无需指定设备,它将会被均匀分配至各块GPU模型,进行前向传播。之后各块GPU的输出再合并到GPU0中,得到输出y。输出y在GPU0中计算损失,并进行反向传播计算梯度、优化器更新参数。

DDP

  为了对分布式编程有基本概念,首先使用pytorch内部的方法实现一个多进程程序,再使用DDP模块实现模型的分布式训练。

Pytorch分布式基础

  首先使用pytorch内部的方法编写一个多进程程序作为编写分布式训练的基础。

  1. import os, torch
  2. import torch.multiprocessing as mp
  3. import torch.distributed as dist
  4. def run(rank, size):
  5. tensor = torch.tensor([1,2,3,4], device='cuda:'+str(rank)) # ——1——
  6. group = dist.new_group(range(size)) # ——2——
  7. dist.all_reduce(tensor=tensor, group=group, op=dist.ReduceOp.SUM) # ——3——
  8. print(str(rank)+ ': ' + str(tensor) + '\n')
  9. def ini_process(rank, size, fn, backend = 'nccl'):
  10. os.environ['MASTER_ADDR'] = '127.0.0.1' # ——4——
  11. os.environ['MASTER_PORT'] = '1234'
  12. dist.init_process_group(backend, rank=rank, world_size=size) # ——5——
  13. fn(rank, size) # ——6——
  14. if __name__ == '__main__': # ——7——
  15. mp.set_start_method('spawn') # ——8——
  16. size = 2 # ——9——
  17. ps = []
  18. for rank in range(size):
  19. p = mp.Process(target=ini_process, args=(rank, size, run)) # ——10——
  20. p.start()
  21. ps.append(p)
  22. for p in ps: # ——11——
  23. p.join()

  以上代码主进程创建了两个子进程,子进程之间使用NCCL后端进行通信。每个子进程各占用一个GPU资源,实现了所有GPU张量求和的功能。细节注释如下:

  1、为每个子进程定义相同名称的张量,并分别分配至不同的GPU,从而能进行后续的GPU间通信。

  2、定义一个通信组,用于后面的all_reduce通信操作。

  3、all_reduce操作以及其它通信方式请看下图:

  4、定义编号(rank)为0的ip和端口地址,让每个子进程都知道。ip和端口地址可以随意定义,不冲突即可。如果不设置,子进程在涉及进程通信时会出错。

  5、初始化子进程组,定义进程间的通信后端(还有GLOO、MPI,只有NCCL支持GPU间通信)、子进程rank、子进程数量。只有当该函数在size个进程中被调用时,各进程才会继续从这里执行下去。这个函数统一了各子进程后续代码的开始时间。

  6、执行子进程代码。

  7、由于创建子进程会执行本程序,因此主进程的执行需要放在__main__里,防止子进程执行。

  8、开始创建子进程的方式:spawn、fork。windows默认spawn,linux默认fork。具体区别请百度。

  9、由于是以NCCL为通信后端的分布式训练,如果不同进程中相同名称的张量在同一GPU上,当这个张量进行进程间通信时就会出错。为了防止出错,限制每张卡独占一个进程,每个进程独占一张卡。这里有两张卡,所以最多只能创建两个进程。

  10、创建子进程,传入子进程的初始化方法,及子进程调用该方法的参数。

  11、等待子进程全部运行完毕后再退出主进程。

  输出结果如下:

  正是各进程保存在不同GPU上的张量的广播求和(all_reduce)的结果。

  参考: https://pytorch.org/tutorials/intermediate/dist_tuto.html

Pytorch分布式训练DEMO

  我们实际上可以根据上面的分布式基础写一个分布式训练,但由于不知道pytorch如何实现GPU间模型梯度的求和,即官方教程中所谓的ring_reduce(没找到相关API),时间原因,就不再去搜索相关方法了。这里仅记录pytorh内部的分布式模型训练,即利用DDP模块实现。Pytorch版本1.12.1。

  1. import torch,os
  2. import torch.distributed as dist
  3. import torch.multiprocessing as mp
  4. import torch.optim as optim
  5. from torch.nn.parallel import DistributedDataParallel as DDP
  6. from torch import nn
  7. def example(rank, world_size):
  8. dist.init_process_group("nccl", rank=rank, world_size=world_size) # ——1——
  9. model = nn.Linear(2, 1, False).to(rank)
  10. if rank == 0: # ——2——
  11. model.load_state_dict(torch.load('model_weight'))
  12. # model_stat = torch.load('model_weight', {'cuda:0':'cuda:%d'%rank}) #这样读取保险一点
  13. # model.load_state_dict(model_stat)
  14. opt = optim.Adam(model.parameters(), lr=0.0001) # ——3——
  15. opt_stat = torch.load('opt_weight', {'cuda:0':'cuda:%d'%rank}) # ——4——
  16. opt.load_state_dict(opt_stat) # ——5——
  17. ddp_model = DDP(model, device_ids=[rank])# ——6
  18. inp = torch.tensor([[1.,2]]).to(rank) # ——7——
  19. labels = torch.tensor([[5.]]).to(rank)
  20. outp = ddp_model(inp)
  21. loss = torch.mean((outp - labels)**2)
  22. opt.zero_grad()
  23. loss.backward() # ——8——
  24. opt.step() # ——9
  25. if rank == 0:# ——10——
  26. torch.save(model.state_dict(), 'model_weight')
  27. torch.save(opt.state_dict(), 'opt_weight')
  28. if __name__=="__main__":
  29. os.environ["MASTER_ADDR"] = "localhost"# ——11——
  30. os.environ["MASTER_PORT"] = "29500"
  31. world_size = 2
  32. mp.spawn(example, args=(world_size,), nprocs=world_size, join=True) # ——12——

  以上代码包含模型在多GPU上读取权重、进行分布式训练、保存权重等过程。细节注释如下:

  1、初始化进程组,由于使用GPU通信,后端应该写为NCCL。不过经过实验,即使错写为gloo,DDP内部也会自动使用NCCL作为通信模块。

  2、由于后面使用DDP包裹模型进行训练,其内部会自动将所有rank的模型权重同步为rank 0的权重,因此我们只需在rank 0上读取模型权重即可。这是基于Pytorch版本1.12.1,低级版本似乎没有这个特性,需要在不同rank分别导入权重,则load需要传入map_location,如下面注释的两行代码所示。

  3、这里创建model的优化器,而不是创建用ddp包裹后的ddp_model的优化器,是为了兼容单GPU训练,读取优化器权重更方便。

  4、将优化器权重读取至该进程占用的GPU。如果没有map_location参数,load会将权重读取到原本保存它时的设备。

  5、优化器获取权重。经过实验,即使权重不在优化器所在的GPU,权重也会迁移过去而不会报错。当然load直接读取到相应GPU会减少数据传输。

  6、DDP包裹模型,为模型复制一个副本到相应GPU中。所有rank的模型副本会与rank 0保持一致。注意,DDP并不复制模型优化器的副本,因此各进程的优化器需要我们在初始化时保持一致。权重要么不读取,要么都读取。

  7、这里开始模型的训练。数据需转移到相应的GPU设备。

  8、在backward中,所有进程的模型计算梯度后,会进行平均(不是相加)。也就是说,DDP在backward函数添加了hook,所有进程的模型梯度的ring_reduce将在这里执行。这个可以通过给各进程模型分别输入不同的数据进行验证,backward后这些模型有相同的梯度,且验算的确是所有进程梯度的平均。此外,还可以验证backward函数会阻断(block)各进程使用梯度,只有当所有进程都完成backward之后,各进程才能读取和使用梯度。这保证了所有进程在梯度上的一致性。

  9、各进程优化器使用梯度更新其模型副本权重。由于初始化时各进程模型、优化器权重一致,每次反向传播梯度也保持一致,则所有进程的模型在整个训练过程中都能保持一致。

  10、由于所有进程权重保持一致,我们只需通过一个进程保存即可。

  11、定义rank 0的IP和端口,使用mp.spawn,只需在主进程中定义即可,无需分别在子进程中定义。

  12、创建子进程,传入:子进程调用的函数(该函数第一个参数必须是rank)、子进程函数的参数(除了rank参数外)、子进程数、是否等待所有子进程创建完毕再开始执行。

  参考: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html


本文转载自: https://blog.csdn.net/qq_37189298/article/details/127282184
版权归原作者 cnblogs.com/qizhou/ 所有, 如有侵权,请联系我们删除。

“使用Pytorch进行多卡训练”的评论:

还没有评论