Spaces:
Running
Running
import os | |
import gc | |
import random | |
import itertools | |
import warnings | |
import logging | |
warnings.filterwarnings('ignore') | |
logging.disable(logging.WARNING) | |
import numpy as np | |
import pandas as pd | |
from tqdm.auto import tqdm | |
import tokenizers | |
import transformers | |
from transformers import AutoTokenizer, AutoConfig, AutoModel, T5EncoderModel, get_linear_schedule_with_warmup | |
import datasets | |
from datasets import load_dataset, load_metric | |
import sentencepiece | |
import argparse | |
import torch | |
from torch.utils.data import Dataset, DataLoader | |
import torch.nn.functional as F | |
import torch.nn as nn | |
import pickle | |
import time | |
from sklearn.preprocessing import MinMaxScaler | |
from datasets.utils.logging import disable_progress_bar | |
from sklearn.metrics import mean_squared_error, r2_score | |
disable_progress_bar() | |
import streamlit as st | |
st.title('predictyield-t5') | |
st.markdown('##### At this space, you can predict the yields of reactions from their inputs.') | |
st.markdown('##### The code expects input_data as a string or CSV file that contains an "input" column. The format of the string or contents of the column are like "REACTANT:{reactants of the reaction}REAGENT:{reagents, catalysts, or solvents of the reaction}PRODUCT:{products of the reaction}".') | |
st.markdown('##### If there are no reagents or catalysts, fill the blank with a space. And if there are multiple reactants, concatenate them with "."') | |
display_text = 'input the reaction smiles (e.g. REACTANT:CC(C)n1ncnc1-c1cn2c(n1)-c1cnc(O)cc1OCC2.CCN(C(C)C)C(C)C.Cl.NC(=O)[C@@H]1C[C@H](F)CN1REAGENT: PRODUCT:O=C(NNC(=O)C(F)(F)F)C(F)(F)F)' | |
class CFG(): | |
uploaded_file = st.file_uploader("Choose a CSV file") | |
data = st.text_area(display_text) | |
pretrained_model_name_or_path = 'sagawa/ZINC-t5' | |
model = 't5' | |
model_name_or_path = './' | |
max_len = 512 | |
batch_size = 5 | |
fc_dropout = 0.1 | |
seed = 42 | |
num_workers=1 | |
if st.button('predict'): | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
def seed_everything(seed=42): | |
random.seed(seed) | |
os.environ['PYTHONHASHSEED'] = str(seed) | |
np.random.seed(seed) | |
torch.manual_seed(seed) | |
torch.cuda.manual_seed(seed) | |
torch.backends.cudnn.deterministic = True | |
seed_everything(seed=CFG.seed) | |
CFG.tokenizer = AutoTokenizer.from_pretrained(CFG.model_name_or_path, return_tensors='pt') | |
def prepare_input(cfg, text): | |
inputs = cfg.tokenizer(text, add_special_tokens=True, max_length=CFG.max_len, padding='max_length', return_offsets_mapping=False, truncation=True, return_attention_mask=True) | |
for k, v in inputs.items(): | |
inputs[k] = torch.tensor(v, dtype=torch.long) | |
return inputs | |
class TestDataset(Dataset): | |
def __init__(self, cfg, df): | |
self.cfg = cfg | |
self.inputs = df['input'].values | |
def __len__(self): | |
return len(self.inputs) | |
def __getitem__(self, item): | |
inputs = prepare_input(self.cfg, self.inputs[item]) | |
return inputs | |
class RegressionModel(nn.Module): | |
def __init__(self, cfg, config_path=None, pretrained=False): | |
super().__init__() | |
self.cfg = cfg | |
if config_path is None: | |
self.config = AutoConfig.from_pretrained(cfg.pretrained_model_name_or_path, output_hidden_states=True) | |
else: | |
self.config = torch.load(config_path) | |
if pretrained: | |
if 't5' in cfg.model: | |
self.model = T5EncoderModel.from_pretrained(CFG.pretrained_model_name_or_path) | |
else: | |
self.model = AutoModel.from_pretrained(CFG.pretrained_model_name_or_path) | |
else: | |
if 't5' in cfg.model: | |
self.model = T5EncoderModel.from_pretrained('sagawa/ZINC-t5') | |
else: | |
self.model = AutoModel.from_config(self.config) | |
self.model.resize_token_embeddings(len(cfg.tokenizer)) | |
self.fc_dropout1 = nn.Dropout(cfg.fc_dropout) | |
self.fc1 = nn.Linear(self.config.hidden_size, self.config.hidden_size) | |
self.fc_dropout2 = nn.Dropout(cfg.fc_dropout) | |
self.fc2 = nn.Linear(self.config.hidden_size, 1) | |
def forward(self, inputs): | |
outputs = self.model(**inputs) | |
last_hidden_states = outputs[0] | |
output = self.fc1(self.fc_dropout1(last_hidden_states)[:, 0, :].view(-1, self.config.hidden_size)) | |
output = self.fc2(self.fc_dropout2(output)) | |
return output | |
def inference_fn(test_loader, model, device): | |
preds = [] | |
model.eval() | |
model.to(device) | |
tk0 = tqdm(test_loader, total=len(test_loader)) | |
for inputs in tk0: | |
for k, v in inputs.items(): | |
inputs[k] = v.to(device) | |
with torch.no_grad(): | |
y_preds = model(inputs) | |
preds.append(y_preds.to('cpu').numpy()) | |
predictions = np.concatenate(preds) | |
return predictions | |
model = RegressionModel(CFG, config_path=CFG.model_name_or_path + '/config.pth', pretrained=False) | |
state = torch.load(CFG.model_name_or_path + '/ZINC-t5_best.pth', map_location=torch.device('cpu')) | |
model.load_state_dict(state) | |
if CFG.uploaded_file is not None: | |
test_ds = pd.read_csv(CFG.uploaded_file) | |
test_dataset = TestDataset(CFG, test_ds) | |
test_loader = DataLoader(test_dataset, | |
batch_size=CFG.batch_size, | |
shuffle=False, | |
num_workers=CFG.num_workers, pin_memory=True, drop_last=False) | |
prediction = inference_fn(test_loader, model, device) | |
test_ds['prediction'] = prediction*100 | |
test_ds['prediction'] = test_ds['prediction'].clip(0, 100) | |
csv = test_ds.to_csv(index=False) | |
st.download_button( | |
label="Download data as CSV", | |
data=csv, | |
file_name='output.csv', | |
mime='text/csv' | |
) | |
else: | |
test_ds = pd.DataFrame.from_dict({'input': CFG.data}, orient='index').T | |
test_dataset = TestDataset(CFG, test_ds) | |
test_loader = DataLoader(test_dataset, | |
batch_size=1, | |
shuffle=False, | |
num_workers=CFG.num_workers, pin_memory=True, drop_last=False) | |
prediction = inference_fn(test_loader, model, device) | |
prediction = max(min(prediction[0][0]*100, 100), 0) | |
st.text('yiled: '+ str(prediction)) | |