0


Datawhale X 李宏毅苹果书 AI夏令营(Task2)

一、学前概览

    任务内容:critical point并不一定是训练神经网络遇到的最大的阻碍,还有一种叫Adaptive Learning Rate的技术。

    任务目的:了解掌握Learning Rate和分类损失的计算。

    本节出现术语:自适应学习率(root mean square、RMSProp、Adam策略)、学习率调度策略、分类、回归、softmax。

1.学中疑问

    A.训练时候很少卡到saddle point或者local minima,那么上一个task中的图6是怎么画出来的?

    B.lr如何自动根据gradient的大小做调整?

    C.parameter dependent的lr有什么常见的计算方式

    D.分类问题中,为什么cross entropy会比mse常用?

2.学后解答

    A.一般的gradient descent无法做到,多数的training在走到critical point之前其实就已经停止了。

    B.见知识点1

    C.见知识点2

    D.见图10

二、Task2.1 自适应学习率

    大原则:如果在某一方向上,gradient的值非常小,非常平坦,那么lr会调大一点;如果在某一个方向上坡度很大,lr就可以设的小一点。

1.知识点1:不同的参数需要不同的lr

    训练过程中很容易遇到loss无法下降的情况,这种情况从大量的已知实验来看,很少是因为critical point导致的,所以原先的梯度下降的formulation已经不满足目前的需求,需要随着参数进行克制化的learning rate加入。此处以一个参数为例子,如图1所示,然后依次类推出所有参数的表现形式。


图1:加入克制化的learning rate

    那么,parameter dependent的lr有什么常见的计算方式呢?

2.知识点2:parameter dependent的lr常见的计算方式

   方法1:root mean square

    有点基础的话,看图2其实就能看得懂了。


图2:root mean square方式

    使用这种方式的一个前提是同一个参数的gradient大小固定。但是实际情况中,很难做到,因此衍生出第二种方法。

    方法2:RMSProp

    这种方法并没有文献来源,第一步跟方法一是一样的,区别在第二步以后。见图3。


图3:使用RMSProp动态调整参数

     方法3:Adam策略

    Adam策略,实际上就是RMSProp+Momentum

    

3.知识点3:学习率调度

   出现学习率调度的原因:梯度爆炸,在书籍57页展示了原因,还是很容易理解的。这里放出原文,如图4所示。


图4:梯度爆炸的原因

    解决办法也很简单,让η与时间扯上关系,有两种方法,一种是learning rate decay,使得η随着时间的增加作衰减。;另一种是warm up(远古时代就有,bert使用了它),η先递增后衰减。两者的特征如图5所示,但是对于warm up,其可解释性比较差,目前仍在探索。


图5:learning rate decay和warm up的特征

小结

    见图6


图6:公式总结

三、Task2.2:分类

1.知识点4:分类?回归?

    我们已经知道,回归的本质是模型输出一个数值,这个数值与真实标签的数值差距越小越好,那么接下来的目标便是缩小预测值与真实值之间的差距。

    在多分类的情况下,如果每一分类的输出是一个标量的结果,那我们可以变相把它当成回归问题去看。但这种方式存在一个问题,就是标量与标量之间存在有某种关系,当作预测结果时,说服力会很低。因此,比较常用的做法是将class用独热编码去表示。

    从模型输出的角度,回归问题输出的是一个数值,分类问题可以看成回归问题的输出重复n次。但是在做分类问题的时候,往往会把输出通过softmax,将输出限制在0~1之间。如图7所示。


图7:回归和分类的区别

2.知识点5:softmax的运作过程(简单版)

   softmax除了让分类结果限制在0~1之外,还会让大小值之间的差距变大。如图8所示。公式在右上角。


图8:softmax的运作过程

3.知识点6:分类损失

    分类损失有两种计算方式,一种是均方根误差,另一种是交叉熵损失。如图9。


图9:损失函数的计算方式

    从优化的角度,交叉熵是被更常用在分类上。解释如下:

    如图10所示:假设有一个三类的分类,网络先输出y1、y2 和y3,在通过softmax以后,产生 y′1、y′2 和y′3。假设y1、y2的变化都是从-10到10,y3固定设成-1000。正确答案是[1, 0, 0]T,因为y3的值很小,通过softmax后,y′3非常趋近于0,与正确答案非常接近,所以此时我们只需要看y1和y2有变化时对损失的影响。

    图10中,左上角损失大,右下角损失小,所以希望最后在训练的时候,参数可以“走” 到右下角的地方。假设参数优化开始的时候,对应的损失都是左上角。

    如果选择交叉熵(图10右),左上角圆圈所在的点有斜率的,所以可以通过梯度,一路往右下的地方“走”。

    如果选均方误差(图10左),左上角圆圈就卡住了,均方误差在这种损失很大的地方, 是非常平坦的,其梯度是趋近于 0 的。如果初始时在圆圈的位置,离目标非常远,其梯度又很小,是无法用梯度下降顺利地“走”到右下角的。


