|
import argparse |
|
import numpy as np |
|
import pyBigWig,os |
|
from zipfile import ZipFile |
|
import zipfile |
|
import shutil |
|
import torch |
|
from pretrain.model import build_epd_model |
|
from pretrain.track.model import build_track_model |
|
from cage.model import build_cage_model |
|
from cop.micro_model import build_microc_model |
|
from cop.hic_model import build_hic_model |
|
from einops import rearrange |
|
import gradio |
|
|
|
|
|
|
|
|
|
def parser_args(): |
|
""" |
|
Hyperparameters for the pre-training model |
|
""" |
|
|
|
parser = argparse.ArgumentParser(add_help = False) |
|
parser.add_argument('--num_class', default=245, type=int,help='the number of epigenomic features to be predicted') |
|
parser.add_argument('--seq_length', default=1600, type=int,help='the length of input sequences') |
|
parser.add_argument('--nheads', default=4, type=int) |
|
parser.add_argument('--hidden_dim', default=512, type=int) |
|
parser.add_argument('--dim_feedforward', default=1024, type=int) |
|
parser.add_argument('--enc_layers', default=1, type=int) |
|
parser.add_argument('--dec_layers', default=2, type=int) |
|
parser.add_argument('--dropout', default=0.2, type=float) |
|
args, unknown = parser.parse_known_args() |
|
return args,parser |
|
def get_args(): |
|
args,_ = parser_args() |
|
return args,_ |
|
|
|
def parser_args_epi(parent_parser): |
|
""" |
|
Hyperparameters for the downstream model to predict 1kb-resolution CAGE-seq |
|
""" |
|
parser=argparse.ArgumentParser(parents=[parent_parser],add_help = False) |
|
parser.add_argument('--bins', type=int, default=500) |
|
parser.add_argument('--crop', type=int, default=10) |
|
parser.add_argument('--embed_dim', default=768, type=int) |
|
parser.add_argument('--return_embed', default=False, action='store_true') |
|
args, unknown = parser.parse_known_args() |
|
return args |
|
|
|
def parser_args_cage(parent_parser): |
|
""" |
|
Hyperparameters for the downstream model to predict 1kb-resolution CAGE-seq |
|
""" |
|
parser=argparse.ArgumentParser(parents=[parent_parser],add_help = False) |
|
parser.add_argument('--bins', type=int, default=500) |
|
parser.add_argument('--crop', type=int, default=10) |
|
parser.add_argument('--embed_dim', default=768, type=int) |
|
parser.add_argument('--return_embed', default=True, action='store_false') |
|
args, unknown = parser.parse_known_args() |
|
return args |
|
|
|
def parser_args_hic(parent_parser): |
|
""" |
|
Hyperparameters for the downstream model to predict 5kb-resolution Hi-C and ChIA-PET |
|
""" |
|
parser=argparse.ArgumentParser(parents=[parent_parser],add_help = False) |
|
parser.add_argument('--bins', type=int, default=200) |
|
parser.add_argument('--crop', type=int, default=4) |
|
parser.add_argument('--embed_dim', default=256, type=int) |
|
args, unknown = parser.parse_known_args() |
|
return args |
|
|
|
def parser_args_microc(parent_parser): |
|
""" |
|
Hyperparameters for the downstream model to predict 1kb-resolution Micro-C |
|
""" |
|
parser=argparse.ArgumentParser(parents=[parent_parser],add_help = False) |
|
parser.add_argument('--bins', type=int, default=500) |
|
parser.add_argument('--crop', type=int, default=10) |
|
parser.add_argument('--embed_dim', default=768, type=int) |
|
parser.add_argument('--return_embed', default=True, action='store_false') |
|
args, unknown = parser.parse_known_args() |
|
return args |
|
|
|
|
|
|
|
|
|
def check_region(chrom,start,end,ref_genome,region_len): |
|
start,end=int(start),int(end) |
|
if end-start != region_len: |
|
if region_len==500000: |
|
raise gradio.Error("Please enter a 500kb region!") |
|
else: |
|
raise gradio.Error("Please enter a 1Mb region!") |
|
if start<300 or end > ref_genome.shape[1]-300: |
|
raise gradio.Error("The start of input region should be greater than 300 and " |
|
"the end of the region should be less than %s!"%(ref_genome.shape[1]-300)) |
|
return int(chrom),start,end |
|
|
|
def generate_input(start,end,ref_genome,atac_seq): |
|
|
|
pad_left=np.expand_dims(np.vstack((ref_genome[:,start-300:start],atac_seq[:,start-300:start])),0) |
|
pad_right=np.expand_dims(np.vstack((ref_genome[:,end:end+300],atac_seq[:,end:end+300])),0) |
|
center=np.vstack((ref_genome[:,start:end],atac_seq[:,start:end])) |
|
center=rearrange(center,'n (b l)-> b n l',l=1000) |
|
dmatrix = np.concatenate((pad_left, center[:, :, -300:]), axis=0)[:-1, :, :] |
|
umatrix = np.concatenate((center[:, :, :300], pad_right), axis=0)[1:, :, :] |
|
return np.concatenate((dmatrix, center, umatrix), axis=2) |
|
|
|
|
|
def search_tf(tf): |
|
with open('data/epigenomes.txt', 'r') as f: |
|
epigenomes = f.read().splitlines() |
|
tf_idx= epigenomes.index(tf) |
|
return tf_idx |
|
|
|
|
|
|
|
def predict_epb( |
|
model_path, |
|
region, ref_genome,atac_seq, |
|
device, |
|
cop_type |
|
): |
|
args, parser = get_args() |
|
|
|
pretrain_model = build_epd_model(args) |
|
pretrain_model.load_state_dict(torch.load(model_path,map_location=torch.device(device))) |
|
pretrain_model.eval() |
|
pretrain_model.to(device) |
|
start,end=region |
|
inputs=generate_input(start,end,ref_genome,atac_seq) |
|
inputs=torch.tensor(inputs).float().to(device) |
|
with torch.no_grad(): |
|
pred_epi=torch.sigmoid(pretrain_model(inputs)).detach().cpu().numpy() |
|
if cop_type == 'Micro-C': |
|
return pred_epi[10:-10,:] |
|
else: |
|
return pred_epi[20:-20,:] |
|
|
|
|
|
def predict_epis( |
|
model_path, |
|
region, ref_genome,atac_seq, |
|
device, |
|
cop_type |
|
): |
|
args, parser = get_args() |
|
epi_args = parser_args_epi(parser) |
|
pretrain_model = build_track_model(epi_args) |
|
pretrain_model.load_state_dict(torch.load(model_path,map_location=torch.device(device))) |
|
pretrain_model.eval() |
|
pretrain_model.to(device) |
|
inputs=[] |
|
start,end=region |
|
if cop_type == 'Micro-C': |
|
inputs.append(generate_input(start,end,ref_genome,atac_seq)) |
|
else: |
|
for loc in range(start+20000,end-20000,480000): |
|
inputs.append(generate_input(loc-10000,loc+490000,ref_genome,atac_seq)) |
|
inputs=np.stack(inputs) |
|
inputs=torch.tensor(inputs).float().to(device) |
|
pred_epi=[] |
|
with torch.no_grad(): |
|
for i in range(inputs.shape[0]): |
|
pred_epi.append(pretrain_model(inputs[i:i+1]).detach().cpu().numpy()) |
|
|
|
out_epi = rearrange(np.vstack(pred_epi), 'i j k -> (i j) k') |
|
return out_epi |
|
|
|
def predict_cage( |
|
model_path, |
|
region, ref_genome, atac_seq, |
|
device, |
|
cop_type |
|
): |
|
args, parser = get_args() |
|
cage_args = parser_args_cage(parser) |
|
cage_model=build_cage_model(cage_args) |
|
cage_model.load_state_dict(torch.load(model_path,map_location=torch.device(device))) |
|
cage_model.eval() |
|
cage_model.to(device) |
|
inputs = [] |
|
start, end = region |
|
if cop_type == 'Micro-C': |
|
inputs.append(generate_input(start, end, ref_genome, atac_seq)) |
|
else: |
|
for loc in range(start + 20000, end - 20000, 480000): |
|
inputs.append(generate_input(loc - 10000, loc + 490000, ref_genome, atac_seq)) |
|
inputs = np.stack(inputs) |
|
inputs = torch.tensor(inputs).float().to(device) |
|
pred_cage = [] |
|
with torch.no_grad(): |
|
for i in range(inputs.shape[0]): |
|
pred_cage.append(cage_model(inputs[i:i + 1]).detach().cpu().numpy().squeeze()) |
|
return np.concatenate(pred_cage) |
|
|
|
def arraytouptri(arrays,args): |
|
effective_lens=args.bins-2*args.crop |
|
triu_tup = np.triu_indices(effective_lens) |
|
temp=np.zeros((effective_lens,effective_lens)) |
|
temp[triu_tup]=arrays |
|
return temp |
|
def complete_mat(mat): |
|
temp = mat.copy() |
|
np.fill_diagonal(temp,0) |
|
mat= mat+temp.T |
|
return mat |
|
|
|
|
|
def predict_hic( |
|
model_path, |
|
region, ref_genome,atac_seq, |
|
device |
|
): |
|
args, parser = get_args() |
|
hic_args = parser_args_hic(parser) |
|
hic_model = build_hic_model(hic_args) |
|
hic_model.load_state_dict(torch.load(model_path,map_location=torch.device(device))) |
|
hic_model.eval() |
|
hic_model.to(device) |
|
start,end=region |
|
inputs=np.stack([generate_input(start,end,ref_genome,atac_seq)]) |
|
inputs=torch.tensor(inputs).float().to(device) |
|
with torch.no_grad(): |
|
temp=hic_model(inputs).detach().cpu().numpy().squeeze() |
|
return np.stack([complete_mat(arraytouptri(temp[:,i], hic_args)) for i in range(temp.shape[-1])]) |
|
|
|
|
|
def predict_microc( |
|
model_path, |
|
region, ref_genome,atac_seq, |
|
device |
|
): |
|
args, parser = get_args() |
|
microc_args = parser_args_microc(parser) |
|
microc_model = build_microc_model(microc_args) |
|
microc_model.load_state_dict(torch.load(model_path,map_location=torch.device(device))) |
|
microc_model.eval() |
|
microc_model.to(device) |
|
start,end=region |
|
inputs=np.stack([generate_input(start,end,ref_genome,atac_seq)]) |
|
inputs=torch.tensor(inputs).float().to(device) |
|
with torch.no_grad(): |
|
temp=microc_model(inputs).detach().cpu().numpy().squeeze() |
|
return complete_mat(arraytouptri(temp, microc_args)) |
|
|
|
|
|
def filetobrowser(out_epis,out_cages,out_cop,chrom,start,end,file_id): |
|
with open('data/epigenomes.txt', 'r') as f: |
|
epigenomes = f.read().splitlines() |
|
|
|
files_to_zip = file_id |
|
if os.path.exists(files_to_zip): |
|
shutil.rmtree(files_to_zip) |
|
os.mkdir(files_to_zip) |
|
|
|
hdr=[] |
|
with open('data/chrom_size_hg38.txt', 'r') as f: |
|
for line in f: |
|
tmp=line.strip().split('\t') |
|
hdr.append((tmp[0],int(tmp[1]))) |
|
|
|
|
|
for i in range(out_epis.shape[1]): |
|
bwfile = pyBigWig.open(os.path.join(files_to_zip,"%s.bigWig"%epigenomes[i]), 'w') |
|
bwfile.addHeader(hdr) |
|
bwfile.addEntries(['chr' + str(chrom)]*out_epis.shape[0],[loc for loc in range(start,end,1000)], |
|
ends=[loc+1000 for loc in range(start,end,1000)],values=out_epis[:,i].tolist()) |
|
bwfile.close() |
|
bwfile = pyBigWig.open(os.path.join(files_to_zip,"cage.bigWig"),'w') |
|
bwfile.addHeader(hdr) |
|
|
|
bwfile.addEntries(['chr' + str(chrom)] * out_cages.shape[0], [loc for loc in range(start, end, 1000)], |
|
ends=[loc + 1000 for loc in range(start, end, 1000)], values=out_cages.tolist()) |
|
bwfile.close() |
|
cop_lines=[] |
|
|
|
interval=1000 if out_cop.shape[-1]==480 else 5000 |
|
if out_cop.shape[-1]==480: |
|
for bin1 in range(out_cop.shape[-1]): |
|
for bin2 in range(bin1,out_cop.shape[-1],1): |
|
|
|
|
|
|
|
tmp = ['0', 'chr' + str(chrom), str(start + bin1 * interval), '0', '0', 'chr' + str(chrom), |
|
str(start + bin2 * interval), '1', str(np.around(out_cop[bin1, bin2], 2))] |
|
cop_lines.append('\t'.join(tmp)+'\n') |
|
with open(os.path.join(files_to_zip,"microc.bedpe"),'w') as f: |
|
f.writelines(cop_lines) |
|
else: |
|
types=['CTCF_ChIA-PET','POLR2_ChIA-PET','Hi-C'] |
|
for i in range(len(types)): |
|
for bin1 in range(out_cop.shape[-1]): |
|
for bin2 in range(bin1, out_cop.shape[-1], 1): |
|
tmp=['0','chr' + str(chrom), str(start + bin1 * interval),'0','0','chr' +str(chrom),str(start + bin2 * interval),'1',str(np.around(out_cop[i,bin1, bin2], 2))] |
|
cop_lines.append('\t'.join(tmp) + '\n') |
|
with open(os.path.join(files_to_zip,"%s.bedpe"%types[i]), 'w') as f: |
|
f.writelines(cop_lines) |
|
|
|
out_zipfile = ZipFile("results/formatted_%s.zip" % file_id, "w", zipfile.ZIP_DEFLATED) |
|
for file_to_zip in os.listdir(files_to_zip): |
|
file_to_zip_full_path = os.path.join(files_to_zip, file_to_zip) |
|
out_zipfile.write(filename=file_to_zip_full_path, arcname=file_to_zip) |
|
|
|
out_zipfile.close() |
|
shutil.rmtree(files_to_zip) |
|
return "results/formatted_%s.zip"%file_id |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|