由于实验原因,需要用到webdataset在多卡上进行高效训练(主要是减少dataset加载图片在IO上浪费的时间),那么在单卡上训练的教程已经很多在教程了。在网上一顿搜索发现,官方给的样例(WebDataset + Distributed PyTorch Training)也没有具体解释一些参数的含义,那么我自己实验加自己的理解,然后总结了webdataset的训练流程和参数意义。
官方地址:WebDataset + Distributed PyTorch Training - webdataset
参考文章:
pytorch_lightning 全程笔记 - 知乎
第六章 番外篇:webdataset-CSDN博客
官方的dataloader_train的示例代码如下:
# The dataloader pipeline is a fairly typical `IterableDataset` pipeline
# for PyTorch
def make_dataloader_train():
"""Create a DataLoader for training on the ImageNet dataset using WebDataset."""
transform = transforms.Compose(
[
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
]
)
def make_sample(sample):
return transform(sample["jpg"]), sample["cls"]
# This is the basic WebDataset definition: it starts with a URL and add shuffling,
# decoding, and augmentation. Note `resampled=True`; this is essential for
# distributed training to work correctly.
trainset = wds.WebDataset(trainset_url, resampled=True, shardshuffle=True, cache_dir=cache_dir, nodesplitter=wds.split_by_node)
trainset = trainset.shuffle(1000).decode("pil").map(make_sample)
# For IterableDataset objects, the batching needs to happen in the dataset.
trainset = trainset.batched(64)
trainloader = wds.WebLoader(trainset, batch_size=None, num_workers=4)
# We unbatch, shuffle, and rebatch to mix samples from different workers.
trainloader = trainloader.unbatched().shuffle(1000).batched(batch_size)
# A resampled dataset is infinite size, but we can recreate a fixed epoch length.
trainloader = trainloader.with_epoch(1282 * 100 // 64)
return trainloader
这里我们可以看到,要实现分布式训练最重要的三个参数:
- resample=True:初始化WebDataset,开启可重采样模式;
- batch_size=None:表示每次迭代时不进行批处理,直接返回整个数据集中的单个数据项(wds.WebLoader可以换成Dataloader)
- with_epoch(1282*100//64):1282是数据量,100是倍数因子,64是batchsize大小。
关于100这个倍数因子,我的理解是可以重复采样,
WebDataset
的设计通常是为无限数据集或流式数据集服务。
(但是这也会导致像pytorchlighting这样的框架在训练时候进度条无法展示,因为多卡训练需要估计每个epoch的batch数,但是计算batch数就需要知道batchsize大小,由于实例化WeblLoader/Dataloader时候batch_size=None,可能使得框架无法自动计算每个epoch下的batch的大小,同时由于分布式的存在,计算公式应该是这样的:
)
在我的自己打包好的一份数据集上测试了官方的pipline:(包含自己的理解)
class CARLA_Data_web(torch.utils.data.IterableDataset):
def __init__(self, urls,shuffle=1000,batchsize = None,epochs=None, img_aug=False,cache_dir='./Web-train-data-cache'):
self.urls = urls
self.img_aug = img_aug
self._batch_read_number = 0
self.length = 0
self._im_transform = T.Compose([
T.ToTensor(),
T.Resize((256, 512)),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
]) # RGB
self.datasets = (wds.WebDataset(self.urls, resampled=True,shardshuffle=True,
cache_dir=cache_dir,
nodesplitter=wds.shardlists.single_node_only) # 默认值,适用于单节点 )# 设置工作线程数量
.shuffle(shuffle)
.decode('rgb')
.to_tuple(
'front.png',# 0
'rear.png', # 1
'left.png', # 2
'right.png',# 3
'speed.json', # 4
'target_point.npy',# 5
'target_command.pth',
'waypoints.npy',# 7
'waypoints_diff.npy'
)
.map_tuple(self.img_tran, self.img_tran, self.img_tran, self.img_tran,
self.safe_identity,
self.safe_identity,
self.safe_identity,
self.safe_identity, self.safe_identity)
# .batched(batchsize)
# .with_epoch(epochs)
)
def __len__(self):
# self.length = 1500
# progress = tqdm(total=44240,desc='computing',position=0)
for _ in self.datasets:
self.length += 1
progress.set_postfix_str('num:{}/{}'.format(self.length,44240))
progress.update()
return self.length
def safe_json_decode(self, data):
try:
return json.loads(data)
except json.JSONDecodeError:
return None
def safe_identity(self, x):
return x if x is not None else {}
def img_tran(self, x):
if self.img_aug:
return self._im_transform(augmenter(self._batch_read_number).augment_image(x.astype(np.uint8))) # self._batch_read_number
else:
return self._im_transform(x)
def collate_fn(batch):
return batch
root_dir_all = "./mydata_path"
urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar'] # 替换成实际数据集的路径
dataset = CARLA_Data_web(urls, img_aug=True)
dataset_test = dataset.datasets.batched(64)
dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
dataset_test = dataset_test.unbatched().shuffle(1000).batched(32).with_epoch(10)
for i, batch in enumerate(dataset_test):
print(f"Batch {i+1}:len(batch)={len(batch)}")
# 打印 batch 中 waypoints 的形状
if len(batch) > 0:
waypoints_sample = np.array(batch[7]) # 获取第一个样本
print(f"waypoints_sample: {waypoints_sample.shape}")
结果如下:
(RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
Batch 1:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 2:len(batch)=9
waypoints_sample: (32, 4, 2)
...
...
waypoints_sample: (32, 4, 2)
Batch 10:len(batch)=9
waypoints_sample: (32, 4, 2)
可以看出:
- 第一个设置的batchsize其实没有效果,真正的batchsize是在shuffle后的设置生效的;
无效:dataset_test = dataset.datasets.batched(64)生效:dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
- with_epoch的参数设置代表的是:每个epoch内的bacth数量,而不是epoch的数量,并且由于webdataset的可重复采样,每个epoch的batch数量>总数据量/batchsize可以成立,每张GPU都是这个设置的batch数量,上述代码中batch数量设置成了10;
- 两次shuffle的作用不一样,第一次是对是对原数据集的顺序的单份数据shuffle,然后用不同的线程去取数据,第二次shuffle是为了把不同线程取到的数unbatch后再次shuffle。我个人感觉第二次的shuffle不是很必要,会成倍增加读取的时间,懂得大佬可以指正一下。
接下来查看一下官方给出的wds.Webdataset的参数解释:
其中几个对分布式训练比较重要的是:
- cache-dir:这个参数用于定义一个文件夹路径,WebDataset 会在这个目录下存储已加载的数据的缓存版本。这样可以避免每次运行时都从头下载和解压数据,减少数据加载时间。
- nodesplitter:用于跨节点分配数据,确保不同计算节点处理不同的数据块。(默认为单个节点,除非你在多台服务器或者计算机上训练)
- workersplitter: 用于在同一节点的多个工作线程之间分配数据,确保并行处理。(确保每个线程分配不同的数据)
关于上述pipline的顺序:
我的建议是最好不要颠倒,比如如果你将with_epoch和wds.WebLoader的顺序颠倒
def collate_fn(batch):
return batch
root_dir_all = "./mydata_path"
urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar'] # 替换成实际数据集的路径
dataset = CARLA_Data_web(urls, img_aug=True)
dataset_test = dataset.datasets.batched(64)
dataset_test = dataset_test.unbatched().batched(32).with_epoch(10) #先with_epoch()
dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
for i, batch in enumerate(dataset_test):
print(f"Batch {i+1}:len(batch)={len(batch)}")
# 打印 batch 中 waypoints 的形状
if len(batch) > 0:
waypoints_sample = np.array(batch[7]) # 获取第一个样本
print(f"waypoints_sample: {waypoints_sample.shape}")
上述结果如下:
(RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
Batch 1:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 2:len(batch)=9
...
...
Batch 39:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 40:len(batch)=9
waypoints_sample: (32, 4, 2)
Q:
可以看到bacth的数量变成了40,与我们设置with_epoch(10)不匹配的呀!但是细心的同学肯定注意到了我们的WebLoader中的num_worker =4
A:
这里我的理解就是先设置with_epoch就将无限的web数据集的batch数量固定了(但数据没有固定,我分别在无shuffle时候输出了80个batch数据,发现两种情况都存在重复采样的数据),但是线程取数是并发的, 每个线程都取10个batch,最后dataset_test的batch数是40。
而如果先webLoader则是没有batch数量固定,但是最后使用了with_epoch限定了batch数量。所以最后的dataset_test的batch数(如果是分布式,那么这里是每张gpu上的batch数)是10.
最后如果你不是使用框架训练,完整的分布式训练官方代码如下:
# Let's try it out
def make_dataloader(split="train"):
"""Make a dataloader for training or validation."""
if split == "train":
return make_dataloader_train()
elif split == "val":
return make_dataloader_val() # not implemented for this notebook
else:
raise ValueError(f"unknown split {split}")
# Try it out.
os.environ["GOPEN_VERBOSE"] = "1"
sample = next(iter(make_dataloader()))
print(sample[0].shape, sample[1].shape)
os.environ["GOPEN_VERBOSE"] = "0"
Standard PyTorch Training
This is completely standard PyTorch training; nothing changes by using WebDataset.
# We gather all the configuration info into a single typed dataclass.
@dataclasses.dataclass
class Config:
epochs: int = 1
max_steps: int = int(1e18)
lr: float = 0.001
momentum: float = 0.9
rank: Optional[int] = None
world_size: int = 2
backend: str = "nccl"
master_addr: str = "localhost"
master_port: str = "12355"
report_s: float = 15.0
report_growth: float = 1.1
def train(config):
# Define the model, loss function, and optimizer
model = resnet50(pretrained=False).cuda()
if config.rank is not None:
model = DistributedDataParallel(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=config.lr)
# Data loading code
trainloader = make_dataloader(split="train")
losses, accuracies, steps = deque(maxlen=100), deque(maxlen=100), 0
# Training loop
for epoch in range(config.epochs):
for i, data, verbose in enumerate_report(trainloader, config.report_s):
inputs, labels = data[0].cuda(), data[1].cuda()
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = model(inputs)
# update statistics
loss = loss_fn(outputs, labels)
accuracy = (
(outputs.argmax(1) == labels).float().mean()
) # calculate accuracy
losses.append(loss.item())
accuracies.append(accuracy.item())
if verbose and len(losses) > 0:
avgloss = sum(losses) / len(losses)
avgaccuracy = sum(accuracies) / len(accuracies)
print(
f"rank {config.rank} epoch {epoch:5d}/{i:9d} loss {avgloss:8.3f} acc {avgaccuracy:8.3f} {steps:9d}",
file=sys.stderr,
)
loss.backward()
optimizer.step()
steps += len(labels)
if steps > config.max_steps:
print(
"finished training (max_steps)",
steps,
config.max_steps,
file=sys.stderr,
)
return
print("finished Training", steps)
# A quick smoke test of the training function.
config = Config()
config.epochs = 1
config.max_steps = 1000
train(config)
版权归原作者 要成功炼丹啊! 所有, 如有侵权,请联系我们删除。