Spaces:
Runtime error
Runtime error
import math | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
class CausalSelfAttention(nn.Module): | |
""" | |
A vanilla multi-head masked self-attention layer with a projection at the end. | |
It is possible to use torch.nn.MultiheadAttention here but I am including an | |
explicit implementation here to show that there is nothing too scary here. | |
""" | |
def __init__(self, bert_n_emb, bert_n_head, attn_pdrop, resid_pdrop, | |
latent_shape, sampler): | |
super().__init__() | |
assert bert_n_emb % bert_n_head == 0 | |
# key, query, value projections for all heads | |
self.key = nn.Linear(bert_n_emb, bert_n_emb) | |
self.query = nn.Linear(bert_n_emb, bert_n_emb) | |
self.value = nn.Linear(bert_n_emb, bert_n_emb) | |
# regularization | |
self.attn_drop = nn.Dropout(attn_pdrop) | |
self.resid_drop = nn.Dropout(resid_pdrop) | |
# output projection | |
self.proj = nn.Linear(bert_n_emb, bert_n_emb) | |
self.n_head = bert_n_head | |
self.causal = True if sampler == 'autoregressive' else False | |
if self.causal: | |
block_size = np.prod(latent_shape) | |
mask = torch.tril(torch.ones(block_size, block_size)) | |
self.register_buffer("mask", mask.view(1, 1, block_size, | |
block_size)) | |
def forward(self, x, layer_past=None): | |
B, T, C = x.size() | |
# calculate query, key, values for all heads in batch and move head forward to be the batch dim | |
k = self.key(x).view(B, T, self.n_head, | |
C // self.n_head).transpose(1, | |
2) # (B, nh, T, hs) | |
q = self.query(x).view(B, T, self.n_head, | |
C // self.n_head).transpose(1, | |
2) # (B, nh, T, hs) | |
v = self.value(x).view(B, T, self.n_head, | |
C // self.n_head).transpose(1, | |
2) # (B, nh, T, hs) | |
present = torch.stack((k, v)) | |
if self.causal and layer_past is not None: | |
past_key, past_value = layer_past | |
k = torch.cat((past_key, k), dim=-2) | |
v = torch.cat((past_value, v), dim=-2) | |
# causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T) | |
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) | |
if self.causal and layer_past is None: | |
att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf')) | |
att = F.softmax(att, dim=-1) | |
att = self.attn_drop(att) | |
y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs) | |
# re-assemble all head outputs side by side | |
y = y.transpose(1, 2).contiguous().view(B, T, C) | |
# output projection | |
y = self.resid_drop(self.proj(y)) | |
return y, present | |
class Block(nn.Module): | |
""" an unassuming Transformer block """ | |
def __init__(self, bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
latent_shape, sampler): | |
super().__init__() | |
self.ln1 = nn.LayerNorm(bert_n_emb) | |
self.ln2 = nn.LayerNorm(bert_n_emb) | |
self.attn = CausalSelfAttention(bert_n_emb, bert_n_head, attn_pdrop, | |
resid_pdrop, latent_shape, sampler) | |
self.mlp = nn.Sequential( | |
nn.Linear(bert_n_emb, 4 * bert_n_emb), | |
nn.GELU(), # nice | |
nn.Linear(4 * bert_n_emb, bert_n_emb), | |
nn.Dropout(resid_pdrop), | |
) | |
def forward(self, x, layer_past=None, return_present=False): | |
attn, present = self.attn(self.ln1(x), layer_past) | |
x = x + attn | |
x = x + self.mlp(self.ln2(x)) | |
if layer_past is not None or return_present: | |
return x, present | |
return x | |
class Transformer(nn.Module): | |
""" the full GPT language model, with a context size of block_size """ | |
def __init__(self, | |
codebook_size, | |
segm_codebook_size, | |
bert_n_emb, | |
bert_n_layers, | |
bert_n_head, | |
block_size, | |
latent_shape, | |
embd_pdrop, | |
resid_pdrop, | |
attn_pdrop, | |
sampler='absorbing'): | |
super().__init__() | |
self.vocab_size = codebook_size + 1 | |
self.n_embd = bert_n_emb | |
self.block_size = block_size | |
self.n_layers = bert_n_layers | |
self.codebook_size = codebook_size | |
self.segm_codebook_size = segm_codebook_size | |
self.causal = sampler == 'autoregressive' | |
if self.causal: | |
self.vocab_size = codebook_size | |
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) | |
self.pos_emb = nn.Parameter( | |
torch.zeros(1, self.block_size, self.n_embd)) | |
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) | |
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) | |
self.drop = nn.Dropout(embd_pdrop) | |
# transformer | |
self.blocks = nn.Sequential(*[ | |
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
latent_shape, sampler) for _ in range(self.n_layers) | |
]) | |
# decoder head | |
self.ln_f = nn.LayerNorm(self.n_embd) | |
self.head = nn.Linear(self.n_embd, self.codebook_size, bias=False) | |
def get_block_size(self): | |
return self.block_size | |
def _init_weights(self, module): | |
if isinstance(module, (nn.Linear, nn.Embedding)): | |
module.weight.data.normal_(mean=0.0, std=0.02) | |
if isinstance(module, nn.Linear) and module.bias is not None: | |
module.bias.data.zero_() | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
def forward(self, idx, segm_tokens, t=None): | |
# each index maps to a (learnable) vector | |
token_embeddings = self.tok_emb(idx) | |
segm_embeddings = self.segm_emb(segm_tokens) | |
if self.causal: | |
token_embeddings = torch.cat((self.start_tok.repeat( | |
token_embeddings.size(0), 1, 1), token_embeddings), | |
dim=1) | |
t = token_embeddings.shape[1] | |
assert t <= self.block_size, "Cannot forward, model block size is exhausted." | |
# each position maps to a (learnable) vector | |
position_embeddings = self.pos_emb[:, :t, :] | |
x = token_embeddings + position_embeddings + segm_embeddings | |
x = self.drop(x) | |
for block in self.blocks: | |
x = block(x) | |
x = self.ln_f(x) | |
logits = self.head(x) | |
return logits | |
class TransformerMultiHead(nn.Module): | |
""" the full GPT language model, with a context size of block_size """ | |
def __init__(self, | |
codebook_size, | |
segm_codebook_size, | |
texture_codebook_size, | |
bert_n_emb, | |
bert_n_layers, | |
bert_n_head, | |
block_size, | |
latent_shape, | |
embd_pdrop, | |
resid_pdrop, | |
attn_pdrop, | |
num_head, | |
sampler='absorbing'): | |
super().__init__() | |
self.vocab_size = codebook_size + 1 | |
self.n_embd = bert_n_emb | |
self.block_size = block_size | |
self.n_layers = bert_n_layers | |
self.codebook_size = codebook_size | |
self.segm_codebook_size = segm_codebook_size | |
self.texture_codebook_size = texture_codebook_size | |
self.causal = sampler == 'autoregressive' | |
if self.causal: | |
self.vocab_size = codebook_size | |
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) | |
self.pos_emb = nn.Parameter( | |
torch.zeros(1, self.block_size, self.n_embd)) | |
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) | |
self.texture_emb = nn.Embedding(self.texture_codebook_size, | |
self.n_embd) | |
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) | |
self.drop = nn.Dropout(embd_pdrop) | |
# transformer | |
self.blocks = nn.Sequential(*[ | |
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, | |
latent_shape, sampler) for _ in range(self.n_layers) | |
]) | |
# decoder head | |
self.num_head = num_head | |
self.head_class_num = codebook_size // self.num_head | |
self.ln_f = nn.LayerNorm(self.n_embd) | |
self.head_list = nn.ModuleList([ | |
nn.Linear(self.n_embd, self.head_class_num, bias=False) | |
for _ in range(self.num_head) | |
]) | |
def get_block_size(self): | |
return self.block_size | |
def _init_weights(self, module): | |
if isinstance(module, (nn.Linear, nn.Embedding)): | |
module.weight.data.normal_(mean=0.0, std=0.02) | |
if isinstance(module, nn.Linear) and module.bias is not None: | |
module.bias.data.zero_() | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
def forward(self, idx, segm_tokens, texture_tokens, t=None): | |
# each index maps to a (learnable) vector | |
token_embeddings = self.tok_emb(idx) | |
segm_embeddings = self.segm_emb(segm_tokens) | |
texture_embeddings = self.texture_emb(texture_tokens) | |
if self.causal: | |
token_embeddings = torch.cat((self.start_tok.repeat( | |
token_embeddings.size(0), 1, 1), token_embeddings), | |
dim=1) | |
t = token_embeddings.shape[1] | |
assert t <= self.block_size, "Cannot forward, model block size is exhausted." | |
# each position maps to a (learnable) vector | |
position_embeddings = self.pos_emb[:, :t, :] | |
x = token_embeddings + position_embeddings + segm_embeddings + texture_embeddings | |
x = self.drop(x) | |
for block in self.blocks: | |
x = block(x) | |
x = self.ln_f(x) | |
logits_list = [self.head_list[i](x) for i in range(self.num_head)] | |
return logits_list | |