# Copyright (c) OpenMMLab. All rights reserved. import copy from abc import ABCMeta, abstractmethod from inspect import signature from typing import List, Optional, Tuple import torch from mmcv.ops import batched_nms from mmengine.config import ConfigDict from mmengine.model import BaseModule, constant_init from mmengine.structures import InstanceData from torch import Tensor from mmdet.structures import SampleList from mmdet.structures.bbox import (cat_boxes, get_box_tensor, get_box_wh, scale_boxes) from mmdet.utils import InstanceList, OptMultiConfig from ..test_time_augs import merge_aug_results from ..utils import (filter_scores_and_topk, select_single_mlvl, unpack_gt_instances) class BaseDenseHead(BaseModule, metaclass=ABCMeta): """Base class for DenseHeads. 1. The ``init_weights`` method is used to initialize densehead's model parameters. After detector initialization, ``init_weights`` is triggered when ``detector.init_weights()`` is called externally. 2. The ``loss`` method is used to calculate the loss of densehead, which includes two steps: (1) the densehead model performs forward propagation to obtain the feature maps (2) The ``loss_by_feat`` method is called based on the feature maps to calculate the loss. .. code:: text loss(): forward() -> loss_by_feat() 3. The ``predict`` method is used to predict detection results, which includes two steps: (1) the densehead model performs forward propagation to obtain the feature maps (2) The ``predict_by_feat`` method is called based on the feature maps to predict detection results including post-processing. .. code:: text predict(): forward() -> predict_by_feat() 4. The ``loss_and_predict`` method is used to return loss and detection results at the same time. It will call densehead's ``forward``, ``loss_by_feat`` and ``predict_by_feat`` methods in order. If one-stage is used as RPN, the densehead needs to return both losses and predictions. This predictions is used as the proposal of roihead. .. code:: text loss_and_predict(): forward() -> loss_by_feat() -> predict_by_feat() """ def __init__(self, init_cfg: OptMultiConfig = None) -> None: super().__init__(init_cfg=init_cfg) # `_raw_positive_infos` will be used in `get_positive_infos`, which # can get positive information. self._raw_positive_infos = dict() def init_weights(self) -> None: """Initialize the weights.""" super().init_weights() # avoid init_cfg overwrite the initialization of `conv_offset` for m in self.modules(): # DeformConv2dPack, ModulatedDeformConv2dPack if hasattr(m, 'conv_offset'): constant_init(m.conv_offset, 0) def get_positive_infos(self) -> InstanceList: """Get positive information from sampling results. Returns: list[:obj:`InstanceData`]: Positive information of each image, usually including positive bboxes, positive labels, positive priors, etc. """ if len(self._raw_positive_infos) == 0: return None sampling_results = self._raw_positive_infos.get( 'sampling_results', None) assert sampling_results is not None positive_infos = [] for sampling_result in enumerate(sampling_results): pos_info = InstanceData() pos_info.bboxes = sampling_result.pos_gt_bboxes pos_info.labels = sampling_result.pos_gt_labels pos_info.priors = sampling_result.pos_priors pos_info.pos_assigned_gt_inds = \ sampling_result.pos_assigned_gt_inds pos_info.pos_inds = sampling_result.pos_inds positive_infos.append(pos_info) return positive_infos def loss(self, x: Tuple[Tensor], batch_data_samples: SampleList) -> dict: """Perform forward propagation and loss calculation of the detection head on the features of the upstream network. Args: x (tuple[Tensor]): Features from the upstream network, each is a 4D-tensor. batch_data_samples (List[:obj:`DetDataSample`]): The Data Samples. It usually includes information such as `gt_instance`, `gt_panoptic_seg` and `gt_sem_seg`. Returns: dict: A dictionary of loss components. """ outs = self(x) outputs = unpack_gt_instances(batch_data_samples) (batch_gt_instances, batch_gt_instances_ignore, batch_img_metas) = outputs loss_inputs = outs + (batch_gt_instances, batch_img_metas, batch_gt_instances_ignore) losses = self.loss_by_feat(*loss_inputs) return losses @abstractmethod def loss_by_feat(self, **kwargs) -> dict: """Calculate the loss based on the features extracted by the detection head.""" pass def loss_and_predict( self, x: Tuple[Tensor], batch_data_samples: SampleList, proposal_cfg: Optional[ConfigDict] = None ) -> Tuple[dict, InstanceList]: """Perform forward propagation of the head, then calculate loss and predictions from the features and data samples. Args: x (tuple[Tensor]): Features from FPN. batch_data_samples (list[:obj:`DetDataSample`]): Each item contains the meta information of each image and corresponding annotations. proposal_cfg (ConfigDict, optional): Test / postprocessing configuration, if None, test_cfg would be used. Defaults to None. Returns: tuple: the return value is a tuple contains: - losses: (dict[str, Tensor]): A dictionary of loss components. - predictions (list[:obj:`InstanceData`]): Detection results of each image after the post process. """ outputs = unpack_gt_instances(batch_data_samples) (batch_gt_instances, batch_gt_instances_ignore, batch_img_metas) = outputs outs = self(x) loss_inputs = outs + (batch_gt_instances, batch_img_metas, batch_gt_instances_ignore) losses = self.loss_by_feat(*loss_inputs) predictions = self.predict_by_feat( *outs, batch_img_metas=batch_img_metas, cfg=proposal_cfg) return losses, predictions def predict(self, x: Tuple[Tensor], batch_data_samples: SampleList, rescale: bool = False) -> InstanceList: """Perform forward propagation of the detection head and predict detection results on the features of the upstream network. Args: x (tuple[Tensor]): Multi-level features from the upstream network, each is a 4D-tensor. batch_data_samples (List[:obj:`DetDataSample`]): The Data Samples. It usually includes information such as `gt_instance`, `gt_panoptic_seg` and `gt_sem_seg`. rescale (bool, optional): Whether to rescale the results. Defaults to False. Returns: list[obj:`InstanceData`]: Detection results of each image after the post process. """ batch_img_metas = [ data_samples.metainfo for data_samples in batch_data_samples ] outs = self(x) predictions = self.predict_by_feat( *outs, batch_img_metas=batch_img_metas, rescale=rescale) return predictions def predict_by_feat(self, cls_scores: List[Tensor], bbox_preds: List[Tensor], score_factors: Optional[List[Tensor]] = None, batch_img_metas: Optional[List[dict]] = None, cfg: Optional[ConfigDict] = None, rescale: bool = False, with_nms: bool = True) -> InstanceList: """Transform a batch of output features extracted from the head into bbox results. Note: When score_factors is not None, the cls_scores are usually multiplied by it then obtain the real score used in NMS, such as CenterNess in FCOS, IoU branch in ATSS. Args: cls_scores (list[Tensor]): Classification scores for all scale levels, each is a 4D-tensor, has shape (batch_size, num_priors * num_classes, H, W). bbox_preds (list[Tensor]): Box energies / deltas for all scale levels, each is a 4D-tensor, has shape (batch_size, num_priors * 4, H, W). score_factors (list[Tensor], optional): Score factor for all scale level, each is a 4D-tensor, has shape (batch_size, num_priors * 1, H, W). Defaults to None. batch_img_metas (list[dict], Optional): Batch image meta info. Defaults to None. cfg (ConfigDict, optional): Test / postprocessing configuration, if None, test_cfg would be used. Defaults to None. rescale (bool): If True, return boxes in original image space. Defaults to False. with_nms (bool): If True, do nms before return boxes. Defaults to True. Returns: list[:obj:`InstanceData`]: Object detection results of each image after the post process. Each item usually contains following keys. - scores (Tensor): Classification scores, has a shape (num_instance, ) - labels (Tensor): Labels of bboxes, has a shape (num_instances, ). - bboxes (Tensor): Has a shape (num_instances, 4), the last dimension 4 arrange as (x1, y1, x2, y2). """ assert len(cls_scores) == len(bbox_preds) if score_factors is None: # e.g. Retina, FreeAnchor, Foveabox, etc. with_score_factors = False else: # e.g. FCOS, PAA, ATSS, AutoAssign, etc. with_score_factors = True assert len(cls_scores) == len(score_factors) num_levels = len(cls_scores) featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)] mlvl_priors = self.prior_generator.grid_priors( featmap_sizes, dtype=cls_scores[0].dtype, device=cls_scores[0].device) result_list = [] for img_id in range(len(batch_img_metas)): img_meta = batch_img_metas[img_id] cls_score_list = select_single_mlvl( cls_scores, img_id, detach=True) bbox_pred_list = select_single_mlvl( bbox_preds, img_id, detach=True) if with_score_factors: score_factor_list = select_single_mlvl( score_factors, img_id, detach=True) else: score_factor_list = [None for _ in range(num_levels)] results = self._predict_by_feat_single( cls_score_list=cls_score_list, bbox_pred_list=bbox_pred_list, score_factor_list=score_factor_list, mlvl_priors=mlvl_priors, img_meta=img_meta, cfg=cfg, rescale=rescale, with_nms=with_nms) result_list.append(results) return result_list def _predict_by_feat_single(self, cls_score_list: List[Tensor], bbox_pred_list: List[Tensor], score_factor_list: List[Tensor], mlvl_priors: List[Tensor], img_meta: dict, cfg: ConfigDict, rescale: bool = False, with_nms: bool = True) -> InstanceData: """Transform a single image's features extracted from the head into bbox results. Args: cls_score_list (list[Tensor]): Box scores from all scale levels of a single image, each item has shape (num_priors * num_classes, H, W). bbox_pred_list (list[Tensor]): Box energies / deltas from all scale levels of a single image, each item has shape (num_priors * 4, H, W). score_factor_list (list[Tensor]): Score factor from all scale levels of a single image, each item has shape (num_priors * 1, H, W). mlvl_priors (list[Tensor]): Each element in the list is the priors of a single level in feature pyramid. In all anchor-based methods, it has shape (num_priors, 4). In all anchor-free methods, it has shape (num_priors, 2) when `with_stride=True`, otherwise it still has shape (num_priors, 4). img_meta (dict): Image meta info. cfg (mmengine.Config): Test / postprocessing configuration, if None, test_cfg would be used. rescale (bool): If True, return boxes in original image space. Defaults to False. with_nms (bool): If True, do nms before return boxes. Defaults to True. Returns: :obj:`InstanceData`: Detection results of each image after the post process. Each item usually contains following keys. - scores (Tensor): Classification scores, has a shape (num_instance, ) - labels (Tensor): Labels of bboxes, has a shape (num_instances, ). - bboxes (Tensor): Has a shape (num_instances, 4), the last dimension 4 arrange as (x1, y1, x2, y2). """ if score_factor_list[0] is None: # e.g. Retina, FreeAnchor, etc. with_score_factors = False else: # e.g. FCOS, PAA, ATSS, etc. with_score_factors = True cfg = self.test_cfg if cfg is None else cfg cfg = copy.deepcopy(cfg) img_shape = img_meta['img_shape'] nms_pre = cfg.get('nms_pre', -1) mlvl_bbox_preds = [] mlvl_valid_priors = [] mlvl_scores = [] mlvl_labels = [] if with_score_factors: mlvl_score_factors = [] else: mlvl_score_factors = None for level_idx, (cls_score, bbox_pred, score_factor, priors) in \ enumerate(zip(cls_score_list, bbox_pred_list, score_factor_list, mlvl_priors)): assert cls_score.size()[-2:] == bbox_pred.size()[-2:] dim = self.bbox_coder.encode_size bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, dim) if with_score_factors: score_factor = score_factor.permute(1, 2, 0).reshape(-1).sigmoid() cls_score = cls_score.permute(1, 2, 0).reshape(-1, self.cls_out_channels) if self.use_sigmoid_cls: scores = cls_score.sigmoid() else: # remind that we set FG labels to [0, num_class-1] # since mmdet v2.0 # BG cat_id: num_class scores = cls_score.softmax(-1)[:, :-1] # After https://github.com/open-mmlab/mmdetection/pull/6268/, # this operation keeps fewer bboxes under the same `nms_pre`. # There is no difference in performance for most models. If you # find a slight drop in performance, you can set a larger # `nms_pre` than before. score_thr = cfg.get('score_thr', 0) results = filter_scores_and_topk( scores, score_thr, nms_pre, dict(bbox_pred=bbox_pred, priors=priors)) scores, labels, keep_idxs, filtered_results = results bbox_pred = filtered_results['bbox_pred'] priors = filtered_results['priors'] if with_score_factors: score_factor = score_factor[keep_idxs] mlvl_bbox_preds.append(bbox_pred) mlvl_valid_priors.append(priors) mlvl_scores.append(scores) mlvl_labels.append(labels) if with_score_factors: mlvl_score_factors.append(score_factor) bbox_pred = torch.cat(mlvl_bbox_preds) priors = cat_boxes(mlvl_valid_priors) bboxes = self.bbox_coder.decode(priors, bbox_pred, max_shape=img_shape) results = InstanceData() results.bboxes = bboxes results.scores = torch.cat(mlvl_scores) results.labels = torch.cat(mlvl_labels) if with_score_factors: results.score_factors = torch.cat(mlvl_score_factors) return self._bbox_post_process( results=results, cfg=cfg, rescale=rescale, with_nms=with_nms, img_meta=img_meta) def _bbox_post_process(self, results: InstanceData, cfg: ConfigDict, rescale: bool = False, with_nms: bool = True, img_meta: Optional[dict] = None) -> InstanceData: """bbox post-processing method. The boxes would be rescaled to the original image scale and do the nms operation. Usually `with_nms` is False is used for aug test. Args: results (:obj:`InstaceData`): Detection instance results, each item has shape (num_bboxes, ). cfg (ConfigDict): Test / postprocessing configuration, if None, test_cfg would be used. rescale (bool): If True, return boxes in original image space. Default to False. with_nms (bool): If True, do nms before return boxes. Default to True. img_meta (dict, optional): Image meta info. Defaults to None. Returns: :obj:`InstanceData`: Detection results of each image after the post process. Each item usually contains following keys. - scores (Tensor): Classification scores, has a shape (num_instance, ) - labels (Tensor): Labels of bboxes, has a shape (num_instances, ). - bboxes (Tensor): Has a shape (num_instances, 4), the last dimension 4 arrange as (x1, y1, x2, y2). """ if rescale: assert img_meta.get('scale_factor') is not None scale_factor = [1 / s for s in img_meta['scale_factor']] results.bboxes = scale_boxes(results.bboxes, scale_factor) if hasattr(results, 'score_factors'): # TODOļ¼š Add sqrt operation in order to be consistent with # the paper. score_factors = results.pop('score_factors') results.scores = results.scores * score_factors # filter small size bboxes if cfg.get('min_bbox_size', -1) >= 0: w, h = get_box_wh(results.bboxes) valid_mask = (w > cfg.min_bbox_size) & (h > cfg.min_bbox_size) if not valid_mask.all(): results = results[valid_mask] # TODO: deal with `with_nms` and `nms_cfg=None` in test_cfg if with_nms and results.bboxes.numel() > 0: bboxes = get_box_tensor(results.bboxes) det_bboxes, keep_idxs = batched_nms(bboxes, results.scores, results.labels, cfg.nms) results = results[keep_idxs] # some nms would reweight the score, such as softnms results.scores = det_bboxes[:, -1] results = results[:cfg.max_per_img] return results def aug_test(self, aug_batch_feats, aug_batch_img_metas, rescale=False, with_ori_nms=False, **kwargs): """Test function with test time augmentation. Args: aug_batch_feats (list[tuple[Tensor]]): The outer list indicates test-time augmentations and inner tuple indicate the multi-level feats from FPN, each Tensor should have a shape (B, C, H, W), aug_batch_img_metas (list[list[dict]]): Meta information of images under the different test-time augs (multiscale, flip, etc.). The outer list indicate the rescale (bool, optional): Whether to rescale the results. Defaults to False. with_ori_nms (bool): Whether execute the nms in original head. Defaults to False. It will be `True` when the head is adopted as `rpn_head`. Returns: list(obj:`InstanceData`): Detection results of the input images. Each item usually contains\ following keys. - scores (Tensor): Classification scores, has a shape (num_instance,) - labels (Tensor): Labels of bboxes, has a shape (num_instances,). - bboxes (Tensor): Has a shape (num_instances, 4), the last dimension 4 arrange as (x1, y1, x2, y2). """ # TODO: remove this for detr and deformdetr sig_of_get_results = signature(self.get_results) get_results_args = [ p.name for p in sig_of_get_results.parameters.values() ] get_results_single_sig = signature(self._get_results_single) get_results_single_sig_args = [ p.name for p in get_results_single_sig.parameters.values() ] assert ('with_nms' in get_results_args) and \ ('with_nms' in get_results_single_sig_args), \ f'{self.__class__.__name__}' \ 'does not support test-time augmentation ' num_imgs = len(aug_batch_img_metas[0]) aug_batch_results = [] for x, img_metas in zip(aug_batch_feats, aug_batch_img_metas): outs = self.forward(x) batch_instance_results = self.get_results( *outs, img_metas=img_metas, cfg=self.test_cfg, rescale=False, with_nms=with_ori_nms, **kwargs) aug_batch_results.append(batch_instance_results) # after merging, bboxes will be rescaled to the original image batch_results = merge_aug_results(aug_batch_results, aug_batch_img_metas) final_results = [] for img_id in range(num_imgs): results = batch_results[img_id] det_bboxes, keep_idxs = batched_nms(results.bboxes, results.scores, results.labels, self.test_cfg.nms) results = results[keep_idxs] # some nms operation may reweight the score such as softnms results.scores = det_bboxes[:, -1] results = results[:self.test_cfg.max_per_img] if rescale: # all results have been mapped to the original scale # in `merge_aug_results`, so just pass pass else: # map to the first aug image scale scale_factor = results.bboxes.new_tensor( aug_batch_img_metas[0][img_id]['scale_factor']) results.bboxes = \ results.bboxes * scale_factor final_results.append(results) return final_results