# Copyright (c) OpenMMLab. All rights reserved. import math import warnings import torch import torch.nn as nn from mmcv.cnn.bricks.registry import ( TRANSFORMER_LAYER, TRANSFORMER_LAYER_SEQUENCE, ) from mmcv.cnn.bricks.transformer import ( BaseTransformerLayer, TransformerLayerSequence, build_transformer_layer_sequence, ) from mmcv.runner.base_module import BaseModule # from mmcv.utils import to_2tuple from torch.nn.init import normal_ # from mmdet.models.utils.builder import TRANSFORMER from .builder import TRANSFORMER # import torch.nn.functional as F from mmcv.cnn import ( # build_activation_layer,; build_conv_layer, build_norm_layer, xavier_init, ) # from typing import Sequence try: from mmcv.ops.multi_scale_deform_attn import MultiScaleDeformableAttention except ImportError: warnings.warn( '`MultiScaleDeformableAttention` in MMCV has been moved to ' '`mmcv.ops.multi_scale_deform_attn`, please update your MMCV') from mmcv.cnn.bricks.transformer import MultiScaleDeformableAttention def inverse_sigmoid(x, eps=1e-5): """Inverse function of sigmoid. Args: x (Tensor): The tensor to do the inverse. eps (float): EPS avoid numerical overflow. Defaults 1e-5. Returns: Tensor: The x has passed the inverse function of sigmoid, has same shape with input. """ x = x.clamp(min=0, max=1) x1 = x.clamp(min=eps) x2 = (1 - x).clamp(min=eps) return torch.log(x1 / x2) @TRANSFORMER_LAYER.register_module() class DetrTransformerDecoderLayer(BaseTransformerLayer): """Implements decoder layer in DETR transformer. Args: attn_cfgs (list[`mmcv.ConfigDict`] | list[dict] | dict )): Configs for self_attention or cross_attention, the order should be consistent with it in `operation_order`. If it is a dict, it would be expand to the number of attention in `operation_order`. feedforward_channels (int): The hidden dimension for FFNs. ffn_dropout (float): Probability of an element to be zeroed in ffn. Default 0.0. operation_order (tuple[str]): The execution order of operation in transformer. Such as ('self_attn', 'norm', 'ffn', 'norm'). Default:None act_cfg (dict): The activation config for FFNs. Default: `LN` norm_cfg (dict): Config dict for normalization layer. Default: `LN`. ffn_num_fcs (int): The number of fully-connected layers in FFNs. Default:2. """ def __init__(self, attn_cfgs, feedforward_channels, ffn_dropout=0.0, operation_order=None, act_cfg=dict(type='ReLU', inplace=True), norm_cfg=dict(type='LN'), ffn_num_fcs=2, **kwargs): super(DetrTransformerDecoderLayer, self).__init__(attn_cfgs=attn_cfgs, feedforward_channels=feedforward_channels, ffn_dropout=ffn_dropout, operation_order=operation_order, act_cfg=act_cfg, norm_cfg=norm_cfg, ffn_num_fcs=ffn_num_fcs, **kwargs) assert len(operation_order) == 6 assert set(operation_order) == set( ['self_attn', 'norm', 'cross_attn', 'ffn']) @TRANSFORMER_LAYER_SEQUENCE.register_module() class DetrTransformerEncoder(TransformerLayerSequence): """TransformerEncoder of DETR. Args: post_norm_cfg (dict): Config of last normalization layer. Default: `LN`. Only used when `self.pre_norm` is `True` """ def __init__(self, *args, post_norm_cfg=dict(type='LN'), **kwargs): super(DetrTransformerEncoder, self).__init__(*args, **kwargs) if post_norm_cfg is not None: self.post_norm = build_norm_layer( post_norm_cfg, self.embed_dims)[1] if self.pre_norm else None else: assert not self.pre_norm, f'Use prenorm in ' \ f'{self.__class__.__name__},' \ f'Please specify post_norm_cfg' self.post_norm = None def forward(self, *args, **kwargs): """Forward function for `TransformerCoder`. Returns: Tensor: forwarded results with shape [num_query, bs, embed_dims]. """ x = super(DetrTransformerEncoder, self).forward(*args, **kwargs) if self.post_norm is not None: x = self.post_norm(x) return x @TRANSFORMER_LAYER_SEQUENCE.register_module() class DetrTransformerDecoder(TransformerLayerSequence): """Implements the decoder in DETR transformer. Args: return_intermediate (bool): Whether to return intermediate outputs. post_norm_cfg (dict): Config of last normalization layer. Default: `LN`. """ def __init__(self, *args, post_norm_cfg=dict(type='LN'), return_intermediate=False, **kwargs): super(DetrTransformerDecoder, self).__init__(*args, **kwargs) self.return_intermediate = return_intermediate if post_norm_cfg is not None: self.post_norm = build_norm_layer(post_norm_cfg, self.embed_dims)[1] else: self.post_norm = None def forward(self, query, *args, **kwargs): """Forward function for `TransformerDecoder`. Args: query (Tensor): Input query with shape `(num_query, bs, embed_dims)`. Returns: Tensor: Results with shape [1, num_query, bs, embed_dims] when return_intermediate is `False`, otherwise it has shape [num_layers, num_query, bs, embed_dims]. """ if not self.return_intermediate: x = super().forward(query, *args, **kwargs) if self.post_norm: x = self.post_norm(x)[None] return x intermediate = [] for layer in self.layers: query = layer(query, *args, **kwargs) if self.return_intermediate: if self.post_norm is not None: intermediate.append(self.post_norm(query)) else: intermediate.append(query) return torch.stack(intermediate) @TRANSFORMER_LAYER_SEQUENCE.register_module() class DeformableDetrTransformerDecoder(TransformerLayerSequence): """Implements the decoder in DETR transformer. Args: return_intermediate (bool): Whether to return intermediate outputs. coder_norm_cfg (dict): Config of last normalization layer. Default: `LN`. """ def __init__(self, *args, return_intermediate=False, **kwargs): super(DeformableDetrTransformerDecoder, self).__init__(*args, **kwargs) self.return_intermediate = return_intermediate def forward(self, query, *args, reference_points=None, valid_ratios=None, reg_branches=None, **kwargs): """Forward function for `TransformerDecoder`. Args: query (Tensor): Input query with shape `(num_query, bs, embed_dims)`. reference_points (Tensor): The reference points of offset. has shape (bs, num_query, 4) when as_two_stage, otherwise has shape ((bs, num_query, 2). valid_ratios (Tensor): The radios of valid points on the feature map, has shape (bs, num_levels, 2) reg_branch: (obj:`nn.ModuleList`): Used for refining the regression results. Only would be passed when with_box_refine is True, otherwise would be passed a `None`. Returns: Tensor: Results with shape [1, num_query, bs, embed_dims] when return_intermediate is `False`, otherwise it has shape [num_layers, num_query, bs, embed_dims]. """ output = query intermediate = [] intermediate_reference_points = [] for lid, layer in enumerate(self.layers): if reference_points.shape[-1] == 4: reference_points_input = reference_points[:, :, None] * \ torch.cat([valid_ratios, valid_ratios], -1)[:, None] else: assert reference_points.shape[-1] == 2 reference_points_input = reference_points[:, :, None] * \ valid_ratios[:, None] output = layer(output, *args, reference_points=reference_points_input, **kwargs) output = output.permute(1, 0, 2) if reg_branches is not None: tmp = reg_branches[lid](output) if reference_points.shape[-1] == 4: new_reference_points = tmp + inverse_sigmoid( reference_points) new_reference_points = new_reference_points.sigmoid() else: assert reference_points.shape[-1] == 2 new_reference_points = tmp new_reference_points[..., :2] = tmp[ ..., :2] + inverse_sigmoid(reference_points) new_reference_points = new_reference_points.sigmoid() reference_points = new_reference_points.detach() output = output.permute(1, 0, 2) if self.return_intermediate: intermediate.append(output) intermediate_reference_points.append(reference_points) if self.return_intermediate: return torch.stack(intermediate), torch.stack( intermediate_reference_points) return output, reference_points @TRANSFORMER.register_module() class Transformer(BaseModule): """Implements the DETR transformer. Following the official DETR implementation, this module copy-paste from torch.nn.Transformer with modifications: * positional encodings are passed in MultiheadAttention * extra LN at the end of encoder is removed * decoder returns a stack of activations from all decoding layers See `paper: End-to-End Object Detection with Transformers `_ for details. Args: encoder (`mmcv.ConfigDict` | Dict): Config of TransformerEncoder. Defaults to None. decoder ((`mmcv.ConfigDict` | Dict)): Config of TransformerDecoder. Defaults to None init_cfg (obj:`mmcv.ConfigDict`): The Config for initialization. Defaults to None. """ def __init__(self, encoder=None, decoder=None, init_cfg=None): super(Transformer, self).__init__(init_cfg=init_cfg) self.encoder = build_transformer_layer_sequence(encoder) self.decoder = build_transformer_layer_sequence(decoder) self.embed_dims = self.encoder.embed_dims def init_weights(self): # follow the official DETR to init parameters for m in self.modules(): if hasattr(m, 'weight') and m.weight.dim() > 1: xavier_init(m, distribution='uniform') self._is_init = True def forward(self, x, mask, query_embed, pos_embed): """Forward function for `Transformer`. Args: x (Tensor): Input query with shape [bs, c, h, w] where c = embed_dims. mask (Tensor): The key_padding_mask used for encoder and decoder, with shape [bs, h, w]. query_embed (Tensor): The query embedding for decoder, with shape [num_query, c]. pos_embed (Tensor): The positional encoding for encoder and decoder, with the same shape as `x`. Returns: tuple[Tensor]: results of decoder containing the following tensor. - out_dec: Output from decoder. If return_intermediate_dec \ is True output has shape [num_dec_layers, bs, num_query, embed_dims], else has shape [1, bs, \ num_query, embed_dims]. - memory: Output results from encoder, with shape \ [bs, embed_dims, h, w]. """ bs, c, h, w = x.shape # use `view` instead of `flatten` for dynamically exporting to ONNX x = x.view(bs, c, -1).permute(2, 0, 1) # [bs, c, h, w] -> [h*w, bs, c] pos_embed = pos_embed.view(bs, c, -1).permute(2, 0, 1) query_embed = query_embed.unsqueeze(1).repeat( 1, bs, 1) # [num_query, dim] -> [num_query, bs, dim] mask = mask.view(bs, -1) # [bs, h, w] -> [bs, h*w] memory = self.encoder(query=x, key=None, value=None, query_pos=pos_embed, query_key_padding_mask=mask) target = torch.zeros_like(query_embed) # out_dec: [num_layers, num_query, bs, dim] out_dec = self.decoder(query=target, key=memory, value=memory, key_pos=pos_embed, query_pos=query_embed, key_padding_mask=mask) out_dec = out_dec.transpose(1, 2) memory = memory.permute(1, 2, 0).reshape(bs, c, h, w) return out_dec, memory @TRANSFORMER.register_module() class DeformableDetrTransformer(Transformer): """Implements the DeformableDETR transformer. Args: as_two_stage (bool): Generate query from encoder features. Default: False. num_feature_levels (int): Number of feature maps from FPN: Default: 4. two_stage_num_proposals (int): Number of proposals when set `as_two_stage` as True. Default: 300. """ def __init__(self, as_two_stage=False, num_feature_levels=4, two_stage_num_proposals=300, **kwargs): super(DeformableDetrTransformer, self).__init__(**kwargs) self.as_two_stage = as_two_stage self.num_feature_levels = num_feature_levels self.two_stage_num_proposals = two_stage_num_proposals self.embed_dims = self.encoder.embed_dims self.init_layers() def init_layers(self): """Initialize layers of the DeformableDetrTransformer.""" self.level_embeds = nn.Parameter( torch.Tensor(self.num_feature_levels, self.embed_dims)) if self.as_two_stage: self.enc_output = nn.Linear(self.embed_dims, self.embed_dims) self.enc_output_norm = nn.LayerNorm(self.embed_dims) self.pos_trans = nn.Linear(self.embed_dims * 2, self.embed_dims * 2) self.pos_trans_norm = nn.LayerNorm(self.embed_dims * 2) else: self.reference_points = nn.Linear(self.embed_dims, 2) def init_weights(self): """Initialize the transformer weights.""" for p in self.parameters(): if p.dim() > 1: nn.init.xavier_uniform_(p) for m in self.modules(): if isinstance(m, MultiScaleDeformableAttention): m.init_weights() if not self.as_two_stage: xavier_init(self.reference_points, distribution='uniform', bias=0.) normal_(self.level_embeds) def gen_encoder_output_proposals(self, memory, memory_padding_mask, spatial_shapes): """Generate proposals from encoded memory. Args: memory (Tensor) : The output of encoder, has shape (bs, num_key, embed_dim). num_key is equal the number of points on feature map from all level. memory_padding_mask (Tensor): Padding mask for memory. has shape (bs, num_key). spatial_shapes (Tensor): The shape of all feature maps. has shape (num_level, 2). Returns: tuple: A tuple of feature map and bbox prediction. - output_memory (Tensor): The input of decoder, \ has shape (bs, num_key, embed_dim). num_key is \ equal the number of points on feature map from \ all levels. - output_proposals (Tensor): The normalized proposal \ after a inverse sigmoid, has shape \ (bs, num_keys, 4). """ N, S, C = memory.shape proposals = [] _cur = 0 for lvl, (H, W) in enumerate(spatial_shapes): mask_flatten_ = memory_padding_mask[:, _cur:(_cur + H * W)].view( N, H, W, 1) valid_H = torch.sum(~mask_flatten_[:, :, 0, 0], 1) valid_W = torch.sum(~mask_flatten_[:, 0, :, 0], 1) grid_y, grid_x = torch.meshgrid( torch.linspace(0, H - 1, H, dtype=torch.float32, device=memory.device), torch.linspace(0, W - 1, W, dtype=torch.float32, device=memory.device)) grid = torch.cat([grid_x.unsqueeze(-1), grid_y.unsqueeze(-1)], -1) scale = torch.cat([valid_W.unsqueeze(-1), valid_H.unsqueeze(-1)], 1).view(N, 1, 1, 2) grid = (grid.unsqueeze(0).expand(N, -1, -1, -1) + 0.5) / scale wh = torch.ones_like(grid) * 0.05 * (2.0**lvl) proposal = torch.cat((grid, wh), -1).view(N, -1, 4) proposals.append(proposal) _cur += (H * W) output_proposals = torch.cat(proposals, 1) output_proposals_valid = ((output_proposals > 0.01) & (output_proposals < 0.99)).all(-1, keepdim=True) output_proposals = torch.log(output_proposals / (1 - output_proposals)) output_proposals = output_proposals.masked_fill( memory_padding_mask.unsqueeze(-1), float('inf')) output_proposals = output_proposals.masked_fill( ~output_proposals_valid, float('inf')) output_memory = memory output_memory = output_memory.masked_fill( memory_padding_mask.unsqueeze(-1), float(0)) output_memory = output_memory.masked_fill(~output_proposals_valid, float(0)) output_memory = self.enc_output_norm(self.enc_output(output_memory)) return output_memory, output_proposals @staticmethod def get_reference_points(spatial_shapes, valid_ratios, device): """Get the reference points used in decoder. Args: spatial_shapes (Tensor): The shape of all feature maps, has shape (num_level, 2). valid_ratios (Tensor): The radios of valid points on the feature map, has shape (bs, num_levels, 2) device (obj:`device`): The device where reference_points should be. Returns: Tensor: reference points used in decoder, has \ shape (bs, num_keys, num_levels, 2). """ reference_points_list = [] for lvl, (H, W) in enumerate(spatial_shapes): # TODO check this 0.5 ref_y, ref_x = torch.meshgrid( torch.linspace(0.5, H - 0.5, H, dtype=torch.float32, device=device), torch.linspace(0.5, W - 0.5, W, dtype=torch.float32, device=device)) ref_y = ref_y.reshape(-1)[None] / (valid_ratios[:, None, lvl, 1] * H) ref_x = ref_x.reshape(-1)[None] / (valid_ratios[:, None, lvl, 0] * W) ref = torch.stack((ref_x, ref_y), -1) reference_points_list.append(ref) reference_points = torch.cat(reference_points_list, 1) reference_points = reference_points[:, :, None] * valid_ratios[:, None] return reference_points def get_valid_ratio(self, mask): """Get the valid radios of feature maps of all level.""" _, H, W = mask.shape valid_H = torch.sum(~mask[:, :, 0], 1) valid_W = torch.sum(~mask[:, 0, :], 1) valid_ratio_h = valid_H.float() / H valid_ratio_w = valid_W.float() / W valid_ratio = torch.stack([valid_ratio_w, valid_ratio_h], -1) return valid_ratio def get_proposal_pos_embed(self, proposals, num_pos_feats=128, temperature=10000): """Get the position embedding of proposal.""" scale = 2 * math.pi dim_t = torch.arange(num_pos_feats, dtype=torch.float32, device=proposals.device) dim_t = temperature**(2 * (dim_t // 2) / num_pos_feats) # N, L, 4 proposals = proposals.sigmoid() * scale # N, L, 4, 128 pos = proposals[:, :, :, None] / dim_t # N, L, 4, 64, 2 pos = torch.stack((pos[:, :, :, 0::2].sin(), pos[:, :, :, 1::2].cos()), dim=4).flatten(2) return pos def forward(self, mlvl_feats, mlvl_masks, query_embed, mlvl_pos_embeds, reg_branches=None, cls_branches=None, smpl_branches=None, **kwargs): """Forward function for `Transformer`. Args: mlvl_feats (list(Tensor)): Input queries from different level. Each element has shape [bs, embed_dims, h, w]. mlvl_masks (list(Tensor)): The key_padding_mask from different level used for encoder and decoder, each element has shape [bs, h, w]. query_embed (Tensor): The query embedding for decoder, with shape [num_query, c]. mlvl_pos_embeds (list(Tensor)): The positional encoding of feats from different level, has the shape [bs, embed_dims, h, w]. reg_branches (obj:`nn.ModuleList`): Regression heads for feature maps from each decoder layer. Only would be passed when `with_box_refine` is True. Default to None. cls_branches (obj:`nn.ModuleList`): Classification heads for feature maps from each decoder layer. Only would be passed when `as_two_stage` is True. Default to None. Returns: tuple[Tensor]: results of decoder containing the following tensor. - inter_states: Outputs from decoder. If return_intermediate_dec is True output has shape \ (num_dec_layers, bs, num_query, embed_dims), else has \ shape (1, bs, num_query, embed_dims). - init_reference_out: The initial value of reference \ points, has shape (bs, num_queries, 4). - inter_references_out: The internal value of reference \ points in decoder, has shape \ (num_dec_layers, bs,num_query, embed_dims) - enc_outputs_class: The classification score of \ proposals generated from \ encoder's feature maps, has shape \ (batch, h*w, num_classes). \ Only would be returned when `as_two_stage` is True, \ otherwise None. - enc_outputs_coord_unact: The regression results \ generated from encoder's feature maps., has shape \ (batch, h*w, 4). Only would \ be returned when `as_two_stage` is True, \ otherwise None. """ assert self.as_two_stage or query_embed is not None feat_flatten = [] mask_flatten = [] lvl_pos_embed_flatten = [] spatial_shapes = [] for lvl, (feat, mask, pos_embed) in enumerate( zip(mlvl_feats, mlvl_masks, mlvl_pos_embeds)): bs, c, h, w = feat.shape spatial_shape = (h, w) spatial_shapes.append(spatial_shape) feat = feat.flatten(2).transpose(1, 2) mask = mask.flatten(1) pos_embed = pos_embed.flatten(2).transpose(1, 2) lvl_pos_embed = pos_embed + self.level_embeds[lvl].view(1, 1, -1) lvl_pos_embed_flatten.append(lvl_pos_embed) feat_flatten.append(feat) mask_flatten.append(mask) feat_flatten = torch.cat(feat_flatten, 1) mask_flatten = torch.cat(mask_flatten, 1) lvl_pos_embed_flatten = torch.cat(lvl_pos_embed_flatten, 1) spatial_shapes = torch.as_tensor(spatial_shapes, dtype=torch.long, device=feat_flatten.device) level_start_index = torch.cat((spatial_shapes.new_zeros( (1, )), spatial_shapes.prod(1).cumsum(0)[:-1])) valid_ratios = torch.stack( [self.get_valid_ratio(m) for m in mlvl_masks], 1) reference_points = \ self.get_reference_points(spatial_shapes, valid_ratios, device=feat.device) feat_flatten = feat_flatten.permute(1, 0, 2) # (H*W, bs, embed_dims) lvl_pos_embed_flatten = lvl_pos_embed_flatten.permute( 1, 0, 2) # (H*W, bs, embed_dims) memory = self.encoder(query=feat_flatten, key=None, value=None, query_pos=lvl_pos_embed_flatten, query_key_padding_mask=mask_flatten, spatial_shapes=spatial_shapes, reference_points=reference_points, level_start_index=level_start_index, valid_ratios=valid_ratios, **kwargs) memory = memory.permute(1, 0, 2) bs, _, c = memory.shape if self.as_two_stage: output_memory, output_proposals = \ self.gen_encoder_output_proposals( memory, mask_flatten, spatial_shapes) enc_outputs_class = cls_branches[self.decoder.num_layers]( output_memory) enc_outputs_coord_unact = \ reg_branches[ self.decoder.num_layers](output_memory) + output_proposals topk = self.two_stage_num_proposals # We only use the first channel in enc_outputs_class as foreground, # the other (num_classes - 1) channels are actually not used. # Its targets are set to be 0s, which indicates the first # class (foreground) because we use [0, num_classes - 1] to # indicate class labels, background class is indicated by # num_classes (similar convention in RPN). # See https://github.com/open-mmlab/mmdetection/blob/master/mmdet/models/dense_heads/deformable_detr_head.py#L241 # noqa # This follows the official implementation of Deformable DETR. topk_proposals = torch.topk(enc_outputs_class[..., 0], topk, dim=1)[1] topk_coords_unact = torch.gather( enc_outputs_coord_unact, 1, topk_proposals.unsqueeze(-1).repeat(1, 1, 4)) topk_coords_unact = topk_coords_unact.detach() reference_points = topk_coords_unact.sigmoid() init_reference_out = reference_points pos_trans_out = self.pos_trans_norm( self.pos_trans(self.get_proposal_pos_embed(topk_coords_unact))) query_pos, query = torch.split(pos_trans_out, c, dim=2) else: query_pos, query = torch.split(query_embed, c, dim=1) query_pos = query_pos.unsqueeze(0).expand(bs, -1, -1) query = query.unsqueeze(0).expand(bs, -1, -1) reference_points = self.reference_points(query_pos).sigmoid() init_reference_out = reference_points # decoder query = query.permute(1, 0, 2) memory = memory.permute(1, 0, 2) query_pos = query_pos.permute(1, 0, 2) inter_states, inter_references = self.decoder( query=query, key=None, value=memory, query_pos=query_pos, key_padding_mask=mask_flatten, reference_points=reference_points, spatial_shapes=spatial_shapes, level_start_index=level_start_index, valid_ratios=valid_ratios, reg_branches=reg_branches, smpl_branches=smpl_branches, **kwargs) inter_references_out = inter_references if self.as_two_stage: return inter_states, init_reference_out,\ inter_references_out, enc_outputs_class,\ enc_outputs_coord_unact return inter_states, init_reference_out, \ inter_references_out, None, None