### Train file for enigma model

- Contains K-mer tokenizer, k=4, can be changed though
- Train data is available on huggingface repo: [hf/engima-1.5b](https://huggingface.co./shivendrra/enigma-1.5b)
- For now, trainig decoder-based model only
- More about this on github repo: [github/enigma-1.5b](https://github.com/shivendrra/enigma-1.5b)
- Saves model after training in '.pth' & '.safetensors' file for later use
- Generate function works fine

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch

# importing the data
file_path = '/content/drive/MyDrive/consolidated_dna.txt'
with open(file_path, 'r', encoding='utf-8') as file:
 dna_seq = file.read()
file.close()

print(f"{(len(dna_seq)/1e6):.2f} million letters")

In [None]:
import os
from tqdm import tqdm
import json

class KMerTokenizer:
 def __init__(self, k_mers: int=4):
 self.k_mers = k_mers
 self.vocab = {}
 self.id_to_token = []
 self.token_to_id = {}

 def tokenize_sequence(self, sequence):
 kmers = [sequence[i:i+self.k_mers] for i in tqdm(range(0, len(sequence), self.k_mers), desc="tokenizing k-mers")]
 return kmers

 def build_vocab(self, sequences):
 all_kmers = []
 for sequence in sequences:
 all_kmers.extend(self.tokenize_sequence(sequence))
 token_count = {}
 for kmer in all_kmers:
 if kmer in token_count:
 token_count[kmer] += 1
 else:
 token_count[kmer] = 1
 sorted_tokens = sorted(token_count.items(), key=lambda x: x[1], reverse=True)
 for token, _ in sorted_tokens:
 self.token_to_id[token] = len(self.token_to_id)
 self.id_to_token.append(token)
 self.vocab = self.token_to_id

 def encode(self, sequence):
 encoded_sequence = []
 kmers = self.tokenize_sequence(sequence)
 for kmer in tqdm(kmers, desc="encoding sequences"):
 if kmer in self.token_to_id:
 encoded_sequence.append(self.token_to_id[kmer])
 else:
 encoded_sequence.append(len(self.vocab))
 return encoded_sequence

 def decode(self, encoded_sequence):
 decoded_sequence = [self.id_to_token[token_id] for token_id in encoded_sequence]
 return decoded_sequence

 def save_model(self, model_path):
 vocab_file = f"{model_path}/base_{self.k_mers}k.json"
 with open(vocab_file, 'w') as f:
 json.dump(self.vocab, f)

 def load_model(self, path):
 assert path.endswith('.json')
 with open(path, 'r') as f:
 vocab = json.load(f)

 self.vocab = vocab
 self.token_to_id = self.vocab
 self.vocab_size = len(vocab)

In [None]:
token = KMerTokenizer()
token.build_vocab([dna_seq])
print(f"vocab size: {len(token.vocab)}")
print(token.id_to_token[:10])

In [None]:
# Train and test splits
data = torch.tensor(token.encode(dna_seq), dtype=torch.long)
print(f"{(len(data)/1e6):0f} million"")
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]
print(f"train data {(len(train_data)/1e6):.0f}million, val data {(len(val_data)/1e6):.0f}million")

In [None]:
# hyperparams
batch_size = 10
block_size = 256
max_iters = 5000
eval_interval = 100
learning_rate = 3e-5
eval_iters = 100
d_model = 512
n_layers = 12
n_head = 18
dropout = 0.25
norm_eps = 1e-5

In [None]:
import torch.nn as nn
from torch.nn import functional as F
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class RMSNorm(nn.Module):
 def __init__(self, dim: int, eps: float = 1e-6):
 super().__init__()
 self.eps = eps
 self.weight = nn.Parameter(torch.ones(dim))

 def _norm(self, x):
 return x * torch.rsqrt(x.pow(2).mean(-1, keepdim=True) + self.eps)

 def forward(self, x):
 output = self._norm(x.float()).type_as(x)
 return output * self.weight

class SingleHead(nn.Module):
 def __init__(self,
 head_size: int,
 d_model: int,
 block_size: int,
 dropout: float):
 super().__init__()
 self.key = nn.Linear(d_model, head_size, bias=True)
 self.query = nn.Linear(d_model, head_size, bias=True)
 self.value = nn.Linear(d_model, head_size, bias=True)
 self.dropout = nn.Dropout(dropout)
 self.rel_pos_embd = nn.Parameter(torch.randn(block_size, block_size, head_size))
 self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

 def forward(self, x: torch.Tensor, mask: bool= False):
 B, T, C = x.shape
 key = self.key(x)
 query = self.query(x)
 scores = torch.matmul(query ,key.transpose(-2, -1)) / (key.shape[-1]**-0.5)

 if mask is True:
 scores = scores.masked_fill(self.tril[:T, :T] == 0, float('-inf'))

 rel_pos_scores = torch.einsum('btc,tvc->btv', query, self.rel_pos_embd[:T, :T])
 scores = scores + rel_pos_scores

 att_mat = F.softmax(scores, dim=-1)
 att_mat = self.dropout(att_mat)
 value = self.value(x)
 output = torch.matmul(att_mat, value)
 return output

class MultiHeadAttention(nn.Module):
 def __init__(self,
 d_model: int,
 block_size: int,
 n_head : int,
 dropout: float):
 head_size = d_model // n_head
 super().__init__()
 self.heads = nn.ModuleList([SingleHead(d_model=d_model, dropout=dropout, block_size=block_size, head_size=head_size) for _ in range(n_head)])
 self.projection = nn.Linear(d_model, d_model)
 self.dropout = nn.Dropout(dropout)

 def forward(self, x: torch.Tensor, mask: bool):
 out = torch.cat([h(x, mask) for h in self.heads], dim=-1)
 out = self.dropout(self.projection(out))
 return out

class FeedForward(nn.Module):
 def __init__(self, d_model, dropout):
 super().__init__()
 self.net = nn.Sequential(
 nn.Linear(d_model, 5 * d_model),
 nn.GELU(),
 nn.Linear(5 * d_model, d_model),
 nn.Dropout(dropout),
 )

 def forward(self, x: torch.Tensor):
 return self.net(x)

class DecoderBlock(nn.Module):
 def __init__(self, d_model: int,
 block_size: int,
 n_head: int,
 norm_eps: float,
 dropout: float):
 super().__init__()
 self.self_att = MultiHeadAttention(n_head=n_head, d_model=d_model, dropout=dropout, block_size=block_size)
 self.ffwd = FeedForward(d_model, dropout)
 self.dropout = nn.Dropout(dropout)
 self.norm = RMSNorm(d_model, eps=norm_eps)

 def forward(self, x: torch.Tensor):
 x_out = self.self_att(self.norm(x), mask=True)
 x_out = x + self.dropout(x_out)
 del x

 x = self.self_att(self.norm(x_out, mask=False))
 x = x_out + self.dropout(x)
 del x_out

 x_out = self.ffwd(self.norm(x))
 x_out = x + self.dropout(x_out)
 del x

 return x_out

class Transformer(nn.Module):
 def __init__(self, vocab_size: int):
 super().__init__()
 self.block_size = block_size
 self.token_embeddings = nn.Embedding(vocab_size, d_model)
 self.decoder = nn.Sequential(*[DecoderBlock(n_head=n_head, d_model=d_model, dropout=dropout, norm_eps=norm_eps, block_size=block_size) for _ in range(n_layers)])
 self.norm_final = RMSNorm(d_model, eps=norm_eps)
 self.linear_final = nn.Linear(d_model, vocab_size)
 self.dropout = nn.Dropout(dropout)
 self.apply(self._init_weights)

 def _init_weights(self, module):
 if isinstance(module, nn.Linear):
 torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
 if module.bias is not None:
 torch.nn.init.zeros_(module.bias.data)
 elif isinstance(module, nn.Embedding):
 torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

 def forward(self, idx, targets=None):
 B, T = idx.shape
 x = self.token_embeddings(idx)
 x = self.decoder(x)
 logits = self.linear_final(self.norm_final(x))

 if targets is None:
 loss = None

 else:
 B, T, C = logits.shape
 logits = logits.view(B*T, C)
 targets = targets.view(B*T)
 loss = F.cross_entropy(logits, targets)

 return logits, loss

 @torch.no_grad()
 def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
 self.eval()
 for _ in range(max_new_tokens):

 idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
 logits, _ = self(idx_cond)
 logits = logits[:, -1, :] / temperature

 if top_k is not None:
 v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
 logits[logits < v[:, [-1]]] = -float('Inf')

 probs = F.softmax(logits, dim=-1)
 idx_next = torch.multinomial(probs, num_samples=1)
 idx = torch.cat((idx, idx_next), dim=1)

 return idx

In [None]:
import timeit
start_time = timeit.default_timer()

def get_batch(split):
 data = train_data if split == 'train' else val_data
 ix = torch.randint(len(data) - block_size, (batch_size,))
 x = torch.stack([data[i:i+block_size] for i in ix])
 y = torch.stack([data[i+1:i+block_size+1] for i in ix])
 x, y = x.to(device), y.to(device)
 return x, y

@torch.no_grad()
def estimate_loss():
 out = {}
 model.eval()
 for split in ['train', 'val']:
 losses = torch.zeros(eval_iters)
 for k in range(eval_iters):
 X, Y = get_batch(split)
 logits, loss = model(X, Y)
 losses[k] = loss.item()
 out[split] = losses.mean()
 model.train()
 return out

vocab_size = len(token.vocab)
model = Transformer(vocab_size)
# checkpoint_path = '/content/drive/MyDrive/enigma-2.5b.pth'
# checkpoint = torch.load(checkpoint_path)
# model.load_state_dict(checkpoint)
m = model.to(device)

# no of parameters
n_param = sum(p.numel() for p in m.parameters())/1e6
print(f"{n_param:.1f} million parameters")

# optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
steps = []
train_losses = []
val_losses = []

for iter in range(max_iters):

 if iter % eval_interval == 0 or iter == max_iters - 1:
 losses = estimate_loss()
 print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

 steps.append(iter)
 train_losses.append(losses['train'])
 val_losses.append(losses['val'])

 xb, yb = get_batch('train')
 logits, loss = model(xb, yb)
 optimizer.zero_grad(set_to_none=True)
 loss.backward()
 optimizer.step()

In [None]:
end_time = timeit.default_timer()
print(f"total parameters: {n_param:.1f} billion")
print(f"trained in {((end_time - start_time)/3600):.2f}hrs")

In [None]:
model_save_name = f'consolidated_00.pth'
path = f"/content/drive/MyDrive/{model_save_name}"
torch.save(model.state_dict(), path)

# saving safe-tensors
from safetensors.torch import save_file

model_save_name = f'consolidated_00.safetensors'
path = f"/content/drive/MyDrive/{model_save_name}"
save_file(model.state_dict(), path)

In [None]:
!nvidia-smi