Spaces:
Runtime error
Runtime error
File size: 9,077 Bytes
f513a95 0700577 f513a95 0700577 f513a95 0700577 f513a95 0700577 f513a95 0700577 f513a95 0700577 f513a95 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
from transformers import RobertaTokenizer, T5Config, T5EncoderModel
from statement_t5 import StatementT5
import torch
import pickle
import numpy as np
import onnxruntime
def to_numpy(tensor):
""" get np input for onnx runtime model """
return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()
def predict_vul_lines(code: list, gpu: bool = False) -> dict:
"""Generate statement-level and function-level vulnerability prediction probabilities.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores"
"batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
"batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred"
"batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
"batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred"
"""
MAX_STATEMENTS = 155
MAX_STATEMENT_LENGTH = 20
DEVICE = 'cuda' if gpu else 'cpu'
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./utils/statement_t5_tokenizer")
# load model
config = T5Config.from_pretrained("./utils/t5_config.json")
model = T5EncoderModel(config=config)
model = StatementT5(model, tokenizer, device=DEVICE)
output_dir = "./models/statement_t5_model.bin"
model.load_state_dict(torch.load(output_dir, map_location=DEVICE))
model.to(DEVICE)
model.eval()
input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer)
with torch.no_grad():
statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask)
func_preds = torch.argmax(func_probs, dim=-1)
statement_preds = torch.where(statement_probs>0.5, 1, 0)
return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs,
"batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs}
def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer):
batch_input_ids = []
batch_statement_mask = []
for c in code:
source = c.split("\n")
source = [statement for statement in source if statement != ""]
source = source[:max_statements]
padding_statement = [tokenizer.pad_token_id for _ in range(20)]
input_ids = []
for stat in source:
ids_ = tokenizer.encode(str(stat),
truncation=True,
max_length=max_statement_length,
padding='max_length',
add_special_tokens=False)
input_ids.append(ids_)
if len(input_ids) < max_statements:
for _ in range(max_statements-len(input_ids)):
input_ids.append(padding_statement)
statement_mask = []
for statement in input_ids:
if statement == padding_statement:
statement_mask.append(0)
else:
statement_mask.append(1)
batch_input_ids.append(input_ids)
batch_statement_mask.append(statement_mask)
return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask)
def predict_cweid(code: list, gpu: bool = False) -> dict:
"""Generate CWE-IDs and CWE Abstract Types Predictions.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob"
"cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...]
"cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...]
"cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...]
"cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...]
"""
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
with open("./utils/label_map.pkl", "rb") as f:
cwe_id_map, cwe_type_map = pickle.load(f)
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer")
tokenizer.add_tokens(["<cls_type>"])
tokenizer.cls_type_token = "<cls_type>"
model_input = []
for c in code:
code_tokens = tokenizer.tokenize(str(c))[:512 - 3]
source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token]
input_ids = tokenizer.convert_tokens_to_ids(source_tokens)
padding_length = 512 - len(input_ids)
input_ids += [tokenizer.pad_token_id] * padding_length
model_input.append(input_ids)
device = "cuda" if gpu else "cpu"
model_input = torch.tensor(model_input, device=device)
# onnx runtime session
ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider)
# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs)
# batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist()
# map predicted idx back to CWE-ID
batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id]
# batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
batch_cwe_id_pred_prob = []
for i in range(len(cwe_id_prob)):
batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item())
# batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist()
# map predicted idx back to CWE-Type
batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type]
# batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
batch_cwe_type_pred_prob = []
for i in range(len(cwe_type_prob)):
batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item())
return {"cwe_id": batch_cwe_id_pred,
"cwe_id_prob": batch_cwe_id_pred_prob,
"cwe_type": batch_cwe_type_pred,
"cwe_type_prob": batch_cwe_type_pred_prob}
def predict_sev(code: list, gpu: bool = False) -> dict:
"""Generate CVSS severity score predictions.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with two keys, "batch_sev_score", "batch_sev_class"
"batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...]
"batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...]
"""
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer")
model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length',
return_tensors="pt").input_ids
# onnx runtime session
ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider)
# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
cvss_score = ort_session.run(None, ort_inputs)
batch_sev_score = list(cvss_score[0].flatten().tolist())
batch_sev_class = []
for i in range(len(batch_sev_score)):
if batch_sev_score[i] == 0:
batch_sev_class.append("None")
elif batch_sev_score[i] < 4:
batch_sev_class.append("Low")
elif batch_sev_score[i] < 7:
batch_sev_class.append("Medium")
elif batch_sev_score[i] < 9:
batch_sev_class.append("High")
else:
batch_sev_class.append("Critical")
return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class}
if __name__ == "__main__":
import pandas as pd
df = pd.read_csv("./data/processed_test.csv")
funcs = df["func_before"].tolist()
for code in funcs:
out = predict_vul_lines([code])
print(out["batch_func_pred"][0]) |