0


[开源] 基于transformer的时间序列预测模型python代码

分享一下基于transformer的时间序列预测模型python代码,给大家,记得点赞哦

  1. #!/usr/bin/env python
  2. # coding: 帅帅的笔者
  3. import torch
  4. import torch.nn as nn
  5. import numpy as np
  6. import pandas as pd
  7. import time
  8. import math
  9. import matplotlib.pyplot as plt
  10. from sklearn.preprocessing import MinMaxScaler
  11. # Set random seeds for reproducibility
  12. torch.manual_seed(0)
  13. np.random.seed(0)
  14. # Hyperparameters
  15. input_window = 10
  16. output_window = 1
  17. batch_size = 250
  18. device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  19. epochs = 100
  20. lr = 0.00005
  21. # Load data
  22. df = pd.read_csv("data1.csv", parse_dates=["value"], index_col=[0], encoding='gbk')
  23. data = np.array(df['value']).reshape(-1, 1)
  24. # Normalize data
  25. scaler = MinMaxScaler(feature_range=(-1, 1))
  26. data_normalized = scaler.fit_transform(data)
  27. # Split the data into train and validation sets
  28. train_ratio = 0.828
  29. train_size = int(len(data) * train_ratio)
  30. val_size = len(data) - train_size
  31. train_data_normalized = data_normalized[:train_size]
  32. val_data_normalized = data_normalized[train_size:]
  33. # Define the Transformer model
  34. class PositionalEncoding(nn.Module):
  35. def __init__(self, d_model, max_len=5000):
  36. super(PositionalEncoding, self).__init__()
  37. pe = torch.zeros(max_len, d_model)
  38. position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
  39. div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
  40. pe[:, 0::2] = torch.sin(position * div_term)
  41. pe[:, 1::2] = torch.cos(position * div_term)
  42. pe = pe.unsqueeze(0).transpose(0, 1)
  43. self.register_buffer('pe', pe)
  44. def forward(self, x):
  45. return x + self.pe[:x.size(0), :]
  46. class TransAm(nn.Module):
  47. def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
  48. super(TransAm, self).__init__()
  49. self.model_type = 'Transformer'
  50. self.src_mask = None
  51. self.pos_encoder = PositionalEncoding(feature_size)
  52. self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)
  53. self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
  54. self.decoder = nn.Linear(feature_size, 1)
  55. self.init_weights()
  56. def init_weights(self):
  57. initrange = 0.1
  58. self.decoder.bias.data.zero_()
  59. self.decoder.weight.data.uniform_(-initrange, initrange)
  60. def forward(self, src):
  61. if self.src_mask is None or self.src_mask.size(0) != len(src):
  62. device = src.device
  63. mask = self._generate_square_subsequent_mask(len(src)).to(device)
  64. self.src_mask = mask
  65. src = self.pos_encoder(src)
  66. output = self.transformer_encoder(src, self.src_mask)
  67. output = self.decoder(output)
  68. return output
  69. def _generate_square_subsequent_mask(self, sz):
  70. mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
  71. mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
  72. return mask
  73. # Create the dataset for the model
  74. def create_inout_sequences(data, input_window, output_window):
  75. inout_seq = []
  76. length = len(data)
  77. for i in range(length - input_window - output_window):
  78. train_seq = data[i:i+input_window]
  79. train_label = data[i+input_window:i+input_window+output_window]
  80. inout_seq.append((train_seq, train_label))
  81. return inout_seq
  82. train_data = create_inout_sequences(train_data_normalized, input_window, output_window)
  83. val_data = create_inout_sequences(val_data_normalized, input_window, output_window)
  84. # Train the model
  85. model = TransAm().to(device)
  86. criterion = nn.MSELoss()
  87. optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
  88. scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
  89. def train(train_data):
  90. model.train()
  91. total_loss = 0.
  92. for i in range(0, len(train_data) - 1, batch_size):
  93. data, targets = torch.stack([torch.tensor(item[0], dtype=torch.float32) for item in train_data[i:i+batch_size]]).to(device), torch.stack([torch.tensor(item[1], dtype=torch.float32) for item in train_data[i:i+batch_size]]).to(device)
  94. optimizer.zero_grad()
  95. output = model(data)
  96. loss = criterion(output, targets)
  97. loss.backward()
  98. torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
  99. optimizer.step()
  100. total_loss += loss.item()
  101. return total_loss / len(train_data)
  102. def validate(val_data):
  103. model.eval()
  104. total_loss = 0.
  105. with torch.no_grad():
  106. for i in range(0, len(val_data) - 1, batch_size):
  107. data, targets = torch.stack([torch.tensor(item[0], dtype=torch.float32) for item in val_data[i:i+batch_size]]).to(device), torch.stack([torch.tensor(item[1], dtype=torch.float32) for item in val_data[i:i+batch_size]]).to(device)
  108. output = model(data)
  109. loss = criterion(output, targets)
  110. total_loss += loss.item()
  111. return total_loss / len(val_data)
  112. best_val_loss = float("inf")
  113. best_model = None
  114. for epoch in range(1, epochs + 1):
  115. epoch_start_time = time.time()
  116. train_loss = train(train_data)
  117. val_loss = validate(val_data)
  118. scheduler.step()
  119. if val_loss < best_val_loss:
  120. best_val_loss = val_loss
  121. best_model = model
  122. # Predict and denormalize the data
  123. def predict(model, dataset):
  124. model.eval()
  125. predictions = []
  126. actuals = []
  127. with torch.no_grad():
  128. for i in range(len(dataset)):
  129. data, target = dataset[i]
  130. data = torch.tensor(data, dtype=torch.float32).to(device)
  131. output = model(data.unsqueeze(0))
  132. prediction = output.squeeze().cpu().numpy()
  133. predictions.append(prediction)
  134. actuals.append(target)
  135. return np.array(predictions), np.array(actuals)
  136. predictions, actuals = predict(best_model, val_data)
  137. print("Predictions shape:", predictions.shape)
  138. print("Actuals shape:", actuals.shape)
  139. predictions_denorm = scaler.inverse_transform(predictions)
  140. actuals_denorm = scaler.inverse_transform(actuals.flatten().reshape(-1, 1))
  141. # Plot the results
  142. plt.plot(predictions_denorm, label='Predictions')
  143. plt.plot(actuals_denorm, label='Actuals')
  144. plt.legend(['Predictions', 'Actuals'])
  145. plt.xlabel('Timestep')
  146. plt.ylabel('High')
  147. plt.legend()
  148. plt.show()

更多时间序列预测代码:时间序列预测算法全集合--深度学习


本文转载自: https://blog.csdn.net/zjdssd/article/details/137546610
版权归原作者 那只呢 所有, 如有侵权,请联系我们删除。

“[开源] 基于transformer的时间序列预测模型python代码”的评论:

还没有评论