simonsv's picture
made simple functional streamlit app to host the model
1eba40c
from typing import Dict, List, Tuple, Any, Callable
from dataclasses import dataclass
import pandas as pd
import numpy as np
from sklearn.metrics import (
f1_score, accuracy_score, recall_score, confusion_matrix,
classification_report,
r2_score, mean_squared_error
)
@dataclass
class ClassificationScores:
precision: float
recall: float
f1: float
support: float = None
@classmethod
def from_dict(cls, d:Dict[str, float]) -> "ClassificationScores":
d = {k.split('-')[0]: v for k, v in d.items() if k.split('-')[0] in cls.__annotations__}
return cls(**d)
@dataclass
class RegressionScores:
r2: float
mse: float
rmse: float
@classmethod
def make(cls, true:np.ndarray, pred:np.ndarray) -> "RegressionScores":
return cls(
r2=r2_score(true, pred),
mse=mean_squared_error(true, pred),
rmse=mean_squared_error(true, pred, squared=False)
)
def __add__(self, other):
return RegressionScores(
r2=self.r2 + other.r2,
mse=self.mse + other.mse,
rmse=self.rmse + other.rmse
)
def __truediv__(self, other):
return RegressionScores(
r2=self.r2 / other,
mse=self.mse / other,
rmse=self.rmse / other
)
@dataclass
class ClassificationReport:
accuracy: float
confusion: np.ndarray
macro: ClassificationScores
weighted: ClassificationScores
labels: list
label_scores: Dict[str, ClassificationScores] # label -> ClassificationScores
f1: float = None # only for binary classification
recall: float = None # only for binary classification
@classmethod
def make_report(cls, true:np.ndarray, pred:np.ndarray) -> "ClassificationReport":
class_labels = np.unique(np.concatenate([true, pred]))
report = classification_report(true, pred, labels=class_labels, output_dict=True, zero_division=0)
rep = cls(
accuracy=report.pop('accuracy'),
confusion=confusion_matrix(true, pred, labels=class_labels),
macro=ClassificationScores.from_dict(report.pop('macro avg')),
weighted=ClassificationScores.from_dict(report.pop('weighted avg')),
label_scores={label: ClassificationScores.from_dict(scores) for label, scores in report.items()},
labels=list(class_labels)
)
if len(class_labels) == 2:
rep.f1 = f1_score(true, pred)
rep.recall = recall_score(true, pred)
return rep
@property
def df(self):
df_dict = {
'Accuracy': self.accuracy,
**{f'{score.title()} (macro)': getattr(self.macro, score) for score in self.macro.__annotations__ if score != 'support'},
}
df = pd.DataFrame([df_dict])
return df
@dataclass
class RegressionReport:
r2: float
rmse: float
labels: list = None # only for multivariate regression
label_scores: Dict[str, float] = None # only for multivariate regression
@classmethod
def make_report(cls, true:np.ndarray, pred:np.ndarray, labels=None) -> "RegressionReport":
report = cls(
r2=r2_score(true, pred),
rmse=mean_squared_error(true, pred, squared=False)
)
if len(true.shape) > 1 and true.shape[1] > 1:
report.labels = labels or list(range(true.shape[1]))
report.label_scores = {label: RegressionScores.make(true[:,i], pred[:,i]) for i,label in enumerate(report.labels)}
return report
@property
def is_multivariate(self):
return self.labels is not None
@property
def df(self):
df_dict = {
'R2 avg': self.r2,
'RMSE avg': self.rmse,
}
if self.is_multivariate:
df_dict.update({f'R2 {label}': scores.r2 for label, scores in self.label_scores.items()})
df_dict.update({f'RMSE {label}': scores.rmse for label, scores in self.label_scores.items()})
df = pd.DataFrame([df_dict])
rmse_cols = ['RMSE avg']
df = df.filter(items=['RMSE avg', 'Pearson avg'] + sorted(df.columns.difference(['Pearson avg', 'RMSE avg'])))
df.columns = df.columns.str.replace('\s(a|b|c|d)_', ' ', regex=True)
return df
@dataclass
class Results:
taska: ClassificationReport
taskb: RegressionReport
taskc: ClassificationReport
taskd: RegressionReport
def absolute_results(true_df:pd.DataFrame, pred_df:pd.DataFrame, tasks='abcd'):
task_reports = {}
for task in tasks:
true=true_df.filter(regex=f'^{task}_').sort_index(axis=1)
pred=pred_df.filter(regex=f'^{task}_').sort_index(axis=1)
if len(true.columns) == 0 or len(pred.columns) == 0:
task_reports['task'+task] = None
continue
if task in ['a', 'c']:
task_reports['task'+task] = ClassificationReport.make_report(
true=true.iloc[:,0].values,
pred=pred.iloc[:,0].values
)
else:
task_reports['task'+task] = RegressionReport.make_report(
true=true.values,
pred=pred.values,
labels=true.columns.tolist() if task == 'd' else None
)
return Results(**task_reports)
def estimators_eval(estimators:List[Tuple[str,Any]], score_func:Callable[[np.ndarray, np.ndarray], float]):
def fit_eval_estimators(X_train:np.ndarray, y_train:np.ndarray, X_test:np.ndarray, y_test:np.ndarray) -> dict:
estimator_scores = {}
for name, estimator in estimators:
estimator.fit(X_train, y_train)
y_pred = estimator.predict(X_test)
score = score_func(y_test, y_pred)#*(1.4*((y_train>th).sum()/(len(y_train)-1))) # weighted for class imbalance
print(f"\"{name}\" estimator score: {score:.4f}")
estimator_scores[name] = score
return estimator_scores
return fit_eval_estimators
def label_metrics(score_fun, y_true, y_pred):
if len(y_true.shape) > 1 and y_true.shape[1] > 1:
scores = []
for i in range(y_true.shape[1]):
scores.append(score_fun(y_true[:,i],y_pred[:,i]))
return scores
score = score_fun(y_true.ravel(), y_pred.ravel())
if isinstance(score, list):
return score
elif isinstance(score, np.ndarray):
return score.tolist()
else:
return [score]
def metrics_for_estimators(estimators, score_fun, X, y_true):
metrics = {}
for name, estimator in estimators:
y_pred = estimator.predict(X)
metrics[name] = label_metrics(score_fun, y_true, y_pred)
return metrics