图10:交叉熵和均方根误差的损失比较

三、Task2.3:CNN图像分类实践

   其实在任务教程里也有代码的解析,但是我觉得跟着走一遍把代码copy到这边印象会更深。(不会贴源码,只贴我不懂的代码)

   以下范式适用于广泛的深度学习任务:准备数据——>训练模型——>应用模型。具体如下:

    1.导入所需要的库/工具包
     2.数据准备与预处理
     3.定义模型
     4.定义损失函数和优化器等其他配置
     5.训练模型
     6.评估模型
     7.进行预测

1.导包

# 选择随机种子没有特定的规则,它可以是任何整数。有些种子值(如42)因为被知名书籍或文献提及而变得流行。
myseed = 6666

# 确保在使用CUDA时,卷积运算具有确定性,以增强实验结果的可重复性
# 保证了每次运行网络时都会得到相同的结果,代价是牺牲性能
torch.backends.cudnn.deterministic = True

# 设为False时,会告诉CuDNN不进行算法的搜索和选择,而是使用默认的卷积实现。这意味着不会根据当前的GPU和输入数据来选择最快的卷积算法。确保每次运行时都使用相同的算法
torch.backends.cudnn.benchmark = False

2.数据准备与预处理

    分三个部分(预设置、数据加载类和调用读取数据)。
# 图像预设置

# 在测试和验证阶段,通常不需要图像增强。
# 我们所需要的只是调整PIL图像的大小并将其转换为Tensor。
test_tfm = transforms.Compose([transforms.Resize((128, 128)),transforms.ToTensor(),])

# 不过,在测试阶段使用图像增强也是有可能的。
# 你可以使用train_tfm生成多种图像,然后使用集成方法进行测试。
train_tfm = transforms.Compose([
    # 将图像调整为固定大小(高度和宽度均为128)
    transforms.Resize((128, 128)),
    # 可以在此处添加一些图像增强的操作。

    # ToTensor()应该是所有变换中的最后一个。
    transforms.ToTensor(),
])
# 数据加载类
class FoodDataset(Dataset):
    """
    用于加载食品图像数据集的类。

    该类继承自Dataset,提供了对食品图像数据集的加载和预处理功能。
    它可以自动从指定路径加载所有的jpg图像,并对这些图像应用给定的变换。
    """

    def __init__(self, path, tfm=test_tfm, files=None):
        """
        参数:
        - path: 图像数据所在的目录路径。
        - tfm: 应用于图像的变换方法(默认为测试变换)。
        - files: 可选参数,用于直接指定图像文件的路径列表(默认为None)。
        """
        super(FoodDataset).__init__()
        self.path = path
        # 列出目录下所有jpg文件,并按顺序排序
        self.files = sorted([os.path.join(path, x) for x in os.listdir(path) if x.endswith(".jpg")])
        if files is not None:
            self.files = files  # 如果提供了文件列表,则使用该列表
        self.transform = tfm  # 图像变换方法

    def __len__(self):
        """返回数据集中图像的数量。"""
        return len(self.files)

    def __getitem__(self, idx):
        """
        获取给定索引的图像及其标签。

        参数:
            idx: 图像在数据集中的索引。

        返回:
            im: 应用了变换后的图像。
            label: 图像对应的标签(如果可用)。
        """
        fname = self.files[idx]
        im = Image.open(fname)
        im = self.transform(im)  # 应用图像变换

        # 尝试从文件名中提取标签
        try:
            label = int(fname.split("/")[-1].split("_")[0])
        except:
            label = -1  # 如果无法提取标签,则设置为-1(测试数据无标签)

        return im, label
# 加载数据

# 构建训练和验证数据集
# "loader" 参数定义了torchvision如何读取数据
train_set = FoodDataset("./hw3_data/train", tfm=train_tfm)
# 创建训练数据加载器,设置批量大小、是否打乱数据顺序、是否使用多线程加载以及是否固定内存地址
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
# 构建验证数据集
# "loader" 参数定义了torchvision如何读取数据
valid_set = FoodDataset("./hw3_data/valid", tfm=test_tfm)
# 创建验证数据加载器,设置批量大小、是否打乱数据顺序、是否使用多线程加载以及是否固定内存地址
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

