In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import torch

# importing the data
file_path = '/content/drive/MyDrive/train2.txt'
with open(file_path, 'r', encoding='utf-8') as file:
 dna_seq = file.read()
file.close()

print(f"{(len(dna_seq)/1e6):.2f} million letters")

485.52 million letters


In [3]:
class PerCharTokenizer:
 """
 Args:
 - chars (list): all bases along with special tokens represented as characters
 - vocab_size (int): size of vocabulary

 Working:
 - vocab contains all the bases and ['P', 'M', 'U'] as padding, mask and unknown token
 - encode(): iterates over each character a time and the looks up for the position in vocab
 and returns it's position as integer
 - decode(): takes input of a list of integers and returns the specific item from vocab
 """
 def __init__(self):
 super().__init__()
 self.chars = ['\n', 'A', 'T', 'G', 'C', 'P', 'M', 'U', ' ']
 self.vocab_size = len(self.chars)
 self.string_to_index = {ch: i for i, ch in enumerate(self.chars)}
 self.index_to_string = {i: ch for i, ch in enumerate(self.chars)}

 def encode(self, string):
 encoded = []
 for char in string:
 if char in self.string_to_index:
 encoded.append(self.string_to_index[char])
 else:
 special_index = len(self.string_to_index)
 self.string_to_index[char] = special_index
 self.index_to_string[special_index] = char
 encoded.append(special_index)
 return encoded

 def decode(self, integer):
 decoded = []
 for i in integer:
 if i in self.index_to_string:
 decoded.append(self.index_to_string[i])
 else:
 continue
 return ''.join(decoded)

In [4]:
token = PerCharTokenizer()
data = torch.tensor(token.encode(dna_seq), dtype=torch.long)

# Train and test splits
n = int(0.9*len(data)) # first 90% will be train, rest val
train_data = data[:n]
val_data = data[n:]
print(f"train data {(len(train_data)/1e6):.2f}million, val data {(len(val_data)/1e6):.2f}million")

train data 436.97million, val data 48.55million


In [5]:
# hyperparams
batch_size = 10
block_size = 512
max_iters = 5000
eval_interval = 100
learning_rate = 3e-4
eval_iters = 100
d_model = 384
n_layers = 12
n_head = 12
dropout = 0.25
norm_eps = 1e-4

In [6]:
import math
import torch.nn as nn
from torch.nn import functional as F

device = 'cuda' if torch.cuda.is_available() else 'cpu'

class AttentionHead(nn.Module):
 """
 initialize a single head of self attention.

 Args:
 - d_model (int): dimensionality of the model's hidden layers
 - head_size (int): dimensionality of each attention head
 - dropout (float): dropout probability
 - block_size (int): the maximum sequence length for positional encoding
 """
 def __init__(self, d_model, head_size, dropout, block_size):
 super().__init__()
 self.key = nn.Linear(d_model, head_size, bias=True)
 self.query = nn.Linear(d_model, head_size, bias=True)
 self.value = nn.Linear(d_model, head_size, bias=False)
 self.dropout = nn.Dropout(dropout)
 self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

 self.rel_pos_emb = nn.Parameter(torch.randn(block_size, block_size, head_size))

 def forward(self, x, mask=False):
 """
 forward pass of a single attention head.

 Args:
 - x (Tensor): input tensor.
 - mask (bool): flag indicating whether to apply masking
 Returns:
 - out (Tensor): output tensor after self attention
 """
 B, T, C = x.shape
 key = self.key(x)
 query = self.query(x)
 scores = torch.matmul(query, key.transpose(-2, -1)) / (key.shape[-1] ** -0.5)

 rel_pos_scores = torch.einsum('btc,tvc->btv', query, self.rel_pos_emb[:T, :T])
 scores += rel_pos_scores

 if mask:
 scores = scores.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
 weights = F.softmax(scores, dim=-1)
 weights = self.dropout(weights)

 value = self.value(x)
 out = torch.matmul(weights, value)
 return out

