1.导入模块、库和框架
import numpy as np
from sklearn.datasets import load_iris, load_digits
import torch
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import train_test_split
from transformers import get_cosine_schedule_with_warmup
from transformers import AdamW
import warnings
import torch.nn.functional as F
warnings.filterwarnings('ignore')
2.定义一个类用来设置随机种子以及复现代码
class SetSeed:
def same_seeds(self, seed):
torch.manual_seed(seed) # 固定随机种子(CPU)
if torch.cuda.is_available(): # 固定随机种子(GPU)
torch.cuda.manual_seed(seed) # 为当前GPU设置
torch.cuda.manual_seed_all(seed) # 为所有GPU设置
np.random.seed(seed) # 保证后续使用random函数时,产生固定的随机数
torch.backends.cudnn.benchmark = False # GPU、网络结构固定,可设置为True
torch.backends.cudnn.deterministic = True # 固定网络结构
3. 定义数据处理类
class ProcessData: # 数据处理类
def __init__(self):
self.dataX, self.dataY = load_digits(return_X_y=True) # 导入数据:手写字体数据集(分类任务)
self.dataX, self.dataY = np.float32(self.dataX), torch.Tensor(self.dataY).long() # 将数字转为tensor或者可以输入进网络的格式
def dataToLoader(self): # 这个方法可以作为DataLoader的中间件,其中包含数据集划分等操作
self.trainDataX, self.midDataX, self.trainDataY, self.midDataY = train_test_split(
self.dataX,
self.dataY,
stratify=self.dataY, # 采样分布同总体一样,克服采样不均匀问题
random_state=1,
test_size=0.2
)
self.testDataX, self.devDataX, self.testDataY, self.devDataY = train_test_split(
self.midDataX,
self.midDataY,
stratify=self.midDataY,
random_state=1,
test_size=0.5
)
trainDataForLoader = list(zip(self.trainDataX, self.trainDataY))
devDataForLoader = list(zip(self.devDataX, self.devDataY))
testDataForLoader = list(zip(self.testDataX, self.testDataY))
self.trainLoader = torch.utils.data.DataLoader(dataset=trainDataForLoader,
batch_size=8,
# collate_fn=False,
shuffle=True,
drop_last=True)
self.devLoader = torch.utils.data.DataLoader(dataset=devDataForLoader,
batch_size=8,
# collate_fn=False,
shuffle=True,
drop_last=True)
self.testLoader = torch.utils.data.DataLoader(dataset=testDataForLoader,
batch_size=8,
# collate_fn=False,
shuffle=True,
drop_last=True)
return self.trainLoader, self.devLoader, self.testLoader
4.定义一些基础的模型类
class FNN(torch.nn.Module):
def __init__(self):
super(FNN, self).__init__()
self.fc = torch.nn.Linear(64, 256)
self.fc2 = torch.nn.Linear(256, 10)
self.softmax = torch.nn.Softmax()
self.dropout = torch.nn.Dropout(p=0.5)
self.tanh = torch.nn.Tanh()
# self.bn = torch.nn.BatchNorm1d()
def forward(self, x):
# out = self.bn(x)
# print(x)
out = self.fc(x)
out = self.dropout(out)
out = self.tanh(out)
out = self.fc2(out)
out = self.dropout(out)
logits = self.tanh(out)
# print(logits)
# prob = self.softmax(logits)
return logits
class SelfAttention(torch.nn.Module):
def __init__(self, hidden_dim):
super().__init__()
self.projection = torch.nn.Sequential(
torch.nn.Linear(hidden_dim, 64),
torch.nn.ReLU(True),
torch.nn.Linear(64, 1))
def forward(self, encoder_outputs):
energy = self.projection(encoder_outputs) # (B, L, H) -> (B , L, 1)
weights = F.softmax(energy, dim=1)
outputs = encoder_outputs * weights # (B, L, H)
return outputs
class RNN(torch.nn.Module):
def __init__(self):
super(RNN, self).__init__()
self.fc = torch.nn.Linear(200, 10)
self.fc2 = torch.nn.Linear(256, 10)
self.softmax = torch.nn.Softmax()
self.dropout = torch.nn.Dropout(p=0.5)
self.tanh = torch.nn.Tanh()
self.bn = torch.nn.BatchNorm1d(64)
self.lstm = torch.nn.LSTM(64, 100, 2, bidirectional=True, batch_first=True, dropout=0.5)
def forward(self, x):
# out = self.bn(x)
x = x.unsqueeze(dim=1)
out, _ = self.lstm(x)
out = self.tanh(out)
out = self.fc(out)
out = self.dropout(out)
out = self.tanh(out)
# out = self.fc2(out)
# out = self.tanh(out)
# logits = self.dropout(out)
# prob = self.softmax(logits)
prob = out.squeeze(dim=1)
return prob
class RNN_ATT(torch.nn.Module):
def __init__(self):
super(RNN_ATT, self).__init__()
self.fc = torch.nn.Linear(200, 10)
self.fc2 = torch.nn.Linear(256, 10)
self.softmax = torch.nn.Softmax()
self.dropout = torch.nn.Dropout(p=0.5)
self.tanh = torch.nn.Tanh()
self.bn = torch.nn.BatchNorm1d(64)
self.lstm = torch.nn.LSTM(64, 100, 2, bidirectional=True, batch_first=True, dropout=0.5)
self.attention = SelfAttention(100 * 2)
def forward(self, x):
# out = self.bn(x)
x = x.unsqueeze(dim=1)
out, _ = self.lstm(x)
out = self.tanh(out)
out=self.attention(out)
out = self.tanh(out)
out = self.fc(out)
out = self.dropout(out)
out = self.tanh(out)
# out = self.fc2(out)
# out = self.tanh(out)
# logits = self.dropout(out)
# prob = self.softmax(logits)
prob = out.squeeze(dim=1)
return prob
class CNN(torch.nn.Module):
def __init__(self):
super(CNN, self).__init__()
self.fc = torch.nn.Linear(64, 256)
self.fc2 = torch.nn.Linear(64, 10)
self.softmax = torch.nn.Softmax()
self.dropout = torch.nn.Dropout(p=0.5)
self.tanh = torch.nn.Tanh()
self.conv1d = torch.nn.Conv1d(64, 10, kernel_size=1)
self.conv1d_t = torch.nn.Conv1d(8, 8, kernel_size=1)
# self.maxpool1d = torch.nn.MaxPool1d(kernel_size=4, stride=None, padding=0, dilation=1,
# return_indices=False, ceil_mode=False)
self.flatten = torch.nn.Flatten(start_dim=0, end_dim=1)
# self.bn = torch.nn.BatchNorm1d()
def forward(self, x):
x = x.unsqueeze(dim=2)
print(x.size())
out = self.conv1d(x)
out = self.flatten(out)
prob = self.softmax(out)
# ------------------------------------
# x = x.unsqueeze(dim=0)
# out = self.conv1d_t(x)
# # out = self.maxpool1d(out)
# out = self.flatten(out)
# out = self.fc2(out)
# # print(out)
# # print(out.size())
# prob = self.softmax(out)
return prob
class CNN2(torch.nn.Module):
def __init__(self):
super(CNN2, self).__init__()
self.conv1 = torch.nn.Conv2d(1, 10, kernel_size=2)
self.conv2 = torch.nn.Conv2d(10, 20, kernel_size=2)
self.conv2_drop = torch.nn.Dropout2d()
self.fc1 = torch.nn.Linear(720, 50)
self.fc2 = torch.nn.Linear(490, 10)
self.flatten = torch.nn.Flatten()
def forward(self, x):
x = np.array(x)
x = x.reshape(8, 1, 8, 8)
x = np.float32(x)
x = torch.tensor(x)
x = self.conv1(x)
# x = F.dropout2d(x)
x = F.max_pool2d(x, kernel_size=1)
x = F.relu(x)
x = self.flatten(x)
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return x
5.定义主类(包括训练、验证和测试方法)
class Main(ProcessData, SetSeed):
def __init__(self, model):
super(Main, self).__init__()
self.model = model
self.trainLoader = self.dataToLoader()[0]
self.devLoader = self.dataToLoader()[1]
self.testLoader = self.dataToLoader()[2]
self.optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=1e-4) # 使用Adam优化器
# 设置学习率
self.schedule = get_cosine_schedule_with_warmup(self.optimizer, num_warmup_steps=len(self.trainLoader),
num_training_steps=10 * len(self.testLoader))
self.criterion = torch.nn.CrossEntropyLoss()
def train(self):
self.same_seeds(20)
bestAcc = 0.0
for i in range(400):
self.model.train() # 开始训练
print("***************Running training epoch{}************".format(i + 1))
train_loss_sum = 0.0
for idx, (X, y) in enumerate(self.trainLoader):
yPred = self.model(X)
# print(yPred)
# print(y)
loss = self.criterion(yPred, y)
# loss=self.criterion()
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.schedule.step()
train_loss_sum += loss.item()
devAcc = self.__evaluate(self.model, self.devLoader)
print(devAcc)
if devAcc > bestAcc:
bestAcc = devAcc
torch.save(self.model.state_dict(), "checkpoint/best_model.pth")
testAcc = self.__predict(self.model, self.testLoader)
torch.save(self.model.state_dict(), "checkpoint/best_model2.pth")
print(testAcc)
def __evaluate(self, model, data_loader):
model.eval() # 防止模型训练改变权值
devTrue, devPred = [], []
with torch.no_grad(): # 计算的结构在计算图中,可以进行梯度反转等操作
for idx, (X, y) in enumerate(data_loader): # 得到的y要转换一下数据格式
yPred = model(X) # 此时得到的是概率矩阵
yPred = torch.argmax(yPred, dim=1).detach().cpu().numpy().tolist() # 将概率矩阵转换成标签并变成list类型
devPred.extend(yPred) # 将标签值放入列表
devTrue.extend(y.squeeze().cpu().numpy().tolist()) # 将真实标签转换成list放在列表中
return accuracy_score(devTrue, devPred)
def __predict(self, model, data_loader):
model.eval() # 防止模型训练改变权值
testTrue, testPred = [], []
with torch.no_grad(): # 计算的结构在计算图中,可以进行梯度反转等操作
for idx, (X, y) in enumerate(data_loader): # 得到的y要转换一下数据格式
yPred = model(X) # 此时得到的是概率矩阵
yPred = torch.argmax(yPred, dim=1).detach().cpu().numpy().tolist() # 将概率矩阵转换成标签并变成list类型
testPred.extend(yPred) # 将标签值放入列表
testTrue.extend(y.squeeze().cpu().numpy().tolist()) # 将真实标签转换成list放在列表中
return classification_report(testTrue, testPred)
定义测试类,可供调试以及部署API模型
class Test(SetSeed):
def __init__(self):
self.path = 'checkpoint/best_model2.pth'
def test(self, model, data):
self.same_seeds(1)
data = torch.Tensor(data)
model = model
model.eval()
model.load_state_dict(torch.load(self.path))
y_pred_prob = model(data) # 此时得到的是概率矩阵
y_pred = torch.argmax(y_pred_prob, dim=1).detach().cpu().numpy().tolist()
# print(y_pred_prob)
print('预测值:', y_pred[0])
6.主函数入口
if __name__ == '__main__':
model = RNN_ATT()
main = Main(model)
main.train()
# index = 348
# dataX, _ = load_digits(return_X_y=True)
# dataMidList = []
# data = dataX[index, :].reshape(8, -1)
# print('真实数据:', _[index])
# import matplotlib.pyplot as plt
# plt.imshow(data, cmap='gray', interpolation='none')
# plt.show()
# data = dataX[index, :]
# dataMidList.append(data)
# for _ in range(7):
# vecMid = [0 for _ in range(64)]
# dataMidList.append(vecMid)
# # print(dataMidList)
# # data = dataX[1, :].reshape(1, -1)
# data = np.array(dataMidList).reshape(8, 1, 8, 8)
# # print(data)
# test = Test()
# test.test(model, data)
# data = torch.rand([3, 3])
# # print(data)
# data1 = data.unsqueeze(dim=0)
# print(data1)
本文转载自: https://blog.csdn.net/qq_45134942/article/details/125174216
版权归原作者 spacex丶 所有, 如有侵权,请联系我们删除。
版权归原作者 spacex丶 所有, 如有侵权,请联系我们删除。