KyanChen's picture
init
f549064
# Copyright (c) OpenMMLab. All rights reserved.
from typing import List, Optional, Sequence, Union
import mmengine
import numpy as np
import torch
import torch.nn.functional as F
from mmengine.evaluator import BaseMetric
from mmcls.registry import METRICS
def to_tensor(value):
"""Convert value to torch.Tensor."""
if isinstance(value, np.ndarray):
value = torch.from_numpy(value)
elif isinstance(value, Sequence) and not mmengine.is_str(value):
value = torch.tensor(value)
elif not isinstance(value, torch.Tensor):
raise TypeError(f'{type(value)} is not an available argument.')
return value
def _precision_recall_f1_support(pred_positive, gt_positive, average):
"""calculate base classification task metrics, such as precision, recall,
f1_score, support."""
average_options = ['micro', 'macro', None]
assert average in average_options, 'Invalid `average` argument, ' \
f'please specicy from {average_options}.'
# ignore -1 target such as difficult sample that is not wanted
# in evaluation results.
# only for calculate multi-label without affecting single-label behavior
ignored_index = gt_positive == -1
pred_positive[ignored_index] = 0
gt_positive[ignored_index] = 0
class_correct = (pred_positive & gt_positive)
if average == 'micro':
tp_sum = class_correct.sum()
pred_sum = pred_positive.sum()
gt_sum = gt_positive.sum()
else:
tp_sum = class_correct.sum(0)
pred_sum = pred_positive.sum(0)
gt_sum = gt_positive.sum(0)
precision = tp_sum / torch.clamp(pred_sum, min=1).float() * 100
recall = tp_sum / torch.clamp(gt_sum, min=1).float() * 100
f1_score = 2 * precision * recall / torch.clamp(
precision + recall, min=torch.finfo(torch.float32).eps)
if average in ['macro', 'micro']:
precision = precision.mean(0)
recall = recall.mean(0)
f1_score = f1_score.mean(0)
support = gt_sum.sum(0)
else:
support = gt_sum
return precision, recall, f1_score, support
@METRICS.register_module()
class Accuracy(BaseMetric):
r"""Accuracy evaluation metric.
For either binary classification or multi-class classification, the
accuracy is the fraction of correct predictions in all predictions:
.. math::
\text{Accuracy} = \frac{N_{\text{correct}}}{N_{\text{all}}}
Args:
topk (int | Sequence[int]): If the ground truth label matches one of
the best **k** predictions, the sample will be regard as a positive
prediction. If the parameter is a tuple, all of top-k accuracy will
be calculated and outputted together. Defaults to 1.
thrs (Sequence[float | None] | float | None): If a float, predictions
with score lower than the threshold will be regard as the negative
prediction. If None, not apply threshold. If the parameter is a
tuple, accuracy based on all thresholds will be calculated and
outputted together. Defaults to 0.
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
prefix (str, optional): The prefix that will be added in the metric
names to disambiguate homonymous metrics of different evaluators.
If prefix is not provided in the argument, self.default_prefix
will be used instead. Defaults to None.
Examples:
>>> import torch
>>> from mmcls.evaluation import Accuracy
>>> # -------------------- The Basic Usage --------------------
>>> y_pred = [0, 2, 1, 3]
>>> y_true = [0, 1, 2, 3]
>>> Accuracy.calculate(y_pred, y_true)
tensor([50.])
>>> # Calculate the top1 and top5 accuracy.
>>> y_score = torch.rand((1000, 10))
>>> y_true = torch.zeros((1000, ))
>>> Accuracy.calculate(y_score, y_true, topk=(1, 5))
[[tensor([9.9000])], [tensor([51.5000])]]
>>>
>>> # ------------------- Use with Evalutor -------------------
>>> from mmcls.structures import ClsDataSample
>>> from mmengine.evaluator import Evaluator
>>> data_samples = [
... ClsDataSample().set_gt_label(0).set_pred_score(torch.rand(10))
... for i in range(1000)
... ]
>>> evaluator = Evaluator(metrics=Accuracy(topk=(1, 5)))
>>> evaluator.process(data_samples)
>>> evaluator.evaluate(1000)
{
'accuracy/top1': 9.300000190734863,
'accuracy/top5': 51.20000076293945
}
"""
default_prefix: Optional[str] = 'accuracy'
def __init__(self,
topk: Union[int, Sequence[int]] = (1, ),
thrs: Union[float, Sequence[Union[float, None]], None] = 0.,
collect_device: str = 'cpu',
prefix: Optional[str] = None) -> None:
super().__init__(collect_device=collect_device, prefix=prefix)
if isinstance(topk, int):
self.topk = (topk, )
else:
self.topk = tuple(topk)
if isinstance(thrs, float) or thrs is None:
self.thrs = (thrs, )
else:
self.thrs = tuple(thrs)
def process(self, data_batch, data_samples: Sequence[dict]):
"""Process one batch of data samples.
The processed results should be stored in ``self.results``, which will
be used to computed the metrics when all batches have been processed.
Args:
data_batch: A batch of data from the dataloader.
data_samples (Sequence[dict]): A batch of outputs from the model.
"""
for data_sample in data_samples:
result = dict()
pred_label = data_sample['pred_label']
gt_label = data_sample['gt_label']
if 'score' in pred_label:
result['pred_score'] = pred_label['score'].cpu()
else:
result['pred_label'] = pred_label['label'].cpu()
result['gt_label'] = gt_label['label'].cpu()
# Save the result to `self.results`.
self.results.append(result)
def compute_metrics(self, results: List):
"""Compute the metrics from processed results.
Args:
results (dict): The processed results of each batch.
Returns:
Dict: The computed metrics. The keys are the names of the metrics,
and the values are corresponding results.
"""
# NOTICE: don't access `self.results` from the method.
metrics = {}
# concat
target = torch.cat([res['gt_label'] for res in results])
if 'pred_score' in results[0]:
pred = torch.stack([res['pred_score'] for res in results])
try:
acc = self.calculate(pred, target, self.topk, self.thrs)
except ValueError as e:
# If the topk is invalid.
raise ValueError(
str(e) + ' Please check the `val_evaluator` and '
'`test_evaluator` fields in your config file.')
multi_thrs = len(self.thrs) > 1
for i, k in enumerate(self.topk):
for j, thr in enumerate(self.thrs):
name = f'top{k}'
if multi_thrs:
name += '_no-thr' if thr is None else f'_thr-{thr:.2f}'
metrics[name] = acc[i][j].item()
else:
# If only label in the `pred_label`.
pred = torch.cat([res['pred_label'] for res in results])
acc = self.calculate(pred, target, self.topk, self.thrs)
metrics['top1'] = acc.item()
return metrics
@staticmethod
def calculate(
pred: Union[torch.Tensor, np.ndarray, Sequence],
target: Union[torch.Tensor, np.ndarray, Sequence],
topk: Sequence[int] = (1, ),
thrs: Sequence[Union[float, None]] = (0., ),
) -> Union[torch.Tensor, List[List[torch.Tensor]]]:
"""Calculate the accuracy.
Args:
pred (torch.Tensor | np.ndarray | Sequence): The prediction
results. It can be labels (N, ), or scores of every
class (N, C).
target (torch.Tensor | np.ndarray | Sequence): The target of
each prediction with shape (N, ).
thrs (Sequence[float | None]): Predictions with scores under
the thresholds are considered negative. It's only used
when ``pred`` is scores. None means no thresholds.
Defaults to (0., ).
thrs (Sequence[float]): Predictions with scores under
the thresholds are considered negative. It's only used
when ``pred`` is scores. Defaults to (0., ).
Returns:
torch.Tensor | List[List[torch.Tensor]]: Accuracy.
- torch.Tensor: If the ``pred`` is a sequence of label instead of
score (number of dimensions is 1). Only return a top-1 accuracy
tensor, and ignore the argument ``topk` and ``thrs``.
- List[List[torch.Tensor]]: If the ``pred`` is a sequence of score
(number of dimensions is 2). Return the accuracy on each ``topk``
and ``thrs``. And the first dim is ``topk``, the second dim is
``thrs``.
"""
pred = to_tensor(pred)
target = to_tensor(target).to(torch.int64)
num = pred.size(0)
assert pred.size(0) == target.size(0), \
f"The size of pred ({pred.size(0)}) doesn't match "\
f'the target ({target.size(0)}).'
if pred.ndim == 1:
# For pred label, ignore topk and acc
pred_label = pred.int()
correct = pred.eq(target).float().sum(0, keepdim=True)
acc = correct.mul_(100. / num)
return acc
else:
# For pred score, calculate on all topk and thresholds.
pred = pred.float()
maxk = max(topk)
if maxk > pred.size(1):
raise ValueError(
f'Top-{maxk} accuracy is unavailable since the number of '
f'categories is {pred.size(1)}.')
pred_score, pred_label = pred.topk(maxk, dim=1)
pred_label = pred_label.t()
correct = pred_label.eq(target.view(1, -1).expand_as(pred_label))
results = []
for k in topk:
results.append([])
for thr in thrs:
# Only prediction values larger than thr are counted
# as correct
_correct = correct
if thr is not None:
_correct = _correct & (pred_score.t() > thr)
correct_k = _correct[:k].reshape(-1).float().sum(
0, keepdim=True)
acc = correct_k.mul_(100. / num)
results[-1].append(acc)
return results
@METRICS.register_module()
class SingleLabelMetric(BaseMetric):
r"""A collection of precision, recall, f1-score and support for
single-label tasks.
The collection of metrics is for single-label multi-class classification.
And all these metrics are based on the confusion matrix of every category:
.. image:: ../../_static/image/confusion-matrix.png
:width: 60%
:align: center
All metrics can be formulated use variables above:
**Precision** is the fraction of correct predictions in all predictions:
.. math::
\text{Precision} = \frac{TP}{TP+FP}
**Recall** is the fraction of correct predictions in all targets:
.. math::
\text{Recall} = \frac{TP}{TP+FN}
**F1-score** is the harmonic mean of the precision and recall:
.. math::
\text{F1-score} = \frac{2\times\text{Recall}\times\text{Precision}}{\text{Recall}+\text{Precision}}
**Support** is the number of samples:
.. math::
\text{Support} = TP + TN + FN + FP
Args:
thrs (Sequence[float | None] | float | None): If a float, predictions
with score lower than the threshold will be regard as the negative
prediction. If None, only the top-1 prediction will be regard as
the positive prediction. If the parameter is a tuple, accuracy
based on all thresholds will be calculated and outputted together.
Defaults to 0.
items (Sequence[str]): The detailed metric items to evaluate, select
from "precision", "recall", "f1-score" and "support".
Defaults to ``('precision', 'recall', 'f1-score')``.
average (str | None): How to calculate the final metrics from the
confusion matrix of every category. It supports three modes:
- `"macro"`: Calculate metrics for each category, and calculate
the mean value over all categories.
- `"micro"`: Average the confusion matrix over all categories and
calculate metrics on the mean confusion matrix.
- `None`: Calculate metrics of every category and output directly.
Defaults to "macro".
num_classes (int, optional): The number of classes. Defaults to None.
collect_device (str): Device name used for collecting results from
different ranks during distributed training. Must be 'cpu' or
'gpu'. Defaults to 'cpu'.
prefix (str, optional): The prefix that will be added in the metric
names to disambiguate homonymous metrics of different evaluators.
If prefix is not provided in the argument, self.default_prefix
will be used instead. Defaults to None.
Examples:
>>> import torch
>>> from mmcls.evaluation import SingleLabelMetric
>>> # -------------------- The Basic Usage --------------------
>>> y_pred = [0, 1, 1, 3]
>>> y_true = [0, 2, 1, 3]
>>> # Output precision, recall, f1-score and support.
>>> SingleLabelMetric.calculate(y_pred, y_true, num_classes=4)
(tensor(62.5000), tensor(75.), tensor(66.6667), tensor(4))
>>> # Calculate with different thresholds.
>>> y_score = torch.rand((1000, 10))
>>> y_true = torch.zeros((1000, ))
>>> SingleLabelMetric.calculate(y_score, y_true, thrs=(0., 0.9))
[(tensor(10.), tensor(0.9500), tensor(1.7352), tensor(1000)),
(tensor(10.), tensor(0.5500), tensor(1.0427), tensor(1000))]
>>>
>>> # ------------------- Use with Evalutor -------------------
>>> from mmcls.structures import ClsDataSample
>>> from mmengine.evaluator import Evaluator
>>> data_samples = [
... ClsDataSample().set_gt_label(i%5).set_pred_score(torch.rand(5))
... for i in range(1000)
... ]
>>> evaluator = Evaluator(metrics=SingleLabelMetric())
>>> evaluator.process(data_samples)
>>> evaluator.evaluate(1000)
{'single-label/precision': 19.650691986083984,
'single-label/recall': 19.600000381469727,
'single-label/f1-score': 19.619548797607422}
>>> # Evaluate on each class
>>> evaluator = Evaluator(metrics=SingleLabelMetric(average=None))
>>> evaluator.process(data_samples)
>>> evaluator.evaluate(1000)
{
'single-label/precision_classwise': [21.1, 18.7, 17.8, 19.4, 16.1],
'single-label/recall_classwise': [18.5, 18.5, 17.0, 20.0, 18.0],
'single-label/f1-score_classwise': [19.7, 18.6, 17.1, 19.7, 17.0]
}
""" # noqa: E501
default_prefix: Optional[str] = 'single-label'
def __init__(self,
thrs: Union[float, Sequence[Union[float, None]], None] = 0.,
items: Sequence[str] = ('precision', 'recall', 'f1-score'),
average: Optional[str] = 'macro',
num_classes: Optional[int] = None,
collect_device: str = 'cpu',
prefix: Optional[str] = None) -> None:
super().__init__(collect_device=collect_device, prefix=prefix)
if isinstance(thrs, float) or thrs is None:
self.thrs = (thrs, )
else:
self.thrs = tuple(thrs)
for item in items:
assert item in ['precision', 'recall', 'f1-score', 'support'], \
f'The metric {item} is not supported by `SingleLabelMetric`,' \
' please specicy from "precision", "recall", "f1-score" and ' \
'"support".'
self.items = tuple(items)
self.average = average
self.num_classes = num_classes
def process(self, data_batch, data_samples: Sequence[dict]):
"""Process one batch of data samples.
The processed results should be stored in ``self.results``, which will
be used to computed the metrics when all batches have been processed.
Args:
data_batch: A batch of data from the dataloader.
data_samples (Sequence[dict]): A batch of outputs from the model.
"""
for data_sample in data_samples:
result = dict()
pred_label = data_sample['pred_label']
gt_label = data_sample['gt_label']
if 'score' in pred_label:
result['pred_score'] = pred_label['score'].cpu()
else:
num_classes = self.num_classes or data_sample.get(
'num_classes')
assert num_classes is not None, \
'The `num_classes` must be specified if `pred_label` has '\
'only `label`.'
result['pred_label'] = pred_label['label'].cpu()
result['num_classes'] = num_classes
result['gt_label'] = gt_label['label'].cpu()
# Save the result to `self.results`.
self.results.append(result)
def compute_metrics(self, results: List):
"""Compute the metrics from processed results.
Args:
results (list): The processed results of each batch.
Returns:
Dict: The computed metrics. The keys are the names of the metrics,
and the values are corresponding results.
"""
# NOTICE: don't access `self.results` from the method. `self.results`
# are a list of results from multiple batch, while the input `results`
# are the collected results.
metrics = {}
def pack_results(precision, recall, f1_score, support):
single_metrics = {}
if 'precision' in self.items:
single_metrics['precision'] = precision
if 'recall' in self.items:
single_metrics['recall'] = recall
if 'f1-score' in self.items:
single_metrics['f1-score'] = f1_score
if 'support' in self.items:
single_metrics['support'] = support
return single_metrics
# concat
target = torch.cat([res['gt_label'] for res in results])
if 'pred_score' in results[0]:
pred = torch.stack([res['pred_score'] for res in results])
metrics_list = self.calculate(
pred, target, thrs=self.thrs, average=self.average)
multi_thrs = len(self.thrs) > 1
for i, thr in enumerate(self.thrs):
if multi_thrs:
suffix = '_no-thr' if thr is None else f'_thr-{thr:.2f}'
else:
suffix = ''
for k, v in pack_results(*metrics_list[i]).items():
metrics[k + suffix] = v
else:
# If only label in the `pred_label`.
pred = torch.cat([res['pred_label'] for res in results])
res = self.calculate(
pred,
target,
average=self.average,
num_classes=results[0]['num_classes'])
metrics = pack_results(*res)
result_metrics = dict()
for k, v in metrics.items():
if self.average is None:
result_metrics[k + '_classwise'] = v.cpu().detach().tolist()
elif self.average == 'micro':
result_metrics[k + f'_{self.average}'] = v.item()
else:
result_metrics[k] = v.item()
return result_metrics
@staticmethod
def calculate(
pred: Union[torch.Tensor, np.ndarray, Sequence],
target: Union[torch.Tensor, np.ndarray, Sequence],
thrs: Sequence[Union[float, None]] = (0., ),
average: Optional[str] = 'macro',
num_classes: Optional[int] = None,
) -> Union[torch.Tensor, List[torch.Tensor]]:
"""Calculate the precision, recall, f1-score and support.
Args:
pred (torch.Tensor | np.ndarray | Sequence): The prediction
results. It can be labels (N, ), or scores of every
class (N, C).
target (torch.Tensor | np.ndarray | Sequence): The target of
each prediction with shape (N, ).
thrs (Sequence[float | None]): Predictions with scores under
the thresholds are considered negative. It's only used
when ``pred`` is scores. None means no thresholds.
Defaults to (0., ).
average (str | None): How to calculate the final metrics from
the confusion matrix of every category. It supports three
modes:
- `"macro"`: Calculate metrics for each category, and calculate
the mean value over all categories.
- `"micro"`: Average the confusion matrix over all categories
and calculate metrics on the mean confusion matrix.
- `None`: Calculate metrics of every category and output
directly.
Defaults to "macro".
num_classes (Optional, int): The number of classes. If the ``pred``
is label instead of scores, this argument is required.
Defaults to None.
Returns:
Tuple: The tuple contains precision, recall and f1-score.
And the type of each item is:
- torch.Tensor: If the ``pred`` is a sequence of label instead of
score (number of dimensions is 1). Only returns a tensor for
each metric. The shape is (1, ) if ``classwise`` is False, and
(C, ) if ``classwise`` is True.
- List[torch.Tensor]: If the ``pred`` is a sequence of score
(number of dimensions is 2). Return the metrics on each ``thrs``.
The shape of tensor is (1, ) if ``classwise`` is False, and (C, )
if ``classwise`` is True.
"""
average_options = ['micro', 'macro', None]
assert average in average_options, 'Invalid `average` argument, ' \
f'please specicy from {average_options}.'
pred = to_tensor(pred)
target = to_tensor(target).to(torch.int64)
assert pred.size(0) == target.size(0), \
f"The size of pred ({pred.size(0)}) doesn't match "\
f'the target ({target.size(0)}).'
if pred.ndim == 1:
assert num_classes is not None, \
'Please specicy the `num_classes` if the `pred` is labels ' \
'intead of scores.'
gt_positive = F.one_hot(target.flatten(), num_classes)
pred_positive = F.one_hot(pred.to(torch.int64), num_classes)
return _precision_recall_f1_support(pred_positive, gt_positive,
average)
else:
# For pred score, calculate on all thresholds.
num_classes = pred.size(1)
pred_score, pred_label = torch.topk(pred, k=1)
pred_score = pred_score.flatten()
pred_label = pred_label.flatten()
gt_positive = F.one_hot(target.flatten(), num_classes)
results = []
for thr in thrs:
pred_positive = F.one_hot(pred_label, num_classes)
if thr is not None:
pred_positive[pred_score <= thr] = 0
results.append(
_precision_recall_f1_support(pred_positive, gt_positive,
average))
return results