|
""" |
|
transformer based model, but with few minimal tweaks |
|
trained a 2.5billion parameters model with current set configurations |
|
""" |
|
|
|
import torch |
|
import json |
|
import os |
|
current_directory = os.path.dirname(os.path.abspath(__file__)) |
|
os.chdir(current_directory) |
|
|
|
import torch.nn as nn |
|
from torch.nn import functional as F |
|
|
|
with open('config_enigma.json', 'r', encoding='utf-8') as file: |
|
params = json.load(file) |
|
|
|
batch_size = params['batch_size'] |
|
block_size = params['block_size'] |
|
n_head = params['n_head'] |
|
d_model = params['d_model'] |
|
n_layers = params['n_layer'] |
|
dropout = params['dropout'] |
|
norm_eps = params['norm_eps'] |
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
class AttentionHead(nn.Module): |
|
""" |
|
initialize a single head of self attention. |
|
|
|
Args: |
|
- d_model (int): dimensionality of the model's hidden layers |
|
- head_size (int): dimensionality of each attention head |
|
- dropout (float): dropout probability |
|
- block_size (int): the maximum sequence length for positional encoding |
|
""" |
|
def __init__(self, d_model, head_size, dropout, block_size): |
|
super().__init__() |
|
self.key = nn.Linear(d_model, head_size, bias=True) |
|
self.query = nn.Linear(d_model, head_size, bias=True) |
|
self.value = nn.Linear(d_model, head_size, bias=False) |
|
self.dropout = nn.Dropout(dropout) |
|
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size))) |
|
|
|
self.rel_pos_emb = nn.Parameter(torch.randn(block_size, block_size, head_size)) |
|
|
|
def forward(self, x, mask=False): |
|
""" |
|
forward pass of a single attention head. |
|
|
|
Args: |
|
- x (Tensor): input tensor. |
|
- mask (bool): flag indicating whether to apply masking |
|
|
|
Returns: |
|
- out (Tensor): output tensor after self attention |
|
""" |
|
B, T, C = x.shape |
|
key = self.key(x) |
|
query = self.query(x) |
|
|
|
scores = torch.matmul(query, key.transpose(-2, -1)) / (key.shape[-1] ** -0.5) |
|
rel_pos_scores = torch.einsum('btc,tvc->btv', query, self.rel_pos_emb[:T, :T]) |
|
scores += rel_pos_scores |
|
|
|
if mask: |
|
scores = scores.masked_fill(self.tril[:T, :T] == 0, float('-inf')) |
|
|
|
weights = F.softmax(scores, dim=-1) |
|
weights = self.dropout(weights) |
|
|
|
value = self.value(x) |
|
out = torch.matmul(weights, value) |
|
return out |
|
|
|
class MultiHeadAttention(nn.Module): |
|
""" |
|
initialize a multi-head attention module. |
|
|
|
Args: |
|
- d_model (int): dimensionality of the model's hidden layers |
|
- n_head (int): no of attention heads |
|
- dropout (float): dropout probability |
|
- block_size (int): context length |
|
""" |
|
def __init__(self, d_model, n_head, dropout, block_size): |
|
head_size = d_model // n_head |
|
super().__init__() |
|
self.heads = nn.ModuleList([AttentionHead(d_model=d_model, dropout=dropout, head_size=head_size, block_size=block_size) for _ in range(n_head)]) |
|
self.proj = nn.Linear(n_head * head_size, d_model) |
|
self.dropout = nn.Dropout(dropout) |
|
|
|
def forward(self, x, mask): |
|
""" |
|
forward pass of the multi-head attention module |
|
|
|
Args: |
|
- x (Tensor): input tensor |
|
- mask (bool): flag indicating whether to apply masking |
|
|
|
Returns: |
|
- out (Tensor): output tensor after multi-head attention |
|
|
|
""" |
|
out = torch.cat([h(x, mask=mask) for h in self.heads], dim=-1) |
|
out = self.dropout(self.proj(out)) |
|
return out |
|
|
|
class FeedForward(nn.Module): |
|
""" |
|
initialize a feedforward network module |
|
|
|
Args: |
|
- d_model (int): the dimensionality of the model's hidden layers |
|
- dropout (float): dropout probability |
|
|
|
""" |
|
def __init__(self, d_model, dropout): |
|
super().__init__() |
|
self.net = nn.Sequential( |
|
nn.Linear(d_model, 10*d_model), |
|
nn.GELU(), |
|
nn.Linear(10*d_model, d_model), |
|
nn.Dropout(dropout) |
|
) |
|
|
|
def forward(self, x): |
|
""" |
|
forward pass of the feedforward network module |
|
|
|
Args: |
|
- x (Tensor): input tensor |
|
|
|
Returns: |
|
- out (Tensor): output tensor after passing through the feedforward network |
|
""" |
|
return self.net(x) |
|
|
|
class EncoderNetwork(nn.Module): |
|
""" |
|
initialize an encoder network module |
|
|
|
Args: |
|
- d_model (int): dimensionality of the model's hidden layers |
|
- n_head (int): no of attention heads in multi-head attention layers |
|
- norm_eps (float): epsilon value for layer normalization |
|
- dropout (float): dropout probability |
|
- block_size (int): the maximum sequence length for positional encoding |
|
""" |
|
def __init__(self, d_model, n_head, norm_eps, dropout, block_size): |
|
super().__init__() |
|
self.s_att = MultiHeadAttention(n_head=n_head, d_model=d_model, dropout=dropout, block_size=block_size) |
|
self.ffwd = FeedForward(d_model, dropout) |
|
self.dropout = nn.Dropout(dropout) |
|
self.norm1 = nn.LayerNorm(d_model, eps=norm_eps) |
|
self.norm2 = nn.LayerNorm(d_model, eps=norm_eps) |
|
|
|
def forward(self, src): |
|
""" |
|
forward pass of the encoder network module. |
|
|
|
Args: |
|
- src (Tensor): input tensor representing source data |
|
|
|
Returns: |
|
- src (Tensor): output tensor after passing through the encoder network |
|
""" |
|
src2 = self.s_att(src, mask=False) |
|
src = src + self.dropout(src2) |
|
src = self.norm1(src) |
|
|
|
src2 = self.ffwd(src) |
|
src = src + self.dropout(src2) |
|
src = self.norm2(src) |
|
|
|
return src |
|
|
|
class DecoderNetwork(nn.Module): |
|
""" |
|
initialize a decoder network module |
|
|
|
Args: |
|
- d_model (int): dimensionality of the model's hidden layers |
|
- n_head (int): no of attention heads in multi-head attention layers |
|
- norm_eps (float): epsilon value for layer normalization |
|
- dropout (float): dropout probability |
|
- block_size (int): the maximum sequence length for positional encoding |
|
""" |
|
def __init__(self, d_model, n_head, norm_eps, dropout, block_size): |
|
super().__init__() |
|
self.s_att = MultiHeadAttention(n_head=n_head, d_model=d_model, dropout=dropout, block_size=block_size) |
|
self.ffwd = FeedForward(d_model, dropout) |
|
self.dropout = nn.Dropout(dropout) |
|
self.norm1 = nn.LayerNorm(d_model, eps=norm_eps) |
|
self.norm2 = nn.LayerNorm(d_model, eps=norm_eps) |
|
|
|
def forward(self, src, att): |
|
""" |
|
forward pass of the decoder network module. |
|
|
|
Args: |
|
- src (Tensor): input tensor, same as the encoder's inputs |
|
- trg (Tensor): encoder's attention matrix |
|
|
|
Returns: |
|
- src_f (Tensor): final output tensor |
|
""" |
|
src2 = self.s_att(src, mask=True) |
|
src = src + self.dropout(src2) |
|
src = src + self.norm1(src) |
|
|
|
att = src + att |
|
att2 = self.s_att(att, mask=False) |
|
att2 = att + self.dropout(att2) |
|
trg = att2 + self.norm1(att2) |
|
|
|
src_f2 = self.ffwd(self.norm2(trg)) |
|
src_f = src_f + self.dropout(src_f2) |
|
src_f = self.norm2(src_f) |
|
|
|
return src_f |
|
|
|
class Transformer(nn.Module): |
|
""" |
|
initialize a Transformer model |
|
|
|
Args: |
|
- vocab_size (int): size of the vocabulary |
|
- d_model (int): dimensionality of the model's hidden layers |
|
- block_size (int): maximum sequence length for positional encoding/context length |
|
- n_layers (int): number of encoder and decoder layers in the Transformer |
|
- n_head (int): number of attention heads in multi-head attention layers |
|
- norm_eps (float): epsilon value for layer normalization |
|
- dropout (float): dropout probability |
|
""" |
|
def __init__(self, vocab_size): |
|
super().__init__() |
|
self.block_size = block_size |
|
self.toked_model = nn.Embedding(vocab_size, d_model) |
|
self.pos_encod = nn.Embedding(block_size, d_model) |
|
self.enc_layer = nn.ModuleList([EncoderNetwork(n_head=n_head, norm_eps=norm_eps, block_size=block_size, dropout=dropout, d_model=d_model) for _ in range(n_layers)]) |
|
self.dec_layer = nn.ModuleList([DecoderNetwork(n_head=n_head, norm_eps=norm_eps, block_size=block_size, dropout=dropout, d_model=d_model) for _ in range(n_layers)]) |
|
|
|
self.norm_final = nn.LayerNorm(d_model) |
|
self.linear_final = nn.Linear(d_model, vocab_size) |
|
self.dropout = nn.Dropout(dropout) |
|
self.apply(self._init_weights) |
|
|
|
def _init_weights(self, module): |
|
""" |
|
initialize weights of linear and embedding layers |
|
|
|
Args: |
|
- module (nn.Module): the module to initialize weights for |
|
""" |
|
if isinstance(module, nn.Linear): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
if module.bias is not None: |
|
torch.nn.init.zeros_(module.bias.data) |
|
elif isinstance(module, nn.Embedding): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
|
def forward(self, idx, targets=None): |
|
""" |
|
forward pass of the transformer model |
|
|
|
Args: |
|
- idx (Tensor): input tensor representing token indices |
|
- targets (Tensor): target tensor for computing loss during training |
|
|
|
Returns: |
|
- logits (Tensor): output logits from the final linear layer |
|
- loss (Tensor): optional. computed cross-entropy loss if targets are provided, else None |
|
""" |
|
B, T = idx.shape |
|
|
|
toked_model = self.toked_model(idx) |
|
pos_encod = self.pos_encod(torch.arange(T, device=device)) |
|
x = toked_model + pos_encod |
|
|
|
for layer in self.enc_layer: |
|
x_out = layer(x) |
|
|
|
for layer in self.dec_layer: |
|
x_final = layer(x, x_out) |
|
|
|
x_final = self.norm_final(x_final) |
|
logits = self.linear_final(x_final) |
|
|
|
if targets is None: |
|
loss = None |
|
|
|
else: |
|
B, T, C = logits.shape |
|
logits = logits.view(B*T, C) |
|
targets = targets.view(B*T) |
|
loss = F.cross_entropy(logits, targets) |
|
|
|
return logits, loss |
|
|
|
def generate(self, idx, max_new_tokens, temperature=1.0, top_k=0): |
|
""" |
|
generate new tokens using the trained model |
|
|
|
Args: |
|
- idx (Tensor): input tensor representing initial token indices |
|
- max_new_tokens (int): max no of new tokens to generate |
|
- temperature (float): softmax temperature for sampling |
|
- top_k (int): no of top tokens to consider in sampling |
|
|
|
Returns: |
|
- generated_tokens (list): list of generated token indices |
|
""" |
|
generated_tokens = [] |
|
|
|
for _ in range(max_new_tokens): |
|
idx_cond = idx[:, -self.block_size:] |
|
logits, _ = self(idx_cond) |
|
logits = logits[:, -1, :] |
|
|
|
scaled_logits = logits / temperature |
|
if top_k > 0: |
|
scaled_logits = self._top_k_filtering(scaled_logits, top_k) |
|
|
|
probs = F.softmax(scaled_logits, dim=-1) |
|
sampled_idx = torch.multinomial(probs, num_samples=1) |
|
generated_tokens.append(sampled_idx.item()) |
|
idx = torch.cat((idx, sampled_idx), dim=1) |
|
|
|
return generated_tokens |
|
|
|
def generate_masked_tokens(self, idx, masked_indices, temperature=1.0, top_k=0): |
|
""" |
|
Generate predictions for masked tokens using the trained model. |
|
|
|
Args: |
|
- idx (Tensor): input tensor representing token indices |
|
- masked_indices (Tensor): tensor of indices indicating masked positions |
|
- temperature (float): softmax temperature for sampling |
|
- top_k (int): no of top tokens to consider in sampling |
|
|
|
Returns: |
|
- predicted_tokens (Tensor): tensor of predicted token indices |
|
""" |
|
B, T = idx.shape |
|
|
|
toked_model = self.toked_model(idx) |
|
pos_encod = self.pos_encod(torch.arange(T, device=device)) |
|
x = toked_model + pos_encod |
|
|
|
for layer in self.enc_layer: |
|
x_out = layer(x) |
|
|
|
for layer in self.dec_layer: |
|
x_final = layer(x, x_out) |
|
|
|
x_masked = x_final.clone() |
|
x_masked[masked_indices] = self.toked_model(torch.tensor([6], device=device)) |
|
|
|
x_masked = self.norm_final(x_masked) |
|
logits = self.linear_final(x_masked) |
|
|
|
masked_logits = logits[masked_indices].view(-1, logits.size(-1)) |
|
scaled_logits = masked_logits / temperature |
|
if top_k > 0: |
|
scaled_logits = self._top_k_filtering(scaled_logits, top_k) |
|
|
|
probs = F.softmax(scaled_logits, dim=-1) |
|
predicted_indices = torch.argmax(probs, dim=-1) |
|
|
|
return predicted_indices |
|
|
|
def _top_k_filtering(self, logits, top_k): |
|
""" |
|
filter logits to keep only the top-k tokens |
|
|
|
Args: |
|
- logits (Tensor): input tensor representing unscaled logits |
|
- top_k (int): no of top tokens to keep |
|
|
|
Returns: |
|
- filtered_logits (Tensor): filtered logits with only top-k tokens remaining |
|
""" |
|
values, indices = torch.topk(logits, top_k, dim=-1) |
|
min_value = values[:, -1].unsqueeze(-1).expand_as(logits) |
|
filtered_logits = torch.where(logits < min_value, torch.ones_like(logits) * -float('inf'), logits) |
|
|
|
return filtered_logits |