Spaces:
Runtime error
Runtime error
File size: 4,826 Bytes
4d0eb62 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Dict, Sequence
from mmengine.evaluator import BaseMetric
from mmpretrain.registry import METRICS
@METRICS.register_module()
class MultiTasksMetric(BaseMetric):
"""Metrics for MultiTask
Args:
task_metrics(dict): a dictionary in the keys are the names of the tasks
and the values is a list of the metric corresponds to this task
Examples:
>>> import torch
>>> from mmpretrain.evaluation import MultiTasksMetric
# -------------------- The Basic Usage --------------------
>>>task_metrics = {
'task0': [dict(type='Accuracy', topk=(1, ))],
'task1': [dict(type='Accuracy', topk=(1, 3))]
}
>>>pred = [{
'pred_task': {
'task0': torch.tensor([0.7, 0.0, 0.3]),
'task1': torch.tensor([0.5, 0.2, 0.3])
},
'gt_task': {
'task0': torch.tensor(0),
'task1': torch.tensor(2)
}
}, {
'pred_task': {
'task0': torch.tensor([0.0, 0.0, 1.0]),
'task1': torch.tensor([0.0, 0.0, 1.0])
},
'gt_task': {
'task0': torch.tensor(2),
'task1': torch.tensor(2)
}
}]
>>>metric = MultiTasksMetric(task_metrics)
>>>metric.process(None, pred)
>>>results = metric.evaluate(2)
results = {
'task0_accuracy/top1': 100.0,
'task1_accuracy/top1': 50.0,
'task1_accuracy/top3': 100.0
}
"""
def __init__(self,
task_metrics: Dict,
collect_device: str = 'cpu') -> None:
self.task_metrics = task_metrics
super().__init__(collect_device=collect_device)
self._metrics = {}
for task_name in self.task_metrics.keys():
self._metrics[task_name] = []
for metric in self.task_metrics[task_name]:
self._metrics[task_name].append(METRICS.build(metric))
def process(self, data_batch, data_samples: Sequence[dict]):
"""Process one batch of data samples.
The processed results should be stored in ``self.results``, which will
be used to computed the metrics when all batches have been processed.
Args:
data_batch: A batch of data from the dataloader.
data_samples (Sequence[dict]): A batch of outputs from the model.
"""
for task_name in self.task_metrics.keys():
filtered_data_samples = []
for data_sample in data_samples:
eval_mask = data_sample[task_name]['eval_mask']
if eval_mask:
filtered_data_samples.append(data_sample[task_name])
for metric in self._metrics[task_name]:
metric.process(data_batch, filtered_data_samples)
def compute_metrics(self, results: list) -> dict:
raise NotImplementedError(
'compute metrics should not be used here directly')
def evaluate(self, size):
"""Evaluate the model performance of the whole dataset after processing
all batches.
Args:
size (int): Length of the entire validation dataset. When batch
size > 1, the dataloader may pad some data samples to make
sure all ranks have the same length of dataset slice. The
``collect_results`` function will drop the padded data based on
this size.
Returns:
dict: Evaluation metrics dict on the val dataset. The keys are
"{task_name}_{metric_name}" , and the values
are corresponding results.
"""
metrics = {}
for task_name in self._metrics:
for metric in self._metrics[task_name]:
name = metric.__class__.__name__
if name == 'MultiTasksMetric' or metric.results:
results = metric.evaluate(size)
else:
results = {metric.__class__.__name__: 0}
for key in results:
name = f'{task_name}_{key}'
if name in results:
"""Inspired from https://github.com/open-
mmlab/mmengine/ bl ob/ed20a9cba52ceb371f7c825131636b9e2
747172e/mmengine/evalua tor/evaluator.py#L84-L87."""
raise ValueError(
'There are multiple metric results with the same'
f'metric name {name}. Please make sure all metrics'
'have different prefixes.')
metrics[name] = results[key]
return metrics
|