# Copyright (c) OpenMMLab. All rights reserved. from typing import List, Optional, Sequence, Union import numpy as np import torch from mmengine.evaluator import BaseMetric from mmengine.logging import MMLogger from mmengine.structures import LabelData from mmcls.registry import METRICS from .single_label import _precision_recall_f1_support, to_tensor @METRICS.register_module() class MultiLabelMetric(BaseMetric): r"""A collection of precision, recall, f1-score and support for multi-label tasks. The collection of metrics is for single-label multi-class classification. And all these metrics are based on the confusion matrix of every category: .. image:: ../../_static/image/confusion-matrix.png :width: 60% :align: center All metrics can be formulated use variables above: **Precision** is the fraction of correct predictions in all predictions: .. math:: \text{Precision} = \frac{TP}{TP+FP} **Recall** is the fraction of correct predictions in all targets: .. math:: \text{Recall} = \frac{TP}{TP+FN} **F1-score** is the harmonic mean of the precision and recall: .. math:: \text{F1-score} = \frac{2\times\text{Recall}\times\text{Precision}}{\text{Recall}+\text{Precision}} **Support** is the number of samples: .. math:: \text{Support} = TP + TN + FN + FP Args: thr (float, optional): Predictions with scores under the threshold are considered as negative. If None, the ``topk`` predictions will be considered as positive. If the ``topk`` is also None, use ``thr=0.5`` as default. Defaults to None. topk (int, optional): Predictions with the k-th highest scores are considered as positive. If None, use ``thr`` to determine positive predictions. If both ``thr`` and ``topk`` are not None, use ``thr``. Defaults to None. items (Sequence[str]): The detailed metric items to evaluate, select from "precision", "recall", "f1-score" and "support". Defaults to ``('precision', 'recall', 'f1-score')``. average (str | None): How to calculate the final metrics from the confusion matrix of every category. It supports three modes: - `"macro"`: Calculate metrics for each category, and calculate the mean value over all categories. - `"micro"`: Average the confusion matrix over all categories and calculate metrics on the mean confusion matrix. - `None`: Calculate metrics of every category and output directly. Defaults to "macro". collect_device (str): Device name used for collecting results from different ranks during distributed training. Must be 'cpu' or 'gpu'. Defaults to 'cpu'. prefix (str, optional): The prefix that will be added in the metric names to disambiguate homonymous metrics of different evaluators. If prefix is not provided in the argument, self.default_prefix will be used instead. Defaults to None. Examples: >>> import torch >>> from mmcls.evaluation import MultiLabelMetric >>> # ------ The Basic Usage for category indices labels ------- >>> y_pred = [[0], [1], [0, 1], [3]] >>> y_true = [[0, 3], [0, 2], [1], [3]] >>> # Output precision, recall, f1-score and support >>> MultiLabelMetric.calculate( ... y_pred, y_true, pred_indices=True, target_indices=True, num_classes=4) (tensor(50.), tensor(50.), tensor(45.8333), tensor(6)) >>> # ----------- The Basic Usage for one-hot labels ----------- >>> y_pred = torch.tensor([[1, 1, 0, 0], ... [1, 1, 0, 0], ... [0, 0, 1, 0], ... [0, 1, 0, 0], ... [0, 1, 0, 0]]) >>> y_true = torch.Tensor([[1, 1, 0, 0], ... [0, 0, 1, 0], ... [1, 1, 1, 0], ... [1, 0, 0, 0], ... [1, 0, 0, 0]]) >>> MultiLabelMetric.calculate(y_pred, y_true) (tensor(43.7500), tensor(31.2500), tensor(33.3333), tensor(8)) >>> # --------- The Basic Usage for one-hot pred scores --------- >>> y_pred = torch.rand(y_true.size()) >>> y_pred tensor([[0.4575, 0.7335, 0.3934, 0.2572], [0.1318, 0.1004, 0.8248, 0.6448], [0.8349, 0.6294, 0.7896, 0.2061], [0.4037, 0.7308, 0.6713, 0.8374], [0.3779, 0.4836, 0.0313, 0.0067]]) >>> # Calculate with different threshold. >>> MultiLabelMetric.calculate(y_pred, y_true, thr=0.1) (tensor(42.5000), tensor(75.), tensor(53.1746), tensor(8)) >>> # Calculate with topk. >>> MultiLabelMetric.calculate(y_pred, y_true, topk=1) (tensor(62.5000), tensor(31.2500), tensor(39.1667), tensor(8)) >>> >>> # ------------------- Use with Evalutor ------------------- >>> from mmcls.structures import ClsDataSample >>> from mmengine.evaluator import Evaluator >>> data_sampels = [ ... ClsDataSample().set_pred_score(pred).set_gt_score(gt) ... for pred, gt in zip(torch.rand(1000, 5), torch.randint(0, 2, (1000, 5)))] >>> evaluator = Evaluator(metrics=MultiLabelMetric(thr=0.5)) >>> evaluator.process(data_sampels) >>> evaluator.evaluate(1000) { 'multi-label/precision': 50.72898037055408, 'multi-label/recall': 50.06836461357571, 'multi-label/f1-score': 50.384466955258475 } >>> # Evaluate on each class by using topk strategy >>> evaluator = Evaluator(metrics=MultiLabelMetric(topk=1, average=None)) >>> evaluator.process(data_sampels) >>> evaluator.evaluate(1000) { 'multi-label/precision_top1_classwise': [48.22, 50.54, 50.99, 44.18, 52.5], 'multi-label/recall_top1_classwise': [18.92, 19.22, 19.92, 20.0, 20.27], 'multi-label/f1-score_top1_classwise': [27.18, 27.85, 28.65, 27.54, 29.25] } """ # noqa: E501 default_prefix: Optional[str] = 'multi-label' def __init__(self, thr: Optional[float] = None, topk: Optional[int] = None, items: Sequence[str] = ('precision', 'recall', 'f1-score'), average: Optional[str] = 'macro', collect_device: str = 'cpu', prefix: Optional[str] = None) -> None: logger = MMLogger.get_current_instance() if thr is None and topk is None: thr = 0.5 logger.warning('Neither thr nor k is given, set thr as 0.5 by ' 'default.') elif thr is not None and topk is not None: logger.warning('Both thr and topk are given, ' 'use threshold in favor of top-k.') self.thr = thr self.topk = topk self.average = average for item in items: assert item in ['precision', 'recall', 'f1-score', 'support'], \ f'The metric {item} is not supported by `SingleLabelMetric`,' \ ' please choose from "precision", "recall", "f1-score" and ' \ '"support".' self.items = tuple(items) super().__init__(collect_device=collect_device, prefix=prefix) def process(self, data_batch, data_samples: Sequence[dict]): """Process one batch of data samples. The processed results should be stored in ``self.results``, which will be used to computed the metrics when all batches have been processed. Args: data_batch: A batch of data from the dataloader. data_samples (Sequence[dict]): A batch of outputs from the model. """ for data_sample in data_samples: result = dict() pred_label = data_sample['pred_label'] gt_label = data_sample['gt_label'] result['pred_score'] = pred_label['score'].clone() num_classes = result['pred_score'].size()[-1] if 'score' in gt_label: result['gt_score'] = gt_label['score'].clone() else: result['gt_score'] = LabelData.label_to_onehot( gt_label['label'], num_classes) # Save the result to `self.results`. self.results.append(result) def compute_metrics(self, results: List): """Compute the metrics from processed results. Args: results (list): The processed results of each batch. Returns: Dict: The computed metrics. The keys are the names of the metrics, and the values are corresponding results. """ # NOTICE: don't access `self.results` from the method. `self.results` # are a list of results from multiple batch, while the input `results` # are the collected results. metrics = {} target = torch.stack([res['gt_score'] for res in results]) pred = torch.stack([res['pred_score'] for res in results]) metric_res = self.calculate( pred, target, pred_indices=False, target_indices=False, average=self.average, thr=self.thr, topk=self.topk) def pack_results(precision, recall, f1_score, support): single_metrics = {} if 'precision' in self.items: single_metrics['precision'] = precision if 'recall' in self.items: single_metrics['recall'] = recall if 'f1-score' in self.items: single_metrics['f1-score'] = f1_score if 'support' in self.items: single_metrics['support'] = support return single_metrics if self.thr: suffix = '' if self.thr == 0.5 else f'_thr-{self.thr:.2f}' for k, v in pack_results(*metric_res).items(): metrics[k + suffix] = v else: for k, v in pack_results(*metric_res).items(): metrics[k + f'_top{self.topk}'] = v result_metrics = dict() for k, v in metrics.items(): if self.average is None: result_metrics[k + '_classwise'] = v.detach().cpu().tolist() elif self.average == 'macro': result_metrics[k] = v.item() else: result_metrics[k + f'_{self.average}'] = v.item() return result_metrics @staticmethod def calculate( pred: Union[torch.Tensor, np.ndarray, Sequence], target: Union[torch.Tensor, np.ndarray, Sequence], pred_indices: bool = False, target_indices: bool = False, average: Optional[str] = 'macro', thr: Optional[float] = None, topk: Optional[int] = None, num_classes: Optional[int] = None ) -> Union[torch.Tensor, List[torch.Tensor]]: """Calculate the precision, recall, f1-score. Args: pred (torch.Tensor | np.ndarray | Sequence): The prediction results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with shape ``(N, num_classes)`` or a sequence of index/onehot format labels. target (torch.Tensor | np.ndarray | Sequence): The prediction results. A :obj:`torch.Tensor` or :obj:`np.ndarray` with shape ``(N, num_classes)`` or a sequence of index/onehot format labels. pred_indices (bool): Whether the ``pred`` is a sequence of category index labels. If True, ``num_classes`` must be set. Defaults to False. target_indices (bool): Whether the ``target`` is a sequence of category index labels. If True, ``num_classes`` must be set. Defaults to False. average (str | None): How to calculate the final metrics from the confusion matrix of every category. It supports three modes: - `"macro"`: Calculate metrics for each category, and calculate the mean value over all categories. - `"micro"`: Average the confusion matrix over all categories and calculate metrics on the mean confusion matrix. - `None`: Calculate metrics of every category and output directly. Defaults to "macro". thr (float, optional): Predictions with scores under the thresholds are considered as negative. Defaults to None. topk (int, optional): Predictions with the k-th highest scores are considered as positive. Defaults to None. num_classes (Optional, int): The number of classes. If the ``pred`` is indices instead of onehot, this argument is required. Defaults to None. Returns: Tuple: The tuple contains precision, recall and f1-score. And the type of each item is: - torch.Tensor: A tensor for each metric. The shape is (1, ) if ``average`` is not None, and (C, ) if ``average`` is None. Notes: If both ``thr`` and ``topk`` are set, use ``thr` to determine positive predictions. If neither is set, use ``thr=0.5`` as default. """ average_options = ['micro', 'macro', None] assert average in average_options, 'Invalid `average` argument, ' \ f'please specicy from {average_options}.' def _format_label(label, is_indices): """format various label to torch.Tensor.""" if isinstance(label, np.ndarray): assert label.ndim == 2, 'The shape `pred` and `target` ' \ 'array must be (N, num_classes).' label = torch.from_numpy(label) elif isinstance(label, torch.Tensor): assert label.ndim == 2, 'The shape `pred` and `target` ' \ 'tensor must be (N, num_classes).' elif isinstance(label, Sequence): if is_indices: assert num_classes is not None, 'For index-type labels, ' \ 'please specify `num_classes`.' label = torch.stack([ LabelData.label_to_onehot( to_tensor(indices), num_classes) for indices in label ]) else: label = torch.stack( [to_tensor(onehot) for onehot in label]) else: raise TypeError( 'The `pred` and `target` must be type of torch.tensor or ' f'np.ndarray or sequence but get {type(label)}.') return label pred = _format_label(pred, pred_indices) target = _format_label(target, target_indices).long() assert pred.shape == target.shape, \ f"The size of pred ({pred.shape}) doesn't match "\ f'the target ({target.shape}).' if num_classes is not None: assert pred.size(1) == num_classes, \ f'The shape of `pred` ({pred.shape}) '\ f"doesn't match the num_classes ({num_classes})." num_classes = pred.size(1) thr = 0.5 if (thr is None and topk is None) else thr if thr is not None: # a label is predicted positive if larger than thr pos_inds = (pred >= thr).long() else: # top-k labels will be predicted positive for any example _, topk_indices = pred.topk(topk) pos_inds = torch.zeros_like(pred).scatter_(1, topk_indices, 1) pos_inds = pos_inds.long() return _precision_recall_f1_support(pos_inds, target, average) def _average_precision(pred: torch.Tensor, target: torch.Tensor) -> torch.Tensor: r"""Calculate the average precision for a single class. AP summarizes a precision-recall curve as the weighted mean of maximum precisions obtained for any r'>r, where r is the recall: .. math:: \text{AP} = \sum_n (R_n - R_{n-1}) P_n Note that no approximation is involved since the curve is piecewise constant. Args: pred (torch.Tensor): The model prediction with shape ``(N, num_classes)``. target (torch.Tensor): The target of predictions with shape ``(N, num_classes)``. Returns: torch.Tensor: average precision result. """ assert pred.shape == target.shape, \ f"The size of pred ({pred.shape}) doesn't match "\ f'the target ({target.shape}).' # a small value for division by zero errors eps = torch.finfo(torch.float32).eps # get rid of -1 target such as difficult sample # that is not wanted in evaluation results. valid_index = target > -1 pred = pred[valid_index] target = target[valid_index] # sort examples sorted_pred_inds = torch.argsort(pred, dim=0, descending=True) sorted_target = target[sorted_pred_inds] # get indexes when gt_true is positive pos_inds = sorted_target == 1 # Calculate cumulative tp case numbers tps = torch.cumsum(pos_inds, 0) total_pos = tps[-1].item() # the last of tensor may change later # Calculate cumulative tp&fp(pred_poss) case numbers pred_pos_nums = torch.arange(1, len(sorted_target) + 1).to(pred.device) pred_pos_nums[pred_pos_nums < eps] = eps tps[torch.logical_not(pos_inds)] = 0 precision = tps / pred_pos_nums.float() ap = torch.sum(precision, 0) / max(total_pos, eps) return ap @METRICS.register_module() class AveragePrecision(BaseMetric): r"""Calculate the average precision with respect of classes. AveragePrecision (AP) summarizes a precision-recall curve as the weighted mean of maximum precisions obtained for any r'>r, where r is the recall: .. math:: \text{AP} = \sum_n (R_n - R_{n-1}) P_n Note that no approximation is involved since the curve is piecewise constant. Args: average (str | None): How to calculate the final metrics from every category. It supports two modes: - `"macro"`: Calculate metrics for each category, and calculate the mean value over all categories. The result of this mode is also called **mAP**. - `None`: Calculate metrics of every category and output directly. Defaults to "macro". collect_device (str): Device name used for collecting results from different ranks during distributed training. Must be 'cpu' or 'gpu'. Defaults to 'cpu'. prefix (str, optional): The prefix that will be added in the metric names to disambiguate homonymous metrics of different evaluators. If prefix is not provided in the argument, self.default_prefix will be used instead. Defaults to None. References ---------- .. [1] `Wikipedia entry for the Average precision `_ Examples: >>> import torch >>> from mmcls.evaluation import AveragePrecision >>> # --------- The Basic Usage for one-hot pred scores --------- >>> y_pred = torch.Tensor([[0.9, 0.8, 0.3, 0.2], ... [0.1, 0.2, 0.2, 0.1], ... [0.7, 0.5, 0.9, 0.3], ... [0.8, 0.1, 0.1, 0.2]]) >>> y_true = torch.Tensor([[1, 1, 0, 0], ... [0, 1, 0, 0], ... [0, 0, 1, 0], ... [1, 0, 0, 0]]) >>> AveragePrecision.calculate(y_pred, y_true) tensor(70.833) >>> # ------------------- Use with Evalutor ------------------- >>> from mmcls.structures import ClsDataSample >>> from mmengine.evaluator import Evaluator >>> data_samples = [ ... ClsDataSample().set_pred_score(i).set_gt_score(j) ... for i, j in zip(y_pred, y_true) ... ] >>> evaluator = Evaluator(metrics=AveragePrecision()) >>> evaluator.process(data_samples) >>> evaluator.evaluate(5) {'multi-label/mAP': 70.83333587646484} >>> # Evaluate on each class >>> evaluator = Evaluator(metrics=AveragePrecision(average=None)) >>> evaluator.process(data_samples) >>> evaluator.evaluate(5) {'multi-label/AP_classwise': [100., 83.33, 100., 0.]} """ default_prefix: Optional[str] = 'multi-label' def __init__(self, average: Optional[str] = 'macro', collect_device: str = 'cpu', prefix: Optional[str] = None) -> None: super().__init__(collect_device=collect_device, prefix=prefix) self.average = average def process(self, data_batch, data_samples: Sequence[dict]): """Process one batch of data samples. The processed results should be stored in ``self.results``, which will be used to computed the metrics when all batches have been processed. Args: data_batch: A batch of data from the dataloader. data_samples (Sequence[dict]): A batch of outputs from the model. """ for data_sample in data_samples: result = dict() pred_label = data_sample['pred_label'] gt_label = data_sample['gt_label'] result['pred_score'] = pred_label['score'] num_classes = result['pred_score'].size()[-1] if 'score' in gt_label: result['gt_score'] = gt_label['score'] else: result['gt_score'] = LabelData.label_to_onehot( gt_label['label'], num_classes) # Save the result to `self.results`. self.results.append(result) def compute_metrics(self, results: List): """Compute the metrics from processed results. Args: results (list): The processed results of each batch. Returns: Dict: The computed metrics. The keys are the names of the metrics, and the values are corresponding results. """ # NOTICE: don't access `self.results` from the method. `self.results` # are a list of results from multiple batch, while the input `results` # are the collected results. # concat target = torch.stack([res['gt_score'] for res in results]) pred = torch.stack([res['pred_score'] for res in results]) ap = self.calculate(pred, target, self.average) result_metrics = dict() if self.average is None: result_metrics['AP_classwise'] = ap.detach().cpu().tolist() else: result_metrics['mAP'] = ap.item() return result_metrics @staticmethod def calculate(pred: Union[torch.Tensor, np.ndarray], target: Union[torch.Tensor, np.ndarray], average: Optional[str] = 'macro') -> torch.Tensor: r"""Calculate the average precision for a single class. Args: pred (torch.Tensor | np.ndarray): The model predictions with shape ``(N, num_classes)``. target (torch.Tensor | np.ndarray): The target of predictions with shape ``(N, num_classes)``. average (str | None): The average method. It supports two modes: - `"macro"`: Calculate metrics for each category, and calculate the mean value over all categories. The result of this mode is also called mAP. - `None`: Calculate metrics of every category and output directly. Defaults to "macro". Returns: torch.Tensor: the average precision of all classes. """ average_options = ['macro', None] assert average in average_options, 'Invalid `average` argument, ' \ f'please specicy from {average_options}.' pred = to_tensor(pred) target = to_tensor(target) assert pred.ndim == 2 and pred.shape == target.shape, \ 'Both `pred` and `target` should have shape `(N, num_classes)`.' num_classes = pred.shape[1] ap = pred.new_zeros(num_classes) for k in range(num_classes): ap[k] = _average_precision(pred[:, k], target[:, k]) if average == 'macro': return ap.mean() * 100.0 else: return ap * 100