PyTorch并行训练DistributedDataParallel完整demo

 更新时间:2023年06月01日 14:43:52   作者:Joseph El Kettaneh  
这篇文章主要为大家介绍了PyTorch并行训练DistributedDataParallel完整demo,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

大型数据集训练

使用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习领域的主要挑战。 随着 DNN 和数据集规模的增加,训练这些模型的计算和内存需求也会增加。 这使得在计算资源有限的单台机器上训练这些模型变得困难甚至不可能。

使用大型数据集训练大型 DNN 的一些主要挑战包括:

  • 训练时间长:训练过程可能需要数周甚至数月才能完成,具体取决于模型的复杂性和数据集的大小。
  • 内存限制:大型 DNN 可能需要大量内存来存储训练期间的所有模型参数、梯度和中间激活。 这可能会导致内存不足错误并限制可在单台机器上训练的模型的大小。

为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。

PyTorch 的数据并行性和模型并行性

在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。

我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。

下面我们用用ResNet50和CIFAR10数据集来进行完整的代码示例:

在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。

PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。

导入必须要的库

importos
 fromdatetimeimportdatetime
 fromtimeimporttime
 importargparse
 importtorchvision
 importtorchvision.transformsastransforms
 importtorch
 importtorch.nnasnn
 importtorch.distributedasdist
 fromtorch.nn.parallelimportDistributedDataParallel

检查GPU

importsubprocess
 result=subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向Linux和Unix类似内核的免费和开源工作调度程序),

defmain():
     # get distributed configuration from Slurm environment
     parser=argparse.ArgumentParser()
     parser.add_argument('-b', '--batch-size', default=128, type=int,
                         help='batch size. it will be divided in mini-batch for each worker')
     parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
                         help='number of total epochs to run')
     parser.add_argument('-c','--checkpoint', default=None, type=str,
                         help='path to checkpoint to load')
     args=parser.parse_args()
     rank=int(os.environ['SLURM_PROCID'])
     local_rank=int(os.environ['SLURM_LOCALID'])
     size=int(os.environ['SLURM_NTASKS'])
     master_addr=os.environ["SLURM_SRUN_COMM_HOST"]
     port="29500"
     node_id=os.environ['SLURM_NODEID']
     ddp_arg= [rank, local_rank, size, master_addr, port, node_id]
     train(args, ddp_arg)

然后我们使用DistributedDataParallel 库来执行分布式训练。

deftrain(args, ddp_arg):
     rank, local_rank, size, MASTER_ADDR, port, NODE_ID=ddp_arg
     # display info
     ifrank==0:
         #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
         print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
     #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
     # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
     #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
     dist.init_process_group(
         backend='nccl',
         init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
         world_size=size,
         rank=rank
     )
     # distribute model
     torch.cuda.set_device(local_rank)
     gpu=torch.device("cuda")
     #model = ResNet18(classes=10).to(gpu)
     model=torchvision.models.resnet50(pretrained=False).to(gpu)
     ddp_model=DistributedDataParallel(model, device_ids=[local_rank])
     ifargs.checkpointisnotNone:
         map_location= {'cuda:%d'%0: 'cuda:%d'%local_rank}
         ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
     # distribute batch size (mini-batch)
     batch_size=args.batch_size
     batch_size_per_gpu=batch_size//size
     # define loss function (criterion) and optimizer
     criterion=nn.CrossEntropyLoss()  
     optimizer=torch.optim.SGD(ddp_model.parameters(), 1e-4)
     transform_train=transforms.Compose([
         transforms.RandomCrop(32, padding=4),
         transforms.RandomHorizontalFlip(),
         transforms.ToTensor(),
         transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
     ])
     # load data with distributed sampler
     #train_dataset = torchvision.datasets.CIFAR10(root='./data',
     #                                           train=True,
     #                                           transform=transform_train,
     #                                           download=False)
     # load data with distributed sampler
     train_dataset=torchvision.datasets.CIFAR10(root='./data',
                                                train=True,
                                                transform=transform_train,
                                                download=False)
     train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,
                                                                     num_replicas=size,
                                                                     rank=rank)
     train_loader=torch.utils.data.DataLoader(dataset=train_dataset,
                                                batch_size=batch_size_per_gpu,
                                                shuffle=False,
                                                num_workers=0,
                                                pin_memory=True,
                                                sampler=train_sampler)
     # training (timers and display handled by process 0)
     ifrank==0: start=datetime.now()         
     total_step=len(train_loader)
     forepochinrange(args.epochs):
         ifrank==0: start_dataload=time()
         fori, (images, labels) inenumerate(train_loader):
             # distribution of images and labels to all GPUs
             images=images.to(gpu, non_blocking=True)
             labels=labels.to(gpu, non_blocking=True) 
             ifrank==0: stop_dataload=time()
             ifrank==0: start_training=time()
             # forward pass
             outputs=ddp_model(images)
             loss=criterion(outputs, labels)
             # backward and optimize
             optimizer.zero_grad()
             loss.backward()
             optimizer.step()
             ifrank==0: stop_training=time() 
             if (i+1) %10==0andrank==0:
                 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch+1, args.epochs,
                                                                         i+1, total_step, loss.item(), (stop_dataload-start_dataload)*1000,
                                                                         (stop_training-start_training)*1000))
             ifrank==0: start_dataload=time()
         #Save checkpoint at every end of epoch
         ifrank==0:
             torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
     ifrank==0:
         print(">>> Training complete in: "+str(datetime.now() -start))
 if__name__=='__main__':
     main()

