Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
import copy | |
import math | |
from functools import partial | |
import torch | |
import torch.nn as nn | |
import torch.utils.checkpoint as cp | |
from mmcv.cnn.bricks import ConvModule, DropPath | |
from mmengine.model import BaseModule, Sequential | |
from mmcls.models.backbones.base_backbone import BaseBackbone | |
from mmcls.models.utils import InvertedResidual, SELayer, make_divisible | |
from mmcls.registry import MODELS | |
class EdgeResidual(BaseModule): | |
"""Edge Residual Block. | |
Args: | |
in_channels (int): The input channels of this module. | |
out_channels (int): The output channels of this module. | |
mid_channels (int): The input channels of the second convolution. | |
kernel_size (int): The kernel size of the first convolution. | |
Defaults to 3. | |
stride (int): The stride of the first convolution. Defaults to 1. | |
se_cfg (dict, optional): Config dict for se layer. Defaults to None, | |
which means no se layer. | |
with_residual (bool): Use residual connection. Defaults to True. | |
conv_cfg (dict, optional): Config dict for convolution layer. | |
Defaults to None, which means using conv2d. | |
norm_cfg (dict): Config dict for normalization layer. | |
Defaults to ``dict(type='BN')``. | |
act_cfg (dict): Config dict for activation layer. | |
Defaults to ``dict(type='ReLU')``. | |
drop_path_rate (float): stochastic depth rate. Defaults to 0. | |
with_cp (bool): Use checkpoint or not. Using checkpoint will save some | |
memory while slowing down the training speed. Defaults to False. | |
init_cfg (dict | list[dict], optional): Initialization config dict. | |
""" | |
def __init__(self, | |
in_channels, | |
out_channels, | |
mid_channels, | |
kernel_size=3, | |
stride=1, | |
se_cfg=None, | |
with_residual=True, | |
conv_cfg=None, | |
norm_cfg=dict(type='BN'), | |
act_cfg=dict(type='ReLU'), | |
drop_path_rate=0., | |
with_cp=False, | |
init_cfg=None): | |
super(EdgeResidual, self).__init__(init_cfg=init_cfg) | |
assert stride in [1, 2] | |
self.with_cp = with_cp | |
self.drop_path = DropPath( | |
drop_path_rate) if drop_path_rate > 0 else nn.Identity() | |
self.with_se = se_cfg is not None | |
self.with_residual = ( | |
stride == 1 and in_channels == out_channels and with_residual) | |
if self.with_se: | |
assert isinstance(se_cfg, dict) | |
self.conv1 = ConvModule( | |
in_channels=in_channels, | |
out_channels=mid_channels, | |
kernel_size=kernel_size, | |
stride=stride, | |
padding=kernel_size // 2, | |
conv_cfg=conv_cfg, | |
norm_cfg=norm_cfg, | |
act_cfg=act_cfg) | |
if self.with_se: | |
self.se = SELayer(**se_cfg) | |
self.conv2 = ConvModule( | |
in_channels=mid_channels, | |
out_channels=out_channels, | |
kernel_size=1, | |
stride=1, | |
padding=0, | |
conv_cfg=None, | |
norm_cfg=norm_cfg, | |
act_cfg=None) | |
def forward(self, x): | |
def _inner_forward(x): | |
out = x | |
out = self.conv1(out) | |
if self.with_se: | |
out = self.se(out) | |
out = self.conv2(out) | |
if self.with_residual: | |
return x + self.drop_path(out) | |
else: | |
return out | |
if self.with_cp and x.requires_grad: | |
out = cp.checkpoint(_inner_forward, x) | |
else: | |
out = _inner_forward(x) | |
return out | |
def model_scaling(layer_setting, arch_setting): | |
"""Scaling operation to the layer's parameters according to the | |
arch_setting.""" | |
# scale width | |
new_layer_setting = copy.deepcopy(layer_setting) | |
for layer_cfg in new_layer_setting: | |
for block_cfg in layer_cfg: | |
block_cfg[1] = make_divisible(block_cfg[1] * arch_setting[0], 8) | |
# scale depth | |
split_layer_setting = [new_layer_setting[0]] | |
for layer_cfg in new_layer_setting[1:-1]: | |
tmp_index = [0] | |
for i in range(len(layer_cfg) - 1): | |
if layer_cfg[i + 1][1] != layer_cfg[i][1]: | |
tmp_index.append(i + 1) | |
tmp_index.append(len(layer_cfg)) | |
for i in range(len(tmp_index) - 1): | |
split_layer_setting.append(layer_cfg[tmp_index[i]:tmp_index[i + | |
1]]) | |
split_layer_setting.append(new_layer_setting[-1]) | |
num_of_layers = [len(layer_cfg) for layer_cfg in split_layer_setting[1:-1]] | |
new_layers = [ | |
int(math.ceil(arch_setting[1] * num)) for num in num_of_layers | |
] | |
merge_layer_setting = [split_layer_setting[0]] | |
for i, layer_cfg in enumerate(split_layer_setting[1:-1]): | |
if new_layers[i] <= num_of_layers[i]: | |
tmp_layer_cfg = layer_cfg[:new_layers[i]] | |
else: | |
tmp_layer_cfg = copy.deepcopy(layer_cfg) + [layer_cfg[-1]] * ( | |
new_layers[i] - num_of_layers[i]) | |
if tmp_layer_cfg[0][3] == 1 and i != 0: | |
merge_layer_setting[-1] += tmp_layer_cfg.copy() | |
else: | |
merge_layer_setting.append(tmp_layer_cfg.copy()) | |
merge_layer_setting.append(split_layer_setting[-1]) | |
return merge_layer_setting | |
class EfficientNet(BaseBackbone): | |
"""EfficientNet backbone. | |
Args: | |
arch (str): Architecture of efficientnet. Defaults to b0. | |
out_indices (Sequence[int]): Output from which stages. | |
Defaults to (6, ). | |
frozen_stages (int): Stages to be frozen (all param fixed). | |
Defaults to 0, which means not freezing any parameters. | |
conv_cfg (dict): Config dict for convolution layer. | |
Defaults to None, which means using conv2d. | |
norm_cfg (dict): Config dict for normalization layer. | |
Defaults to dict(type='BN'). | |
act_cfg (dict): Config dict for activation layer. | |
Defaults to dict(type='Swish'). | |
norm_eval (bool): Whether to set norm layers to eval mode, namely, | |
freeze running stats (mean and var). Note: Effect on Batch Norm | |
and its variants only. Defaults to False. | |
with_cp (bool): Use checkpoint or not. Using checkpoint will save some | |
memory while slowing down the training speed. Defaults to False. | |
""" | |
# Parameters to build layers. | |
# 'b' represents the architecture of normal EfficientNet family includes | |
# 'b0', 'b1', 'b2', 'b3', 'b4', 'b5', 'b6', 'b7', 'b8'. | |
# 'e' represents the architecture of EfficientNet-EdgeTPU including 'es', | |
# 'em', 'el'. | |
# 6 parameters are needed to construct a layer, From left to right: | |
# - kernel_size: The kernel size of the block | |
# - out_channel: The number of out_channels of the block | |
# - se_ratio: The sequeeze ratio of SELayer. | |
# - stride: The stride of the block | |
# - expand_ratio: The expand_ratio of the mid_channels | |
# - block_type: -1: Not a block, 0: InvertedResidual, 1: EdgeResidual | |
layer_settings = { | |
'b': [[[3, 32, 0, 2, 0, -1]], | |
[[3, 16, 4, 1, 1, 0]], | |
[[3, 24, 4, 2, 6, 0], | |
[3, 24, 4, 1, 6, 0]], | |
[[5, 40, 4, 2, 6, 0], | |
[5, 40, 4, 1, 6, 0]], | |
[[3, 80, 4, 2, 6, 0], | |
[3, 80, 4, 1, 6, 0], | |
[3, 80, 4, 1, 6, 0], | |
[5, 112, 4, 1, 6, 0], | |
[5, 112, 4, 1, 6, 0], | |
[5, 112, 4, 1, 6, 0]], | |
[[5, 192, 4, 2, 6, 0], | |
[5, 192, 4, 1, 6, 0], | |
[5, 192, 4, 1, 6, 0], | |
[5, 192, 4, 1, 6, 0], | |
[3, 320, 4, 1, 6, 0]], | |
[[1, 1280, 0, 1, 0, -1]] | |
], | |
'e': [[[3, 32, 0, 2, 0, -1]], | |
[[3, 24, 0, 1, 3, 1]], | |
[[3, 32, 0, 2, 8, 1], | |
[3, 32, 0, 1, 8, 1]], | |
[[3, 48, 0, 2, 8, 1], | |
[3, 48, 0, 1, 8, 1], | |
[3, 48, 0, 1, 8, 1], | |
[3, 48, 0, 1, 8, 1]], | |
[[5, 96, 0, 2, 8, 0], | |
[5, 96, 0, 1, 8, 0], | |
[5, 96, 0, 1, 8, 0], | |
[5, 96, 0, 1, 8, 0], | |
[5, 96, 0, 1, 8, 0], | |
[5, 144, 0, 1, 8, 0], | |
[5, 144, 0, 1, 8, 0], | |
[5, 144, 0, 1, 8, 0], | |
[5, 144, 0, 1, 8, 0]], | |
[[5, 192, 0, 2, 8, 0], | |
[5, 192, 0, 1, 8, 0]], | |
[[1, 1280, 0, 1, 0, -1]] | |
] | |
} # yapf: disable | |
# Parameters to build different kinds of architecture. | |
# From left to right: scaling factor for width, scaling factor for depth, | |
# resolution. | |
arch_settings = { | |
'b0': (1.0, 1.0, 224), | |
'b1': (1.0, 1.1, 240), | |
'b2': (1.1, 1.2, 260), | |
'b3': (1.2, 1.4, 300), | |
'b4': (1.4, 1.8, 380), | |
'b5': (1.6, 2.2, 456), | |
'b6': (1.8, 2.6, 528), | |
'b7': (2.0, 3.1, 600), | |
'b8': (2.2, 3.6, 672), | |
'l2': (4.3, 5.3, 800), | |
'es': (1.0, 1.0, 224), | |
'em': (1.0, 1.1, 240), | |
'el': (1.2, 1.4, 300) | |
} | |
def __init__(self, | |
arch='b0', | |
drop_path_rate=0., | |
out_indices=(6, ), | |
frozen_stages=0, | |
conv_cfg=dict(type='Conv2dAdaptivePadding'), | |
norm_cfg=dict(type='BN', eps=1e-3), | |
act_cfg=dict(type='Swish'), | |
norm_eval=False, | |
with_cp=False, | |
init_cfg=[ | |
dict(type='Kaiming', layer='Conv2d'), | |
dict( | |
type='Constant', | |
layer=['_BatchNorm', 'GroupNorm'], | |
val=1) | |
]): | |
super(EfficientNet, self).__init__(init_cfg) | |
assert arch in self.arch_settings, \ | |
f'"{arch}" is not one of the arch_settings ' \ | |
f'({", ".join(self.arch_settings.keys())})' | |
self.arch_setting = self.arch_settings[arch] | |
# layer_settings of arch='l2' is 'b' | |
self.layer_setting = self.layer_settings['b' if arch == | |
'l2' else arch[:1]] | |
for index in out_indices: | |
if index not in range(0, len(self.layer_setting)): | |
raise ValueError('the item in out_indices must in ' | |
f'range(0, {len(self.layer_setting)}). ' | |
f'But received {index}') | |
if frozen_stages not in range(len(self.layer_setting) + 1): | |
raise ValueError('frozen_stages must be in range(0, ' | |
f'{len(self.layer_setting) + 1}). ' | |
f'But received {frozen_stages}') | |
self.drop_path_rate = drop_path_rate | |
self.out_indices = out_indices | |
self.frozen_stages = frozen_stages | |
self.conv_cfg = conv_cfg | |
self.norm_cfg = norm_cfg | |
self.act_cfg = act_cfg | |
self.norm_eval = norm_eval | |
self.with_cp = with_cp | |
self.layer_setting = model_scaling(self.layer_setting, | |
self.arch_setting) | |
block_cfg_0 = self.layer_setting[0][0] | |
block_cfg_last = self.layer_setting[-1][0] | |
self.in_channels = make_divisible(block_cfg_0[1], 8) | |
self.out_channels = block_cfg_last[1] | |
self.layers = nn.ModuleList() | |
self.layers.append( | |
ConvModule( | |
in_channels=3, | |
out_channels=self.in_channels, | |
kernel_size=block_cfg_0[0], | |
stride=block_cfg_0[3], | |
padding=block_cfg_0[0] // 2, | |
conv_cfg=self.conv_cfg, | |
norm_cfg=self.norm_cfg, | |
act_cfg=self.act_cfg)) | |
self.make_layer() | |
self.layers.append( | |
ConvModule( | |
in_channels=self.in_channels, | |
out_channels=self.out_channels, | |
kernel_size=block_cfg_last[0], | |
stride=block_cfg_last[3], | |
padding=block_cfg_last[0] // 2, | |
conv_cfg=self.conv_cfg, | |
norm_cfg=self.norm_cfg, | |
act_cfg=self.act_cfg)) | |
def make_layer(self): | |
# Without the first and the final conv block. | |
layer_setting = self.layer_setting[1:-1] | |
total_num_blocks = sum([len(x) for x in layer_setting]) | |
block_idx = 0 | |
dpr = [ | |
x.item() | |
for x in torch.linspace(0, self.drop_path_rate, total_num_blocks) | |
] # stochastic depth decay rule | |
for layer_cfg in layer_setting: | |
layer = [] | |
for i, block_cfg in enumerate(layer_cfg): | |
(kernel_size, out_channels, se_ratio, stride, expand_ratio, | |
block_type) = block_cfg | |
mid_channels = int(self.in_channels * expand_ratio) | |
out_channels = make_divisible(out_channels, 8) | |
if se_ratio <= 0: | |
se_cfg = None | |
else: | |
se_cfg = dict( | |
channels=mid_channels, | |
ratio=expand_ratio * se_ratio, | |
divisor=1, | |
act_cfg=(self.act_cfg, dict(type='Sigmoid'))) | |
if block_type == 1: # edge tpu | |
if i > 0 and expand_ratio == 3: | |
with_residual = False | |
expand_ratio = 4 | |
else: | |
with_residual = True | |
mid_channels = int(self.in_channels * expand_ratio) | |
if se_cfg is not None: | |
se_cfg = dict( | |
channels=mid_channels, | |
ratio=se_ratio * expand_ratio, | |
divisor=1, | |
act_cfg=(self.act_cfg, dict(type='Sigmoid'))) | |
block = partial(EdgeResidual, with_residual=with_residual) | |
else: | |
block = InvertedResidual | |
layer.append( | |
block( | |
in_channels=self.in_channels, | |
out_channels=out_channels, | |
mid_channels=mid_channels, | |
kernel_size=kernel_size, | |
stride=stride, | |
se_cfg=se_cfg, | |
conv_cfg=self.conv_cfg, | |
norm_cfg=self.norm_cfg, | |
act_cfg=self.act_cfg, | |
drop_path_rate=dpr[block_idx], | |
with_cp=self.with_cp)) | |
self.in_channels = out_channels | |
block_idx += 1 | |
self.layers.append(Sequential(*layer)) | |
def forward(self, x): | |
outs = [] | |
for i, layer in enumerate(self.layers): | |
x = layer(x) | |
if i in self.out_indices: | |
outs.append(x) | |
return tuple(outs) | |
def _freeze_stages(self): | |
for i in range(self.frozen_stages): | |
m = self.layers[i] | |
m.eval() | |
for param in m.parameters(): | |
param.requires_grad = False | |
def train(self, mode=True): | |
super(EfficientNet, self).train(mode) | |
self._freeze_stages() | |
if mode and self.norm_eval: | |
for m in self.modules(): | |
if isinstance(m, nn.BatchNorm2d): | |
m.eval() | |