class MultiHeadAttention(nn.Module):
 """
 initialize a multi-head attention module.

 Args:
 - d_model (int): dimensionality of the model's hidden layers
 - n_head (int): no of attention heads
 - dropout (float): dropout probability
 - block_size (int): context length
 """
 def __init__(self, d_model, n_head, dropout, block_size):
 head_size = d_model // n_head
 super().__init__()
 self.heads = nn.ModuleList([AttentionHead(d_model=d_model, dropout=dropout, head_size=head_size, block_size=block_size) for _ in range(n_head)])
 self.proj = nn.Linear(n_head * head_size, d_model)
 self.dropout = nn.Dropout(dropout)

 def forward(self, x, mask):
 """
 forward pass of the multi-head attention module

 Args:
 - x (Tensor): input tensor
 - mask (bool): flag indicating whether to apply masking

 Returns:
 - out (Tensor): output tensor after multi-head attention

 """
 out = torch.cat([h(x, mask=mask) for h in self.heads], dim=-1)
 out = self.dropout(self.proj(out))
 return out

class FeedForward(nn.Module):
 """
 initialize a feedforward network module

 Args:
 - d_model (int): the dimensionality of the model's hidden layers
 - dropout (float): dropout probability

 """
 def __init__(self, d_model, dropout):
 super().__init__()
 self.net = nn.Sequential(
 nn.Linear(d_model, 5*d_model),
 nn.GELU(),
 nn.Linear(5*d_model, d_model),
 nn.Dropout(dropout)
 )

 def forward(self, x):
 """
 forward pass of the feedforward network module

 Args:
 - x (Tensor): input tensor

 Returns:
 - out (Tensor): output tensor after passing through the feedforward network
 """
 return self.net(x)

class EncoderNetwork(nn.Module):
 """
 initialize an encoder network module

 Args:
 - d_model (int): dimensionality of the model's hidden layers
 - n_head (int): no of attention heads in multi-head attention layers
 - norm_eps (float): epsilon value for layer normalization
 - dropout (float): dropout probability
 - block_size (int): the maximum sequence length for positional encoding
 """
 def __init__(self, d_model, n_head, norm_eps, dropout, block_size):
 super().__init__()
 self.s_att = MultiHeadAttention(n_head=n_head, d_model=d_model, dropout=dropout, block_size=block_size)
 self.ffwd = FeedForward(d_model, dropout)
 self.dropout = nn.Dropout(dropout)
 self.norm1 = nn.LayerNorm(d_model, eps=norm_eps)
 self.norm2 = nn.LayerNorm(d_model, eps=norm_eps)

 def forward(self, src):
 """
 forward pass of the encoder network module.

 Args:
 - src (Tensor): input tensor representing source data

 Returns:
 - src (Tensor): output tensor after passing through the encoder network
 """
 src2 = self.s_att(src, mask=False)
 src = src + self.dropout(src2)
 src = self.norm1(src)

 src2 = self.ffwd(src)
 src = src + self.dropout(src2)
 src = self.norm2(src)

 return src

class DecoderNetwork(nn.Module):
 """
 initialize a decoder network module

 Args:
 - d_model (int): dimensionality of the model's hidden layers
 - n_head (int): no of attention heads in multi-head attention layers
 - norm_eps (float): epsilon value for layer normalization
 - dropout (float): dropout probability
 - block_size (int): the maximum sequence length for positional encoding
 """
 def __init__(self, d_model, n_head, norm_eps, dropout, block_size):
 super().__init__()
 self.s_att = MultiHeadAttention(n_head=n_head, d_model=d_model, dropout=dropout, block_size=block_size)
 self.ffwd = FeedForward(d_model, dropout)
 self.dropout = nn.Dropout(dropout)
 self.norm1 = nn.LayerNorm(d_model, eps=norm_eps)
 self.norm2 = nn.LayerNorm(d_model, eps=norm_eps)

 def forward(self, src, att):
 """
 forward pass of the decoder network module.

 Args:
 - src (Tensor): input tensor, same as the encoder's inputs
 - trg (Tensor): encoder's attention matrix

 Returns:
 - src_f (Tensor): final output tensor
 """
 src2 = self.s_att(src, mask=True)
 src = src + self.dropout(src2)
 src = src + self.norm1(src)

 att = src + att
 att2 = self.s_att(att, mask=False)
 att2 = att + self.dropout(att2)
 trg = att2 + self.norm1(att2)

 src_f2 = self.ffwd(self.norm2(trg))
 src_f = src_f2 + self.dropout(src_f2)
 src_f = self.norm2(src_f)

 return src_f

