0


深度学习实战:手把手教你构建多任务、多标签模型

多任务多标签模型是现代机器学习中的基础架构,这个任务在概念上很简单 -训练一个模型同时预测多个任务的多个输出。

在本文中,我们将基于流行的 MovieLens 数据集,使用稀疏特征来创建一个多任务多标签模型,并逐步介绍整个过程。所以本文将涵盖数据准备、模型构建、训练循环、模型诊断,最后使用 Ray Serve 部署模型的全部流程。

1. 设置环境

在深入代码之前,请确保安装了必要的库(以下不是详尽列表):

  1. pip install pandas scikit-learn torch ray[serve] matplotlib requests tensorboard

我们在这里使用的数据集足够小,所以可以使用 CPU 进行训练。

2. 准备数据集

我们将从创建用于处理 MovieLens 数据集的下载、预处理的类开始,然后将数据分割为训练集和测试集。

MovieLens数据集包含有关用户、电影及其评分的信息,我们将用它来预测评分(回归任务)和用户是否喜欢这部电影(二元分类任务)。

  1. importos
  2. importpandasaspd
  3. fromsklearn.model_selectionimporttrain_test_split
  4. fromsklearn.preprocessingimportLabelEncoder
  5. importtorch
  6. fromtorch.utils.dataimportDataset, DataLoader
  7. importzipfile
  8. importio
  9. importrequests
  10. classMovieLensDataset(Dataset):
  11. def__init__(self, dataset_version="small", data_dir="data"):
  12. print("Initializing MovieLensDataset...")
  13. ifnotos.path.exists(data_dir):
  14. os.makedirs(data_dir)
  15. ifdataset_version=="small":
  16. url="https://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
  17. local_zip_path=os.path.join(data_dir, "ml-latest-small.zip")
  18. file_path='ml-latest-small/ratings.csv'
  19. parquet_path=os.path.join(data_dir, "ml-latest-small.parquet")
  20. elifdataset_version=="full":
  21. url="https://files.grouplens.org/datasets/movielens/ml-latest.zip"
  22. local_zip_path=os.path.join(data_dir, "ml-latest.zip")
  23. file_path='ml-latest/ratings.csv'
  24. parquet_path=os.path.join(data_dir, "ml-latest.parquet")
  25. else:
  26. raiseValueError("Invalid dataset_version. Choose 'small' or 'full'.")
  27. ifos.path.exists(parquet_path):
  28. print(f"Loading dataset from {parquet_path}...")
  29. movielens=pd.read_parquet(parquet_path)
  30. else:
  31. ifnotos.path.exists(local_zip_path):
  32. print(f"Downloading {dataset_version} dataset from {url}...")
  33. response=requests.get(url)
  34. withopen(local_zip_path, "wb") asf:
  35. f.write(response.content)
  36. withzipfile.ZipFile(local_zip_path, "r") asz:
  37. withz.open(file_path) asf:
  38. movielens=pd.read_csv(f, usecols=['userId', 'movieId', 'rating'], low_memory=True)
  39. movielens.to_parquet(parquet_path, index=False)
  40. movielens['liked'] = (movielens['rating'] >=4).astype(int)
  41. self.user_encoder=LabelEncoder()
  42. self.movie_encoder=LabelEncoder()
  43. movielens['user'] =self.user_encoder.fit_transform(movielens['userId'])
  44. movielens['movie'] =self.movie_encoder.fit_transform(movielens['movieId'])
  45. self.train_df, self.test_df=train_test_split(movielens, test_size=0.2, random_state=42)
  46. defget_data(self, split="train"):
  47. ifsplit=="train":
  48. data=self.train_df
  49. elifsplit=="test":
  50. data=self.test_df
  51. else:
  52. raiseValueError("Invalid split. Choose 'train' or 'test'.")
  53. dense_features=torch.tensor(data[['user', 'movie']].values, dtype=torch.long)
  54. labels=torch.tensor(data[['rating', 'liked']].values, dtype=torch.float32)
  55. returndense_features, labels
  56. defget_encoders(self):
  57. returnself.user_encoder, self.movie_encoder

定义了

  1. MovieLensDataset

,就可以将训练集和评估集加载到内存中

  1. # Example usage with a single dataset object
  2. print("Creating MovieLens dataset...")
  3. # Feel free to use dataset_version="full" if you are using
  4. # a GPU
  5. dataset=MovieLensDataset(dataset_version="small")
  6. print("Getting training data...")
  7. train_dense_features, train_labels=dataset.get_data(split="train")
  8. print("Getting testing data...")
  9. test_dense_features, test_labels=dataset.get_data(split="test")
  10. # Create DataLoader for training and testing
  11. train_loader=DataLoader(torch.utils.data.TensorDataset(train_dense_features, train_labels), batch_size=64, shuffle=True)
  12. test_loader=DataLoader(torch.utils.data.TensorDataset(test_dense_features, test_labels), batch_size=64, shuffle=False)
  13. print("Accessing encoders...")
  14. user_encoder, movie_encoder=dataset.get_encoders()
  15. print("Setup complete.")

