wyysf's picture
update to v1.5
8133633
raw
history blame
11.4 kB
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from craftsman.utils.typing import *
from craftsman.utils.checkpoint import checkpoint
from .utils import init_linear, MLP
from timm.models.vision_transformer import Attention
def scaled_dot_product_gqa(
query: Tensor,
key: Tensor,
value: Tensor,
dropout: float = 0.0,
scale: Optional[float] = None,
mask: Optional[Tensor] = None,
is_causal: Optional[bool] = None,
need_weights: bool = False,
average_attn_weights: bool = False,
force_grouped: bool = False,
):
"""Scaled dot product attention with support for grouped queries.
Einstein notation:
- b: batch size
- n / s: sequence length
- h: number of heads
- g: number of groups
- d: dimension of query/key/value
Args:
query: Query tensor of shape (b, n, h, d)
key: Key tensor of shape (b, s, h, d)
value: Value tensor of shape (b, s, h, d)
dropout: Dropout probability (default: 0.0)
scale: Scale factor for query (default: d_query ** 0.5)
mask: Mask tensor of shape (b, n, s) or (b, s). If 'ndim == 2', the mask is
applied to all 'n' rows of the attention matrix. (default: None)
force_grouped: If True, apply grouped-query attention even if the number of
heads is equal for query, key, and value. (default: False)
Returns:
2-tuple of:
- Attention output with shape (b, n, h, d)
- (Optional) Attention weights with shape (b, h, n, s). Only returned if
'need_weights' is True.
"""
if (mask is not None) and (is_causal is not None):
raise ValueError(
"Only one of 'mask' and 'is_causal' should be provided, but got both."
)
elif not query.ndim == key.ndim == value.ndim == 4:
raise ValueError(
f"Expected query, key, and value to be 4-dimensional, but got shapes "
f"{query.shape}, {key.shape}, and {value.shape}."
)
# Move sequence length dimension to axis 2.
# This makes the attention operations below *much* faster.
query = rearrange(query, "b n h d -> b h n d")
key = rearrange(key, "b s h d -> b h s d")
value = rearrange(value, "b s h d -> b h s d")
bq, hq, nq, dq = query.shape
bk, hk, nk, dk = key.shape
bv, hv, nv, dv = value.shape
if not (bq == bk == bv and dq == dk == dv):
raise ValueError(
"Expected query, key, and value to have the same batch size (dim=0) and "
f"embedding dimension (dim=3), but got query: {query.shape}, "
f"key: {key.shape}, and value: {value.shape}."
)
elif (hk != hv) or (nk != nv):
raise ValueError(
"Expected key and value to have the same size in dimensions 1 and 2, but "
f"got key: {key.shape} and value: {value.shape}."
)
elif hq % hk != 0:
raise ValueError(
"Expected query heads to be a multiple of key/value heads, but got "
f"query: {query.shape} and key/value: {key.shape}."
)
if scale is None:
scale = query.size(-1) ** 0.5
query = query / scale
num_head_groups = hq // hk
query = rearrange(query, "b (h g) n d -> b g h n d", g=num_head_groups)
similarity = einsum(query, key, "b g h n d, b h s d -> b g h n s")
if is_causal:
# Mask out the upper triangular portion of the attention matrix. This prevents
# the model from attending to tokens in the future.
mask = torch.ones((bq, nq, nk), device=query.device, dtype=torch.bool).tril_()
if mask is not None:
# Expand mask to match the shape of the attention matrix.
# If mask is 2D, assume that it is applied to the key/value sequence dimension.
# Else if mask is 3D, assume that it is applied to the query/key/value sequence
# dimension for all attention heads.
#
if mask.ndim == 2:
mask = rearrange(mask, "b s -> b () () () s")
elif mask.ndim == 3:
mask = rearrange(mask, "b n s -> b () () n s")
# Mask similarity values by setting them to negative infinity. This guarantees
# that they will not contribute to the softmax computation below.
similarity.masked_fill_(~mask, torch.finfo(similarity.dtype).min)
attention = F.softmax(similarity, dim=-1)
if dropout > 0.0:
attention = F.dropout(attention, p=dropout)
# Apply attention matrix to the value Tensor.
out = einsum(attention, value, "b g h n s, b h s d -> b g h n d")
# Move head dimension back to axis 2
out = rearrange(out, "b g h n d -> b n (h g) d")
attn_weights: Optional[Tensor] = None
if need_weights:
# Move the sequence dimensions back to positions 1, 2. Move the head dimension
# to position 3. This more closely matches the return shape of the attention
# output: (b, n, h, d).
attn_weights = rearrange(attention, "b g h n s -> b n s (h g)")
if average_attn_weights:
attn_weights = attn_weights.mean(dim=1)
return out, attn_weights
class MultiheadAttention(nn.Module):
def __init__(
self,
*,
n_ctx: int,
width: int,
heads: int,
init_scale: float,
qkv_bias: bool,
use_flash: bool = False
):
super().__init__()
self.n_ctx = n_ctx
self.width = width
self.heads = heads
self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias)
self.c_proj = nn.Linear(width, width)
self.attention = QKVMultiheadAttention(heads=heads, n_ctx=n_ctx, use_flash=use_flash)
init_linear(self.c_qkv, init_scale)
init_linear(self.c_proj, init_scale)
def forward(self, x):
x = self.c_qkv(x)
x = checkpoint(self.attention, (x,), (), True)
x = self.c_proj(x)
return x
class QKVMultiheadAttention(nn.Module):
def __init__(self, *, heads: int, n_ctx: int, use_flash: bool = False):
super().__init__()
self.heads = heads
self.n_ctx = n_ctx
self.use_flash = use_flash
def forward(self, qkv):
bs, n_ctx, width = qkv.shape
attn_ch = width // self.heads // 3
scale = 1 / math.sqrt(math.sqrt(attn_ch))
qkv = qkv.view(bs, n_ctx, self.heads, -1)
q, k, v = torch.split(qkv, attn_ch, dim=-1)
if self.use_flash:
q = q.permute(0, 2, 1, 3)
k = k.permute(0, 2, 1, 3)
v = v.permute(0, 2, 1, 3)
out = F.scaled_dot_product_attention(q, k, v).permute(0, 2, 1, 3).reshape(bs, n_ctx, -1)
else:
weight = torch.einsum(
"bthc,bshc->bhts", q * scale, k * scale
) # More stable with f16 than dividing afterwards
wdtype = weight.dtype
weight = torch.softmax(weight.float(), dim=-1).type(wdtype)
out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1)
return out
class ResidualAttentionBlock(nn.Module):
def __init__(
self,
*,
n_ctx: int,
width: int,
heads: int,
init_scale: float = 1.0,
qkv_bias: bool = True,
use_flash: bool = False,
use_checkpoint: bool = False
):
super().__init__()
self.use_checkpoint = use_checkpoint
self.attn = MultiheadAttention(
n_ctx=n_ctx,
width=width,
heads=heads,
init_scale=init_scale,
qkv_bias=qkv_bias,
use_flash=use_flash
)
self.ln_1 = nn.LayerNorm(width)
self.mlp = MLP(width=width, init_scale=init_scale)
self.ln_2 = nn.LayerNorm(width)
def _forward(self, x: torch.Tensor):
x = x + self.attn(self.ln_1(x))
x = x + self.mlp(self.ln_2(x))
return x
def forward(self, x: torch.Tensor):
return checkpoint(self._forward, (x,), self.parameters(), self.use_checkpoint)
class MultiheadCrossAttention(nn.Module):
def __init__(
self,
*,
width: int,
heads: int,
init_scale: float,
qkv_bias: bool = True,
use_flash: bool = False,
n_data: Optional[int] = None,
data_width: Optional[int] = None,
):
super().__init__()
self.n_data = n_data
self.width = width
self.heads = heads
self.data_width = width if data_width is None else data_width
self.c_q = nn.Linear(width, width, bias=qkv_bias)
self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias)
self.c_proj = nn.Linear(width, width)
self.attention = QKVMultiheadCrossAttention(
heads=heads, n_data=n_data, use_flash=use_flash
)
init_linear(self.c_q, init_scale)
init_linear(self.c_kv, init_scale)
init_linear(self.c_proj, init_scale)
def forward(self, x, data):
x = self.c_q(x)
data = self.c_kv(data)
x = checkpoint(self.attention, (x, data), (), True)
x = self.c_proj(x)
return x
class QKVMultiheadCrossAttention(nn.Module):
def __init__(self, *, heads: int, use_flash: bool = False, n_data: Optional[int] = None):
super().__init__()
self.heads = heads
self.n_data = n_data
self.use_flash = use_flash
def forward(self, q, kv):
_, n_ctx, _ = q.shape
bs, n_data, width = kv.shape
attn_ch = width // self.heads // 2
scale = 1 / math.sqrt(math.sqrt(attn_ch))
q = q.view(bs, n_ctx, self.heads, -1)
kv = kv.view(bs, n_data, self.heads, -1)
k, v = torch.split(kv, attn_ch, dim=-1)
if self.use_flash:
q = q.permute(0, 2, 1, 3)
k = k.permute(0, 2, 1, 3)
v = v.permute(0, 2, 1, 3)
out = F.scaled_dot_product_attention(q, k, v).permute(0, 2, 1, 3).reshape(bs, n_ctx, -1)
else:
weight = torch.einsum(
"bthc,bshc->bhts", q * scale, k * scale
) # More stable with f16 than dividing afterwards
wdtype = weight.dtype
weight = torch.softmax(weight.float(), dim=-1).type(wdtype)
out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1)
return out
class ResidualCrossAttentionBlock(nn.Module):
def __init__(
self,
*,
n_data: Optional[int] = None,
width: int,
heads: int,
data_width: Optional[int] = None,
init_scale: float = 0.25,
qkv_bias: bool = True,
use_flash: bool = False
):
super().__init__()
if data_width is None:
data_width = width
self.attn = MultiheadCrossAttention(
n_data=n_data,
width=width,
heads=heads,
data_width=data_width,
init_scale=init_scale,
qkv_bias=qkv_bias,
use_flash=use_flash,
)
self.ln_1 = nn.LayerNorm(width)
self.ln_2 = nn.LayerNorm(data_width)
self.mlp = MLP(width=width, init_scale=init_scale)
self.ln_3 = nn.LayerNorm(width)
def forward(self, x: torch.Tensor, data: torch.Tensor):
x = x + self.attn(self.ln_1(x), self.ln_2(data))
x = x + self.mlp(self.ln_3(x))
return x