0


使用webdataset进行多卡分布式训练

由于实验原因,需要用到webdataset在多卡上进行高效训练(主要是减少dataset加载图片在IO上浪费的时间),那么在单卡上训练的教程已经很多在教程了。在网上一顿搜索发现,官方给的样例(WebDataset + Distributed PyTorch Training)也没有具体解释一些参数的含义,那么我自己实验加自己的理解,然后总结了webdataset的训练流程和参数意义。

官方地址:WebDataset + Distributed PyTorch Training - webdataset

参考文章:

pytorch_lightning 全程笔记 - 知乎

第六章 番外篇:webdataset-CSDN博客

官方的dataloader_train的示例代码如下:


# The dataloader pipeline is a fairly typical `IterableDataset` pipeline
# for PyTorch

def make_dataloader_train():
    """Create a DataLoader for training on the ImageNet dataset using WebDataset."""

    transform = transforms.Compose(
        [
            transforms.RandomResizedCrop(224),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
        ]
    )

    def make_sample(sample):
        return transform(sample["jpg"]), sample["cls"]

    # This is the basic WebDataset definition: it starts with a URL and add shuffling,
    # decoding, and augmentation. Note `resampled=True`; this is essential for
    # distributed training to work correctly.
    trainset = wds.WebDataset(trainset_url, resampled=True, shardshuffle=True, cache_dir=cache_dir, nodesplitter=wds.split_by_node)
    trainset = trainset.shuffle(1000).decode("pil").map(make_sample)

    # For IterableDataset objects, the batching needs to happen in the dataset.
    trainset = trainset.batched(64)
    trainloader = wds.WebLoader(trainset, batch_size=None, num_workers=4)

    # We unbatch, shuffle, and rebatch to mix samples from different workers.
    trainloader = trainloader.unbatched().shuffle(1000).batched(batch_size)

    # A resampled dataset is infinite size, but we can recreate a fixed epoch length.
    trainloader = trainloader.with_epoch(1282 * 100 // 64)

    return trainloader

这里我们可以看到,要实现分布式训练最重要的三个参数:

  1. resample=True:初始化WebDataset,开启可重采样模式;
  2. batch_size=None:表示每次迭代时不进行批处理,直接返回整个数据集中的单个数据项(wds.WebLoader可以换成Dataloader)
  3. with_epoch(1282*100//64):1282是数据量,100是倍数因子,64是batchsize大小。

关于100这个倍数因子,我的理解是可以重复采样,

WebDataset

的设计通常是为无限数据集或流式数据集服务。

(但是这也会导致像pytorchlighting这样的框架在训练时候进度条无法展示,因为多卡训练需要估计每个epoch的batch数,但是计算batch数就需要知道batchsize大小,由于实例化WeblLoader/Dataloader时候batch_size=None,可能使得框架无法自动计算每个epoch下的batch的大小,同时由于分布式的存在,计算公式应该是这样的:

TotalBatch=\frac {DatasetNums}{GPUNums*Batchsize}


在我的自己打包好的一份数据集上测试了官方的pipline:(包含自己的理解)

class CARLA_Data_web(torch.utils.data.IterableDataset):
    def __init__(self, urls,shuffle=1000,batchsize = None,epochs=None, img_aug=False,cache_dir='./Web-train-data-cache'):
        self.urls = urls
        self.img_aug = img_aug
        self._batch_read_number = 0
        self.length = 0
        self._im_transform = T.Compose([
            T.ToTensor(),
            T.Resize((256, 512)),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])  # RGB
        self.datasets = (wds.WebDataset(self.urls, resampled=True,shardshuffle=True,
                        cache_dir=cache_dir,
                        nodesplitter=wds.shardlists.single_node_only)  # 默认值,适用于单节点 )# 设置工作线程数量
        .shuffle(shuffle)
        .decode('rgb')
        .to_tuple(
        'front.png',# 0
        'rear.png', # 1
        'left.png', # 2
        'right.png',# 3
        'speed.json', # 4
        'target_point.npy',# 5
        'target_command.pth',
        'waypoints.npy',# 7
        'waypoints_diff.npy'
        )
        .map_tuple(self.img_tran, self.img_tran, self.img_tran, self.img_tran, 
        self.safe_identity,
        self.safe_identity, 
        self.safe_identity,
        self.safe_identity, self.safe_identity)
        # .batched(batchsize)
        # .with_epoch(epochs)
        )

    def __len__(self):
        # self.length = 1500
        # progress = tqdm(total=44240,desc='computing',position=0)
        for _ in self.datasets: 
            self.length += 1
            progress.set_postfix_str('num:{}/{}'.format(self.length,44240))
            progress.update()
        return self.length
    
    def safe_json_decode(self, data):
        try:
            return json.loads(data)
        except json.JSONDecodeError:
            return None

    def safe_identity(self, x):
        return x if x is not None else {}
    
    def img_tran(self, x):
        if self.img_aug:
            return self._im_transform(augmenter(self._batch_read_number).augment_image(x.astype(np.uint8))) # self._batch_read_number
        else:
            return self._im_transform(x)
def collate_fn(batch):
    return batch

root_dir_all = "./mydata_path"
urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar']  # 替换成实际数据集的路径
dataset = CARLA_Data_web(urls, img_aug=True)
dataset_test = dataset.datasets.batched(64)
dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
dataset_test = dataset_test.unbatched().shuffle(1000).batched(32).with_epoch(10)
for i, batch in enumerate(dataset_test):
    print(f"Batch {i+1}:len(batch)={len(batch)}")
    # 打印 batch 中 waypoints 的形状
    if len(batch) > 0:
        waypoints_sample = np.array(batch[7])  # 获取第一个样本
        print(f"waypoints_sample: {waypoints_sample.shape}")

结果如下:

(RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
Batch 1:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 2:len(batch)=9
waypoints_sample: (32, 4, 2)
...
...
waypoints_sample: (32, 4, 2)
Batch 10:len(batch)=9
waypoints_sample: (32, 4, 2)

可以看出:

  1. 第一个设置的batchsize其实没有效果,真正的batchsize是在shuffle后的设置生效的; 无效:dataset_test = dataset.datasets.batched(64)生效:dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
  2. with_epoch的参数设置代表的是:每个epoch内的bacth数量,而不是epoch的数量,并且由于webdataset的可重复采样,每个epoch的batch数量>总数据量/batchsize可以成立,每张GPU都是这个设置的batch数量,上述代码中batch数量设置成了10;
  3. 两次shuffle的作用不一样,第一次是对是对原数据集的顺序的单份数据shuffle,然后用不同的线程去取数据,第二次shuffle是为了把不同线程取到的数unbatch后再次shuffle。我个人感觉第二次的shuffle不是很必要,会成倍增加读取的时间,懂得大佬可以指正一下。

接下来查看一下官方给出的wds.Webdataset的参数解释:

其中几个对分布式训练比较重要的是:

  1. cache-dir:这个参数用于定义一个文件夹路径,WebDataset 会在这个目录下存储已加载的数据的缓存版本。这样可以避免每次运行时都从头下载和解压数据,减少数据加载时间。
  2. nodesplitter:用于跨节点分配数据,确保不同计算节点处理不同的数据块。(默认为单个节点,除非你在多台服务器或者计算机上训练)
  3. workersplitter: 用于在同一节点的多个工作线程之间分配数据,确保并行处理。(确保每个线程分配不同的数据)

关于上述pipline的顺序:

我的建议是最好不要颠倒,比如如果你将with_epoch和wds.WebLoader的顺序颠倒

def collate_fn(batch):
    return batch

root_dir_all = "./mydata_path"
urls = [root_dir_all+'/data_collect_2_town04_results/packaged_data_10443.tar']  # 替换成实际数据集的路径
dataset = CARLA_Data_web(urls, img_aug=True)
dataset_test = dataset.datasets.batched(64)
dataset_test = dataset_test.unbatched().batched(32).with_epoch(10) #先with_epoch()
dataset_test = wds.WebLoader(dataset_test,batch_size=None,collate_fn=collate_fn, num_workers=4)
for i, batch in enumerate(dataset_test):
    print(f"Batch {i+1}:len(batch)={len(batch)}")
    # 打印 batch 中 waypoints 的形状
    if len(batch) > 0:
        waypoints_sample = np.array(batch[7])  # 获取第一个样本
        print(f"waypoints_sample: {waypoints_sample.shape}")

上述结果如下:

(RSH_TCP) (base) great74@server19:/server19/gyl/01-E2E-AutoDrive/DiffAD$ /nvme0n1/anaconda3/envs/RSH_TCP/bin/python /server19/gyl/01-E2E-AutoDrive/DiffAD/TCP/data_web_test_gyl.py
Batch 1:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 2:len(batch)=9
...
...
Batch 39:len(batch)=9
waypoints_sample: (32, 4, 2)
Batch 40:len(batch)=9
waypoints_sample: (32, 4, 2)

Q:

可以看到bacth的数量变成了40,与我们设置with_epoch(10)不匹配的呀!但是细心的同学肯定注意到了我们的WebLoader中的num_worker =4

A:

这里我的理解就是先设置with_epoch就将无限的web数据集的batch数量固定了(但数据没有固定,我分别在无shuffle时候输出了80个batch数据,发现两种情况都存在重复采样的数据),但是线程取数是并发的, 每个线程都取10个batch,最后dataset_test的batch数是40。

而如果先webLoader则是没有batch数量固定,但是最后使用了with_epoch限定了batch数量。所以最后的dataset_test的batch数(如果是分布式,那么这里是每张gpu上的batch数)是10.


最后如果你不是使用框架训练,完整的分布式训练官方代码如下

# Let's try it out

def make_dataloader(split="train"):
    """Make a dataloader for training or validation."""
    if split == "train":
        return make_dataloader_train()
    elif split == "val":
        return make_dataloader_val()  # not implemented for this notebook
    else:
        raise ValueError(f"unknown split {split}")

# Try it out.
os.environ["GOPEN_VERBOSE"] = "1"
sample = next(iter(make_dataloader()))
print(sample[0].shape, sample[1].shape)
os.environ["GOPEN_VERBOSE"] = "0"
Standard PyTorch Training
This is completely standard PyTorch training; nothing changes by using WebDataset.

# We gather all the configuration info into a single typed dataclass.

@dataclasses.dataclass
class Config:
    epochs: int = 1
    max_steps: int = int(1e18)
    lr: float = 0.001
    momentum: float = 0.9
    rank: Optional[int] = None
    world_size: int = 2
    backend: str = "nccl"
    master_addr: str = "localhost"
    master_port: str = "12355"
    report_s: float = 15.0
    report_growth: float = 1.1
def train(config):
    # Define the model, loss function, and optimizer
    model = resnet50(pretrained=False).cuda()
    if config.rank is not None:
        model = DistributedDataParallel(model)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=config.lr)

    # Data loading code
    trainloader = make_dataloader(split="train")

    losses, accuracies, steps = deque(maxlen=100), deque(maxlen=100), 0

    # Training loop
    for epoch in range(config.epochs):
        for i, data, verbose in enumerate_report(trainloader, config.report_s):
            inputs, labels = data[0].cuda(), data[1].cuda()

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)

            # update statistics
            loss = loss_fn(outputs, labels)
            accuracy = (
                (outputs.argmax(1) == labels).float().mean()
            )  # calculate accuracy
            losses.append(loss.item())
            accuracies.append(accuracy.item())

            if verbose and len(losses) > 0:
                avgloss = sum(losses) / len(losses)
                avgaccuracy = sum(accuracies) / len(accuracies)
                print(
                    f"rank {config.rank} epoch {epoch:5d}/{i:9d} loss {avgloss:8.3f} acc {avgaccuracy:8.3f} {steps:9d}",
                    file=sys.stderr,
                )
            loss.backward()
            optimizer.step()
            steps += len(labels)
            if steps > config.max_steps:
                print(
                    "finished training (max_steps)",
                    steps,
                    config.max_steps,
                    file=sys.stderr,
                )
                return

    print("finished Training", steps)
# A quick smoke test of the training function.

config = Config()
config.epochs = 1
config.max_steps = 1000
train(config)

本文转载自: https://blog.csdn.net/qq_66556875/article/details/143534307
版权归原作者 要成功炼丹啊! 所有, 如有侵权,请联系我们删除。

“使用webdataset进行多卡分布式训练”的评论:

还没有评论