** 源码来自[https://blog.csdn.net/qq_34467412/article/details/90738232],作者也是对论文作者ResNet框架的复现,而我是在chatGPT帮助下把博主TensorFlow的代码改成了pytorch代码。
由于硬件限制,并没有使用完整的数据集,仅对前10种调制模型进行识别,全信噪比情况下测试集识别率可达72%;仅考虑0:30dB情况下测试集识别率可达94%。**
训练过程
测试集上的混淆矩阵
不同信噪比下的识别率
信噪比为0db时候的混淆矩阵
网络部分
class ResidualStack(nn.Module):
def __init__(self, input_channels, output_channels, kernel_size, seq, pool_size):
super(ResidualStack, self).__init__()
self.conv1 = nn.Conv2d(input_channels, output_channels, kernel_size=1, stride=1, padding='same') # (kernel_size-1)//2保证输入输出形状一样
# Residual Unit 1
self.conv2 = nn.Conv2d(output_channels, 32, kernel_size=kernel_size, stride=1, padding='same')
self.conv3 = nn.Conv2d(32, output_channels, kernel_size=kernel_size, stride=1, padding='same')
# Residual Unit 2
self.conv4 = nn.Conv2d(output_channels, 32, kernel_size=kernel_size, stride=1, padding='same')
self.conv5 = nn.Conv2d(32, output_channels, kernel_size=kernel_size, stride=1, padding='same')
self.maxpool = nn.MaxPool2d(kernel_size=pool_size, stride=pool_size)
self.seq = seq
def forward(self, x):
# Residual Unit 1
x = self.conv1(x)
shortcut = x
x = self.conv2(x)
x = F.relu(x)
x = self.conv3(x)
x = x + shortcut
x = F.relu(x)
# Residual Unit 2
shortcut = x
x = self.conv4(x)
x = F.relu(x)
x = self.conv5(x)
x = x + shortcut
x = F.relu(x)
x = self.maxpool(x)
return x
class MyResNet(nn.Module): # 1,1024,2
def __init__(self, num_classes):
super(MyResNet, self).__init__()
self.num_classes = num_classes
# self.bn = nn.BatchNorm2d(1)
self.seq1 = ResidualStack(1, 32, kernel_size=(3, 2), seq="ReStk0", pool_size=(2, 2))
self.seq2 = ResidualStack(32, 32, kernel_size=(3, 1), seq="ReStk1", pool_size=(2, 1))
self.seq3 = ResidualStack(32, 32, kernel_size=(3, 1), seq="ReStk2", pool_size=(2, 1))
self.seq4 = ResidualStack(32, 32, kernel_size=(3, 1), seq="ReStk3", pool_size=(2, 1))
self.seq5 = ResidualStack(32, 32, kernel_size=(3, 1), seq="ReStk4", pool_size=(2, 1))
self.seq6 = ResidualStack(32, 32, kernel_size=(3, 1), seq="ReStk5", pool_size=(2, 1))
self.fc1 = nn.Linear(512, 128) # 64 rml, 192 mnist, 512 rml2018
self.fc2 = nn.Linear(128, num_classes)
self.dropout = nn.AlphaDropout(0.2)
def forward(self, x):
# x = self.bn(x)
x = self.seq1(x)
x = self.seq2(x)
x = self.seq3(x)
x = self.seq4(x)
x = self.seq5(x)
x = self.seq6(x)
x = torch.flatten(x,start_dim=1)
x = self.fc1(x)
x = F.selu(x)
x = self.dropout(x)
x = self.fc2(x)
return x
混淆矩阵代码
def plot_confusion_matrix(dataloader, model, classes):
# pre-progression
num_classes = len(classes)
matrix = torch.zeros(size=(num_classes,num_classes))
for x, y in dataloader:
y_pred = model(x)
for i in range(y.size(0)):
matrix[y_pred[i].argmax()][y[i].argmax()] += 1
for i in range(0, num_classes):
matrix[i, :] = matrix[i, :] / torch.sum(matrix[i, :])
# configuration of plot
plt.figure(figsize=(10, 10))
plt.imshow(matrix, interpolation='nearest', cmap=plt.cm.Blues)
# interpolation插值影响图像显示效果
tick_marks = np.arange(num_classes)
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
plt.tight_layout()
plt.title('Confusion Matrix')
plt.colorbar()
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()
return matrix
信噪比准确率,信噪比混淆矩阵代码
def plot_snr_curves(x, y, snr, model, classes):
if not x[0].is_cuda:
model.cpu()
num_classes = len(classes)
snr = snr.reshape((len(snr)))
snrs, counts = np.unique(snr, return_counts=True)
num_snrs = len(snrs)
acc = np.zeros(num_snrs)
matrix = torch.zeros(size=(num_snrs, num_classes, num_classes))
for i in range(num_snrs):
x_snr = x[snr==snrs[i]]
y_snr = y[snr==snrs[i]]
temp_dataset = Data.TensorDataset(x_snr, y_snr)
temp_dataloader = DataLoader(dataset=temp_dataset, batch_size=256)
for temp_x, temp_y in temp_dataloader:
y_pred = model(temp_x)
acc[i] += (y_pred.argmax(1) == temp_y.argmax(1)).sum()
for k in range(temp_y.size(0)):
matrix[i][y_pred[k].argmax()][temp_y[k].argmax()] += 1
acc = acc / counts
plt.plot(snrs, acc)
plt.xlabel('SNR')
plt.ylabel('Acc')
plt.show()
plt.figure(figsize=(10, 10))
plt.imshow(matrix[0][:][:], interpolation='nearest', cmap=plt.cm.Blues)
# interpolation插值影响图像显示效果
tick_marks = np.arange(num_classes)
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
plt.tight_layout()
plt.title('Confusion Matrix SNR 0dB')
plt.colorbar()
plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.show()
return matrix
重新配置信号的label
def select(y, classes, classes_included=True):
temp = y.sum(axis=0)
one_zero = (temp >= 1) # 哪些地方是label的
index = [i for i, x in enumerate(one_zero) if x] # 得到label的位置
new_classes = []
new_num_classes = one_zero.sum()
for i in range(new_num_classes):
new_classes.append(classes[index[i]])
new_y = np.zeros((y.shape[0], new_num_classes))
y_index = y.argmax(1) # y=1的位置
for i in range(y_index.shape[0]):
new_y[i][y_index[i]] = 1
if classes_included:
return new_y, new_classes
else:
return new_y
训练和测试代码
def train(model, train_dataloader, itr, optimizer, loss_func):
start_time = time.time()
train_loss = 0
train_accuracy = 0
model.train()
for x, y in train_dataloader:
optimizer.zero_grad() # 梯度清零
y_pred = model(x) # 计算预测标签
loss = loss_func(y_pred, y.argmax(dim=1)) # 计算损失, argmax() for one-hot
loss.backward() # 利用反向传播计算gradients
optimizer.step() # 利用gradients更新参数值
train_accuracy += (y_pred.argmax(1) == y.argmax(1)).sum()
train_loss += loss.item()
ep_loss = train_loss / len(train_dataloader.dataset)
ep_train_acc = train_accuracy / len(train_dataloader.dataset)
end_time = time.time()
print("Epoch:", itr + 1,
"\nTraining Loss: ", round(ep_loss,5),
"Training Accuracy: ", round(ep_train_acc.item(), 5))
print("Training time consuming: {}".format(end_time-start_time))
return ep_loss, ep_train_acc
def test(model,test_dataloader):
# test
test_accuracy = 0
model.eval()
for x, y in test_dataloader:
y_pred = model(x)
test_accuracy += (y_pred.argmax(1) == y.argmax(1)).sum()
ep_test_acc = test_accuracy / len(test_dataloader.dataset)
print("Test Accuracy: ", round(ep_test_acc.item(),5))
return ep_test_acc
主程序
if __name__ == "__main__":
# Running Time
time = datetime.datetime.now()
month = time.month
day = time.day
# Configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# File path
path = 'data/SNR_greater_0_10data/'
x_train, x_test, y_train, y_test = np.load(path + 'X_train.npy'), np.load(path + 'X_test.npy'), np.load(path + 'Y_train.npy'), np.load(path + 'Y_test.npy')
y_train, classes = select(y_train, classes)
y_test = select(y_test, classes, False)
x_train, x_test, y_train, y_test = torch.from_numpy(x_train), torch.from_numpy(x_test), torch.from_numpy(y_train), torch.from_numpy(y_test)
x_train, x_test, y_train, y_test = x_train.to(device), x_test.to(device), y_train.to(device), y_test.to(device)
num_classes = len(classes)
train_mean, train_std = torch.mean(x_train), torch.std(x_train)
test_mean, test_std = torch.mean(x_test), torch.std(x_test)
train_transformer = transforms.Compose([
transforms.Normalize(mean=train_mean, std=train_std),
])
test_transformer = transforms.Compose([
transforms.Normalize(mean=test_mean, std=test_std),
])
x_train = train_transformer(x_train)
x_test = test_transformer(x_test)
x_train = resize(x_train, (x_train.shape[0], 1, 1024, 2))
x_test = resize(x_test, (x_test.shape[0], 1, 1024, 2))
print("Shape of x_train : {}".format(x_train.shape))
train_dataset = Data.TensorDataset(x_train, y_train)
train_dataloader = DataLoader(dataset=train_dataset,batch_size=256, shuffle=True)
test_dataset = Data.TensorDataset(x_test, y_test)
test_dataloader = DataLoader(dataset=test_dataset, batch_size=256, shuffle=True)
# Model
model = MyResNet(num_classes).to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1) # step decay
itrs = 100
train_loss = []
train_acc = []
test_acc = []
best_accuracy = 0
print("start training")
for itr in range(itrs):
epoch_loss, epoch_train_acc = train(model, train_dataloader, itr, optimizer, loss_function)
epoch_test_acc = test(model, test_dataloader)
train_loss.append(epoch_loss)
train_acc.append(epoch_train_acc)
test_acc.append(epoch_test_acc)
# Save best model on test data
if epoch_test_acc > best_accuracy:
best_accuracy = epoch_test_acc
torch.save(model, path + "ResNet_Identification_best_{}month_{}day.pth".format(month,day))
print("-----The best accuracy now is {}-----".format(best_accuracy))
print("-----The best model until now has been saved-----")
lr_scheduler.step()
confusion_matrix = plot_confusion_matrix(test_dataloader, model, classes)
# Accuracy and Loss
train_acc = [tensor.item() for tensor in train_acc]
test_acc = [tensor.item() for tensor in test_acc]
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
x = range(itrs)
ax1.plot(x, train_loss, label='train_loss')
ax2.plot(x, train_acc, label='train_acc')
ax2.plot(x, test_acc, label='test_acc')
ax1.set_xlabel('Iteration')
ax1.set_ylabel('Loss')
ax2.set_ylabel('Accuracy')
plt.legend()
plt.show()
标签:
人工智能
本文转载自: https://blog.csdn.net/weixin_45121008/article/details/129493241
版权归原作者 Pγ 所有, 如有侵权,请联系我们删除。
版权归原作者 Pγ 所有, 如有侵权,请联系我们删除。