class Transformer(nn.Module):
 """
 initialize a Transformer model

 Args:
 - vocab_size (int): size of the vocabulary
 - d_model (int): dimensionality of the model's hidden layers
 - block_size (int): maximum sequence length for positional encoding/context length
 - n_layers (int): number of encoder and decoder layers in the Transformer
 - n_head (int): number of attention heads in multi-head attention layers
 - norm_eps (float): epsilon value for layer normalization
 - dropout (float): dropout probability
 """
 def __init__(self, vocab_size):
 super().__init__()
 self.block_size = block_size
 self.toked_model = nn.Embedding(vocab_size, d_model)
 self.pos_encod = nn.Embedding(block_size, d_model)
 self.enc_layer = nn.ModuleList([EncoderNetwork(n_head=n_head, norm_eps=norm_eps, block_size=block_size, dropout=dropout, d_model=d_model) for _ in range(n_layers)])
 self.dec_layer = nn.ModuleList([DecoderNetwork(n_head=n_head, norm_eps=norm_eps, block_size=block_size, dropout=dropout, d_model=d_model) for _ in range(n_layers)])

 self.norm_final = nn.LayerNorm(d_model)
 self.linear_final = nn.Linear(d_model, vocab_size)
 self.dropout = nn.Dropout(dropout)
 self.apply(self._init_weights)

 def _init_weights(self, module):
 """
 initialize weights of linear and embedding layers

 Args:
 - module (nn.Module): the module to initialize weights for
 """
 if isinstance(module, nn.Linear):
 torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
 if module.bias is not None:
 torch.nn.init.zeros_(module.bias.data)
 elif isinstance(module, nn.Embedding):
 torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

 def forward(self, idx, targets=None):
 """
 forward pass of the transformer model

 Args:
 - idx (Tensor): input tensor representing token indices
 - targets (Tensor): target tensor for computing loss during training

 Returns:
 - logits (Tensor): output logits from the final linear layer
 - loss (Tensor): optional. computed cross-entropy loss if targets are provided, else None
 """
 B, T = idx.shape

 toked_model = self.toked_model(idx)
 pos_encod = self.pos_encod(torch.arange(T, device=device))
 x = toked_model + pos_encod

 for layer in self.enc_layer:
 x_out = layer(x)

 for layer in self.dec_layer:
 x_final = layer(x, x_out)

 x_final = self.norm_final(x_final)
 logits = self.linear_final(x_final)

 if targets is None:
 loss = None

 else:
 B, T, C = logits.shape
 logits = logits.view(B*T, C)
 targets = targets.view(B*T)
 loss = F.cross_entropy(logits, targets)

 return logits, loss
 def generate(self, idx, max_new_tokens, temperature=1.0, top_k=0):
 """
 generate new tokens using the trained model

 Args:
 - idx (Tensor): input tensor representing initial token indices
 - max_new_tokens (int): max no of new tokens to generate
 - temperature (float): softmax temperature for sampling
 - top_k (int): no of top tokens to consider in sampling

 Returns:
 - generated_tokens (list): list of generated token indices
 """
 generated_tokens = []

 for _ in range(max_new_tokens):
 idx_cond = idx[:, -self.block_size:]
 logits, _ = self(idx_cond)
 logits = logits[:, -1, :]

 scaled_logits = logits / temperature
 if top_k > 0:
 scaled_logits = self._top_k_filtering(scaled_logits, top_k)

 probs = F.softmax(scaled_logits, dim=-1)
 sampled_idx = torch.multinomial(probs, num_samples=1)
 generated_tokens.append(sampled_idx.item())
 idx = torch.cat((idx, sampled_idx), dim=1)

 return generated_tokens

 def generate_masked_tokens(self, idx, masked_indices, temperature=1.0, top_k=0):
 """
 Generate predictions for masked tokens using the trained model.

 Args:
 - idx (Tensor): input tensor representing token indices
 - masked_indices (Tensor): tensor of indices indicating masked positions
 - temperature (float): softmax temperature for sampling
 - top_k (int): no of top tokens to consider in sampling

 Returns:
 - predicted_tokens (Tensor): tensor of predicted token indices
 """
 B, T = idx.shape

 toked_model = self.toked_model(idx)
 pos_encod = self.pos_encod(torch.arange(T, device=device))
 x = toked_model + pos_encod

 for layer in self.enc_layer:
 x_out = layer(x)

 for layer in self.dec_layer:
 x_final = layer(x, x_out)

 x_masked = x_final.clone()
 x_masked[masked_indices] = self.toked_model(torch.tensor([6], device=device))

 x_masked = self.norm_final(x_masked)
 logits = self.linear_final(x_masked)

 masked_logits = logits[masked_indices].view(-1, logits.size(-1))
 scaled_logits = masked_logits / temperature
 if top_k > 0:
 scaled_logits = self._top_k_filtering(scaled_logits, top_k)

 probs = F.softmax(scaled_logits, dim=-1)
 predicted_indices = torch.argmax(probs, dim=-1)

 return predicted_indices

 def _top_k_filtering(self, logits, top_k):
 """
 filter logits to keep only the top-k tokens

 Args:
 - logits (Tensor): input tensor representing unscaled logits
 - top_k (int): no of top tokens to keep

 Returns:
 - filtered_logits (Tensor): filtered logits with only top-k tokens remaining
 """
 values, indices = torch.topk(logits, top_k, dim=-1)
 min_value = values[:, -1].unsqueeze(-1).expand_as(logits)
 filtered_logits = torch.where(logits < min_value, torch.ones_like(logits) * -float('inf'), logits)

 return filtered_logits