3.定义模型

class Classifier(nn.Module):
    """
    定义一个图像分类器类,继承自PyTorch的nn.Module。
    该分类器包含卷积层和全连接层,用于对图像进行分类。
    """
    def __init__(self):
        """
        初始化函数,构建卷积神经网络的结构。
        包含一系列的卷积层、批归一化层、激活函数和池化层。
        """
        super(Classifier, self).__init__()
        # 定义卷积神经网络的序列结构
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 64, 3, 1, 1),  # 输入通道3,输出通道64,卷积核大小3,步长1,填充1
            nn.BatchNorm2d(64),        # 批归一化,作用于64个通道
            nn.ReLU(),                 # ReLU激活函数
            nn.MaxPool2d(2, 2, 0),      # 最大池化,池化窗口大小2,步长2,填充0
            
            nn.Conv2d(64, 128, 3, 1, 1), # 输入通道64,输出通道128,卷积核大小3,步长1,填充1
            nn.BatchNorm2d(128),        # 批归一化,作用于128个通道
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # 最大池化,池化窗口大小2,步长2,填充0
            
            nn.Conv2d(128, 256, 3, 1, 1), # 输入通道128,输出通道256,卷积核大小3,步长1,填充1
            nn.BatchNorm2d(256),        # 批归一化,作用于256个通道
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),      # 最大池化,池化窗口大小2,步长2,填充0
            
            nn.Conv2d(256, 512, 3, 1, 1), # 输入通道256,输出通道512,卷积核大小3,步长1,填充1
            nn.BatchNorm2d(512),        # 批归一化,作用于512个通道
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # 最大池化,池化窗口大小2,步长2,填充0
            
            nn.Conv2d(512, 512, 3, 1, 1), # 输入通道512,输出通道512,卷积核大小3,步长1,填充1
            nn.BatchNorm2d(512),        # 批归一化,作用于512个通道
            nn.ReLU(),
            nn.MaxPool2d(2, 2, 0),       # 最大池化,池化窗口大小2,步长2,填充0
        )
        # 定义全连接神经网络的序列结构
        self.fc = nn.Sequential(
            nn.Linear(512*4*4, 1024),    # 输入大小512*4*4,输出大小1024
            nn.ReLU(),
            nn.Linear(1024, 512),        # 输入大小1024,输出大小512
            nn.ReLU(),
            nn.Linear(512, 11)           # 输入大小512,输出大小11,最终输出11个类别的概率
        )

    def forward(self, x):
        """
        前向传播函数,对输入进行处理。
        
        参数:
        x -- 输入的图像数据,形状为(batch_size, 3, 128, 128)
        
        返回:
        输出的分类结果,形状为(batch_size, 11)
        """
        out = self.cnn(x)               # 通过卷积神经网络处理输入
        out = out.view(out.size()[0], -1)  # 展平输出,以适配全连接层的输入要求
        return self.fc(out)             # 通过全连接神经网络得到最终输出

4.定义损失函数和优化器等其他配置

# 根据GPU是否可用选择设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"

# 初始化模型,并将其放置在指定的设备上
model = Classifier().to(device)

# 定义批量大小
batch_size = 64

# 定义训练轮数
n_epochs = 8

# 如果在'patience'轮中没有改进,则提前停止
patience = 5

# 对于分类任务,我们使用交叉熵作为性能衡量标准
criterion = nn.CrossEntropyLoss()

# 初始化优化器,您可以自行调整一些超参数,如学习率
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5)

5.训练模型

# 初始化追踪器,这些不是参数,不应该被更改
stale = 0
best_acc = 0

