Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2019 Shigeki Karita | |
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) | |
"""Multi-Head Attention layer definition.""" | |
import math | |
import numpy | |
import torch | |
from torch import nn | |
from typing import Optional, Tuple | |
import torch.nn.functional as F | |
from funasr_detach.models.transformer.utils.nets_utils import make_pad_mask | |
import funasr_detach.models.lora.layers as lora | |
class MultiHeadedAttention(nn.Module): | |
"""Multi-Head Attention layer. | |
Args: | |
n_head (int): The number of heads. | |
n_feat (int): The number of features. | |
dropout_rate (float): Dropout rate. | |
""" | |
def __init__(self, n_head, n_feat, dropout_rate): | |
"""Construct an MultiHeadedAttention object.""" | |
super(MultiHeadedAttention, self).__init__() | |
assert n_feat % n_head == 0 | |
# We assume d_v always equals d_k | |
self.d_k = n_feat // n_head | |
self.h = n_head | |
self.linear_q = nn.Linear(n_feat, n_feat) | |
self.linear_k = nn.Linear(n_feat, n_feat) | |
self.linear_v = nn.Linear(n_feat, n_feat) | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
self.attn = None | |
self.dropout = nn.Dropout(p=dropout_rate) | |
def forward_qkv(self, query, key, value): | |
"""Transform query, key and value. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
Returns: | |
torch.Tensor: Transformed query tensor (#batch, n_head, time1, d_k). | |
torch.Tensor: Transformed key tensor (#batch, n_head, time2, d_k). | |
torch.Tensor: Transformed value tensor (#batch, n_head, time2, d_k). | |
""" | |
n_batch = query.size(0) | |
q = self.linear_q(query).view(n_batch, -1, self.h, self.d_k) | |
k = self.linear_k(key).view(n_batch, -1, self.h, self.d_k) | |
v = self.linear_v(value).view(n_batch, -1, self.h, self.d_k) | |
q = q.transpose(1, 2) # (batch, head, time1, d_k) | |
k = k.transpose(1, 2) # (batch, head, time2, d_k) | |
v = v.transpose(1, 2) # (batch, head, time2, d_k) | |
return q, k, v | |
def forward_attention(self, value, scores, mask): | |
"""Compute attention context vector. | |
Args: | |
value (torch.Tensor): Transformed value (#batch, n_head, time2, d_k). | |
scores (torch.Tensor): Attention score (#batch, n_head, time1, time2). | |
mask (torch.Tensor): Mask (#batch, 1, time2) or (#batch, time1, time2). | |
Returns: | |
torch.Tensor: Transformed value (#batch, time1, d_model) | |
weighted by the attention score (#batch, time1, time2). | |
""" | |
n_batch = value.size(0) | |
if mask is not None: | |
mask = mask.unsqueeze(1).eq(0) # (batch, 1, *, time2) | |
min_value = float( | |
numpy.finfo(torch.tensor(0, dtype=scores.dtype).numpy().dtype).min | |
) | |
scores = scores.masked_fill(mask, min_value) | |
self.attn = torch.softmax(scores, dim=-1).masked_fill( | |
mask, 0.0 | |
) # (batch, head, time1, time2) | |
else: | |
self.attn = torch.softmax(scores, dim=-1) # (batch, head, time1, time2) | |
p_attn = self.dropout(self.attn) | |
x = torch.matmul(p_attn, value) # (batch, head, time1, d_k) | |
x = ( | |
x.transpose(1, 2).contiguous().view(n_batch, -1, self.h * self.d_k) | |
) # (batch, time1, d_model) | |
return self.linear_out(x) # (batch, time1, d_model) | |
def forward(self, query, key, value, mask): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q, k, v = self.forward_qkv(query, key, value) | |
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k) | |
return self.forward_attention(v, scores, mask) | |
class MultiHeadedAttentionSANM(nn.Module): | |
"""Multi-Head Attention layer. | |
Args: | |
n_head (int): The number of heads. | |
n_feat (int): The number of features. | |
dropout_rate (float): Dropout rate. | |
""" | |
def __init__( | |
self, | |
n_head, | |
in_feat, | |
n_feat, | |
dropout_rate, | |
kernel_size, | |
sanm_shfit=0, | |
lora_list=None, | |
lora_rank=8, | |
lora_alpha=16, | |
lora_dropout=0.1, | |
): | |
"""Construct an MultiHeadedAttention object.""" | |
super().__init__() | |
assert n_feat % n_head == 0 | |
# We assume d_v always equals d_k | |
self.d_k = n_feat // n_head | |
self.h = n_head | |
# self.linear_q = nn.Linear(n_feat, n_feat) | |
# self.linear_k = nn.Linear(n_feat, n_feat) | |
# self.linear_v = nn.Linear(n_feat, n_feat) | |
if lora_list is not None: | |
if "o" in lora_list: | |
self.linear_out = lora.Linear( | |
n_feat, | |
n_feat, | |
r=lora_rank, | |
lora_alpha=lora_alpha, | |
lora_dropout=lora_dropout, | |
) | |
else: | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
lora_qkv_list = ["q" in lora_list, "k" in lora_list, "v" in lora_list] | |
if lora_qkv_list == [False, False, False]: | |
self.linear_q_k_v = nn.Linear(in_feat, n_feat * 3) | |
else: | |
self.linear_q_k_v = lora.MergedLinear( | |
in_feat, | |
n_feat * 3, | |
r=lora_rank, | |
lora_alpha=lora_alpha, | |
lora_dropout=lora_dropout, | |
enable_lora=lora_qkv_list, | |
) | |
else: | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
self.linear_q_k_v = nn.Linear(in_feat, n_feat * 3) | |
self.attn = None | |
self.dropout = nn.Dropout(p=dropout_rate) | |
self.fsmn_block = nn.Conv1d( | |
n_feat, n_feat, kernel_size, stride=1, padding=0, groups=n_feat, bias=False | |
) | |
# padding | |
left_padding = (kernel_size - 1) // 2 | |
if sanm_shfit > 0: | |
left_padding = left_padding + sanm_shfit | |
right_padding = kernel_size - 1 - left_padding | |
self.pad_fn = nn.ConstantPad1d((left_padding, right_padding), 0.0) | |
def forward_fsmn(self, inputs, mask, mask_shfit_chunk=None): | |
b, t, d = inputs.size() | |
if mask is not None: | |
mask = torch.reshape(mask, (b, -1, 1)) | |
if mask_shfit_chunk is not None: | |
mask = mask * mask_shfit_chunk | |
inputs = inputs * mask | |
x = inputs.transpose(1, 2) | |
x = self.pad_fn(x) | |
x = self.fsmn_block(x) | |
x = x.transpose(1, 2) | |
x += inputs | |
x = self.dropout(x) | |
if mask is not None: | |
x = x * mask | |
return x | |
def forward_qkv(self, x): | |
"""Transform query, key and value. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
Returns: | |
torch.Tensor: Transformed query tensor (#batch, n_head, time1, d_k). | |
torch.Tensor: Transformed key tensor (#batch, n_head, time2, d_k). | |
torch.Tensor: Transformed value tensor (#batch, n_head, time2, d_k). | |
""" | |
b, t, d = x.size() | |
q_k_v = self.linear_q_k_v(x) | |
q, k, v = torch.split(q_k_v, int(self.h * self.d_k), dim=-1) | |
q_h = torch.reshape(q, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time1, d_k) | |
k_h = torch.reshape(k, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
v_h = torch.reshape(v, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
return q_h, k_h, v_h, v | |
def forward_attention(self, value, scores, mask, mask_att_chunk_encoder=None): | |
"""Compute attention context vector. | |
Args: | |
value (torch.Tensor): Transformed value (#batch, n_head, time2, d_k). | |
scores (torch.Tensor): Attention score (#batch, n_head, time1, time2). | |
mask (torch.Tensor): Mask (#batch, 1, time2) or (#batch, time1, time2). | |
Returns: | |
torch.Tensor: Transformed value (#batch, time1, d_model) | |
weighted by the attention score (#batch, time1, time2). | |
""" | |
n_batch = value.size(0) | |
if mask is not None: | |
if mask_att_chunk_encoder is not None: | |
mask = mask * mask_att_chunk_encoder | |
mask = mask.unsqueeze(1).eq(0) # (batch, 1, *, time2) | |
min_value = float( | |
numpy.finfo(torch.tensor(0, dtype=scores.dtype).numpy().dtype).min | |
) | |
scores = scores.masked_fill(mask, min_value) | |
self.attn = torch.softmax(scores, dim=-1).masked_fill( | |
mask, 0.0 | |
) # (batch, head, time1, time2) | |
else: | |
self.attn = torch.softmax(scores, dim=-1) # (batch, head, time1, time2) | |
p_attn = self.dropout(self.attn) | |
x = torch.matmul(p_attn, value) # (batch, head, time1, d_k) | |
x = ( | |
x.transpose(1, 2).contiguous().view(n_batch, -1, self.h * self.d_k) | |
) # (batch, time1, d_model) | |
return self.linear_out(x) # (batch, time1, d_model) | |
def forward(self, x, mask, mask_shfit_chunk=None, mask_att_chunk_encoder=None): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q_h, k_h, v_h, v = self.forward_qkv(x) | |
fsmn_memory = self.forward_fsmn(v, mask, mask_shfit_chunk) | |
q_h = q_h * self.d_k ** (-0.5) | |
scores = torch.matmul(q_h, k_h.transpose(-2, -1)) | |
att_outs = self.forward_attention(v_h, scores, mask, mask_att_chunk_encoder) | |
return att_outs + fsmn_memory | |
def forward_chunk(self, x, cache=None, chunk_size=None, look_back=0): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q_h, k_h, v_h, v = self.forward_qkv(x) | |
if chunk_size is not None and look_back > 0 or look_back == -1: | |
if cache is not None: | |
k_h_stride = k_h[:, :, : -(chunk_size[2]), :] | |
v_h_stride = v_h[:, :, : -(chunk_size[2]), :] | |
k_h = torch.cat((cache["k"], k_h), dim=2) | |
v_h = torch.cat((cache["v"], v_h), dim=2) | |
cache["k"] = torch.cat((cache["k"], k_h_stride), dim=2) | |
cache["v"] = torch.cat((cache["v"], v_h_stride), dim=2) | |
if look_back != -1: | |
cache["k"] = cache["k"][:, :, -(look_back * chunk_size[1]) :, :] | |
cache["v"] = cache["v"][:, :, -(look_back * chunk_size[1]) :, :] | |
else: | |
cache_tmp = { | |
"k": k_h[:, :, : -(chunk_size[2]), :], | |
"v": v_h[:, :, : -(chunk_size[2]), :], | |
} | |
cache = cache_tmp | |
fsmn_memory = self.forward_fsmn(v, None) | |
q_h = q_h * self.d_k ** (-0.5) | |
scores = torch.matmul(q_h, k_h.transpose(-2, -1)) | |
att_outs = self.forward_attention(v_h, scores, None) | |
return att_outs + fsmn_memory, cache | |
class MultiHeadedAttentionSANMDecoder(nn.Module): | |
"""Multi-Head Attention layer. | |
Args: | |
n_head (int): The number of heads. | |
n_feat (int): The number of features. | |
dropout_rate (float): Dropout rate. | |
""" | |
def __init__(self, n_feat, dropout_rate, kernel_size, sanm_shfit=0): | |
"""Construct an MultiHeadedAttention object.""" | |
super(MultiHeadedAttentionSANMDecoder, self).__init__() | |
self.dropout = nn.Dropout(p=dropout_rate) | |
self.fsmn_block = nn.Conv1d( | |
n_feat, n_feat, kernel_size, stride=1, padding=0, groups=n_feat, bias=False | |
) | |
# padding | |
# padding | |
left_padding = (kernel_size - 1) // 2 | |
if sanm_shfit > 0: | |
left_padding = left_padding + sanm_shfit | |
right_padding = kernel_size - 1 - left_padding | |
self.pad_fn = nn.ConstantPad1d((left_padding, right_padding), 0.0) | |
self.kernel_size = kernel_size | |
def forward(self, inputs, mask, cache=None, mask_shfit_chunk=None): | |
""" | |
:param x: (#batch, time1, size). | |
:param mask: Mask tensor (#batch, 1, time) | |
:return: | |
""" | |
# print("in fsmn, inputs", inputs.size()) | |
b, t, d = inputs.size() | |
# logging.info( | |
# "mask: {}".format(mask.size())) | |
if mask is not None: | |
mask = torch.reshape(mask, (b, -1, 1)) | |
# logging.info("in fsmn, mask: {}, {}".format(mask.size(), mask[0:100:50, :, :])) | |
if mask_shfit_chunk is not None: | |
# logging.info("in fsmn, mask_fsmn: {}, {}".format(mask_shfit_chunk.size(), mask_shfit_chunk[0:100:50, :, :])) | |
mask = mask * mask_shfit_chunk | |
# logging.info("in fsmn, mask_after_fsmn: {}, {}".format(mask.size(), mask[0:100:50, :, :])) | |
# print("in fsmn, mask", mask.size()) | |
# print("in fsmn, inputs", inputs.size()) | |
inputs = inputs * mask | |
x = inputs.transpose(1, 2) | |
b, d, t = x.size() | |
if cache is None: | |
# print("in fsmn, cache is None, x", x.size()) | |
x = self.pad_fn(x) | |
if not self.training: | |
cache = x | |
else: | |
# print("in fsmn, cache is not None, x", x.size()) | |
# x = torch.cat((x, cache), dim=2)[:, :, :-1] | |
# if t < self.kernel_size: | |
# x = self.pad_fn(x) | |
x = torch.cat((cache[:, :, 1:], x), dim=2) | |
x = x[:, :, -(self.kernel_size + t - 1) :] | |
# print("in fsmn, cache is not None, x_cat", x.size()) | |
cache = x | |
x = self.fsmn_block(x) | |
x = x.transpose(1, 2) | |
# print("in fsmn, fsmn_out", x.size()) | |
if x.size(1) != inputs.size(1): | |
inputs = inputs[:, -1, :] | |
x = x + inputs | |
x = self.dropout(x) | |
if mask is not None: | |
x = x * mask | |
return x, cache | |
class MultiHeadedAttentionCrossAtt(nn.Module): | |
"""Multi-Head Attention layer. | |
Args: | |
n_head (int): The number of heads. | |
n_feat (int): The number of features. | |
dropout_rate (float): Dropout rate. | |
""" | |
def __init__( | |
self, | |
n_head, | |
n_feat, | |
dropout_rate, | |
lora_list=None, | |
lora_rank=8, | |
lora_alpha=16, | |
lora_dropout=0.1, | |
encoder_output_size=None, | |
): | |
"""Construct an MultiHeadedAttention object.""" | |
super(MultiHeadedAttentionCrossAtt, self).__init__() | |
assert n_feat % n_head == 0 | |
# We assume d_v always equals d_k | |
self.d_k = n_feat // n_head | |
self.h = n_head | |
if lora_list is not None: | |
if "q" in lora_list: | |
self.linear_q = lora.Linear( | |
n_feat, | |
n_feat, | |
r=lora_rank, | |
lora_alpha=lora_alpha, | |
lora_dropout=lora_dropout, | |
) | |
else: | |
self.linear_q = nn.Linear(n_feat, n_feat) | |
lora_kv_list = ["k" in lora_list, "v" in lora_list] | |
if lora_kv_list == [False, False]: | |
self.linear_k_v = nn.Linear( | |
n_feat if encoder_output_size is None else encoder_output_size, | |
n_feat * 2, | |
) | |
else: | |
self.linear_k_v = lora.MergedLinear( | |
n_feat if encoder_output_size is None else encoder_output_size, | |
n_feat * 2, | |
r=lora_rank, | |
lora_alpha=lora_alpha, | |
lora_dropout=lora_dropout, | |
enable_lora=lora_kv_list, | |
) | |
if "o" in lora_list: | |
self.linear_out = lora.Linear( | |
n_feat, | |
n_feat, | |
r=lora_rank, | |
lora_alpha=lora_alpha, | |
lora_dropout=lora_dropout, | |
) | |
else: | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
else: | |
self.linear_q = nn.Linear(n_feat, n_feat) | |
self.linear_k_v = nn.Linear( | |
n_feat if encoder_output_size is None else encoder_output_size, | |
n_feat * 2, | |
) | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
self.attn = None | |
self.dropout = nn.Dropout(p=dropout_rate) | |
def forward_qkv(self, x, memory): | |
"""Transform query, key and value. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
Returns: | |
torch.Tensor: Transformed query tensor (#batch, n_head, time1, d_k). | |
torch.Tensor: Transformed key tensor (#batch, n_head, time2, d_k). | |
torch.Tensor: Transformed value tensor (#batch, n_head, time2, d_k). | |
""" | |
# print("in forward_qkv, x", x.size()) | |
b = x.size(0) | |
q = self.linear_q(x) | |
q_h = torch.reshape(q, (b, -1, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time1, d_k) | |
k_v = self.linear_k_v(memory) | |
k, v = torch.split(k_v, int(self.h * self.d_k), dim=-1) | |
k_h = torch.reshape(k, (b, -1, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
v_h = torch.reshape(v, (b, -1, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
return q_h, k_h, v_h | |
def forward_attention(self, value, scores, mask, ret_attn=False): | |
"""Compute attention context vector. | |
Args: | |
value (torch.Tensor): Transformed value (#batch, n_head, time2, d_k). | |
scores (torch.Tensor): Attention score (#batch, n_head, time1, time2). | |
mask (torch.Tensor): Mask (#batch, 1, time2) or (#batch, time1, time2). | |
Returns: | |
torch.Tensor: Transformed value (#batch, time1, d_model) | |
weighted by the attention score (#batch, time1, time2). | |
""" | |
n_batch = value.size(0) | |
if mask is not None: | |
mask = mask.unsqueeze(1).eq(0) # (batch, 1, *, time2) | |
min_value = float( | |
numpy.finfo(torch.tensor(0, dtype=scores.dtype).numpy().dtype).min | |
) | |
# logging.info( | |
# "scores: {}, mask_size: {}".format(scores.size(), mask.size())) | |
scores = scores.masked_fill(mask, min_value) | |
self.attn = torch.softmax(scores, dim=-1).masked_fill( | |
mask, 0.0 | |
) # (batch, head, time1, time2) | |
else: | |
self.attn = torch.softmax(scores, dim=-1) # (batch, head, time1, time2) | |
p_attn = self.dropout(self.attn) | |
x = torch.matmul(p_attn, value) # (batch, head, time1, d_k) | |
x = ( | |
x.transpose(1, 2).contiguous().view(n_batch, -1, self.h * self.d_k) | |
) # (batch, time1, d_model) | |
if ret_attn: | |
return self.linear_out(x), self.attn # (batch, time1, d_model) | |
return self.linear_out(x) # (batch, time1, d_model) | |
def forward(self, x, memory, memory_mask, ret_attn=False): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q_h, k_h, v_h = self.forward_qkv(x, memory) | |
q_h = q_h * self.d_k ** (-0.5) | |
scores = torch.matmul(q_h, k_h.transpose(-2, -1)) | |
return self.forward_attention(v_h, scores, memory_mask, ret_attn=ret_attn) | |
def forward_chunk(self, x, memory, cache=None, chunk_size=None, look_back=0): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q_h, k_h, v_h = self.forward_qkv(x, memory) | |
if chunk_size is not None and look_back > 0: | |
if cache is not None: | |
k_h = torch.cat((cache["k"], k_h), dim=2) | |
v_h = torch.cat((cache["v"], v_h), dim=2) | |
cache["k"] = k_h[:, :, -(look_back * chunk_size[1]) :, :] | |
cache["v"] = v_h[:, :, -(look_back * chunk_size[1]) :, :] | |
else: | |
cache_tmp = { | |
"k": k_h[:, :, -(look_back * chunk_size[1]) :, :], | |
"v": v_h[:, :, -(look_back * chunk_size[1]) :, :], | |
} | |
cache = cache_tmp | |
q_h = q_h * self.d_k ** (-0.5) | |
scores = torch.matmul(q_h, k_h.transpose(-2, -1)) | |
return self.forward_attention(v_h, scores, None), cache | |
class MultiHeadSelfAttention(nn.Module): | |
"""Multi-Head Attention layer. | |
Args: | |
n_head (int): The number of heads. | |
n_feat (int): The number of features. | |
dropout_rate (float): Dropout rate. | |
""" | |
def __init__(self, n_head, in_feat, n_feat, dropout_rate): | |
"""Construct an MultiHeadedAttention object.""" | |
super(MultiHeadSelfAttention, self).__init__() | |
assert n_feat % n_head == 0 | |
# We assume d_v always equals d_k | |
self.d_k = n_feat // n_head | |
self.h = n_head | |
self.linear_out = nn.Linear(n_feat, n_feat) | |
self.linear_q_k_v = nn.Linear(in_feat, n_feat * 3) | |
self.attn = None | |
self.dropout = nn.Dropout(p=dropout_rate) | |
def forward_qkv(self, x): | |
"""Transform query, key and value. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
Returns: | |
torch.Tensor: Transformed query tensor (#batch, n_head, time1, d_k). | |
torch.Tensor: Transformed key tensor (#batch, n_head, time2, d_k). | |
torch.Tensor: Transformed value tensor (#batch, n_head, time2, d_k). | |
""" | |
b, t, d = x.size() | |
q_k_v = self.linear_q_k_v(x) | |
q, k, v = torch.split(q_k_v, int(self.h * self.d_k), dim=-1) | |
q_h = torch.reshape(q, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time1, d_k) | |
k_h = torch.reshape(k, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
v_h = torch.reshape(v, (b, t, self.h, self.d_k)).transpose( | |
1, 2 | |
) # (batch, head, time2, d_k) | |
return q_h, k_h, v_h, v | |
def forward_attention(self, value, scores, mask, mask_att_chunk_encoder=None): | |
"""Compute attention context vector. | |
Args: | |
value (torch.Tensor): Transformed value (#batch, n_head, time2, d_k). | |
scores (torch.Tensor): Attention score (#batch, n_head, time1, time2). | |
mask (torch.Tensor): Mask (#batch, 1, time2) or (#batch, time1, time2). | |
Returns: | |
torch.Tensor: Transformed value (#batch, time1, d_model) | |
weighted by the attention score (#batch, time1, time2). | |
""" | |
n_batch = value.size(0) | |
if mask is not None: | |
if mask_att_chunk_encoder is not None: | |
mask = mask * mask_att_chunk_encoder | |
mask = mask.unsqueeze(1).eq(0) # (batch, 1, *, time2) | |
min_value = float( | |
numpy.finfo(torch.tensor(0, dtype=scores.dtype).numpy().dtype).min | |
) | |
scores = scores.masked_fill(mask, min_value) | |
self.attn = torch.softmax(scores, dim=-1).masked_fill( | |
mask, 0.0 | |
) # (batch, head, time1, time2) | |
else: | |
self.attn = torch.softmax(scores, dim=-1) # (batch, head, time1, time2) | |
p_attn = self.dropout(self.attn) | |
x = torch.matmul(p_attn, value) # (batch, head, time1, d_k) | |
x = ( | |
x.transpose(1, 2).contiguous().view(n_batch, -1, self.h * self.d_k) | |
) # (batch, time1, d_model) | |
return self.linear_out(x) # (batch, time1, d_model) | |
def forward(self, x, mask, mask_att_chunk_encoder=None): | |
"""Compute scaled dot product attention. | |
Args: | |
query (torch.Tensor): Query tensor (#batch, time1, size). | |
key (torch.Tensor): Key tensor (#batch, time2, size). | |
value (torch.Tensor): Value tensor (#batch, time2, size). | |
mask (torch.Tensor): Mask tensor (#batch, 1, time2) or | |
(#batch, time1, time2). | |
Returns: | |
torch.Tensor: Output tensor (#batch, time1, d_model). | |
""" | |
q_h, k_h, v_h, v = self.forward_qkv(x) | |
q_h = q_h * self.d_k ** (-0.5) | |
scores = torch.matmul(q_h, k_h.transpose(-2, -1)) | |
att_outs = self.forward_attention(v_h, scores, mask, mask_att_chunk_encoder) | |
return att_outs | |