3. 定义多任务多标签模型

我们将定义一个基本的 PyTorch 模型,处理两个任务:预测评分(回归)和用户是否喜欢这部电影(二元分类)。

模型使用稀疏嵌入来表示用户和电影,并有共享层,这些共享层会输入到两个单独的输出层。

通过在任务之间共享一些层,并为每个特定任务的输出设置单独的层,该模型利用了共享表示,同时仍然针对每个任务定制其预测。

  1. fromtorchimportnn
  2. classMultiTaskMovieLensModel(nn.Module):
  3. def__init__(self, n_users, n_movies, embedding_size, hidden_size):
  4. super(MultiTaskMovieLensModel, self).__init__()
  5. self.user_embedding=nn.Embedding(n_users, embedding_size)
  6. self.movie_embedding=nn.Embedding(n_movies, embedding_size)
  7. self.shared_layer=nn.Linear(embedding_size*2, hidden_size)
  8. self.shared_activation=nn.ReLU()
  9. self.task1_fc=nn.Linear(hidden_size, 1)
  10. self.task2_fc=nn.Linear(hidden_size, 1)
  11. self.task2_activation=nn.Sigmoid()
  12. defforward(self, x):
  13. user=x[:, 0]
  14. movie=x[:, 1]
  15. user_embed=self.user_embedding(user)
  16. movie_embed=self.movie_embedding(movie)
  17. combined=torch.cat((user_embed, movie_embed), dim=1)
  18. shared_out=self.shared_activation(self.shared_layer(combined))
  19. rating_out=self.task1_fc(shared_out)
  20. liked_out=self.task2_fc(shared_out)
  21. liked_out=self.task2_activation(liked_out)
  22. returnrating_out, liked_out

**输入 (

  1. x

)** :

  • 输入 x 预期是一个 2D 张量,其中每行包含一个用户 ID 和一个电影 ID。

用户和电影嵌入 :

  • user = x[:, 0]: 从第一列提取用户 ID。
  • movie = x[:, 1]: 从第二列提取电影 ID。
  • user_embedmovie_embed 是对应这些 ID 的嵌入。

连接 :

  • combined = torch.cat((user_embed, movie_embed), dim=1): 沿特征维度连接用户和电影嵌入。

共享层 :

  • shared_out = self.shared_activation(self.shared_layer(combined)): 将组合的嵌入通过共享的全连接层和激活函数。

任务特定输出 :

  • rating_out = self.task1_fc(shared_out): 从第一个任务特定层输出预测评分。
  • liked_out = self.task2_fc(shared_out): 输出用户是否喜欢电影的原始分数。
  • liked_out = self.task2_activation(liked_out): 原始分数通过 sigmoid 函数转换为概率。

返回 :

模型返回两个输出:

  • rating_out: 预测的评分(回归输出)。
  • liked_out: 用户喜欢电影的概率(分类输出)。

4. 训练循环

首先,用一些任意选择的超参数(嵌入维度和隐藏层中的神经元数量)实例化我们的模型。对于回归任务将使用均方误差损失,对于分类任务,将使用二元交叉熵。

我们可以通过它们的初始值来归一化两个损失,以确保它们都大致处于相似的尺度(这里也可以使用不确定性加权来归一化损失)

