Text-human / Text2Human /models /archs /transformer_arch.py
yitianlian's picture
update demo
24be7a2
raw
history blame
10.4 kB
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
class CausalSelfAttention(nn.Module):
"""
A vanilla multi-head masked self-attention layer with a projection at the end.
It is possible to use torch.nn.MultiheadAttention here but I am including an
explicit implementation here to show that there is nothing too scary here.
"""
def __init__(self, bert_n_emb, bert_n_head, attn_pdrop, resid_pdrop,
latent_shape, sampler):
super().__init__()
assert bert_n_emb % bert_n_head == 0
# key, query, value projections for all heads
self.key = nn.Linear(bert_n_emb, bert_n_emb)
self.query = nn.Linear(bert_n_emb, bert_n_emb)
self.value = nn.Linear(bert_n_emb, bert_n_emb)
# regularization
self.attn_drop = nn.Dropout(attn_pdrop)
self.resid_drop = nn.Dropout(resid_pdrop)
# output projection
self.proj = nn.Linear(bert_n_emb, bert_n_emb)
self.n_head = bert_n_head
self.causal = True if sampler == 'autoregressive' else False
if self.causal:
block_size = np.prod(latent_shape)
mask = torch.tril(torch.ones(block_size, block_size))
self.register_buffer("mask", mask.view(1, 1, block_size,
block_size))
def forward(self, x, layer_past=None):
B, T, C = x.size()
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
k = self.key(x).view(B, T, self.n_head,
C // self.n_head).transpose(1,
2) # (B, nh, T, hs)
q = self.query(x).view(B, T, self.n_head,
C // self.n_head).transpose(1,
2) # (B, nh, T, hs)
v = self.value(x).view(B, T, self.n_head,
C // self.n_head).transpose(1,
2) # (B, nh, T, hs)
present = torch.stack((k, v))
if self.causal and layer_past is not None:
past_key, past_value = layer_past
k = torch.cat((past_key, k), dim=-2)
v = torch.cat((past_value, v), dim=-2)
# causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
if self.causal and layer_past is None:
att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf'))
att = F.softmax(att, dim=-1)
att = self.attn_drop(att)
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
# re-assemble all head outputs side by side
y = y.transpose(1, 2).contiguous().view(B, T, C)
# output projection
y = self.resid_drop(self.proj(y))
return y, present
class Block(nn.Module):
""" an unassuming Transformer block """
def __init__(self, bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop,
latent_shape, sampler):
super().__init__()
self.ln1 = nn.LayerNorm(bert_n_emb)
self.ln2 = nn.LayerNorm(bert_n_emb)
self.attn = CausalSelfAttention(bert_n_emb, bert_n_head, attn_pdrop,
resid_pdrop, latent_shape, sampler)
self.mlp = nn.Sequential(
nn.Linear(bert_n_emb, 4 * bert_n_emb),
nn.GELU(), # nice
nn.Linear(4 * bert_n_emb, bert_n_emb),
nn.Dropout(resid_pdrop),
)
def forward(self, x, layer_past=None, return_present=False):
attn, present = self.attn(self.ln1(x), layer_past)
x = x + attn
x = x + self.mlp(self.ln2(x))
if layer_past is not None or return_present:
return x, present
return x
class Transformer(nn.Module):
""" the full GPT language model, with a context size of block_size """
def __init__(self,
codebook_size,
segm_codebook_size,
bert_n_emb,
bert_n_layers,
bert_n_head,
block_size,
latent_shape,
embd_pdrop,
resid_pdrop,
attn_pdrop,
sampler='absorbing'):
super().__init__()
self.vocab_size = codebook_size + 1
self.n_embd = bert_n_emb
self.block_size = block_size
self.n_layers = bert_n_layers
self.codebook_size = codebook_size
self.segm_codebook_size = segm_codebook_size
self.causal = sampler == 'autoregressive'
if self.causal:
self.vocab_size = codebook_size
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd)
self.pos_emb = nn.Parameter(
torch.zeros(1, self.block_size, self.n_embd))
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd)
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd))
self.drop = nn.Dropout(embd_pdrop)
# transformer
self.blocks = nn.Sequential(*[
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop,
latent_shape, sampler) for _ in range(self.n_layers)
])
# decoder head
self.ln_f = nn.LayerNorm(self.n_embd)
self.head = nn.Linear(self.n_embd, self.codebook_size, bias=False)
def get_block_size(self):
return self.block_size
def _init_weights(self, module):
if isinstance(module, (nn.Linear, nn.Embedding)):
module.weight.data.normal_(mean=0.0, std=0.02)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(self, idx, segm_tokens, t=None):
# each index maps to a (learnable) vector
token_embeddings = self.tok_emb(idx)
segm_embeddings = self.segm_emb(segm_tokens)
if self.causal:
token_embeddings = torch.cat((self.start_tok.repeat(
token_embeddings.size(0), 1, 1), token_embeddings),
dim=1)
t = token_embeddings.shape[1]
assert t <= self.block_size, "Cannot forward, model block size is exhausted."
# each position maps to a (learnable) vector
position_embeddings = self.pos_emb[:, :t, :]
x = token_embeddings + position_embeddings + segm_embeddings
x = self.drop(x)
for block in self.blocks:
x = block(x)
x = self.ln_f(x)
logits = self.head(x)
return logits
class TransformerMultiHead(nn.Module):
""" the full GPT language model, with a context size of block_size """
def __init__(self,
codebook_size,
segm_codebook_size,
texture_codebook_size,
bert_n_emb,
bert_n_layers,
bert_n_head,
block_size,
latent_shape,
embd_pdrop,
resid_pdrop,
attn_pdrop,
num_head,
sampler='absorbing'):
super().__init__()
self.vocab_size = codebook_size + 1
self.n_embd = bert_n_emb
self.block_size = block_size
self.n_layers = bert_n_layers
self.codebook_size = codebook_size
self.segm_codebook_size = segm_codebook_size
self.texture_codebook_size = texture_codebook_size
self.causal = sampler == 'autoregressive'
if self.causal:
self.vocab_size = codebook_size
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd)
self.pos_emb = nn.Parameter(
torch.zeros(1, self.block_size, self.n_embd))
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd)
self.texture_emb = nn.Embedding(self.texture_codebook_size,
self.n_embd)
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd))
self.drop = nn.Dropout(embd_pdrop)
# transformer
self.blocks = nn.Sequential(*[
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop,
latent_shape, sampler) for _ in range(self.n_layers)
])
# decoder head
self.num_head = num_head
self.head_class_num = codebook_size // self.num_head
self.ln_f = nn.LayerNorm(self.n_embd)
self.head_list = nn.ModuleList([
nn.Linear(self.n_embd, self.head_class_num, bias=False)
for _ in range(self.num_head)
])
def get_block_size(self):
return self.block_size
def _init_weights(self, module):
if isinstance(module, (nn.Linear, nn.Embedding)):
module.weight.data.normal_(mean=0.0, std=0.02)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
def forward(self, idx, segm_tokens, texture_tokens, t=None):
# each index maps to a (learnable) vector
token_embeddings = self.tok_emb(idx)
segm_embeddings = self.segm_emb(segm_tokens)
texture_embeddings = self.texture_emb(texture_tokens)
if self.causal:
token_embeddings = torch.cat((self.start_tok.repeat(
token_embeddings.size(0), 1, 1), token_embeddings),
dim=1)
t = token_embeddings.shape[1]
assert t <= self.block_size, "Cannot forward, model block size is exhausted."
# each position maps to a (learnable) vector
position_embeddings = self.pos_emb[:, :t, :]
x = token_embeddings + position_embeddings + segm_embeddings + texture_embeddings
x = self.drop(x)
for block in self.blocks:
x = block(x)
x = self.ln_f(x)
logits_list = [self.head_list[i](x) for i in range(self.num_head)]
return logits_list