BenCzechMark / compare_significance.py
jstetina's picture
Ranks
49d6897
raw
history blame
9.11 kB
import argparse
import json
from collections import defaultdict
from typing import Sequence
import numpy
import numpy as np
from scipy.stats import ttest_ind, ttest_rel
from sklearn.metrics import roc_curve, auc
from tqdm import tqdm
# from leaderboard import SUPPORTED_METRICS
SUPPORTED_METRICS = [
"avg_mcauroc", # for classification tasks
"em", # for QA tasks
"acc", # for multichoice tasks
"rouge", # for summarization tasks
"ppl", # for language modeling tasks
]
def _get_CMs(i, probabilities, references, thresholds):
confusion_matrices = []
for threshold in thresholds[i]:
TP = 0
FP = 0
TN = 0
FN = 0
for j in range(len(probabilities)):
if probabilities[j][i] >= threshold:
if references[j] == i:
TP += 1
else:
FP += 1
else:
if references[j] == i:
FN += 1
else:
TN += 1
cm = {"TP": TP, "FP": FP, "TN": TN, "FN": FN, "threshold": threshold, "class": i}
confusion_matrices.append(cm)
return confusion_matrices
def compute_significance_accuracy(predsA, referencesA, predsB, referencesB):
# following https://github.com/rtmdrr/testSignificanceNLP/blob/c7302d015538944364b622eb860dd9fbee6d50ec/testSignificance.py#L164C32-L165C24
# Calculate the T-test on TWO RELATED samples of scores, a and b. for one sided test we multiply p-value by half
scores_A = [1 if pred == ref else 0 for pred, ref in zip(predsA, referencesA)]
scores_B = [1 if pred == ref else 0 for pred, ref in zip(predsB, referencesB)]
t, p = ttest_rel(scores_A, scores_B)
# correct for one-tailed test
p_value = p / 2
delta = np.mean(scores_A) - np.mean(scores_B)
return p_value, delta
def compute_significance_em(predsA, referencesA, predsB, referencesB):
pass
def compute_significance_rouge(predsA, referencesA, predsB, referencesB):
# TODO: MDocekal
# Use bootstrapping
# https://github.com/rtmdrr/testSignificanceNLP/blob/c7302d015538944364b622eb860dd9fbee6d50ec/testSignificance.py#L89
pass
def compute_significance_ppl(predsA, referencesA, predsB, referencesB):
# TODO: MDocekal
# Use bootstrapping
# https://github.com/rtmdrr/testSignificanceNLP/blob/c7302d015538944364b622eb860dd9fbee6d50ec/testSignificance.py#L89
pass
def compute_significance_avg_mcauroc(probsA: Sequence[Sequence[float]], referencesA: Sequence[int],
probsB: Sequence[Sequence[float]], referencesB: Sequence[int]):
# compute MC-AUC for model A
model_A_scores = get_mc_auc_samples(probsA, referencesA, Nsamples=1_000)
model_B_scores = get_mc_auc_samples(probsB, referencesB, Nsamples=1_000)
# one-tailed test
p_value = ((model_A_scores[:, np.newaxis] <= model_B_scores[np.newaxis, :]).sum()
/ (len(model_A_scores) * len(model_B_scores)))
delta = np.mean(model_A_scores) - np.mean(model_B_scores)
return p_value, delta
def get_mc_auc_samples(probs, references, Nsamples=1_000_000):
n_classes = list(range(len(probs[0])))
fpr = dict()
thresholds = dict()
# compute AUC for every class
auc_scores_per_class = []
for i in range(len(n_classes)):
# for i-th class vs all others
fpr[i], _, thresholds[i] = roc_curve(y_true=[1 if x == n_classes[i] else 0 for x in references],
y_score=[prob[i] for prob in probs])
confusion_matrices = _get_CMs(i, probs, references, thresholds)
位 = 1.0 # <- Flat prior
# 位 = 0.5 # <- Jeffrey's prior
# sample variates for every threshold
tpr_variates_for_each_fpr = []
for k in range(len(thresholds[i])):
tpr_variates_for_each_fpr.append(
numpy.random.beta(confusion_matrices[k]["TP"] + 位, confusion_matrices[k]["FN"] + 位, Nsamples))
# fprs x tpr_variates
tpr_variates_for_each_fpr = np.array(tpr_variates_for_each_fpr)
# now pick 1 variate for each fpr, and compute AUC
auc_scores = []
for tpr_variates in tqdm(tpr_variates_for_each_fpr.T,
desc=f"Computing AUCs for class {i + 1}/{len(n_classes)}"):
auc_score = auc(fpr[i], tpr_variates)
# if numpy.isnan(auc_score):
# auc_score = 0
auc_scores.append(auc_score)
auc_scores_per_class.append(auc_scores)
auc_scores_per_class = np.array(auc_scores_per_class)
mcauc_scores = np.mean(auc_scores_per_class, axis=0)
return mcauc_scores
def read_json(file_path):
data = defaultdict(list)
with open(file_path, "r") as f:
fc = json.load(f)
for task, results in fc["predictions"].items():
# determine the metric
metric = None
for key in SUPPORTED_METRICS:
if key in results[0]:
metric = key
break
if metric is None:
raise ValueError(f"Unsupported metric in {file_path}")
if metric == "avg_mcauroc":
local_data = [line[metric] for line in fc["predictions"][task]]
unzipped_list = list(zip(*local_data))
golds = unzipped_list[0]
probs = unzipped_list[1]
data[task] = (golds, probs), metric
return data, fc["metadata"]
def check_significance_task(fileA, fileB, task, significance_level=0.05):
dataA, metadataA = read_json(fileA)
dataB, metadataB = read_json(fileB)
print("DEBUG",fileA, task, dataA[task])
decisions = dict()
metricA = dataA[task][1]
metricB = dataB[task][1]
assert metricA == metricB
assert len(dataA[task]) == len(dataB[task])
if metricA == "avg_mcauroc":
p_value, delta = compute_significance_avg_mcauroc(probsA=dataA[task][0][1], referencesA=dataA[task][0][0],
probsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA == "acc":
p_value, delta = compute_significance_accuracy(predsA=dataA[task][0][1], referencesA=dataA[task][0][0],
predsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA == "em":
raise NotImplementedError("Exact match is not supported yet.")
elif metricA == "rouge":
raise NotImplementedError("Rouge is not supported yet.")
elif metricA == "ppl":
raise NotImplementedError("Perplexity is not supported yet.")
else:
raise ValueError(f"Unsupported metric {metricA}")
decisions[task] = {
"significant": not (p_value > significance_level),
"p_value": p_value,
"delta": delta,
}
return decisions
def check_significance(fileA, fileB, significance_level=0.05):
dataA, metadataA = read_json(fileA)
dataB, metadataB = read_json(fileB)
decisions = dict()
for task in dataA.keys():
metricA = dataA[task][1]
metricB = dataB[task][1]
assert metricA == metricB
assert len(dataA[task]) == len(dataB[task])
if metricA == "avg_mcauroc":
p_value, delta = compute_significance_avg_mcauroc(probsA=dataA[task][0][1], referencesA=dataA[task][0][0],
probsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA == "acc":
p_value, delta = compute_significance_accuracy(predsA=dataA[task][0][1], referencesA=dataA[task][0][0],
predsB=dataB[task][0][1], referencesB=dataB[task][0][0])
elif metricA == "em":
raise NotImplementedError("Exact match is not supported yet.")
elif metricA == "rouge":
raise NotImplementedError("Rouge is not supported yet.")
elif metricA == "ppl":
raise NotImplementedError("Perplexity is not supported yet.")
else:
raise ValueError(f"Unsupported metric {metricA}")
decisions[task] = {
"significant": not (p_value > significance_level),
"p_value": p_value,
"delta": delta,
}
return decisions
def main():
parser = argparse.ArgumentParser(description="One-tailed test if model A improves over model B.")
parser.add_argument("--modelA", help="ModelA JSONL file from lm harness.")
parser.add_argument("--modelB", help="ModelB JSONL file from lm harness.")
parser.add_argument("--significance_level", type=float, default=0.05, help="Significance level (e.g., 0.05)")
args = parser.parse_args()
result = check_significance(args.modelA, args.modelB, args.significance_level)
print(json.dumps(result, indent=2))
# harness already returns stderr estimate for sampling distribution
# see https://github.com/EleutherAI/lm-evaluation-harness/blob/6433bd3fe3033d302b22cdcd53af237e9039ef29/lm_eval/api/metrics.py#L213
if __name__ == "__main__":
main()