然后将使用数据加载器训练模型,并跟踪两个任务的损失。损失将被绘制成图表,以可视化模型在评估集上随时间的学习和泛化情况。

  1. importtorch.optimasoptim
  2. importmatplotlib.pyplotasplt
  3. # Check if GPU is available
  4. device=torch.device("cuda"iftorch.cuda.is_available() else"cpu")
  5. print(f"Using device: {device}")
  6. embedding_size=16
  7. hidden_size=32
  8. n_users=len(dataset.get_encoders()[0].classes_)
  9. n_movies=len(dataset.get_encoders()[1].classes_)
  10. model=MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size).to(device)
  11. criterion_rating=nn.MSELoss()
  12. criterion_liked=nn.BCELoss()
  13. optimizer=optim.Adam(model.parameters(), lr=0.001)
  14. train_rating_losses, train_liked_losses= [], []
  15. eval_rating_losses, eval_liked_losses= [], []
  16. epochs=10
  17. # used for loss normalization
  18. initial_loss_rating=None
  19. initial_loss_liked=None
  20. forepochinrange(epochs):
  21. model.train()
  22. running_loss_rating=0.0
  23. running_loss_liked=0.0
  24. fordense_features, labelsintrain_loader:
  25. optimizer.zero_grad()
  26. dense_features=dense_features.to(device)
  27. labels=labels.to(device)
  28. rating_pred, liked_pred=model(dense_features)
  29. rating_target=labels[:, 0].unsqueeze(1)
  30. liked_target=labels[:, 1].unsqueeze(1)
  31. loss_rating=criterion_rating(rating_pred, rating_target)
  32. loss_liked=criterion_liked(liked_pred, liked_target)
  33. # Set initial losses
  34. ifinitial_loss_ratingisNone:
  35. initial_loss_rating=loss_rating.item()
  36. ifinitial_loss_likedisNone:
  37. initial_loss_liked=loss_liked.item()
  38. # Normalize losses
  39. loss= (loss_rating/initial_loss_rating) + (loss_liked/initial_loss_liked)
  40. loss.backward()
  41. optimizer.step()
  42. running_loss_rating+=loss_rating.item()
  43. running_loss_liked+=loss_liked.item()
  44. train_rating_losses.append(running_loss_rating/len(train_loader))
  45. train_liked_losses.append(running_loss_liked/len(train_loader))
  46. model.eval()
  47. eval_loss_rating=0.0
  48. eval_loss_liked=0.0
  49. withtorch.no_grad():
  50. fordense_features, labelsintest_loader:
  51. dense_features=dense_features.to(device)
  52. labels=labels.to(device)
  53. rating_pred, liked_pred=model(dense_features)
  54. rating_target=labels[:, 0].unsqueeze(1)
  55. liked_target=labels[:, 1].unsqueeze(1)
  56. loss_rating=criterion_rating(rating_pred, rating_target)
  57. loss_liked=criterion_liked(liked_pred, liked_target)
  58. eval_loss_rating+=loss_rating.item()
  59. eval_loss_liked+=loss_liked.item()
  60. eval_rating_losses.append(eval_loss_rating/len(test_loader))
  61. eval_liked_losses.append(eval_loss_liked/len(test_loader))
  62. print(f'Epoch {epoch+1}, Train Rating Loss: {train_rating_losses[-1]}, Train Liked Loss: {train_liked_losses[-1]}, Eval Rating Loss: {eval_rating_losses[-1]}, Eval Liked Loss: {eval_liked_losses[-1]}')
  63. # Plotting losses
  64. plt.figure(figsize=(14, 6))
  65. plt.subplot(1, 2, 1)
  66. plt.plot(train_rating_losses, label='Train Rating Loss')
  67. plt.plot(eval_rating_losses, label='Eval Rating Loss')
  68. plt.xlabel('Epoch')
  69. plt.ylabel('Loss')
  70. plt.title('Rating Loss')
  71. plt.legend()
  72. plt.subplot(1, 2, 2)
  73. plt.plot(train_liked_losses, label='Train Liked Loss')
  74. plt.plot(eval_liked_losses, label='Eval Liked Loss')
  75. plt.xlabel('Epoch')
  76. plt.ylabel('Loss')
  77. plt.title('Liked Loss')
  78. plt.legend()
  79. plt.tight_layout()
  80. plt.show()

