|
import torch |
|
import torch.nn as nn |
|
|
|
from einops import rearrange, repeat |
|
from torch import einsum, nn |
|
from einops_exts import rearrange_many |
|
|
|
from .utils import getattr_recursive, setattr_recursive |
|
|
|
def exists(val): |
|
return val is not None |
|
|
|
|
|
def FeedForward( |
|
dim, |
|
mult=4, |
|
use_ft_layernorm=False, |
|
enable_init_network_params=False, |
|
initializer_range=0.02, |
|
): |
|
inner_dim = int(dim * mult) |
|
net = nn.Sequential( |
|
nn.LayerNorm(dim), |
|
nn.Linear(dim, inner_dim, bias=False), |
|
nn.GELU(), |
|
nn.Linear(inner_dim, dim, bias=False), |
|
) |
|
|
|
if use_ft_layernorm and enable_init_network_params: |
|
|
|
|
|
net[0].weight.data.normal_(mean=0.0, std=initializer_range) |
|
net[0].bias.data.zero_() |
|
net[1].weight.data.normal_(mean=0.0, std=initializer_range) |
|
net[3].weight.data.normal_(mean=0.0, std=initializer_range) |
|
return net |
|
|
|
|
|
|
|
class MaskedCrossAttention(nn.Module): |
|
def __init__( |
|
self, |
|
*, |
|
dim, |
|
dim_visual, |
|
dim_head=64, |
|
heads=8, |
|
only_attend_immediate_media=True, |
|
use_ft_layernorm=False, |
|
use_ft_flash_attention=False, |
|
enable_init_network_params=False, |
|
initializer_range=0.02, |
|
): |
|
super().__init__() |
|
self.scale = dim_head**-0.5 |
|
self.heads = heads |
|
self.use_ft_flash_attention = False |
|
self.initializer_range = initializer_range |
|
inner_dim = dim_head * heads |
|
|
|
self.norm = nn.LayerNorm(dim) |
|
|
|
self.to_q = nn.Linear(dim, inner_dim, bias=False) |
|
self.to_kv = nn.Linear(dim_visual, inner_dim * 2, bias=False) |
|
self.to_out = nn.Linear(inner_dim, dim, bias=False) |
|
|
|
|
|
self.only_attend_immediate_media = only_attend_immediate_media |
|
|
|
if enable_init_network_params: |
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, nn.Linear): |
|
|
|
|
|
module.weight.data.normal_(mean=0.0, std=self.initializer_range) |
|
if module.bias is not None: |
|
module.bias.data.zero_() |
|
|
|
elif isinstance(module, nn.LayerNorm): |
|
module.bias.data.zero_() |
|
module.weight.data.fill_(1.0) |
|
|
|
def forward(self, x, media, media_locations=None, use_cached_media=False, image_mask=None): |
|
""" |
|
Args: |
|
x (torch.Tensor): text features |
|
shape (B, T_txt, D_txt) |
|
media (torch.Tensor): image features |
|
shape (B, T_img, n, D_img) where n is the dim of the latents |
|
media_locations: boolean mask identifying the media tokens in x |
|
shape (B, T_txt) |
|
use_cached_media: bool |
|
If true, treat all of x as if they occur after the last media |
|
registered in media_locations. T_txt does not need to exactly |
|
equal media_locations.shape[1] in this case |
|
""" |
|
|
|
if not use_cached_media: |
|
assert media_locations.shape[1] == x.shape[1], ( |
|
f"media_location.shape is {media_locations.shape} but x.shape is" |
|
f" {x.shape}" |
|
) |
|
|
|
T_txt = x.shape[1] |
|
_, T_img, n = media.shape[:3] |
|
h = self.heads |
|
|
|
x = self.norm(x.contiguous()) |
|
q = self.to_q(x) |
|
media = rearrange(media, "b t n d -> b (t n) d") |
|
|
|
k, v = self.to_kv(media).chunk(2, dim=-1) |
|
|
|
if exists(media_locations): |
|
media_time = torch.arange(T_img, device=x.device) + 1 |
|
|
|
if use_cached_media: |
|
|
|
text_time = repeat( |
|
torch.count_nonzero(media_locations, dim=1), |
|
"b -> b i", |
|
i=T_txt, |
|
) |
|
else: |
|
|
|
text_time = media_locations.cumsum(dim=-1) |
|
|
|
|
|
|
|
mask_op = torch.eq if self.only_attend_immediate_media else torch.ge |
|
text_to_media_mask = mask_op( |
|
rearrange(text_time, "b i -> b 1 i 1"), |
|
repeat(media_time, "j -> 1 1 1 (j n)", n=n), |
|
) |
|
|
|
if self.only_attend_immediate_media: |
|
|
|
text_without_media_mask = text_time == 0 |
|
text_without_media_mask = rearrange( |
|
text_without_media_mask, "b i -> b 1 i 1" |
|
) |
|
|
|
|
|
q, k, v = rearrange_many((q, k, v), "b n (h d) -> b h n d", h=h) |
|
q = q * self.scale |
|
sim = einsum("... i d, ... j d -> ... i j", q, k) |
|
|
|
if exists(image_mask): |
|
image_mask = image_mask.unsqueeze(1).unsqueeze(1).bool() |
|
image_mask = image_mask.repeat_interleave(int(sim.shape[3] / image_mask.shape[3]), dim=-1) |
|
sim = sim.masked_fill(~image_mask, -torch.finfo(sim.dtype).max) |
|
|
|
|
|
|
|
sim = sim - sim.amax(dim=-1, keepdim=True).detach() |
|
attn = sim.softmax(dim=-1) |
|
|
|
if exists(media_locations) and self.only_attend_immediate_media: |
|
|
|
attn = attn.masked_fill(text_without_media_mask, 0.0) |
|
|
|
out = einsum("... i j, ... j d -> ... i d", attn, v) |
|
out = rearrange(out, "b h n d -> b n (h d)") |
|
|
|
return self.to_out(out) |
|
|
|
|
|
|
|
class GatedCrossAttentionBlock(nn.Module): |
|
def __init__( |
|
self, |
|
*, |
|
dim, |
|
dim_visual, |
|
dim_head=64, |
|
heads=12, |
|
ff_mult=1, |
|
only_attend_immediate_media=True, |
|
use_ft_layernorm=False, |
|
use_ft_flash_attention=False, |
|
enable_init_network_params=False, |
|
initializer_range=0.02, |
|
gradient_checkpointing=False, |
|
): |
|
super().__init__() |
|
self.attn = MaskedCrossAttention( |
|
dim=dim, |
|
dim_visual=dim_visual, |
|
dim_head=dim_head, |
|
heads=heads, |
|
only_attend_immediate_media=only_attend_immediate_media, |
|
use_ft_flash_attention=use_ft_flash_attention, |
|
use_ft_layernorm=use_ft_layernorm, |
|
enable_init_network_params=enable_init_network_params, |
|
initializer_range=initializer_range, |
|
) |
|
self.attn_gate = nn.Parameter(torch.zeros(dim)) |
|
|
|
self.ff = FeedForward(dim, mult=ff_mult) |
|
self.ff_gate = nn.Parameter(torch.zeros(dim)) |
|
|
|
self.gradient_checkpointing = gradient_checkpointing |
|
|
|
def forward( |
|
self, |
|
x, |
|
media, |
|
media_locations=None, |
|
use_cached_media=False, |
|
image_mask=None, |
|
): |
|
|
|
flag = torch.sum(media_locations, dim=-1) |
|
flag = torch.where(flag > 0.0, 1.0, 0.0) |
|
flag = flag.unsqueeze(1).unsqueeze(1).to(torch.bfloat16) |
|
x = ( |
|
flag |
|
* self.attn( |
|
x, |
|
media, |
|
media_locations=media_locations, |
|
use_cached_media=use_cached_media, |
|
image_mask=image_mask, |
|
) |
|
* self.attn_gate.tanh() |
|
+ x |
|
) |
|
|
|
x = flag * self.ff(x) * self.ff_gate.tanh() + x |
|
|
|
return x |
|
|
|
|
|
class FlamingoLayer(nn.Module): |
|
""" |
|
FlamingoLayer is a wrapper around the GatedCrossAttentionBlock and DecoderLayer. |
|
""" |
|
|
|
def __init__( |
|
self, gated_cross_attn_layer, decoder_layer, gradient_checkpointing=False |
|
): |
|
super().__init__() |
|
self.gated_cross_attn_layer = gated_cross_attn_layer |
|
self.decoder_layer = decoder_layer |
|
self.vis_x = None |
|
self.media_locations = None |
|
if self.gated_cross_attn_layer is not None: |
|
self.gated_cross_attn_layer._use_gradient_checkpointing = ( |
|
gradient_checkpointing |
|
) |
|
self.decoder_layer._use_gradient_checkpointing = gradient_checkpointing |
|
|
|
def is_conditioned(self) -> bool: |
|
"""Check whether the layer is conditioned.""" |
|
return self.vis_x is not None and self.media_locations is not None |
|
|
|
|
|
def condition_vis_x(self, vis_x): |
|
if vis_x is not None: |
|
self.vis_x, self.image_mask = vis_x |
|
else: |
|
self.vis_x, self.image_mask = None, None |
|
|
|
def condition_media_locations(self, media_locations): |
|
self.media_locations = media_locations |
|
|
|
def condition_use_cached_media(self, use_cached_media): |
|
self.use_cached_media = use_cached_media |
|
|
|
def forward( |
|
self, |
|
lang_x, |
|
attention_mask=None, |
|
**decoder_layer_kwargs, |
|
): |
|
|
|
if self.gated_cross_attn_layer is not None: |
|
if self.vis_x is None: |
|
raise ValueError("vis_x must be conditioned before forward pass") |
|
|
|
if self.media_locations is None: |
|
raise ValueError( |
|
"media_locations must be conditioned before forward pass" |
|
) |
|
|
|
lang_x = self.gated_cross_attn_layer( |
|
lang_x, |
|
self.vis_x, |
|
media_locations=self.media_locations, |
|
use_cached_media=self.use_cached_media, |
|
image_mask=self.image_mask, |
|
) |
|
|
|
|
|
lang_x = self.decoder_layer( |
|
lang_x, attention_mask=attention_mask, **decoder_layer_kwargs |
|
) |
|
return lang_x |
|
|
|
|
|
class FlamingoLMMixin(nn.Module): |
|
""" |
|
Mixin to add cross-attention layers to a language model. |
|
""" |
|
|
|
def set_decoder_layers_attr_name(self, decoder_layers_attr_name): |
|
self.decoder_layers_attr_name = decoder_layers_attr_name |
|
|
|
def _get_decoder_layers(self): |
|
return getattr_recursive(self, self.decoder_layers_attr_name) |
|
|
|
def _set_decoder_layers(self, value): |
|
setattr_recursive(self, self.decoder_layers_attr_name, value) |
|
|
|
def init_flamingo( |
|
self, |
|
media_token_id, |
|
lang_hidden_size, |
|
vis_hidden_size, |
|
cross_attn_every_n_layers, |
|
*, |
|
use_ft_layernorm=False, |
|
use_ft_flash_attention=False, |
|
enable_init_network_params=False, |
|
initializer_range=0.02, |
|
gradient_checkpointing=False, |
|
): |
|
""" |
|
Initialize Flamingo by adding a new gated cross attn to the decoder. Store the media token id for computing the media locations. |
|
""" |
|
self.old_decoder_blocks = self._get_decoder_layers() |
|
self.gated_cross_attn_layers = nn.ModuleList( |
|
[ |
|
( |
|
GatedCrossAttentionBlock( |
|
dim=lang_hidden_size, |
|
dim_visual=vis_hidden_size, |
|
use_ft_layernorm=use_ft_layernorm, |
|
use_ft_flash_attention=use_ft_flash_attention, |
|
enable_init_network_params=enable_init_network_params, |
|
initializer_range=initializer_range, |
|
gradient_checkpointing=gradient_checkpointing, |
|
) |
|
if (layer_idx + 1) % cross_attn_every_n_layers == 0 |
|
else None |
|
) |
|
for layer_idx, _ in enumerate(self._get_decoder_layers()) |
|
] |
|
) |
|
self.init_flamingo_layers(gradient_checkpointing) |
|
self.media_token_id = media_token_id |
|
self.initialized_flamingo = True |
|
self._use_cached_vision_x = False |
|
|
|
def init_flamingo_layers(self, gradient_checkpointing): |
|
""" |
|
Re initializes the FlamingoLayers. |
|
Propagates any changes made to self.gated_corss_attn_layers or self.old_decoder_blocks |
|
""" |
|
self._set_decoder_layers( |
|
nn.ModuleList( |
|
[ |
|
FlamingoLayer( |
|
gated_cross_attn_layer, decoder_layer, gradient_checkpointing |
|
) |
|
for gated_cross_attn_layer, decoder_layer in zip( |
|
self.gated_cross_attn_layers, self.old_decoder_blocks |
|
) |
|
] |
|
) |
|
) |
|
|
|
def forward(self, input_ids, attention_mask, **kwargs): |
|
"""Condition the Flamingo layers on the media locations before forward()""" |
|
if not self.initialized_flamingo: |
|
raise ValueError( |
|
"Flamingo layers are not initialized. Please call `init_flamingo`" |
|
" first." |
|
) |
|
media_locations = input_ids == self.media_token_id |
|
|
|
|
|
|
|
|
|
|
|
|
|
use_cached_media_locations = ( |
|
self._use_cached_vision_x |
|
and self.is_conditioned() |
|
and not media_locations.any() |
|
) |
|
|
|
for layer in self._get_decoder_layers(): |
|
if not use_cached_media_locations: |
|
layer.condition_media_locations(media_locations) |
|
layer.condition_use_cached_media(use_cached_media_locations) |
|
|
|
|
|
|
|
kwargs["input_ids"] = input_ids |
|
kwargs["attention_mask"] = attention_mask |
|
return super().forward(**kwargs) |
|
|
|
def is_conditioned(self) -> bool: |
|
"""Check whether all decoder layers are already conditioned.""" |
|
return all(l.is_conditioned() for l in self._get_decoder_layers()) |
|
|
|
def clear_conditioned_layers(self): |
|
for layer in self._get_decoder_layers(): |
|
layer.condition_vis_x(None) |
|
layer.condition_media_locations(None) |
|
layer.condition_use_cached_media(None) |
|
|