Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import datetime | |
import itertools | |
import os.path as osp | |
import tempfile | |
from collections import OrderedDict | |
from typing import Dict, List, Optional, Sequence, Union | |
import numpy as np | |
import torch | |
from mmengine.evaluator import BaseMetric | |
from mmengine.fileio import FileClient, dump, load | |
from mmengine.logging import MMLogger | |
from terminaltables import AsciiTable | |
from mmdet.datasets.api_wrappers import COCO, COCOeval | |
from mmdet.registry import METRICS | |
from mmdet.structures.mask import encode_mask_results | |
from ..functional import eval_recalls | |
class CocoMetric(BaseMetric): | |
"""COCO evaluation metric. | |
Evaluate AR, AP, and mAP for detection tasks including proposal/box | |
detection and instance segmentation. Please refer to | |
https://cocodataset.org/#detection-eval for more details. | |
Args: | |
ann_file (str, optional): Path to the coco format annotation file. | |
If not specified, ground truth annotations from the dataset will | |
be converted to coco format. Defaults to None. | |
metric (str | List[str]): Metrics to be evaluated. Valid metrics | |
include 'bbox', 'segm', 'proposal', and 'proposal_fast'. | |
Defaults to 'bbox'. | |
classwise (bool): Whether to evaluate the metric class-wise. | |
Defaults to False. | |
proposal_nums (Sequence[int]): Numbers of proposals to be evaluated. | |
Defaults to (100, 300, 1000). | |
iou_thrs (float | List[float], optional): IoU threshold to compute AP | |
and AR. If not specified, IoUs from 0.5 to 0.95 will be used. | |
Defaults to None. | |
metric_items (List[str], optional): Metric result names to be | |
recorded in the evaluation result. Defaults to None. | |
format_only (bool): Format the output results without perform | |
evaluation. It is useful when you want to format the result | |
to a specific format and submit it to the test server. | |
Defaults to False. | |
outfile_prefix (str, optional): The prefix of json files. It includes | |
the file path and the prefix of filename, e.g., "a/b/prefix". | |
If not specified, a temp file will be created. Defaults to None. | |
file_client_args (dict): Arguments to instantiate a FileClient. | |
See :class:`mmengine.fileio.FileClient` for details. | |
Defaults to ``dict(backend='disk')``. | |
collect_device (str): Device name used for collecting results from | |
different ranks during distributed training. Must be 'cpu' or | |
'gpu'. Defaults to 'cpu'. | |
prefix (str, optional): The prefix that will be added in the metric | |
names to disambiguate homonymous metrics of different evaluators. | |
If prefix is not provided in the argument, self.default_prefix | |
will be used instead. Defaults to None. | |
sort_categories (bool): Whether sort categories in annotations. Only | |
used for `Objects365V1Dataset`. Defaults to False. | |
""" | |
default_prefix: Optional[str] = 'coco' | |
def __init__(self, | |
ann_file: Optional[str] = None, | |
metric: Union[str, List[str]] = 'bbox', | |
classwise: bool = False, | |
proposal_nums: Sequence[int] = (100, 300, 1000), | |
iou_thrs: Optional[Union[float, Sequence[float]]] = None, | |
metric_items: Optional[Sequence[str]] = None, | |
format_only: bool = False, | |
outfile_prefix: Optional[str] = None, | |
file_client_args: dict = dict(backend='disk'), | |
collect_device: str = 'cpu', | |
prefix: Optional[str] = None, | |
sort_categories: bool = False) -> None: | |
super().__init__(collect_device=collect_device, prefix=prefix) | |
# coco evaluation metrics | |
self.metrics = metric if isinstance(metric, list) else [metric] | |
allowed_metrics = ['bbox', 'segm', 'proposal', 'proposal_fast'] | |
for metric in self.metrics: | |
if metric not in allowed_metrics: | |
raise KeyError( | |
"metric should be one of 'bbox', 'segm', 'proposal', " | |
f"'proposal_fast', but got {metric}.") | |
# do class wise evaluation, default False | |
self.classwise = classwise | |
# proposal_nums used to compute recall or precision. | |
self.proposal_nums = list(proposal_nums) | |
# iou_thrs used to compute recall or precision. | |
if iou_thrs is None: | |
iou_thrs = np.linspace( | |
.5, 0.95, int(np.round((0.95 - .5) / .05)) + 1, endpoint=True) | |
self.iou_thrs = iou_thrs | |
self.metric_items = metric_items | |
self.format_only = format_only | |
if self.format_only: | |
assert outfile_prefix is not None, 'outfile_prefix must be not' | |
'None when format_only is True, otherwise the result files will' | |
'be saved to a temp directory which will be cleaned up at the end.' | |
self.outfile_prefix = outfile_prefix | |
self.file_client_args = file_client_args | |
self.file_client = FileClient(**file_client_args) | |
# if ann_file is not specified, | |
# initialize coco api with the converted dataset | |
if ann_file is not None: | |
with self.file_client.get_local_path(ann_file) as local_path: | |
self._coco_api = COCO(local_path) | |
if sort_categories: | |
# 'categories' list in objects365_train.json and | |
# objects365_val.json is inconsistent, need sort | |
# list(or dict) before get cat_ids. | |
cats = self._coco_api.cats | |
sorted_cats = {i: cats[i] for i in sorted(cats)} | |
self._coco_api.cats = sorted_cats | |
categories = self._coco_api.dataset['categories'] | |
sorted_categories = sorted( | |
categories, key=lambda i: i['id']) | |
self._coco_api.dataset['categories'] = sorted_categories | |
else: | |
self._coco_api = None | |
# handle dataset lazy init | |
self.cat_ids = None | |
self.img_ids = None | |
def fast_eval_recall(self, | |
results: List[dict], | |
proposal_nums: Sequence[int], | |
iou_thrs: Sequence[float], | |
logger: Optional[MMLogger] = None) -> np.ndarray: | |
"""Evaluate proposal recall with COCO's fast_eval_recall. | |
Args: | |
results (List[dict]): Results of the dataset. | |
proposal_nums (Sequence[int]): Proposal numbers used for | |
evaluation. | |
iou_thrs (Sequence[float]): IoU thresholds used for evaluation. | |
logger (MMLogger, optional): Logger used for logging the recall | |
summary. | |
Returns: | |
np.ndarray: Averaged recall results. | |
""" | |
gt_bboxes = [] | |
pred_bboxes = [result['bboxes'] for result in results] | |
for i in range(len(self.img_ids)): | |
ann_ids = self._coco_api.get_ann_ids(img_ids=self.img_ids[i]) | |
ann_info = self._coco_api.load_anns(ann_ids) | |
if len(ann_info) == 0: | |
gt_bboxes.append(np.zeros((0, 4))) | |
continue | |
bboxes = [] | |
for ann in ann_info: | |
if ann.get('ignore', False) or ann['iscrowd']: | |
continue | |
x1, y1, w, h = ann['bbox'] | |
bboxes.append([x1, y1, x1 + w, y1 + h]) | |
bboxes = np.array(bboxes, dtype=np.float32) | |
if bboxes.shape[0] == 0: | |
bboxes = np.zeros((0, 4)) | |
gt_bboxes.append(bboxes) | |
recalls = eval_recalls( | |
gt_bboxes, pred_bboxes, proposal_nums, iou_thrs, logger=logger) | |
ar = recalls.mean(axis=1) | |
return ar | |
def xyxy2xywh(self, bbox: np.ndarray) -> list: | |
"""Convert ``xyxy`` style bounding boxes to ``xywh`` style for COCO | |
evaluation. | |
Args: | |
bbox (numpy.ndarray): The bounding boxes, shape (4, ), in | |
``xyxy`` order. | |
Returns: | |
list[float]: The converted bounding boxes, in ``xywh`` order. | |
""" | |
_bbox: List = bbox.tolist() | |
return [ | |
_bbox[0], | |
_bbox[1], | |
_bbox[2] - _bbox[0], | |
_bbox[3] - _bbox[1], | |
] | |
def results2json(self, results: Sequence[dict], | |
outfile_prefix: str) -> dict: | |
"""Dump the detection results to a COCO style json file. | |
There are 3 types of results: proposals, bbox predictions, mask | |
predictions, and they have different data types. This method will | |
automatically recognize the type, and dump them to json files. | |
Args: | |
results (Sequence[dict]): Testing results of the | |
dataset. | |
outfile_prefix (str): The filename prefix of the json files. If the | |
prefix is "somepath/xxx", the json files will be named | |
"somepath/xxx.bbox.json", "somepath/xxx.segm.json", | |
"somepath/xxx.proposal.json". | |
Returns: | |
dict: Possible keys are "bbox", "segm", "proposal", and | |
values are corresponding filenames. | |
""" | |
bbox_json_results = [] | |
segm_json_results = [] if 'masks' in results[0] else None | |
for idx, result in enumerate(results): | |
image_id = result.get('img_id', idx) | |
labels = result['labels'] | |
bboxes = result['bboxes'] | |
scores = result['scores'] | |
# bbox results | |
for i, label in enumerate(labels): | |
data = dict() | |
data['image_id'] = image_id | |
data['bbox'] = self.xyxy2xywh(bboxes[i]) | |
data['score'] = float(scores[i]) | |
data['category_id'] = self.cat_ids[label] | |
bbox_json_results.append(data) | |
if segm_json_results is None: | |
continue | |
# segm results | |
masks = result['masks'] | |
mask_scores = result.get('mask_scores', scores) | |
for i, label in enumerate(labels): | |
data = dict() | |
data['image_id'] = image_id | |
data['bbox'] = self.xyxy2xywh(bboxes[i]) | |
data['score'] = float(mask_scores[i]) | |
data['category_id'] = self.cat_ids[label] | |
if isinstance(masks[i]['counts'], bytes): | |
masks[i]['counts'] = masks[i]['counts'].decode() | |
data['segmentation'] = masks[i] | |
segm_json_results.append(data) | |
result_files = dict() | |
result_files['bbox'] = f'{outfile_prefix}.bbox.json' | |
result_files['proposal'] = f'{outfile_prefix}.bbox.json' | |
dump(bbox_json_results, result_files['bbox']) | |
if segm_json_results is not None: | |
result_files['segm'] = f'{outfile_prefix}.segm.json' | |
dump(segm_json_results, result_files['segm']) | |
return result_files | |
def gt_to_coco_json(self, gt_dicts: Sequence[dict], | |
outfile_prefix: str) -> str: | |
"""Convert ground truth to coco format json file. | |
Args: | |
gt_dicts (Sequence[dict]): Ground truth of the dataset. | |
outfile_prefix (str): The filename prefix of the json files. If the | |
prefix is "somepath/xxx", the json file will be named | |
"somepath/xxx.gt.json". | |
Returns: | |
str: The filename of the json file. | |
""" | |
categories = [ | |
dict(id=id, name=name) | |
for id, name in enumerate(self.dataset_meta['classes']) | |
] | |
image_infos = [] | |
annotations = [] | |
for idx, gt_dict in enumerate(gt_dicts): | |
img_id = gt_dict.get('img_id', idx) | |
image_info = dict( | |
id=img_id, | |
width=gt_dict['width'], | |
height=gt_dict['height'], | |
file_name='') | |
image_infos.append(image_info) | |
for ann in gt_dict['anns']: | |
label = ann['bbox_label'] | |
bbox = ann['bbox'] | |
coco_bbox = [ | |
bbox[0], | |
bbox[1], | |
bbox[2] - bbox[0], | |
bbox[3] - bbox[1], | |
] | |
annotation = dict( | |
id=len(annotations) + | |
1, # coco api requires id starts with 1 | |
image_id=img_id, | |
bbox=coco_bbox, | |
iscrowd=ann.get('ignore_flag', 0), | |
category_id=int(label), | |
area=coco_bbox[2] * coco_bbox[3]) | |
if ann.get('mask', None): | |
mask = ann['mask'] | |
# area = mask_util.area(mask) | |
if isinstance(mask, dict) and isinstance( | |
mask['counts'], bytes): | |
mask['counts'] = mask['counts'].decode() | |
annotation['segmentation'] = mask | |
# annotation['area'] = float(area) | |
annotations.append(annotation) | |
info = dict( | |
date_created=str(datetime.datetime.now()), | |
description='Coco json file converted by mmdet CocoMetric.') | |
coco_json = dict( | |
info=info, | |
images=image_infos, | |
categories=categories, | |
licenses=None, | |
) | |
if len(annotations) > 0: | |
coco_json['annotations'] = annotations | |
converted_json_path = f'{outfile_prefix}.gt.json' | |
dump(coco_json, converted_json_path) | |
return converted_json_path | |
# TODO: data_batch is no longer needed, consider adjusting the | |
# parameter position | |
def process(self, data_batch: dict, data_samples: Sequence[dict]) -> None: | |
"""Process one batch of data samples and predictions. The processed | |
results should be stored in ``self.results``, which will be used to | |
compute the metrics when all batches have been processed. | |
Args: | |
data_batch (dict): A batch of data from the dataloader. | |
data_samples (Sequence[dict]): A batch of data samples that | |
contain annotations and predictions. | |
""" | |
for data_sample in data_samples: | |
result = dict() | |
pred = data_sample['pred_instances'] | |
result['img_id'] = data_sample['img_id'] | |
result['bboxes'] = pred['bboxes'].cpu().numpy() | |
result['scores'] = pred['scores'].cpu().numpy() | |
result['labels'] = pred['labels'].cpu().numpy() | |
# encode mask to RLE | |
if 'masks' in pred: | |
result['masks'] = encode_mask_results( | |
pred['masks'].detach().cpu().numpy()) if isinstance( | |
pred['masks'], torch.Tensor) else pred['masks'] | |
# some detectors use different scores for bbox and mask | |
if 'mask_scores' in pred: | |
result['mask_scores'] = pred['mask_scores'].cpu().numpy() | |
# parse gt | |
gt = dict() | |
gt['width'] = data_sample['ori_shape'][1] | |
gt['height'] = data_sample['ori_shape'][0] | |
gt['img_id'] = data_sample['img_id'] | |
if self._coco_api is None: | |
# TODO: Need to refactor to support LoadAnnotations | |
assert 'instances' in data_sample, \ | |
'ground truth is required for evaluation when ' \ | |
'`ann_file` is not provided' | |
gt['anns'] = data_sample['instances'] | |
# add converted result to the results list | |
self.results.append((gt, result)) | |
def compute_metrics(self, results: list) -> Dict[str, float]: | |
"""Compute the metrics from processed results. | |
Args: | |
results (list): The processed results of each batch. | |
Returns: | |
Dict[str, float]: The computed metrics. The keys are the names of | |
the metrics, and the values are corresponding results. | |
""" | |
logger: MMLogger = MMLogger.get_current_instance() | |
# split gt and prediction list | |
gts, preds = zip(*results) | |
tmp_dir = None | |
if self.outfile_prefix is None: | |
tmp_dir = tempfile.TemporaryDirectory() | |
outfile_prefix = osp.join(tmp_dir.name, 'results') | |
else: | |
outfile_prefix = self.outfile_prefix | |
if self._coco_api is None: | |
# use converted gt json file to initialize coco api | |
logger.info('Converting ground truth to coco format...') | |
coco_json_path = self.gt_to_coco_json( | |
gt_dicts=gts, outfile_prefix=outfile_prefix) | |
self._coco_api = COCO(coco_json_path) | |
# handle lazy init | |
if self.cat_ids is None: | |
self.cat_ids = self._coco_api.get_cat_ids( | |
cat_names=self.dataset_meta['classes']) | |
if self.img_ids is None: | |
self.img_ids = self._coco_api.get_img_ids() | |
# convert predictions to coco format and dump to json file | |
result_files = self.results2json(preds, outfile_prefix) | |
eval_results = OrderedDict() | |
if self.format_only: | |
logger.info('results are saved in ' | |
f'{osp.dirname(outfile_prefix)}') | |
return eval_results | |
for metric in self.metrics: | |
logger.info(f'Evaluating {metric}...') | |
# TODO: May refactor fast_eval_recall to an independent metric? | |
# fast eval recall | |
if metric == 'proposal_fast': | |
ar = self.fast_eval_recall( | |
preds, self.proposal_nums, self.iou_thrs, logger=logger) | |
log_msg = [] | |
for i, num in enumerate(self.proposal_nums): | |
eval_results[f'AR@{num}'] = ar[i] | |
log_msg.append(f'\nAR@{num}\t{ar[i]:.4f}') | |
log_msg = ''.join(log_msg) | |
logger.info(log_msg) | |
continue | |
# evaluate proposal, bbox and segm | |
iou_type = 'bbox' if metric == 'proposal' else metric | |
if metric not in result_files: | |
raise KeyError(f'{metric} is not in results') | |
try: | |
predictions = load(result_files[metric]) | |
if iou_type == 'segm': | |
# Refer to https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocotools/coco.py#L331 # noqa | |
# When evaluating mask AP, if the results contain bbox, | |
# cocoapi will use the box area instead of the mask area | |
# for calculating the instance area. Though the overall AP | |
# is not affected, this leads to different | |
# small/medium/large mask AP results. | |
for x in predictions: | |
x.pop('bbox') | |
coco_dt = self._coco_api.loadRes(predictions) | |
except IndexError: | |
logger.error( | |
'The testing results of the whole dataset is empty.') | |
break | |
coco_eval = COCOeval(self._coco_api, coco_dt, iou_type) | |
coco_eval.params.catIds = self.cat_ids | |
coco_eval.params.imgIds = self.img_ids | |
coco_eval.params.maxDets = list(self.proposal_nums) | |
coco_eval.params.iouThrs = self.iou_thrs | |
# mapping of cocoEval.stats | |
coco_metric_names = { | |
'mAP': 0, | |
'mAP_50': 1, | |
'mAP_75': 2, | |
'mAP_s': 3, | |
'mAP_m': 4, | |
'mAP_l': 5, | |
'AR@100': 6, | |
'AR@300': 7, | |
'AR@1000': 8, | |
'AR_s@1000': 9, | |
'AR_m@1000': 10, | |
'AR_l@1000': 11 | |
} | |
metric_items = self.metric_items | |
if metric_items is not None: | |
for metric_item in metric_items: | |
if metric_item not in coco_metric_names: | |
raise KeyError( | |
f'metric item "{metric_item}" is not supported') | |
if metric == 'proposal': | |
coco_eval.params.useCats = 0 | |
coco_eval.evaluate() | |
coco_eval.accumulate() | |
coco_eval.summarize() | |
if metric_items is None: | |
metric_items = [ | |
'AR@100', 'AR@300', 'AR@1000', 'AR_s@1000', | |
'AR_m@1000', 'AR_l@1000' | |
] | |
for item in metric_items: | |
val = float( | |
f'{coco_eval.stats[coco_metric_names[item]]:.3f}') | |
eval_results[item] = val | |
else: | |
coco_eval.evaluate() | |
coco_eval.accumulate() | |
coco_eval.summarize() | |
if self.classwise: # Compute per-category AP | |
# Compute per-category AP | |
# from https://github.com/facebookresearch/detectron2/ | |
precisions = coco_eval.eval['precision'] | |
# precision: (iou, recall, cls, area range, max dets) | |
assert len(self.cat_ids) == precisions.shape[2] | |
results_per_category = [] | |
for idx, cat_id in enumerate(self.cat_ids): | |
# area range index 0: all area ranges | |
# max dets index -1: typically 100 per image | |
nm = self._coco_api.loadCats(cat_id)[0] | |
precision = precisions[:, :, idx, 0, -1] | |
precision = precision[precision > -1] | |
if precision.size: | |
ap = np.mean(precision) | |
else: | |
ap = float('nan') | |
results_per_category.append( | |
(f'{nm["name"]}', f'{round(ap, 3)}')) | |
eval_results[f'{nm["name"]}_precision'] = round(ap, 3) | |
num_columns = min(6, len(results_per_category) * 2) | |
results_flatten = list( | |
itertools.chain(*results_per_category)) | |
headers = ['category', 'AP'] * (num_columns // 2) | |
results_2d = itertools.zip_longest(*[ | |
results_flatten[i::num_columns] | |
for i in range(num_columns) | |
]) | |
table_data = [headers] | |
table_data += [result for result in results_2d] | |
table = AsciiTable(table_data) | |
logger.info('\n' + table.table) | |
if metric_items is None: | |
metric_items = [ | |
'mAP', 'mAP_50', 'mAP_75', 'mAP_s', 'mAP_m', 'mAP_l' | |
] | |
for metric_item in metric_items: | |
key = f'{metric}_{metric_item}' | |
val = coco_eval.stats[coco_metric_names[metric_item]] | |
eval_results[key] = float(f'{round(val, 3)}') | |
ap = coco_eval.stats[:6] | |
logger.info(f'{metric}_mAP_copypaste: {ap[0]:.3f} ' | |
f'{ap[1]:.3f} {ap[2]:.3f} {ap[3]:.3f} ' | |
f'{ap[4]:.3f} {ap[5]:.3f}') | |
if tmp_dir is not None: | |
tmp_dir.cleanup() | |
return eval_results | |