In [7]:
import timeit

start_time = timeit.default_timer()
# data loading
def get_batch(split):

 data = train_data if split == 'train' else val_data
 ix = torch.randint(len(data) - block_size, (batch_size,))
 x = torch.stack([data[i:i+block_size] for i in ix])
 y = torch.stack([data[i+1:i+block_size+1] for i in ix])
 x, y = x.to(device), y.to(device)
 return x, y

@torch.no_grad()
def estimate_loss():
 out = {}
 model.eval()
 for split in ['train', 'val']:
 losses = torch.zeros(eval_iters)
 for k in range(eval_iters):
 X, Y = get_batch(split)
 logits, loss = model(X, Y)
 losses[k] = loss.item()
 out[split] = losses.mean()
 model.train()
 return out

vocab_size = token.vocab_size
model = Transformer(vocab_size)
# checkpoint_path = '/content/drive/MyDrive/enigma-2.5b.pth'
# checkpoint = torch.load(checkpoint_path)
# model.load_state_dict(checkpoint)
m = model.to(device)

# no of parameters
n_param = sum(p.numel() for p in m.parameters())/1e9
print(f"{n_param:.1f} billion parameters")

# optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
steps = []
train_losses = []
val_losses = []

for iter in range(max_iters):

 if iter % eval_interval == 0 or iter == max_iters - 1:
 losses = estimate_loss()
 print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

 steps.append(iter)
 train_losses.append(losses['train'])
 val_losses.append(losses['val'])

 xb, yb = get_batch('train')
 logits, loss = model(xb, yb)
 optimizer.zero_grad(set_to_none=True)
 loss.backward()
 optimizer.step()

2.5 billion parameters
step 0: train loss 2.2869, val loss 2.2884
step 100: train loss 1.3312, val loss 1.3281
step 200: train loss 1.3233, val loss 1.3181
step 300: train loss 1.3209, val loss 1.3196
step 400: train loss 1.3215, val loss 1.3203
step 500: train loss 1.1974, val loss 1.1994
step 600: train loss 0.3350, val loss 0.3365
step 700: train loss 0.0703, val loss 0.0702
step 800: train loss 0.0143, val loss 0.0143
step 900: train loss 0.0049, val loss 0.0047
step 1000: train loss 0.0041, val loss 0.0037
step 1100: train loss 0.0035, val loss 0.0036
step 1200: train loss 0.0038, val loss 0.0035
step 1300: train loss 0.0035, val loss 0.0033
step 1400: train loss 0.0035, val loss 0.0033
step 1500: train loss 0.0033, val loss 0.0033
step 1600: train loss 0.0033, val loss 0.0034
step 1700: train loss 0.0033, val loss 0.0033
step 1800: train loss 0.0033, val loss 0.0031
step 1900: train loss 0.0031, val loss 0.0031
step 2000: train loss 0.0032, val loss 0.0032


KeyboardInterrupt: 

In [8]:
end_time = timeit.default_timer()
print(f"total parameters: {n_param:.1f} billion")
print(f"trained in {((end_time - start_time)/3600):.2f}hrs")

total parameters: 2.5 billion
trained in 1.82hrs


In [10]:
model_save_name = f'enigma-{n_param:.1f}b_v1.pth'
path = f"/content/drive/MyDrive/{model_save_name}"
torch.save(model.state_dict(), path)

In [None]:
# 8-bit quantization

import torch
import torch.quantization

checkpoint_path = '/content/drive/MyDrive/enigma-2.5b.pth'
checkpoint = torch.load(checkpoint_path)
model.load_state_dict(checkpoint)
model = model.to(device)

