import math import torch import torch.nn as nn import torch.nn.functional as F from craftsman.utils.typing import * from craftsman.utils.checkpoint import checkpoint from .utils import init_linear, MLP from timm.models.vision_transformer import Attention def scaled_dot_product_gqa( query: Tensor, key: Tensor, value: Tensor, dropout: float = 0.0, scale: Optional[float] = None, mask: Optional[Tensor] = None, is_causal: Optional[bool] = None, need_weights: bool = False, average_attn_weights: bool = False, force_grouped: bool = False, ): """Scaled dot product attention with support for grouped queries. Einstein notation: - b: batch size - n / s: sequence length - h: number of heads - g: number of groups - d: dimension of query/key/value Args: query: Query tensor of shape (b, n, h, d) key: Key tensor of shape (b, s, h, d) value: Value tensor of shape (b, s, h, d) dropout: Dropout probability (default: 0.0) scale: Scale factor for query (default: d_query ** 0.5) mask: Mask tensor of shape (b, n, s) or (b, s). If 'ndim == 2', the mask is applied to all 'n' rows of the attention matrix. (default: None) force_grouped: If True, apply grouped-query attention even if the number of heads is equal for query, key, and value. (default: False) Returns: 2-tuple of: - Attention output with shape (b, n, h, d) - (Optional) Attention weights with shape (b, h, n, s). Only returned if 'need_weights' is True. """ if (mask is not None) and (is_causal is not None): raise ValueError( "Only one of 'mask' and 'is_causal' should be provided, but got both." ) elif not query.ndim == key.ndim == value.ndim == 4: raise ValueError( f"Expected query, key, and value to be 4-dimensional, but got shapes " f"{query.shape}, {key.shape}, and {value.shape}." ) # Move sequence length dimension to axis 2. # This makes the attention operations below *much* faster. query = rearrange(query, "b n h d -> b h n d") key = rearrange(key, "b s h d -> b h s d") value = rearrange(value, "b s h d -> b h s d") bq, hq, nq, dq = query.shape bk, hk, nk, dk = key.shape bv, hv, nv, dv = value.shape if not (bq == bk == bv and dq == dk == dv): raise ValueError( "Expected query, key, and value to have the same batch size (dim=0) and " f"embedding dimension (dim=3), but got query: {query.shape}, " f"key: {key.shape}, and value: {value.shape}." ) elif (hk != hv) or (nk != nv): raise ValueError( "Expected key and value to have the same size in dimensions 1 and 2, but " f"got key: {key.shape} and value: {value.shape}." ) elif hq % hk != 0: raise ValueError( "Expected query heads to be a multiple of key/value heads, but got " f"query: {query.shape} and key/value: {key.shape}." ) if scale is None: scale = query.size(-1) ** 0.5 query = query / scale num_head_groups = hq // hk query = rearrange(query, "b (h g) n d -> b g h n d", g=num_head_groups) similarity = einsum(query, key, "b g h n d, b h s d -> b g h n s") if is_causal: # Mask out the upper triangular portion of the attention matrix. This prevents # the model from attending to tokens in the future. mask = torch.ones((bq, nq, nk), device=query.device, dtype=torch.bool).tril_() if mask is not None: # Expand mask to match the shape of the attention matrix. # If mask is 2D, assume that it is applied to the key/value sequence dimension. # Else if mask is 3D, assume that it is applied to the query/key/value sequence # dimension for all attention heads. # if mask.ndim == 2: mask = rearrange(mask, "b s -> b () () () s") elif mask.ndim == 3: mask = rearrange(mask, "b n s -> b () () n s") # Mask similarity values by setting them to negative infinity. This guarantees # that they will not contribute to the softmax computation below. similarity.masked_fill_(~mask, torch.finfo(similarity.dtype).min) attention = F.softmax(similarity, dim=-1) if dropout > 0.0: attention = F.dropout(attention, p=dropout) # Apply attention matrix to the value Tensor. out = einsum(attention, value, "b g h n s, b h s d -> b g h n d") # Move head dimension back to axis 2 out = rearrange(out, "b g h n d -> b n (h g) d") attn_weights: Optional[Tensor] = None if need_weights: # Move the sequence dimensions back to positions 1, 2. Move the head dimension # to position 3. This more closely matches the return shape of the attention # output: (b, n, h, d). attn_weights = rearrange(attention, "b g h n s -> b n s (h g)") if average_attn_weights: attn_weights = attn_weights.mean(dim=1) return out, attn_weights class MultiheadAttention(nn.Module): def __init__( self, *, n_ctx: int, width: int, heads: int, init_scale: float, qkv_bias: bool, use_flash: bool = False ): super().__init__() self.n_ctx = n_ctx self.width = width self.heads = heads self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias) self.c_proj = nn.Linear(width, width) self.attention = QKVMultiheadAttention(heads=heads, n_ctx=n_ctx, use_flash=use_flash) init_linear(self.c_qkv, init_scale) init_linear(self.c_proj, init_scale) def forward(self, x): x = self.c_qkv(x) x = checkpoint(self.attention, (x,), (), True) x = self.c_proj(x) return x class QKVMultiheadAttention(nn.Module): def __init__(self, *, heads: int, n_ctx: int, use_flash: bool = False): super().__init__() self.heads = heads self.n_ctx = n_ctx self.use_flash = use_flash def forward(self, qkv): bs, n_ctx, width = qkv.shape attn_ch = width // self.heads // 3 scale = 1 / math.sqrt(math.sqrt(attn_ch)) qkv = qkv.view(bs, n_ctx, self.heads, -1) q, k, v = torch.split(qkv, attn_ch, dim=-1) if self.use_flash: q = q.permute(0, 2, 1, 3) k = k.permute(0, 2, 1, 3) v = v.permute(0, 2, 1, 3) out = F.scaled_dot_product_attention(q, k, v).permute(0, 2, 1, 3).reshape(bs, n_ctx, -1) else: weight = torch.einsum( "bthc,bshc->bhts", q * scale, k * scale ) # More stable with f16 than dividing afterwards wdtype = weight.dtype weight = torch.softmax(weight.float(), dim=-1).type(wdtype) out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) return out class ResidualAttentionBlock(nn.Module): def __init__( self, *, n_ctx: int, width: int, heads: int, init_scale: float = 1.0, qkv_bias: bool = True, use_flash: bool = False, use_checkpoint: bool = False ): super().__init__() self.use_checkpoint = use_checkpoint self.attn = MultiheadAttention( n_ctx=n_ctx, width=width, heads=heads, init_scale=init_scale, qkv_bias=qkv_bias, use_flash=use_flash ) self.ln_1 = nn.LayerNorm(width) self.mlp = MLP(width=width, init_scale=init_scale) self.ln_2 = nn.LayerNorm(width) def _forward(self, x: torch.Tensor): x = x + self.attn(self.ln_1(x)) x = x + self.mlp(self.ln_2(x)) return x def forward(self, x: torch.Tensor): return checkpoint(self._forward, (x,), self.parameters(), self.use_checkpoint) class MultiheadCrossAttention(nn.Module): def __init__( self, *, width: int, heads: int, init_scale: float, qkv_bias: bool = True, use_flash: bool = False, n_data: Optional[int] = None, data_width: Optional[int] = None, ): super().__init__() self.n_data = n_data self.width = width self.heads = heads self.data_width = width if data_width is None else data_width self.c_q = nn.Linear(width, width, bias=qkv_bias) self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias) self.c_proj = nn.Linear(width, width) self.attention = QKVMultiheadCrossAttention( heads=heads, n_data=n_data, use_flash=use_flash ) init_linear(self.c_q, init_scale) init_linear(self.c_kv, init_scale) init_linear(self.c_proj, init_scale) def forward(self, x, data): x = self.c_q(x) data = self.c_kv(data) x = checkpoint(self.attention, (x, data), (), True) x = self.c_proj(x) return x class QKVMultiheadCrossAttention(nn.Module): def __init__(self, *, heads: int, use_flash: bool = False, n_data: Optional[int] = None): super().__init__() self.heads = heads self.n_data = n_data self.use_flash = use_flash def forward(self, q, kv): _, n_ctx, _ = q.shape bs, n_data, width = kv.shape attn_ch = width // self.heads // 2 scale = 1 / math.sqrt(math.sqrt(attn_ch)) q = q.view(bs, n_ctx, self.heads, -1) kv = kv.view(bs, n_data, self.heads, -1) k, v = torch.split(kv, attn_ch, dim=-1) if self.use_flash: q = q.permute(0, 2, 1, 3) k = k.permute(0, 2, 1, 3) v = v.permute(0, 2, 1, 3) out = F.scaled_dot_product_attention(q, k, v).permute(0, 2, 1, 3).reshape(bs, n_ctx, -1) else: weight = torch.einsum( "bthc,bshc->bhts", q * scale, k * scale ) # More stable with f16 than dividing afterwards wdtype = weight.dtype weight = torch.softmax(weight.float(), dim=-1).type(wdtype) out = torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) return out class ResidualCrossAttentionBlock(nn.Module): def __init__( self, *, n_data: Optional[int] = None, width: int, heads: int, data_width: Optional[int] = None, init_scale: float = 0.25, qkv_bias: bool = True, use_flash: bool = False ): super().__init__() if data_width is None: data_width = width self.attn = MultiheadCrossAttention( n_data=n_data, width=width, heads=heads, data_width=data_width, init_scale=init_scale, qkv_bias=qkv_bias, use_flash=use_flash, ) self.ln_1 = nn.LayerNorm(width) self.ln_2 = nn.LayerNorm(data_width) self.mlp = MLP(width=width, init_scale=init_scale) self.ln_3 = nn.LayerNorm(width) def forward(self, x: torch.Tensor, data: torch.Tensor): x = x + self.attn(self.ln_1(x), self.ln_2(data)) x = x + self.mlp(self.ln_3(x)) return x