|
import math
|
|
import torch
|
|
import numpy as np
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
from typing import List, Optional, Tuple
|
|
from .configuration import AVHubertConfig
|
|
from fairseq.utils import index_put
|
|
from fairseq.modules import LayerNorm, SamePad
|
|
from fairseq.models.wav2vec.wav2vec2 import TransformerSentenceEncoderLayer
|
|
from fairseq.modules.transformer_sentence_encoder import init_bert_params
|
|
|
|
|
|
class TransformerEncoder(nn.Module):
|
|
def __init__(self, config: AVHubertConfig) -> None:
|
|
super().__init__()
|
|
|
|
self.dropout = config.dropout
|
|
self.embedding_dim = config.encoder_embed_dim
|
|
|
|
self.pos_conv = nn.Conv1d(
|
|
self.embedding_dim,
|
|
self.embedding_dim,
|
|
kernel_size=config.conv_pos,
|
|
padding=config.conv_pos // 2,
|
|
groups=config.conv_pos_groups,
|
|
)
|
|
dropout = 0
|
|
std = math.sqrt((4 * (1.0 - dropout)) / (config.conv_pos * self.embedding_dim))
|
|
nn.init.normal_(self.pos_conv.weight, mean=0, std=std)
|
|
nn.init.constant_(self.pos_conv.bias, 0)
|
|
|
|
self.pos_conv = nn.utils.weight_norm(
|
|
self.pos_conv, name="weight", dim=2
|
|
)
|
|
self.pos_conv = nn.Sequential(self.pos_conv, SamePad(config.conv_pos), nn.GELU())
|
|
|
|
self.layers = nn.ModuleList(
|
|
[
|
|
TransformerSentenceEncoderLayer(
|
|
embedding_dim=self.embedding_dim,
|
|
ffn_embedding_dim=config.encoder_ffn_embed_dim,
|
|
num_attention_heads=config.encoder_attention_heads,
|
|
dropout=self.dropout,
|
|
attention_dropout=config.attention_dropout,
|
|
activation_dropout=config.activation_dropout,
|
|
activation_fn=config.activation_fn,
|
|
layer_norm_first=config.layer_norm_first,
|
|
)
|
|
for _ in range(config.encoder_layers)
|
|
]
|
|
)
|
|
|
|
self.layer_norm_first = config.layer_norm_first
|
|
self.layer_norm = LayerNorm(self.embedding_dim)
|
|
self.layerdrop = config.encoder_layerdrop
|
|
|
|
self.apply(init_bert_params)
|
|
|
|
def forward(
|
|
self,
|
|
x: torch.Tensor,
|
|
padding_mask: Optional[torch.Tensor] = None,
|
|
layer: Optional[int] = None,
|
|
) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]:
|
|
x, layer_results = self.extract_features(x, padding_mask, layer)
|
|
if self.layer_norm_first and layer is None:
|
|
x = self.layer_norm(x)
|
|
return x, layer_results
|
|
|
|
def extract_features(
|
|
self,
|
|
x: torch.Tensor,
|
|
padding_mask: Optional[torch.Tensor] = None,
|
|
tgt_layer: Optional[int] = None,
|
|
) -> Tuple[torch.Tensor, List[Tuple[torch.Tensor, torch.Tensor]]]:
|
|
if padding_mask is not None:
|
|
x = index_put(x, padding_mask, 0)
|
|
|
|
x_conv = self.pos_conv(x.transpose(1, 2))
|
|
x_conv = x_conv.transpose(1, 2)
|
|
x = x + x_conv
|
|
|
|
if not self.layer_norm_first:
|
|
x = self.layer_norm(x)
|
|
|
|
x = F.dropout(x, p=self.dropout, training=self.training)
|
|
|
|
|
|
x = x.transpose(0, 1)
|
|
|
|
layer_results = []
|
|
r = None
|
|
for i, layer in enumerate(self.layers):
|
|
dropout_probability = np.random.random()
|
|
if not self.training or (dropout_probability > self.layerdrop):
|
|
x, z = layer(x, self_attn_padding_mask=padding_mask, need_weights=False)
|
|
if tgt_layer is not None:
|
|
layer_results.append((x, z))
|
|
if i == tgt_layer:
|
|
r = x
|
|
break
|
|
|
|
if r is not None:
|
|
x = r
|
|
|
|
|
|
x = x.transpose(0, 1)
|
|
|
|
return x, layer_results
|
|
|