import torch import torch.nn.functional as F from torch import nn from einops import rearrange from torchvision.transforms.v2 import ( Compose, Resize, InterpolationMode, ToImage, ToDtype, Normalize, ) from transformers.utils import is_flash_attn_2_available try: if is_flash_attn_2_available(): from flash_attn.modules.mha import FlashSelfAttention else: FlashSelfAttention = None except ImportError: FlashSelfAttention = None class Attention(nn.Module): def __init__(self, dim, num_heads=16, use_flash_attn=False): super().__init__() assert dim % num_heads == 0, "dim should be divisible by num_heads" self.num_heads = num_heads self.head_dim = dim // num_heads self.qkv = nn.Linear(dim, dim * 3) self.proj = nn.Linear(dim, dim) if use_flash_attn and FlashSelfAttention is not None: self.flash_attn = FlashSelfAttention() else: self.flash_attn = None torch.nn.init.kaiming_normal_( self.qkv.weight, mode="fan_in", nonlinearity="relu" ) torch.nn.init.kaiming_normal_( self.proj.weight, mode="fan_in", nonlinearity="relu" ) def forward(self, x: torch.Tensor) -> torch.Tensor: if self.flash_attn is not None: qkv = self.qkv(x) qkv = rearrange( qkv, "... (three h d) -> ... three h d", three=3, h=self.num_heads ) attn_output = self.flash_attn(qkv) output = rearrange(attn_output, "... h d -> ... (h d)") output = self.proj(output) return output else: B, N, C = x.shape qkv = ( self.qkv(x) .reshape(B, N, 3, self.num_heads, self.head_dim) .permute(2, 0, 3, 1, 4) ) q, k, v = qkv.unbind(0) x = F.scaled_dot_product_attention(q, k, v) x = x.transpose(1, 2).reshape(B, N, C) x = self.proj(x) return x class VitBlock(nn.Module): def __init__(self, embed_dim, use_flash_attn=False): super().__init__() self.attn = Attention(embed_dim, use_flash_attn=use_flash_attn) self.mlp = MLP(embed_dim, 4304) self.norm1 = nn.LayerNorm(embed_dim) self.norm2 = nn.LayerNorm(embed_dim) def forward(self, x): x = x + self.attn(self.norm1(x)) x = x + self.mlp(self.norm2(x)) return x class VisionTransformer(nn.Module): def __init__(self, use_flash_attn=False): super().__init__() embed_len = 729 embed_dim = 1152 self.patch_embed = LinearPatchEmbedding() self.pos_embed = nn.Parameter(torch.randn(1, embed_len, embed_dim) * 0.02) self.blocks = nn.Sequential( *[VitBlock(embed_dim, use_flash_attn=use_flash_attn) for _ in range(27)] ) self.norm = nn.LayerNorm(embed_dim) def forward(self, x): x = self.patch_embed(x) x = x + self.pos_embed for block in self.blocks: x = block(x) return self.norm(x) class EncoderWrapper(nn.Module): def __init__(self, use_flash_attn=False): super().__init__() self.model = nn.ModuleDict({"visual": VisionTransformer(use_flash_attn)}) def forward(self, x): return self.model["visual"](x) class LinearPatchEmbedding(nn.Module): def __init__(self): super().__init__() self.linear = nn.Linear(588, 1152) def forward(self, x): b, c, hp1, wp2 = x.shape p1, p2 = 14, 14 h, w = hp1 // p1, wp2 // p2 x = x.reshape(b, c, h, p1, w, p2) x = x.permute(0, 2, 4, 1, 3, 5) x = x.reshape(b, h * w, c * p1 * p2) return self.linear(x) class MLP(nn.Module): def __init__( self, in_features: int, hidden_features: int = None, out_features: int = None, ) -> None: super().__init__() out_features = out_features or in_features hidden_features = hidden_features or in_features self.fc1 = nn.Linear(in_features, hidden_features) self.act = nn.GELU(approximate="tanh") self.fc2 = nn.Linear(hidden_features, out_features) torch.nn.init.kaiming_normal_( self.fc1.weight, mode="fan_in", nonlinearity="relu" ) torch.nn.init.kaiming_normal_( self.fc2.weight, mode="fan_in", nonlinearity="relu" ) def forward(self, x: torch.Tensor) -> torch.Tensor: x = self.fc1(x) x = self.act(x) x = self.fc2(x) return x class VisionProjection(nn.Module): def __init__(self): super().__init__() image_embedding_dim = 1152 model_dim = 2048 hidden_dim = model_dim * 4 self.mlp = MLP(image_embedding_dim, hidden_dim, model_dim) @property def device(self): return self.mlp.fc1.weight.device def forward(self, x): return self.mlp(x) class VisionEncoder(nn.Module): def __init__(self, use_flash_attn=False): super().__init__() self.encoder = EncoderWrapper(use_flash_attn) self.projection = VisionProjection() self.preprocess = Compose( [ Resize(size=(378, 378), interpolation=InterpolationMode.BICUBIC), ToImage(), ToDtype(torch.float32, scale=True), Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]), ] ) @property def device(self): return self.projection.mlp.fc1.weight.device @property def dtype(self): return self.projection.mlp.fc1.weight.dtype def __call__(self, images) -> torch.Tensor: if not isinstance(images, list) and not isinstance(images, torch.Tensor): images = [images] with torch.no_grad(): # Skip preprocess if images are already tensors if not isinstance(images, torch.Tensor) and not isinstance( images[0], torch.Tensor ): images = [self.preprocess(image.convert("RGB")) for image in images] if isinstance(images, list): images = torch.stack(images) x = images.to(self.device, dtype=self.dtype) x = self.encoder(x) x = self.projection(x) return x