for epoch in range(n_epochs):
    # ---------- 训练阶段 ----------
    # 确保模型处于训练模式
    model.train()

    # 这些用于记录训练过程中的信息
    train_loss = []
    train_accs = []

    for batch in tqdm(train_loader):
        # 每个批次包含图像数据及其对应的标签
        imgs, labels = batch
        # imgs = imgs.half()
        # print(imgs.shape,labels.shape)

        # 前向传播数据。(确保数据和模型位于同一设备上)
        logits = model(imgs.to(device))

        # 计算交叉熵损失。
        # 在计算交叉熵之前不需要应用softmax,因为它会自动完成。
        loss = criterion(logits, labels.to(device))

        # 清除上一步中参数中存储的梯度
        optimizer.zero_grad()

        # 计算参数的梯度
        loss.backward()

        # 为了稳定训练,限制梯度范数
        grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)

        # 使用计算出的梯度更新参数
        optimizer.step()

        # 计算当前批次的准确率
        acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

        # 记录损失和准确率
        train_loss.append(loss.item())
        train_accs.append(acc)

    train_loss = sum(train_loss) / len(train_loss)
    train_acc = sum(train_accs) / len(train_accs)

    # 打印信息
    print(f"[ 训练 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {train_loss:.5f}, acc = {train_acc:.5f}")

    # ---------- 验证阶段 ----------
    # 确保模型处于评估模式,以便某些模块如dropout能够正常工作
    model.eval()

    # 这些用于记录验证过程中的信息
    valid_loss = []
    valid_accs = []

    # 按批次迭代验证集
    for batch in tqdm(valid_loader):
        # 每个批次包含图像数据及其对应的标签
        imgs, labels = batch
        # imgs = imgs.half()

        # 我们在验证阶段不需要梯度。
        # 使用 torch.no_grad() 加速前向传播过程。
        with torch.no_grad():
            logits = model(imgs.to(device))

        # 我们仍然可以计算损失(但不计算梯度)。
        loss = criterion(logits, labels.to(device))

        # 计算当前批次的准确率
        acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

        # 记录损失和准确率
        valid_loss.append(loss.item())
        valid_accs.append(acc)
        # break

    # 整个验证集的平均损失和准确率是所记录值的平均
    valid_loss = sum(valid_loss) / len(valid_loss)
    valid_acc = sum(valid_accs) / len(valid_accs)

    # 打印信息
    print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

    # 更新日志
    if valid_acc > best_acc:
        with open(f"./{_exp_name}_log.txt", "a"):
            print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> 最佳")
    else:
        with open(f"./{_exp_name}_log.txt", "a"):
            print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

    # 保存模型
    if valid_acc > best_acc:
        print(f"在第 {epoch} 轮找到最佳模型,正在保存模型")
        torch.save(model.state_dict(), f"{_exp_name}_best.ckpt")  # 只保存最佳模型以防止输出内存超出错误
        best_acc = valid_acc
        stale = 0
    else:
        stale += 1
        if stale > patience:
            print(f"连续 {patience} 轮没有改进,提前停止")
            break

6.评估模型

