0


使用webdataset进行多卡分布式训练

由于实验原因,需要用到webdataset在多卡上进行高效训练(主要是减少dataset加载图片在IO上浪费的时间),那么在单卡上训练的教程已经很多在教程了。在网上一顿搜索发现,官方给的样例(WebDataset + Distributed PyTorch Training)也没有具体解释一些参数的含义,那么我自己实验加自己的理解,然后总结了webdataset的训练流程和参数意义。

官方地址:WebDataset + Distributed PyTorch Training - webdataset

参考文章:

pytorch_lightning 全程笔记 - 知乎

第六章 番外篇:webdataset-CSDN博客

官方的dataloader_train的示例代码如下:


  1. # The dataloader pipeline is a fairly typical `IterableDataset` pipeline
  2. # for PyTorch
  3. def make_dataloader_train():
  4. """Create a DataLoader for training on the ImageNet dataset using WebDataset."""
  5. transform = transforms.Compose(
  6. [
  7. transforms.RandomResizedCrop(224),
  8. transforms.RandomHorizontalFlip(),
  9. transforms.ToTensor(),
  10. ]
  11. )
  12. def make_sample(sample):
  13. return transform(sample["jpg"]), sample["cls"]
  14. # This is the basic WebDataset definition: it starts with a URL and add shuffling,
  15. # decoding, and augmentation. Note `resampled=True`; this is essential for
  16. # distributed training to work correctly.
  17. trainset = wds.WebDataset(trainset_url, resampled=True, shardshuffle=True, cache_dir=cache_dir, nodesplitter=wds.split_by_node)
  18. trainset = trainset.shuffle(1000).decode("pil").map(make_sample)
  19. # For IterableDataset objects, the batching needs to happen in the dataset.
  20. trainset = trainset.batched(64)
  21. trainloader = wds.WebLoader(trainset, batch_size=None, num_workers=4)
  22. # We unbatch, shuffle, and rebatch to mix samples from different workers.
  23. trainloader = trainloader.unbatched().shuffle(1000).batched(batch_size)
  24. # A resampled dataset is infinite size, but we can recreate a fixed epoch length.
  25. trainloader = trainloader.with_epoch(1282 * 100 // 64)
  26. return trainloader

这里我们可以看到,要实现分布式训练最重要的三个参数:

  1. resample=True:初始化WebDataset,开启可重采样模式;
  2. batch_size=None:表示每次迭代时不进行批处理,直接返回整个数据集中的单个数据项(wds.WebLoader可以换成Dataloader)
  3. with_epoch(1282*100//64):1282是数据量,100是倍数因子,64是batchsize大小。

关于100这个倍数因子,我的理解是可以重复采样,

  1. WebDataset

的设计通常是为无限数据集或流式数据集服务。

(但是这也会导致像pytorchlighting这样的框架在训练时候进度条无法展示,因为多卡训练需要估计每个epoch的batch数,但是计算batch数就需要知道batchsize大小,由于实例化WeblLoader/Dataloader时候batch_size=None,可能使得框架无法自动计算每个epoch下的batch的大小,同时由于分布式的存在,计算公式应该是这样的:

TotalBatch=\frac {DatasetNums}{GPUNums*Batchsize}


在我的自己打包好的一份数据集上测试了官方的pipline:(包含自己的理解)

  1. class CARLA_Data_web(torch.utils.data.IterableDataset):
  2. def __init__(self, urls,shuffle=1000,batchsize = None,epochs=None, img_aug=False,cache_dir='./Web-train-data-cache'):
  3. self.urls = urls
  4. self.img_aug = img_aug
  5. self._batch_read_number = 0
  6. self.length = 0
  7. self._im_transform = T.Compose([
  8. T.ToTensor(),
  9. T.Resize((256, 512)),
  10. T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
  11. ]) # RGB
  12. self.datasets = (wds.WebDataset(self.urls, resampled=True,shardshuffle=True,
  13. cache_dir=cache_dir,
  14. nodesplitter=wds.shardlists.single_node_only) # 默认值,适用于单节点 )# 设置工作线程数量
  15. .shuffle(shuffle)
  16. .decode('rgb')
  17. .to_tuple(
  18. 'front.png',# 0
  19. 'rear.png', # 1
  20. 'left.png', # 2
  21. 'right.png',# 3
  22. 'speed.json', # 4
  23. 'target_point.npy',# 5
  24. 'target_command.pth',
  25. 'waypoints.npy',# 7
  26. 'waypoints_diff.npy'
  27. )
  28. .map_tuple(self.img_tran, self.img_tran, self.img_tran, self.img_tran,
  29. self.safe_identity,
  30. self.safe_identity,
  31. self.safe_identity,
  32. self.safe_identity, self.safe_identity)
  33. # .batched(batchsize)
  34. # .with_epoch(epochs)
  35. )
  36. def __len__(self):
  37. # self.length = 1500
  38. # progress = tqdm(total=44240,desc='computing',position=0)
  39. for _ in self.datasets:
  40. self.length += 1
  41. progress.set_postfix_str('num:{}/{}'.format(self.length,44240))
  42. progress.update()
  43. return self.length
  44. def safe_json_decode(self, data):
  45. try:
  46. return json.loads(data)
  47. except json.JSONDecodeError:
  48. return None
  49. def safe_identity(self, x):
  50. return x if x is not None else {}
  51. def img_tran(self, x):
  52. if self.img_aug:
  53. return self._im_transform(augmenter(self._batch_read_number).augment_image(x.astype(np.uint8))) # self._batch_read_number
  54. else:
  55. return self._im_transform(x)
  1. def collate_fn(batch):
  2. return batch
  3. root_dir_all = "./mydata_path"
  4. urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar'] # 替换成实际数据集的路径
  5. dataset = CARLA_Data_web(urls, img_aug=True)
  6. dataset_test = dataset.datasets.batched(64)
  7. dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
  8. dataset_test = dataset_test.unbatched().shuffle(1000).batched(32).with_epoch(10)
  9. for i, batch in enumerate(dataset_test):
  10. print(f"Batch {i+1}:len(batch)={len(batch)}")
  11. # 打印 batch 中 waypoints 的形状
  12. if len(batch) > 0:
  13. waypoints_sample = np.array(batch[7]) # 获取第一个样本
  14. print(f"waypoints_sample: {waypoints_sample.shape}")

结果如下:

  1. (RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
  2. Batch 1:len(batch)=9
  3. waypoints_sample: (32, 4, 2)
  4. Batch 2:len(batch)=9
  5. waypoints_sample: (32, 4, 2)
  6. ...
  7. ...
  8. waypoints_sample: (32, 4, 2)
  9. Batch 10:len(batch)=9
  10. waypoints_sample: (32, 4, 2)

可以看出:

  1. 第一个设置的batchsize其实没有效果,真正的batchsize是在shuffle后的设置生效的; 无效:dataset_test = dataset.datasets.batched(64)生效:dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
  2. with_epoch的参数设置代表的是:每个epoch内的bacth数量,而不是epoch的数量,并且由于webdataset的可重复采样,每个epoch的batch数量>总数据量/batchsize可以成立,每张GPU都是这个设置的batch数量,上述代码中batch数量设置成了10;
  3. 两次shuffle的作用不一样,第一次是对是对原数据集的顺序的单份数据shuffle,然后用不同的线程去取数据,第二次shuffle是为了把不同线程取到的数unbatch后再次shuffle。我个人感觉第二次的shuffle不是很必要,会成倍增加读取的时间,懂得大佬可以指正一下。

接下来查看一下官方给出的wds.Webdataset的参数解释:

其中几个对分布式训练比较重要的是:

  1. cache-dir:这个参数用于定义一个文件夹路径,WebDataset 会在这个目录下存储已加载的数据的缓存版本。这样可以避免每次运行时都从头下载和解压数据,减少数据加载时间。
  2. nodesplitter:用于跨节点分配数据,确保不同计算节点处理不同的数据块。(默认为单个节点,除非你在多台服务器或者计算机上训练)
  3. workersplitter: 用于在同一节点的多个工作线程之间分配数据,确保并行处理。(确保每个线程分配不同的数据)

关于上述pipline的顺序:

我的建议是最好不要颠倒,比如如果你将with_epoch和wds.WebLoader的顺序颠倒

  1. def collate_fn(batch):
  2. return batch
  3. root_dir_all = "./mydata_path"
  4. urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar'] # 替换成实际数据集的路径
  5. dataset = CARLA_Data_web(urls, img_aug=True)
  6. dataset_test = dataset.datasets.batched(64)
  7. dataset_test = dataset_test.unbatched().batched(32).with_epoch(10) #先with_epoch()
  8. dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
  9. for i, batch in enumerate(dataset_test):
  10. print(f"Batch {i+1}:len(batch)={len(batch)}")
  11. # 打印 batch 中 waypoints 的形状
  12. if len(batch) > 0:
  13. waypoints_sample = np.array(batch[7]) # 获取第一个样本
  14. print(f"waypoints_sample: {waypoints_sample.shape}")

上述结果如下:

  1. (RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
  2. Batch 1:len(batch)=9
  3. waypoints_sample: (32, 4, 2)
  4. Batch 2:len(batch)=9
  5. ...
  6. ...
  7. Batch 39:len(batch)=9
  8. waypoints_sample: (32, 4, 2)
  9. Batch 40:len(batch)=9
  10. waypoints_sample: (32, 4, 2)

Q:

可以看到bacth的数量变成了40,与我们设置with_epoch(10)不匹配的呀!但是细心的同学肯定注意到了我们的WebLoader中的num_worker =4

A:

这里我的理解就是先设置with_epoch就将无限的web数据集的batch数量固定了(但数据没有固定,我分别在无shuffle时候输出了80个batch数据,发现两种情况都存在重复采样的数据),但是线程取数是并发的, 每个线程都取10个batch,最后dataset_test的batch数是40。

而如果先webLoader则是没有batch数量固定,但是最后使用了with_epoch限定了batch数量。所以最后的dataset_test的batch数(如果是分布式,那么这里是每张gpu上的batch数)是10.


最后如果你不是使用框架训练,完整的分布式训练官方代码如下

  1. # Let's try it out
  2. def make_dataloader(split="train"):
  3. """Make a dataloader for training or validation."""
  4. if split == "train":
  5. return make_dataloader_train()
  6. elif split == "val":
  7. return make_dataloader_val() # not implemented for this notebook
  8. else:
  9. raise ValueError(f"unknown split {split}")
  10. # Try it out.
  11. os.environ["GOPEN_VERBOSE"] = "1"
  12. sample = next(iter(make_dataloader()))
  13. print(sample[0].shape, sample[1].shape)
  14. os.environ["GOPEN_VERBOSE"] = "0"
  15. Standard PyTorch Training
  16. This is completely standard PyTorch training; nothing changes by using WebDataset.
  17. # We gather all the configuration info into a single typed dataclass.
  18. @dataclasses.dataclass
  19. class Config:
  20. epochs: int = 1
  21. max_steps: int = int(1e18)
  22. lr: float = 0.001
  23. momentum: float = 0.9
  24. rank: Optional[int] = None
  25. world_size: int = 2
  26. backend: str = "nccl"
  27. master_addr: str = "localhost"
  28. master_port: str = "12355"
  29. report_s: float = 15.0
  30. report_growth: float = 1.1
  31. def train(config):
  32. # Define the model, loss function, and optimizer
  33. model = resnet50(pretrained=False).cuda()
  34. if config.rank is not None:
  35. model = DistributedDataParallel(model)
  36. loss_fn = nn.CrossEntropyLoss()
  37. optimizer = torch.optim.SGD(model.parameters(), lr=config.lr)
  38. # Data loading code
  39. trainloader = make_dataloader(split="train")
  40. losses, accuracies, steps = deque(maxlen=100), deque(maxlen=100), 0
  41. # Training loop
  42. for epoch in range(config.epochs):
  43. for i, data, verbose in enumerate_report(trainloader, config.report_s):
  44. inputs, labels = data[0].cuda(), data[1].cuda()
  45. # zero the parameter gradients
  46. optimizer.zero_grad()
  47. # forward + backward + optimize
  48. outputs = model(inputs)
  49. # update statistics
  50. loss = loss_fn(outputs, labels)
  51. accuracy = (
  52. (outputs.argmax(1) == labels).float().mean()
  53. ) # calculate accuracy
  54. losses.append(loss.item())
  55. accuracies.append(accuracy.item())
  56. if verbose and len(losses) > 0:
  57. avgloss = sum(losses) / len(losses)
  58. avgaccuracy = sum(accuracies) / len(accuracies)
  59. print(
  60. f"rank {config.rank} epoch {epoch:5d}/{i:9d} loss {avgloss:8.3f} acc {avgaccuracy:8.3f} {steps:9d}",
  61. file=sys.stderr,
  62. )
  63. loss.backward()
  64. optimizer.step()
  65. steps += len(labels)
  66. if steps > config.max_steps:
  67. print(
  68. "finished training (max_steps)",
  69. steps,
  70. config.max_steps,
  71. file=sys.stderr,
  72. )
  73. return
  74. print("finished Training", steps)
  75. # A quick smoke test of the training function.
  76. config = Config()
  77. config.epochs = 1
  78. config.max_steps = 1000
  79. train(config)

本文转载自: https://blog.csdn.net/qq_66556875/article/details/143534307
版权归原作者 要成功炼丹啊! 所有, 如有侵权,请联系我们删除。

“使用webdataset进行多卡分布式训练”的评论:

还没有评论