还可以通过利用 Tensorboard 监控训练的过程

  1. fromtorch.utils.tensorboardimportSummaryWriter
  2. # Check if GPU is available
  3. device=torch.device("cuda"iftorch.cuda.is_available() else"cpu")
  4. print(f"Using device: {device}")
  5. # Model and Training Setup
  6. embedding_size=16
  7. hidden_size=32
  8. n_users=len(user_encoder.classes_)
  9. n_movies=len(movie_encoder.classes_)
  10. model=MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size).to(device)
  11. criterion_rating=nn.MSELoss()
  12. criterion_liked=nn.BCELoss()
  13. optimizer=optim.Adam(model.parameters(), lr=0.001)
  14. epochs=10
  15. # used for loss normalization
  16. initial_loss_rating=None
  17. initial_loss_liked=None
  18. # TensorBoard setup
  19. writer=SummaryWriter(log_dir='runs/multitask_movie_lens')
  20. # Training Loop with TensorBoard Logging
  21. forepochinrange(epochs):
  22. model.train()
  23. running_loss_rating=0.0
  24. running_loss_liked=0.0
  25. forbatch_idx, (dense_features, labels) inenumerate(train_loader):
  26. # Move data to GPU
  27. dense_features=dense_features.to(device)
  28. labels=labels.to(device)
  29. optimizer.zero_grad()
  30. rating_pred, liked_pred=model(dense_features)
  31. rating_target=labels[:, 0].unsqueeze(1)
  32. liked_target=labels[:, 1].unsqueeze(1)
  33. loss_rating=criterion_rating(rating_pred, rating_target)
  34. loss_liked=criterion_liked(liked_pred, liked_target)
  35. # Set initial losses
  36. ifinitial_loss_ratingisNone:
  37. initial_loss_rating=loss_rating.item()
  38. ifinitial_loss_likedisNone:
  39. initial_loss_liked=loss_liked.item()
  40. # Normalize losses
  41. loss= (loss_rating/initial_loss_rating) + (loss_liked/initial_loss_liked)
  42. loss.backward()
  43. optimizer.step()
  44. running_loss_rating+=loss_rating.item()
  45. running_loss_liked+=loss_liked.item()
  46. # Log loss to TensorBoard
  47. writer.add_scalar('Loss/Train_Rating', loss_rating.item(), epoch*len(train_loader) +batch_idx)
  48. writer.add_scalar('Loss/Train_Liked', loss_liked.item(), epoch*len(train_loader) +batch_idx)
  49. print(f'Epoch {epoch+1}/{epochs}, Train Rating Loss: {running_loss_rating/len(train_loader)}, Train Liked Loss: {running_loss_liked/len(train_loader)}')
  50. # Evaluate on the test set
  51. model.eval()
  52. eval_loss_rating=0.0
  53. eval_loss_liked=0.0
  54. withtorch.no_grad():
  55. fordense_features, labelsintest_loader:
  56. # Move data to GPU
  57. dense_features=dense_features.to(device)
  58. labels=labels.to(device)
  59. rating_pred, liked_pred=model(dense_features)
  60. rating_target=labels[:, 0].unsqueeze(1)
  61. liked_target=labels[:, 1].unsqueeze(1)
  62. loss_rating=criterion_rating(rating_pred, rating_target)
  63. loss_liked=criterion_liked(liked_pred, liked_target)
  64. eval_loss_rating+=loss_rating.item()
  65. eval_loss_liked+=loss_liked.item()
  66. eval_loss_avg_rating=eval_loss_rating/len(test_loader)
  67. eval_loss_avg_liked=eval_loss_liked/len(test_loader)
  68. print(f'Epoch {epoch+1}/{epochs}, Eval Rating Loss: {eval_loss_avg_rating}, Eval Liked Loss: {eval_loss_avg_liked}')
  69. # Log evaluation loss to TensorBoard
  70. writer.add_scalar('Loss/Eval_Rating', eval_loss_avg_rating, epoch)
  71. writer.add_scalar('Loss/Eval_Liked', eval_loss_avg_liked, epoch)
  72. # Close the TensorBoard writer
  73. writer.close()

我们在同一目录下运行 TensorBoard 来启动服务器,并在网络浏览器中检查训练和评估曲线。在以下 bash 命令中,将

  1. runs/mutlitask_movie_lens

替换为包含事件文件(日志)的目录路径。

  1. (base) $ tensorboard--logdir=runs/multitask_movie_lens
  2. TensorFlow installation not found - running with reduced feature set.

运行结果如下:

  1. NOTE: Using experimental fast data loading logic. To disable, pass
  2. "--load_fast=false" and report issues on GitHub. More details:
  3. <https://github.com/tensorflow/tensorboard/issues/4784>
  4. Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
  5. TensorBoard 2.12.0 at <http://localhost:6006/> (Press CTRL+C to quit)

Tensorboard 损失曲线视图如上所示

5. 推理

在训练完成后要使用

  1. torch.save

函数将模型保存到磁盘。这个函数允许你保存模型的状态字典,其中包含模型的所有参数和缓冲区。保存的文件通常使用

  1. .pth

  1. .pt

扩展名。

  1. import torch
  2. torch.save(model.state_dict(), "model.pth")

状态字典包含所有模型参数(权重和偏置),当想要将模型加载回代码中时,可以使用以下步骤:

  1. # Initialize the model (make sure the architecture matches the saved model)
  2. model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size)
  3. # Load the saved state dictionary into the model
  4. model.load_state_dict(torch.load("model.pth"))
  5. # Set the model to evaluation mode (important for inference)
  6. model.eval()