for epoch in range(n_epochs):
    # ---------- 训练阶段 ----------
    
    ···

    # ---------- 验证阶段 ----------
    # 确保模型处于评估模式,以便某些模块如dropout能够正常工作
    model.eval()

    # 这些用于记录验证过程中的信息
    valid_loss = []
    valid_accs = []

    # 按批次迭代验证集
    for batch in tqdm(valid_loader):
        # 每个批次包含图像数据及其对应的标签
        imgs, labels = batch
        # imgs = imgs.half()

        # 我们在验证阶段不需要梯度。
        # 使用 torch.no_grad() 加速前向传播过程。
        with torch.no_grad():
            logits = model(imgs.to(device))

        # 我们仍然可以计算损失(但不计算梯度)。
        loss = criterion(logits, labels.to(device))

        # 计算当前批次的准确率
        acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

        # 记录损失和准确率
        valid_loss.append(loss.item())
        valid_accs.append(acc)
        # break

    # 整个验证集的平均损失和准确率是所记录值的平均
    valid_loss = sum(valid_loss) / len(valid_loss)
    valid_acc = sum(valid_accs) / len(valid_accs)

    # 打印信息
    print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

    # 更新日志
    if valid_acc > best_acc:
        with open(f"./{_exp_name}_log.txt", "a"):
            print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f} -> 最佳")
    else:
        with open(f"./{_exp_name}_log.txt", "a"):
            print(f"[ 验证 | {epoch + 1:03d}/{n_epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

    # 保存模型
    if valid_acc > best_acc:
        print(f"在第 {epoch} 轮找到最佳模型,正在保存模型")
        torch.save(model.state_dict(), f"{_exp_name}_best.ckpt")  # 只保存最佳模型以防止输出内存超出错误
        best_acc = valid_acc
        stale = 0
    else:
        stale += 1
        if stale > patience:
            print(f"连续 {patience} 轮没有改进,提前停止")
            break

7.预测模型

    分两步(加载测试数据、测试并生成预测)
# 构建测试数据集
# "loader"参数指定了torchvision如何读取数据
test_set = FoodDataset("./hw3_data/test", tfm=test_tfm)
# 创建测试数据加载器,批量大小为batch_size,不打乱数据顺序,不使用多线程,启用pin_memory以提高数据加载效率
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
# 实例化分类器模型,并将其转移到指定的设备上
model_best = Classifier().to(device)

# 加载模型的最优状态字典
model_best.load_state_dict(torch.load(f"{_exp_name}_best.ckpt"))

# 将模型设置为评估模式
model_best.eval()

# 初始化一个空列表,用于存储所有预测标签
prediction = []

# 使用torch.no_grad()上下文管理器,禁用梯度计算
with torch.no_grad():
    # 遍历测试数据加载器
    for data, _ in tqdm(test_loader):
        # 将数据转移到指定设备上,并获得模型的预测结果
        test_pred = model_best(data.to(device))
        # 选择具有最高分数的类别作为预测标签
        test_label = np.argmax(test_pred.cpu().data.numpy(), axis=1)
        # 将预测标签添加到结果列表中
        prediction += test_label.squeeze().tolist()

# 创建测试csv文件
def pad4(i):
    """
    将输入数字i转换为长度为4的字符串,如果长度不足4,则在前面补0。
    :param i: 需要转换的数字
    :return: 补0后的字符串
    """
    return "0" * (4 - len(str(i))) + str(i)

# 创建一个空的DataFrame对象
df = pd.DataFrame()
# 使用列表推导式生成Id列,列表长度等于测试集的长度
df["Id"] = [pad4(i) for i in range(len(test_set))]
# 将预测结果赋值给Category列
df["Category"] = prediction
# 将DataFrame对象保存为submission.csv文件,不保存索引
df.to_csv("submission.csv", index=False)

8.可视化

# 导入必要的库和模块
import torch
import numpy as np
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from tqdm import tqdm
import matplotlib.cm as cm
import torch.nn as nn

# 根据CUDA是否可用选择执行设备
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# 加载训练好的模型
model = Classifier().to(device)
# 加载模型保存的参数
state_dict = torch.load(f"{_exp_name}_best.ckpt")
# 将参数加载到模型中
model.load_state_dict(state_dict)
# 设置模型为评估模式
model.eval()

# 打印模型结构
print(model)
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
import matplotlib.cm as cm
import torch

def forward_to_layer(model, input_tensor, layer_index):
    outputs = []
    for i, layer in enumerate(model.children()):
        input_tensor = layer(input_tensor)
        if i == layer_index:
            break
        outputs.append(input_tensor)
    return outputs[-1]  # 返回所选层的输出

# 假设model, test_tfm, FoodDataset, DataLoader已经被定义且正确初始化

# 加载由TA定义的验证集
valid_set = FoodDataset("./hw3_data/valid", tfm=test_tfm)
valid_loader = DataLoader(valid_set, batch_size=64, shuffle=False, num_workers=0, pin_memory=True)

# 提取模型特定层的表示
index = 19  # 假设你想提取第19层的特征
features = []
labels = []

# 定义设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

for batch in tqdm(valid_loader):
    imgs, lbls = batch
    imgs, lbls = imgs.to(device), lbls.to(device)  # 确保数据在正确的设备上
    with torch.no_grad():
        # 获取特定层的特征
        logits = forward_to_layer(model.cnn, imgs, index)
        logits = logits.view(logits.size(0), -1)
    labels.extend(lbls.cpu().numpy())
    features.extend(logits.cpu().numpy())

# 将features和labels列表转换为numpy数组
features = np.array(features)
labels = np.array(labels)

# 应用t-SNE到特征上
features_tsne = TSNE(n_components=2, init='pca', random_state=42).fit_transform(features)

# 绘制t-SNE可视化图
plt.figure(figsize=(10, 8))
for label in np.unique(labels):
    # 使用布尔索引选择特定标签的数据点
    mask = (labels == label)
    plt.scatter(features_tsne[mask, 0], features_tsne[mask, 1], label=f'Class {label}', s=5)
plt.legend()
plt.title('All Classes t-SNE Visualization')
plt.show()

# 绘制特定类别的t-SNE可视化图
plt.figure(figsize=(10, 8))
selected_label = 5
mask = (labels == selected_label)
if mask.any():  # 使用 .any() 替代 .sum() 来检查是否有True值
    plt.scatter(features_tsne[mask, 0], features_tsne[mask, 1], label=f'Class {selected_label}', s=5)
plt.legend()
plt.title(f'Class {selected_label} t-SNE Visualization')
plt.show()

本文转载自: https://blog.csdn.net/mengluohuayexuan/article/details/141572395
版权归原作者 梦落花叶萱 所有, 如有侵权,请联系我们删除。

“Datawhale X 李宏毅苹果书 AI夏令营(Task2)”的评论:

还没有评论