quantized_model = torch.quantization.quantize_dynamic(
 model,
 dtype=torch.qint8
)
quantized_model_file = f'/content/drive/MyDrive/enigma-2.5b-quant.pth'
torch.save(quantized_model.state_dict(), quantized_model_file)

print("Quantized model saved successfully.")

In [None]:
# pruning

import torch
from torch import nn
from torch.utils.model_zoo import load_url
import torch.nn.utils.prune as prune

parameters_to_prune = [(model.encoder.self_attn, 'weight'), (model.encoder.linear1, 'weight')]
prune.global_unstructured(
 parameters_to_prune,
 pruning_method=prune.L1Unstructured,
 amount=0.15,
)

torch.save(model.state_dict(), 'enigma-2.5b_pruned.pth')

In [None]:
class Generator(Transformer):
 def __init__(self, vocab_size, block_size):
 super().__init__(vocab_size)
 self.vocab_size = vocab_size
 self.block_size = block_size

 def generate(self, idx, max_new_tokens, temperature=1.0, top_k=0):
 """
 generate new tokens using the trained model

 Args:
 - idx (Tensor): input tensor representing initial token indices
 - max_new_tokens (int): max no of new tokens to generate
 - temperature (float): softmax temperature for sampling
 - top_k (int): no of top tokens to consider in sampling

 Returns:
 - generated_tokens (list): list of generated token indices
 """
 generated_tokens = []

 for _ in range(max_new_tokens):
 idx_cond = idx[:, -self.block_size:]
 logits, _ = self(idx_cond)
 logits = logits[:, -1, :]

 scaled_logits = logits / temperature
 if top_k > 0:
 scaled_logits = self._top_k_filtering(scaled_logits, top_k)

 probs = F.softmax(scaled_logits, dim=-1)
 sampled_idx = torch.multinomial(probs, num_samples=1)
 generated_tokens.append(sampled_idx.item())
 idx = torch.cat((idx, sampled_idx), dim=1)

 return generated_tokens

 def generate_masked_tokens(self, idx, masked_indices, temperature=1.0, top_k=0):
 """
 Generate predictions for masked tokens using the trained model.

 Args:
 - idx (Tensor): input tensor representing token indices
 - masked_indices (Tensor): tensor of indices indicating masked positions
 - temperature (float): softmax temperature for sampling
 - top_k (int): no of top tokens to consider in sampling

 Returns:
 - predicted_tokens (Tensor): tensor of predicted token indices
 """
 B, T = idx.shape

 toked_model = self.toked_model(idx)
 pos_encod = self.pos_encod(torch.arange(T, device=device))
 x = toked_model + pos_encod

 for layer in self.enc_layer:
 x_out = layer(x)

 for layer in self.dec_layer:
 x_final = layer(x, x_out)

 x_masked = x_final.clone()
 x_masked[masked_indices] = self.toked_model(torch.tensor([6], device=device))

 x_masked = self.norm_final(x_masked)
 logits = self.linear_final(x_masked)

 masked_logits = logits[masked_indices].view(-1, logits.size(-1))
 scaled_logits = masked_logits / temperature
 if top_k > 0:
 scaled_logits = self._top_k_filtering(scaled_logits, top_k)

 probs = F.softmax(scaled_logits, dim=-1)
 predicted_indices = torch.argmax(probs, dim=-1)

 return predicted_indices

 def _top_k_filtering(self, logits, top_k):
 """
 filter logits to keep only the top-k tokens

 Args:
 - logits (Tensor): input tensor representing unscaled logits
 - top_k (int): no of top tokens to keep

 Returns:
 - filtered_logits (Tensor): filtered logits with only top-k tokens remaining
 """
 values, indices = torch.topk(logits, top_k, dim=-1)
 min_value = values[:, -1].unsqueeze(-1).expand_as(logits)
 filtered_logits = torch.where(logits < min_value, torch.ones_like(logits) * -float('inf'), logits)

 return filtered_logits

In [None]:
generator = Generator(vocab_size, block_size)

target_text = "AGTTCTGCGAT"
context = torch.tensor([token.encode(target_text)], dtype=torch.long, device=device)
generated_output = token.decode(generator.generate(context, max_new_tokens=100, temperature=0.9, top_k=5))
print(f"{target_text}{generated_output}")

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cpu and cuda:0! (when checking argument for argument index in method wrapper_CUDA__index_select)