代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。

代码解释

train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。

如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。

torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了

加载CIFAR-10数据集并应用数据增强转换。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler

为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。

rank0在每个轮次结束时保存一个检查点。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。

以上就是PyTorch并行训练DistributedDataParallel完整demo的详细内容,更多关于PyTorch并行训练的资料请关注脚本之家其它相关文章!

相关文章

  • 如何在conda虚拟环境中配置cuda+cudnn+pytorch深度学习环境

    如何在conda虚拟环境中配置cuda+cudnn+pytorch深度学习环境

    这篇文章主要介绍了如何在conda虚拟环境中配置cuda+cudnn+pytorch深度学习环境,想在服务器上配置深度学习的环境,看了很多资料后总结出来了对于新手比较友好的配置流程,需要的朋友可以参考下
    2023-03-03
  • pygame实现成语填空游戏

    pygame实现成语填空游戏

    这篇文章主要介绍了pygame实现成语填空游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-10-10
  • Python使用OpenPyXL处理Excel表格

    Python使用OpenPyXL处理Excel表格

    这篇文章主要介绍了Python使用OpenPyXL处理Excel表格,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • python 单线程和异步协程工作方式解析

    python 单线程和异步协程工作方式解析

    这篇文章主要介绍了python 单线程和异步协程工作方式解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-09-09
  • flask入门之文件上传与邮件发送示例

    flask入门之文件上传与邮件发送示例

    本篇文章主要介绍了flask入门之文件上传与邮件发送示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07
  • Python中常用的字典键和值排的方法

    Python中常用的字典键和值排的方法

    这篇文章主要为大家详细介绍了5种使用最多的Python字典 “键“ 和 “值“ 排序的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2023-03-03
  • 在VScode中配置Python开发环境的超详细指南

    在VScode中配置Python开发环境的超详细指南

    在使用VSCode编写Python代码前,我们需要先配置Python环境,这篇文章主要给大家介绍了关于在VScode中配置Python开发环境的相关资料,需要的朋友可以参考下
    2023-12-12
  • Python实现批量提取Word文档表格数据

    Python实现批量提取Word文档表格数据

    在大数据处理与信息抽取领域中,Word文档是各类机构和个人普遍采用的一种信息存储格式,本文将介绍如何使用Python实现对Word文档中表格的提取,感兴趣的可以了解下
    2024-03-03
  • python列表,字典,元组简单用法示例

    python列表,字典,元组简单用法示例

    这篇文章主要介绍了python列表,字典,元组简单用法,结合实例形式分析了Python列表,字典,元组的功能及相关函数使用技巧,需要的朋友可以参考下
    2019-07-07
  • python处理xml文件操作详解

    python处理xml文件操作详解

    这篇文章主要介绍了python处理xml文件操作详解,文章围绕主题展开详细内容,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-07-07

最新评论