0


fastspeech2复现github项目--模型构建

在完成fastspeech论文学习后,对github上一个复现的仓库进行学习,帮助理解算法实现过程中的一些细节;所选择的仓库复现仓库是基于pytorch实现,链接为https://github.com/ming024/FastSpeech2。该仓库是基于https://github.com/xcmyz/FastSpeech中的FastSpeech复现代码完成的,很多代码基本一致。作者前期已对该FastSpeech复现仓库进行注释分析,感兴趣的读者可见此专栏。

通过论文可知,FastSpeech2模型整体架构与FastSpeech基本一致,只是除了Duration Predicator外,还增加了Pitch Predictor和Energy Predictor两部分,并且此三部分的网络架构是一样的。所以,本仓库中transformer路径下的文件基本与https://github.com/xcmyz/FastSpeech中基本一致,在搭建FastSpeech2模型时,主要使用到其中定义的Encoder, Decoder, PostNet模块,可以进入专栏中详细了解。在本仓库中,FastSpeech2模型搭建主要涉及的两个文件为fastspeech.py和model路径下的modules.py文件。

文章目录

model/modules.py

本文件主要是定义Variance Adaptor,其中主要包括Duration Predictor、Length Regulator、Pitch Predictor和Energy Predictor,详细代码和注释解析如下所示

import os
import json
import copy
import math
from collections import OrderedDict

import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F

from utils.tools import get_mask_from_lengths, pad

