File size: 10,709 Bytes
f549064
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# Copyright (c) OpenMMLab. All rights reserved.
import copy
from collections import OrderedDict
from typing import List, Optional, Sequence, Union

import numpy as np
from mmengine.evaluator import BaseMetric
from mmengine.logging import MMLogger, print_log

from mmdet.registry import METRICS
from ..functional import eval_map


@METRICS.register_module()
class OpenImagesMetric(BaseMetric):
    """OpenImages evaluation metric.

    Evaluate detection mAP for OpenImages. Please refer to
    https://storage.googleapis.com/openimages/web/evaluation.html for more
    details.

    Args:
        iou_thrs (float or List[float]): IoU threshold. Defaults to 0.5.
        ioa_thrs (float or List[float]): IoA threshold. Defaults to 0.5.
        scale_ranges (List[tuple], optional): Scale ranges for evaluating
            mAP. If not specified, all bounding boxes would be included in
            evaluation. Defaults to None
        use_group_of (bool): Whether consider group of groud truth bboxes
            during evaluating. Defaults to True.
        get_supercategory (bool): Whether to get parent class of the
            current class. Default: True.
        filter_labels (bool): Whether filter unannotated classes.
            Default: True.
        collect_device (str): Device name used for collecting results from
            different ranks during distributed training. Must be 'cpu' or
            'gpu'. Defaults to 'cpu'.
        prefix (str, optional): The prefix that will be added in the metric
            names to disambiguate homonymous metrics of different evaluators.
            If prefix is not provided in the argument, self.default_prefix
            will be used instead. Defaults to None.
    """
    default_prefix: Optional[str] = 'openimages'

    def __init__(self,
                 iou_thrs: Union[float, List[float]] = 0.5,
                 ioa_thrs: Union[float, List[float]] = 0.5,
                 scale_ranges: Optional[List[tuple]] = None,
                 use_group_of: bool = True,
                 get_supercategory: bool = True,
                 filter_labels: bool = True,
                 collect_device: str = 'cpu',
                 prefix: Optional[str] = None) -> None:
        super().__init__(collect_device=collect_device, prefix=prefix)
        self.iou_thrs = [iou_thrs] if isinstance(iou_thrs, float) else iou_thrs
        self.ioa_thrs = [ioa_thrs] if (isinstance(ioa_thrs, float)
                                       or ioa_thrs is None) else ioa_thrs
        assert isinstance(self.iou_thrs, list) and isinstance(
            self.ioa_thrs, list)
        assert len(self.iou_thrs) == len(self.ioa_thrs)

        self.scale_ranges = scale_ranges
        self.use_group_of = use_group_of
        self.get_supercategory = get_supercategory
        self.filter_labels = filter_labels

    def _get_supercategory_ann(self, instances: List[dict]) -> List[dict]:
        """Get parent classes's annotation of the corresponding class.

        Args:
            instances (List[dict]): A list of annotations of the instances.

        Returns:
            List[dict]: Annotations extended with super-category.
        """
        supercat_instances = []
        relation_matrix = self.dataset_meta['RELATION_MATRIX']
        for instance in instances:
            labels = np.where(relation_matrix[instance['bbox_label']])[0]
            for label in labels:
                if label == instance['bbox_label']:
                    continue
                new_instance = copy.deepcopy(instance)
                new_instance['bbox_label'] = label
                supercat_instances.append(new_instance)
        return supercat_instances

    def _process_predictions(self, pred_bboxes: np.ndarray,
                             pred_scores: np.ndarray, pred_labels: np.ndarray,
                             gt_instances: list,
                             image_level_labels: np.ndarray) -> tuple:
        """Process results of the corresponding class of the detection bboxes.

        Note: It will choose to do the following two processing according to
        the parameters:

        1. Whether to add parent classes of the corresponding class of the
        detection bboxes.

        2. Whether to ignore the classes that unannotated on that image.

        Args:
            pred_bboxes (np.ndarray): bboxes predicted by the model
            pred_scores (np.ndarray): scores predicted by the model
            pred_labels (np.ndarray): labels predicted by the model
            gt_instances (list): ground truth annotations
            image_level_labels (np.ndarray): human-verified image level labels

        Returns:
            tuple: Processed bboxes, scores, and labels.
        """
        processed_bboxes = copy.deepcopy(pred_bboxes)
        processed_scores = copy.deepcopy(pred_scores)
        processed_labels = copy.deepcopy(pred_labels)
        gt_labels = np.array([ins['bbox_label'] for ins in gt_instances],
                             dtype=np.int64)
        if image_level_labels is not None:
            allowed_classes = np.unique(
                np.append(gt_labels, image_level_labels))
        else:
            allowed_classes = np.unique(gt_labels)
        relation_matrix = self.dataset_meta['RELATION_MATRIX']
        pred_classes = np.unique(pred_labels)
        for pred_class in pred_classes:
            classes = np.where(relation_matrix[pred_class])[0]
            for cls in classes:
                if (cls in allowed_classes and cls != pred_class
                        and self.get_supercategory):
                    # add super-supercategory preds
                    index = np.where(pred_labels == pred_class)[0]
                    processed_scores = np.concatenate(
                        [processed_scores, pred_scores[index]])
                    processed_bboxes = np.concatenate(
                        [processed_bboxes, pred_bboxes[index]])
                    extend_labels = np.full(index.shape, cls, dtype=np.int64)
                    processed_labels = np.concatenate(
                        [processed_labels, extend_labels])
                elif cls not in allowed_classes and self.filter_labels:
                    # remove unannotated preds
                    index = np.where(processed_labels != cls)[0]
                    processed_scores = processed_scores[index]
                    processed_bboxes = processed_bboxes[index]
                    processed_labels = processed_labels[index]
        return processed_bboxes, processed_scores, processed_labels

    # TODO: data_batch is no longer needed, consider adjusting the
    #  parameter position
    def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None:
        """Process one batch of data samples and predictions. The processed
        results should be stored in ``self.results``, which will be used to
        compute the metrics when all batches have been processed.

        Args:
            data_batch (dict): A batch of data from the dataloader.
            data_samples (Sequence[dict]): A batch of data samples that
                contain annotations and predictions.
        """
        for data_sample in data_samples:
            gt = copy.deepcopy(data_sample)
            # add super-category instances
            # TODO: Need to refactor to support LoadAnnotations
            instances = gt['instances']
            if self.get_supercategory:
                supercat_instances = self._get_supercategory_ann(instances)
                instances.extend(supercat_instances)
            gt_labels = []
            gt_bboxes = []
            is_group_ofs = []
            for ins in instances:
                gt_labels.append(ins['bbox_label'])
                gt_bboxes.append(ins['bbox'])
                is_group_ofs.append(ins['is_group_of'])
            ann = dict(
                labels=np.array(gt_labels, dtype=np.int64),
                bboxes=np.array(gt_bboxes, dtype=np.float32).reshape((-1, 4)),
                gt_is_group_ofs=np.array(is_group_ofs, dtype=bool))

            image_level_labels = gt.get('image_level_labels', None)
            pred = data_sample['pred_instances']
            pred_bboxes = pred['bboxes'].cpu().numpy()
            pred_scores = pred['scores'].cpu().numpy()
            pred_labels = pred['labels'].cpu().numpy()

            pred_bboxes, pred_scores, pred_labels = self._process_predictions(
                pred_bboxes, pred_scores, pred_labels, instances,
                image_level_labels)

            dets = []
            for label in range(len(self.dataset_meta['classes'])):
                index = np.where(pred_labels == label)[0]
                pred_bbox_scores = np.hstack(
                    [pred_bboxes[index], pred_scores[index].reshape((-1, 1))])
                dets.append(pred_bbox_scores)
            self.results.append((ann, dets))

    def compute_metrics(self, results: list) -> dict:
        """Compute the metrics from processed results.

        Args:
            results (list): The processed results of each batch.

        Returns:
            dict: The computed metrics. The keys are the names of the metrics,
            and the values are corresponding results.
        """
        logger = MMLogger.get_current_instance()
        gts, preds = zip(*results)
        eval_results = OrderedDict()
        # get dataset type
        dataset_type = self.dataset_meta.get('dataset_type')
        if dataset_type not in ['oid_challenge', 'oid_v6']:
            dataset_type = 'oid_v6'
            print_log(
                'Cannot infer dataset type from the length of the'
                ' classes. Set `oid_v6` as dataset type.',
                logger='current')
        mean_aps = []
        for i, (iou_thr,
                ioa_thr) in enumerate(zip(self.iou_thrs, self.ioa_thrs)):
            if self.use_group_of:
                assert ioa_thr is not None, 'ioa_thr must have value when' \
                                            ' using group_of in evaluation.'
            print_log(f'\n{"-" * 15}iou_thr, ioa_thr: {iou_thr}, {ioa_thr}'
                      f'{"-" * 15}')
            mean_ap, _ = eval_map(
                preds,
                gts,
                scale_ranges=self.scale_ranges,
                iou_thr=iou_thr,
                ioa_thr=ioa_thr,
                dataset=dataset_type,
                logger=logger,
                use_group_of=self.use_group_of)

            mean_aps.append(mean_ap)
            eval_results[f'AP{int(iou_thr * 100):02d}'] = round(mean_ap, 3)
        eval_results['mAP'] = sum(mean_aps) / len(mean_aps)
        return eval_results