|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
|
|
|
|
class xATGLU(nn.Module): |
|
def __init__(self, input_dim, output_dim, bias=True): |
|
super().__init__() |
|
|
|
self.proj = nn.Linear(input_dim, output_dim * 2, bias=bias) |
|
nn.init.kaiming_normal_(self.proj.weight, nonlinearity='linear') |
|
|
|
self.alpha = nn.Parameter(torch.zeros(1)) |
|
self.half_pi = torch.pi / 2 |
|
self.inv_pi = 1 / torch.pi |
|
|
|
def forward(self, x): |
|
projected = self.proj(x) |
|
gate_path, value_path = projected.chunk(2, dim=-1) |
|
|
|
|
|
gate = (torch.arctan(gate_path) + self.half_pi) * self.inv_pi |
|
expanded_gate = gate * (1 + 2 * self.alpha) - self.alpha |
|
|
|
return expanded_gate * value_path |
|
|
|
class ResBlock(nn.Module): |
|
def __init__(self, channels): |
|
super().__init__() |
|
self.conv1 = nn.Conv2d(channels, channels, 3, padding=1) |
|
self.norm1 = nn.GroupNorm(32, channels) |
|
self.conv2 = nn.Conv2d(channels, channels, 3, padding=1) |
|
self.norm2 = nn.GroupNorm(32, channels) |
|
|
|
def forward(self, x): |
|
h = self.conv1(F.silu(self.norm1(x))) |
|
h = self.conv2(F.silu(self.norm2(h))) |
|
return x + h |
|
|
|
class TransformerBlock(nn.Module): |
|
def __init__(self, channels, num_heads=8): |
|
super().__init__() |
|
self.norm1 = nn.LayerNorm(channels) |
|
self.attn = nn.MultiheadAttention(channels, num_heads) |
|
self.norm2 = nn.LayerNorm(channels) |
|
self.mlp = nn.Sequential( |
|
xATGLU(channels, 4 * channels), |
|
nn.Linear(4 * channels, channels) |
|
) |
|
|
|
def forward(self, x): |
|
|
|
b, c, h, w = x.shape |
|
spatial_size = h * w |
|
x = x.flatten(2).permute(2, 0, 1) |
|
|
|
|
|
h_attn = self.norm1(x) |
|
h_attn, _ = self.attn(h_attn, h_attn, h_attn) |
|
x = x + h_attn |
|
|
|
|
|
h_mlp = self.norm2(x) |
|
h_mlp = self.mlp(h_mlp) |
|
x = x + h_mlp |
|
|
|
|
|
return x.permute(1, 2, 0).reshape(b, c, h, w) |
|
|
|
class LevelBlock(nn.Module): |
|
def __init__(self, channels, num_blocks, block_type='res'): |
|
super().__init__() |
|
self.blocks = nn.ModuleList() |
|
for _ in range(num_blocks): |
|
if block_type == 'transformer': |
|
self.blocks.append(TransformerBlock(channels)) |
|
else: |
|
self.blocks.append(ResBlock(channels)) |
|
|
|
def forward(self, x): |
|
for block in self.blocks: |
|
x = block(x) |
|
return x |
|
|
|
class AsymmetricResidualUDiT(nn.Module): |
|
def __init__(self, |
|
in_channels=3, |
|
base_channels=128, |
|
patch_size=2, |
|
num_levels=3, |
|
encoder_blocks=3, |
|
decoder_blocks=7, |
|
encoder_transformer_thresh=2, |
|
decoder_transformer_thresh=4, |
|
mid_blocks=16 |
|
): |
|
super().__init__() |
|
|
|
|
|
self.patch_embed = nn.Conv2d(in_channels, base_channels, |
|
kernel_size=patch_size, stride=patch_size) |
|
|
|
|
|
self.encoders = nn.ModuleList() |
|
curr_channels = base_channels |
|
|
|
for level in range(num_levels): |
|
|
|
use_transformer = level >= encoder_transformer_thresh |
|
|
|
|
|
self.encoders.append( |
|
LevelBlock(curr_channels, encoder_blocks, use_transformer) |
|
) |
|
|
|
|
|
if level < num_levels - 1: |
|
self.encoders.append( |
|
nn.Conv2d(curr_channels, curr_channels * 2, 1) |
|
) |
|
curr_channels *= 2 |
|
|
|
|
|
self.middle = nn.ModuleList([ |
|
TransformerBlock(curr_channels) for _ in range(mid_blocks) |
|
]) |
|
|
|
|
|
self.decoders = nn.ModuleList() |
|
|
|
for level in range(num_levels): |
|
|
|
use_transformer = level <= decoder_transformer_thresh |
|
|
|
|
|
self.decoders.append( |
|
LevelBlock(curr_channels, decoder_blocks, use_transformer) |
|
) |
|
|
|
|
|
|
|
if level < num_levels - 1: |
|
self.decoders.append( |
|
nn.Conv2d(curr_channels, curr_channels // 2, 1) |
|
) |
|
curr_channels //= 2 |
|
|
|
|
|
self.final_proj = nn.ConvTranspose2d(base_channels, in_channels, |
|
kernel_size=patch_size, stride=patch_size) |
|
|
|
def downsample(self, x): |
|
return F.avg_pool2d(x, kernel_size=2) |
|
|
|
def upsample(self, x): |
|
return F.interpolate(x, scale_factor=2, mode='nearest') |
|
|
|
def forward(self, x, t=None): |
|
|
|
x = self.patch_embed(x) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
residuals = [] |
|
curr_res = x |
|
|
|
|
|
h = x |
|
for i, blocks in enumerate(self.encoders): |
|
if isinstance(blocks, LevelBlock): |
|
h = blocks(h) |
|
else: |
|
|
|
residuals.append(curr_res) |
|
|
|
h = self.downsample(blocks(h)) |
|
curr_res = h |
|
|
|
|
|
x = h |
|
for block in self.middle: |
|
x = block(x) |
|
|
|
|
|
x = x - curr_res |
|
|
|
|
|
for i, blocks in enumerate(self.decoders): |
|
if isinstance(blocks, LevelBlock): |
|
x = blocks(x) |
|
else: |
|
|
|
x = blocks(x) |
|
|
|
x = self.upsample(x) |
|
|
|
curr_res = residuals.pop() |
|
x = x + curr_res |
|
|
|
|
|
x = self.final_proj(x) |
|
|
|
return x |