device = torch.device("cuda"if torch.cuda.is_available()else"cpu")# 完整Variance AdaptorclassVarianceAdaptor(nn.Module):"""Variance Adaptor"""def__init__(self, preprocess_config, model_config):super(VarianceAdaptor, self).__init__()
        self.duration_predictor = VariancePredictor(model_config)
        self.length_regulator = LengthRegulator()
        self.pitch_predictor = VariancePredictor(model_config)
        self.energy_predictor = VariancePredictor(model_config)# 设置pitch和energy的级别
        self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"]["feature"]
        self.energy_feature_level = preprocess_config["preprocessing"]["energy"]["feature"]assert self.pitch_feature_level in["phoneme_level","frame_level"]assert self.energy_feature_level in["phoneme_level","frame_level"]# 设置pitch和energy的量化方式
        pitch_quantization = model_config["variance_embedding"]["pitch_quantization"]
        energy_quantization = model_config["variance_embedding"]["energy_quantization"]
        n_bins = model_config["variance_embedding"]["n_bins"]assert pitch_quantization in["linear","log"]assert energy_quantization in["linear","log"]# 加载pitch和energy的正则化所需参数withopen(os.path.join(preprocess_config["path"]["preprocessed_path"],"stats.json"))as f:
            stats = json.load(f)
            pitch_min, pitch_max = stats["pitch"][:2]
            energy_min, energy_max = stats["energy"][:2]if pitch_quantization =="log":
            self.pitch_bins = nn.Parameter(
                torch.exp(torch.linspace(np.log(pitch_min), np.log(pitch_max), n_bins -1)),
                requires_grad=False,)else:
            self.pitch_bins = nn.Parameter(
                torch.linspace(pitch_min, pitch_max, n_bins -1),
                requires_grad=False,)if energy_quantization =="log":
            self.energy_bins = nn.Parameter(
                torch.exp(torch.linspace(np.log(energy_min), np.log(energy_max), n_bins -1)),
                requires_grad=False,)else:
            self.energy_bins = nn.Parameter(
                torch.linspace(energy_min, energy_max, n_bins -1),
                requires_grad=False,)# pitch和energy的嵌入层
        self.pitch_embedding = nn.Embedding(n_bins, model_config["transformer"]["encoder_hidden"])
        self.energy_embedding = nn.Embedding(n_bins, model_config["transformer"]["encoder_hidden"])# 计算pitch嵌入层defget_pitch_embedding(self, x, target, mask, control):
        prediction = self.pitch_predictor(x, mask)# pitch预测器预测的数值if target isnotNone:# target存在,训练过程,使用target计算embedding
            embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins))else:# target不存在,预测过程,使用prediction计算embedding
            prediction = prediction * control  # control是用于控制的系数
            embedding = self.pitch_embedding(torch.bucketize(prediction, self.pitch_bins))return prediction, embedding  # prediction用于训练过程计算损失,embedding与x相加进行后续计算# # 计算energy嵌入层defget_energy_embedding(self, x, target, mask, control):
        prediction = self.energy_predictor(x, mask)# energy预测器预测的数值if target isnotNone:# target存在,训练过程,使用target计算embedding
            embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins))else:# target不存在,预测过程,使用prediction计算embedding
            prediction = prediction * control  # control是用于控制的系数
            embedding = self.energy_embedding(torch.bucketize(prediction, self.energy_bins))return prediction, embedding  # prediction用于训练过程计算损失,embedding与x相加进行后续计算defforward(
        self,
        x,
        src_mask,
        mel_mask=None,
        max_len=None,
        pitch_target=None,
        energy_target=None,
        duration_target=None,
        p_control=1.0,
        e_control=1.0,
        d_control=1.0,):

        log_duration_prediction = self.duration_predictor(x, src_mask)# 对音素序列预测的持续时间if self.pitch_feature_level =="phoneme_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(x, pitch_target, src_mask, p_control)
            x = x + pitch_embedding  # 累加pitch嵌入层if self.energy_feature_level =="phoneme_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(x, energy_target, src_mask, p_control)
            x = x + energy_embedding  # 累加energy嵌入层if duration_target isnotNone:# duration_target,训练过程,使用duration_target计算
            x, mel_len = self.length_regulator(x, duration_target, max_len)# 使用duration_target调整x
            duration_rounded = duration_target
        else:# 预测过程# 基于log_duration_prediction构建duration_rounded,用于调整x
            duration_rounded = torch.clamp((torch.round(torch.exp(log_duration_prediction)-1)* d_control),min=0,)
            x, mel_len = self.length_regulator(x, duration_rounded, max_len)
            mel_mask = get_mask_from_lengths(mel_len)# 同上,与phoneme_level一致if self.pitch_feature_level =="frame_level":
            pitch_prediction, pitch_embedding = self.get_pitch_embedding(x, pitch_target, mel_mask, p_control)
            x = x + pitch_embedding
        if self.energy_feature_level =="frame_level":
            energy_prediction, energy_embedding = self.get_energy_embedding(x, energy_target, mel_mask, p_control)
            x = x + energy_embedding

        return(
            x,
            pitch_prediction,# 此处三个prediction用于后续计算损失
            energy_prediction,
            log_duration_prediction,
            duration_rounded,
            mel_len,
            mel_mask,)# 长度调节器classLengthRegulator(nn.Module):"""Length Regulator"""def__init__(self):super(LengthRegulator, self).__init__()# 对输入的音素序列x进行长度调正defLR(self, x, duration, max_len):"""
        基于音素持续时间将音素序列长度与mel谱图长度对齐
        @param x: 经过FFT块转换后的音素序列,[batch_size, max_sequence_len, encoder_dim]
        @param duration: 音素持续时间矩阵,[batch_size, max_sequence_len]
        @param max_len: 音素谱图序列中最大长度
        @return: 长度经过调整后的音素序列,[batch_size, max_len, encoder_dim]
        """
        output =list()
        mel_len =list()for batch, expand_target inzip(x, duration):
            expanded = self.expand(batch, expand_target)# 获得一个长度完整调整之后音素序列
            output.append(expanded)
            mel_len.append(expanded.shape[0])# 记录mel谱图长度大小,方便后续生成mask# 如果传入max_len就按其进行pad,如果没有就以output中最长序列大小进行padif max_len isnotNone:
            output = pad(output, max_len)else:
            output = pad(output)return output, torch.LongTensor(mel_len).to(device)defexpand(self, batch, predicted):"""
        将输入的一个音素序列的长度按其对应的持续时间调整
        @param batch:一个音频对应文本的音素序列,[max_sequence_len, encoder_dim]
        @param predicted:音素序列中每个音素对应的持续序列,长度为max_sequence_len
        @return:长度调整后的音素序列,长度与mel谱图长度一致
        """
        out =list()for i, vec inenumerate(batch):
            expand_size = predicted[i].item()# i对应的音素对应持续时间,即需要重复的次数
            out.append(vec.expand(max(int(expand_size),0),-1))# 将i对应的音素的表征向量vec重复expand_size次
        out = torch.cat(out,0)# 将整个音素序列cat起来return out

    defforward(self, x, duration, max_len):
        output, mel_len = self.LR(x, duration, max_len)return output, mel_len

classVariancePredictor(nn.Module):"""Duration, Pitch and Energy Predictor"""def__init__(self, model_config):super(VariancePredictor, self).__init__()

        self.input_size = model_config["transformer"]["encoder_hidden"]# 输入尺寸
        self.filter_size = model_config["variance_predictor"]["filter_size"]# 输出尺寸
        self.kernel = model_config["variance_predictor"]["kernel_size"]# 卷积核大小
        self.conv_output_size = model_config["variance_predictor"]["filter_size"]
        self.dropout = model_config["variance_predictor"]["dropout"]# 定义一个包含激活函数和正则项的卷积序列,即[Con1D+Relu+LN+Dropout]+[Con1D+Relu+LN+Dropout]
        self.conv_layer = nn.Sequential(
            OrderedDict([("conv1d_1",
                        Conv(
                            self.input_size,
                            self.filter_size,
                            kernel_size=self.kernel,
                            padding=(self.kernel -1)//2,),),("relu_1", nn.ReLU()),("layer_norm_1", nn.LayerNorm(self.filter_size)),("dropout_1", nn.Dropout(self.dropout)),("conv1d_2",
                        Conv(
                            self.filter_size,
                            self.filter_size,
                            kernel_size=self.kernel,
                            padding=1,),),("relu_2", nn.ReLU()),("layer_norm_2", nn.LayerNorm(self.filter_size)),("dropout_2", nn.Dropout(self.dropout)),]))

        self.linear_layer = nn.Linear(self.conv_output_size,1)defforward(self, encoder_output, mask):
        out = self.conv_layer(encoder_output)# [Con1D+Relu+LN+Dropout]+[Con1D+Relu+LN+Dropout]
        out = self.linear_layer(out)# 最后输出前的线性层
        out = out.squeeze(-1)# 因为线性层返回的是1,即输出的尺寸的最后一维是1,将其压缩掉if mask isnotNone:
            out = out.masked_fill(mask,0.0)# 将mask对应地方设置为0return out

# 自定义的一维卷积网络classConv(nn.Module):"""
    Convolution Module
    """def__init__(
        self,
        in_channels,
        out_channels,
        kernel_size=1,
        stride=1,
        padding=0,
        dilation=1,
        bias=True,
        w_init="linear",):"""
        :param in_channels: dimension of input
        :param out_channels: dimension of output
        :param kernel_size: size of kernel
        :param stride: size of stride
        :param padding: size of padding
        :param dilation: dilation rate
        :param bias: boolean. if True, bias is included.
        :param w_init: str. weight inits with xavier initialization.
        """super(Conv, self).__init__()

        self.conv = nn.Conv1d(
            in_channels,
            out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=padding,
            dilation=dilation,
            bias=bias,)defforward(self, x):
        x = x.contiguous().transpose(1,2)
        x = self.conv(x)
        x = x.contiguous().transpose(1,2)return x

model/fastspeech2.py

本文件将Encoder, Decoder, PostNet和Variance Adaptor模块集成在一起,完成FastSpeech2模型搭建

import os
import json

import torch
import torch.nn as nn
import torch.nn.functional as F

from transformer import Encoder, Decoder, PostNet
from.modules import VarianceAdaptor
from utils.tools import get_mask_from_lengths

classFastSpeech2(nn.Module):""" FastSpeech2 """def__init__(self, preprocess_config, model_config):super(FastSpeech2, self).__init__()
        self.model_config = model_config

        self.encoder = Encoder(model_config)# Variance Adaptor之前网络,为编码器
        self.variance_adaptor = VarianceAdaptor(preprocess_config, model_config)# Variance Adaptor
        self.decoder = Decoder(model_config)# Variance Adaptor之后网络,为解码器
        self.mel_linear = nn.Linear(
            model_config["transformer"]["decoder_hidden"],
            preprocess_config["preprocessing"]["mel"]["n_mel_channels"],)
        self.postnet = PostNet()

        self.speaker_emb =Noneif model_config["multi_speaker"]:# 如果为多speaker# 加载speaker文件withopen(os.path.join(preprocess_config["path"]["preprocessed_path"],"speakers.json"),)as f:
                n_speaker =len(json.load(f))# 构建speaker嵌入层
            self.speaker_emb = nn.Embedding(n_speaker, model_config["transformer"]["encoder_hidden"],)defforward(
        self,
        speakers,
        texts,
        src_lens,
        max_src_len,
        mels=None,
        mel_lens=None,
        max_mel_len=None,
        p_targets=None,
        e_targets=None,
        d_targets=None,
        p_control=1.0,# 控制系数
        e_control=1.0,
        d_control=1.0,):
        src_masks = get_mask_from_lengths(src_lens, max_src_len)# 原始文本序列mask
        mel_masks =(
            get_mask_from_lengths(mel_lens, max_mel_len)if mel_lens isnotNoneelseNone)# mel谱图序列mask

        output = self.encoder(texts, src_masks)# 编码if self.speaker_emb isnotNone:# 如果存在speaker嵌入层,将其和output相加
            output = output + self.speaker_emb(speakers).unsqueeze(1).expand(-1, max_src_len,-1)# 通过Variance Adaptor模块计算(
            output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            mel_lens,
            mel_masks,)= self.variance_adaptor(
            output,
            src_masks,
            mel_masks,
            max_mel_len,
            p_targets,
            e_targets,
            d_targets,
            p_control,
            e_control,
            d_control,)

        output, mel_masks = self.decoder(output, mel_masks)# 解码
        output = self.mel_linear(output)# 线性转换

        postnet_output = self.postnet(output)+ output  # 后处理return(
            output,
            postnet_output,
            p_predictions,
            e_predictions,
            log_d_predictions,
            d_rounded,
            src_masks,
            mel_masks,
            src_lens,
            mel_lens,)

本笔记主要记录所选择的fastspeech2复现仓库中模型构建相关的代码,结合之前FastSppech2论文阅读笔记笔记中的模型部分进行理解。本笔记主要是对代码进行详细的注释,读者若发现问题或错误,请评论指出,互相学习。

标签:

本文转载自: https://blog.csdn.net/zzfive/article/details/126860712
版权归原作者 zzfive 所有, 如有侵权,请联系我们删除。

“fastspeech2复现github项目--模型构建”的评论:

还没有评论