为了在一些未见过的数据上评估模型,可以对单个用户-电影对进行预测,并将它们与实际值进行比较。

  1. def predict_and_compare(user_id, movie_id, model, user_encoder, movie_encoder, train_dataset, test_dataset):
  2. user_idx = user_encoder.transform([user_id])[0]
  3. movie_idx = movie_encoder.transform([movie_id])[0]
  4. example_user = torch.tensor([[user_idx]], dtype=torch.long)
  5. example_movie = torch.tensor([[movie_idx]], dtype=torch.long)
  6. example_dense_features = torch.cat((example_user, example_movie), dim=1)
  7. model.eval()
  8. with torch.no_grad():
  9. rating_pred, liked_pred = model(example_dense_features)
  10. predicted_rating = rating_pred.item()
  11. predicted_liked = liked_pred.item()
  12. actual_row = train_dataset.data[(train_dataset.data['userId'] == user_id) & (train_dataset.data['movieId'] == movie_id)]
  13. if actual_row.empty:
  14. actual_row = test_dataset.data[(test_dataset.data['userId'] == user_id) & (test_dataset.data['movieId'] == movie_id)]
  15. if not actual_row.empty:
  16. actual_rating = actual_row['rating'].values[0]
  17. actual_liked = actual_row['liked'].values[0]
  18. return {
  19. 'User ID': user_id,
  20. 'Movie ID': movie_id,
  21. 'Predicted Rating': round(predicted_rating, 2),
  22. 'Actual Rating': actual_rating,
  23. 'Predicted Liked': 'Yes' if predicted_liked >= 0.5 else 'No',
  24. 'Actual Liked': 'Yes' if actual_liked == 1 else 'No'
  25. }
  26. else:
  27. return None
  28. example_pairs = test_dataset.data.sample(n=5)
  29. results = []
  30. for _, row in example_pairs.iterrows():
  31. user_id = row['userId']
  32. movie_id = row['movieId']
  33. result = predict_and_compare(user_id, movie_id, model, user_encoder, movie_encoder, train_dataset, test_dataset)
  34. if result:
  35. results.append(result)
  36. results_df = pd.DataFrame(results)
  37. results_df.head()

6. 使用 Ray Serve 部署模型

最后就是将模型部署为一个服务,使其可以通过 API 访问,这里使用使用 Ray Serve。

使用 Ray Serve是因为它可以从单机无缝扩展到大型集群,可以处理不断增加的负载。Ray Serve 还集成了 Ray 的仪表板,为监控部署的健康状况、性能和资源使用提供了用户友好的界面。

步骤 1:加载训练好的模型

  1. # Load your trained model (assuming it's saved as 'model.pth')
  2. n_users = 1000 # 示例值,替换为实际用户数
  3. n_movies = 1000 # 示例值,替换为实际电影数
  4. embedding_size = 16
  5. hidden_size = 32
  6. model = MultiTaskMovieLensModel(n_users, n_movies, embedding_size, hidden_size)
  7. model.load_state_dict(torch.load("model.pth"))
  8. model.eval()

步骤 2:定义模型服务类

  1. import ray
  2. from ray import serve
  3. @serve.deployment
  4. class ModelServeDeployment:
  5. def __init__(self, model):
  6. self.model = model
  7. self.model.eval()
  8. async def __call__(self, request):
  9. json_input = await request.json()
  10. user_id = torch.tensor([json_input["user_id"]])
  11. movie_id = torch.tensor([json_input["movie_id"]])
  12. with torch.no_grad():
  13. rating_pred, liked_pred = self.model(user_id, movie_id)
  14. return {
  15. "rating_prediction": rating_pred.item(),
  16. "liked_prediction": liked_pred.item()
  17. }

步骤 3:初始化 Ray 服务器

  1. # 初始化 Ray 和 Ray Serve
  2. ray.init()
  3. serve.start()
  4. # 部署模型
  5. model_deployment = ModelServeDeployment.bind(model)
  6. serve.run(model_deployment)

现在应该能够在 localhost:8265 看到 ray 服务器

步骤 4:查询模型

最后就是测试 API 了。运行以下代码行时,应该可以看到一个响应,其中包含查询用户和电影的评分和喜欢预测

  1. import requests
  2. # 定义服务器地址(Ray Serve 默认为 http://127.0.0.1:8000)
  3. url = "http://127.0.0.1:8000/ModelServeDeployment"
  4. # 示例输入
  5. data = {
  6. "user_id": 123, # 替换为实际用户 ID
  7. "movie_id": 456 # 替换为实际电影 ID
  8. }
  9. # 向模型服务器发送 POST 请求
  10. response = requests.post(url, json=data)
  11. # 打印模型的响应
  12. print(response.json())

就是这样,我们刚刚训练并部署了一个多任务多标签模型!

作者:Cole Diamond

“深度学习实战:手把手教你构建多任务、多标签模型”的评论:

还没有评论