import ast import contextlib import json import math import platform import warnings import zipfile from collections import OrderedDict, namedtuple from copy import copy from pathlib import Path from urllib.parse import urlparse from typing import Optional import cv2 import numpy as np import pandas as pd import requests import torch import torch.nn as nn from IPython.display import display from PIL import Image from torch.cuda import amp from utils import TryExcept from utils.dataloaders import exif_transpose, letterbox from utils.general import (LOGGER, ROOT, Profile, check_requirements, check_suffix, check_version, colorstr, increment_path, is_notebook, make_divisible, non_max_suppression, scale_boxes, xywh2xyxy, xyxy2xywh, yaml_load) from utils.plots import Annotator, colors, save_one_box from utils.torch_utils import copy_attr, smart_inference_mode def autopad(k, p=None, d=1): # kernel, padding, dilation # Pad to 'same' shape outputs if d > 1: k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size if p is None: p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad return p class Conv(nn.Module): # Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation) default_act = nn.SiLU() # default activation def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True): super().__init__() self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False) self.bn = nn.BatchNorm2d(c2) self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity() def forward(self, x): return self.act(self.bn(self.conv(x))) def forward_fuse(self, x): return self.act(self.conv(x)) class AConv(nn.Module): def __init__(self, c1, c2): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() self.cv1 = Conv(c1, c2, 3, 2, 1) def forward(self, x): x = torch.nn.functional.avg_pool2d(x, 2, 1, 0, False, True) return self.cv1(x) class ADown(nn.Module): def __init__(self, c1, c2): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() self.c = c2 // 2 self.cv1 = Conv(c1 // 2, self.c, 3, 2, 1) self.cv2 = Conv(c1 // 2, self.c, 1, 1, 0) def forward(self, x): x = torch.nn.functional.avg_pool2d(x, 2, 1, 0, False, True) x1,x2 = x.chunk(2, 1) x1 = self.cv1(x1) x2 = torch.nn.functional.max_pool2d(x2, 3, 2, 1) x2 = self.cv2(x2) return torch.cat((x1, x2), 1) class RepConvN(nn.Module): """RepConv is a basic rep-style block, including training and deploy status This code is based on https://github.com/DingXiaoH/RepVGG/blob/main/repvgg.py """ default_act = nn.SiLU() # default activation def __init__(self, c1, c2, k=3, s=1, p=1, g=1, d=1, act=True, bn=False, deploy=False): super().__init__() assert k == 3 and p == 1 self.g = g self.c1 = c1 self.c2 = c2 self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity() self.bn = None self.conv1 = Conv(c1, c2, k, s, p=p, g=g, act=False) self.conv2 = Conv(c1, c2, 1, s, p=(p - k // 2), g=g, act=False) def forward_fuse(self, x): """Forward process""" return self.act(self.conv(x)) def forward(self, x): """Forward process""" id_out = 0 if self.bn is None else self.bn(x) return self.act(self.conv1(x) + self.conv2(x) + id_out) def get_equivalent_kernel_bias(self): kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1) kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2) kernelid, biasid = self._fuse_bn_tensor(self.bn) return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1) + kernelid, bias3x3 + bias1x1 + biasid def _avg_to_3x3_tensor(self, avgp): channels = self.c1 groups = self.g kernel_size = avgp.kernel_size input_dim = channels // groups k = torch.zeros((channels, input_dim, kernel_size, kernel_size)) k[np.arange(channels), np.tile(np.arange(input_dim), groups), :, :] = 1.0 / kernel_size ** 2 return k def _pad_1x1_to_3x3_tensor(self, kernel1x1): if kernel1x1 is None: return 0 else: return torch.nn.functional.pad(kernel1x1, [1, 1, 1, 1]) def _fuse_bn_tensor(self, branch): if branch is None: return 0, 0 if isinstance(branch, Conv): kernel = branch.conv.weight running_mean = branch.bn.running_mean running_var = branch.bn.running_var gamma = branch.bn.weight beta = branch.bn.bias eps = branch.bn.eps elif isinstance(branch, nn.BatchNorm2d): if not hasattr(self, 'id_tensor'): input_dim = self.c1 // self.g kernel_value = np.zeros((self.c1, input_dim, 3, 3), dtype=np.float32) for i in range(self.c1): kernel_value[i, i % input_dim, 1, 1] = 1 self.id_tensor = torch.from_numpy(kernel_value).to(branch.weight.device) kernel = self.id_tensor running_mean = branch.running_mean running_var = branch.running_var gamma = branch.weight beta = branch.bias eps = branch.eps std = (running_var + eps).sqrt() t = (gamma / std).reshape(-1, 1, 1, 1) return kernel * t, beta - running_mean * gamma / std def fuse_convs(self): if hasattr(self, 'conv'): return kernel, bias = self.get_equivalent_kernel_bias() self.conv = nn.Conv2d(in_channels=self.conv1.conv.in_channels, out_channels=self.conv1.conv.out_channels, kernel_size=self.conv1.conv.kernel_size, stride=self.conv1.conv.stride, padding=self.conv1.conv.padding, dilation=self.conv1.conv.dilation, groups=self.conv1.conv.groups, bias=True).requires_grad_(False) self.conv.weight.data = kernel self.conv.bias.data = bias for para in self.parameters(): para.detach_() self.__delattr__('conv1') self.__delattr__('conv2') if hasattr(self, 'nm'): self.__delattr__('nm') if hasattr(self, 'bn'): self.__delattr__('bn') if hasattr(self, 'id_tensor'): self.__delattr__('id_tensor') class SP(nn.Module): def __init__(self, k=3, s=1): super(SP, self).__init__() self.m = nn.MaxPool2d(kernel_size=k, stride=s, padding=k // 2) def forward(self, x): return self.m(x) class MP(nn.Module): # Max pooling def __init__(self, k=2): super(MP, self).__init__() self.m = nn.MaxPool2d(kernel_size=k, stride=k) def forward(self, x): return self.m(x) class ConvTranspose(nn.Module): # Convolution transpose 2d layer default_act = nn.SiLU() # default activation def __init__(self, c1, c2, k=2, s=2, p=0, bn=True, act=True): super().__init__() self.conv_transpose = nn.ConvTranspose2d(c1, c2, k, s, p, bias=not bn) self.bn = nn.BatchNorm2d(c2) if bn else nn.Identity() self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity() def forward(self, x): return self.act(self.bn(self.conv_transpose(x))) class DWConv(Conv): # Depth-wise convolution def __init__(self, c1, c2, k=1, s=1, d=1, act=True): # ch_in, ch_out, kernel, stride, dilation, activation super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), d=d, act=act) class DWConvTranspose2d(nn.ConvTranspose2d): # Depth-wise transpose convolution def __init__(self, c1, c2, k=1, s=1, p1=0, p2=0): # ch_in, ch_out, kernel, stride, padding, padding_out super().__init__(c1, c2, k, s, p1, p2, groups=math.gcd(c1, c2)) class DFL(nn.Module): # DFL module def __init__(self, c1=17): super().__init__() self.conv = nn.Conv2d(c1, 1, 1, bias=False).requires_grad_(False) self.conv.weight.data[:] = nn.Parameter(torch.arange(c1, dtype=torch.float).view(1, c1, 1, 1)) # / 120.0 self.c1 = c1 # self.bn = nn.BatchNorm2d(4) def forward(self, x): b, c, a = x.shape # batch, channels, anchors return self.conv(x.view(b, 4, self.c1, a).transpose(2, 1).softmax(1)).view(b, 4, a) # return self.conv(x.view(b, self.c1, 4, a).softmax(1)).view(b, 4, a) class BottleneckBase(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, k=(1, 3), e=0.5): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, k[0], 1) self.cv2 = Conv(c_, c2, k[1], 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class RBottleneckBase(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 1), e=0.5): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, k[0], 1) self.cv2 = Conv(c_, c2, k[1], 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class RepNRBottleneckBase(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 1), e=0.5): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = RepConvN(c1, c_, k[0], 1) self.cv2 = Conv(c_, c2, k[1], 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class Bottleneck(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 3), e=0.5): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, k[0], 1) self.cv2 = Conv(c_, c2, k[1], 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class RepNBottleneck(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, k=(3, 3), e=0.5): # ch_in, ch_out, shortcut, kernels, groups, expand super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = RepConvN(c1, c_, k[0], 1) self.cv2 = Conv(c_, c2, k[1], 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class Res(nn.Module): # ResNet bottleneck def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion super(Res, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_, c_, 3, 1, g=g) self.cv3 = Conv(c_, c2, 1, 1) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv3(self.cv2(self.cv1(x))) if self.add else self.cv3(self.cv2(self.cv1(x))) class RepNRes(nn.Module): # ResNet bottleneck def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): # ch_in, ch_out, shortcut, groups, expansion super(RepNRes, self).__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = RepConvN(c_, c_, 3, 1, g=g) self.cv3 = Conv(c_, c2, 1, 1) self.add = shortcut and c1 == c2 def forward(self, x): return x + self.cv3(self.cv2(self.cv1(x))) if self.add else self.cv3(self.cv2(self.cv1(x))) class BottleneckCSP(nn.Module): # CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False) self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False) self.cv4 = Conv(2 * c_, c2, 1, 1) self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3) self.act = nn.SiLU() self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): y1 = self.cv3(self.m(self.cv1(x))) y2 = self.cv2(x) return self.cv4(self.act(self.bn(torch.cat((y1, y2), 1)))) class CSP(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2) self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1)) class RepNCSP(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2) self.m = nn.Sequential(*(RepNBottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1)) class CSPBase(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2) self.m = nn.Sequential(*(BottleneckBase(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1)) class SPP(nn.Module): # Spatial Pyramid Pooling (SPP) layer https://arxiv.org/abs/1406.4729 def __init__(self, c1, c2, k=(5, 9, 13)): super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1) self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) def forward(self, x): x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1)) class ASPP(torch.nn.Module): def __init__(self, in_channels, out_channels): super().__init__() kernel_sizes = [1, 3, 3, 1] dilations = [1, 3, 6, 1] paddings = [0, 3, 6, 0] self.aspp = torch.nn.ModuleList() for aspp_idx in range(len(kernel_sizes)): conv = torch.nn.Conv2d( in_channels, out_channels, kernel_size=kernel_sizes[aspp_idx], stride=1, dilation=dilations[aspp_idx], padding=paddings[aspp_idx], bias=True) self.aspp.append(conv) self.gap = torch.nn.AdaptiveAvgPool2d(1) self.aspp_num = len(kernel_sizes) for m in self.modules(): if isinstance(m, torch.nn.Conv2d): n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels m.weight.data.normal_(0, math.sqrt(2. / n)) m.bias.data.fill_(0) def forward(self, x): avg_x = self.gap(x) out = [] for aspp_idx in range(self.aspp_num): inp = avg_x if (aspp_idx == self.aspp_num - 1) else x out.append(F.relu_(self.aspp[aspp_idx](inp))) out[-1] = out[-1].expand_as(out[-2]) out = torch.cat(out, dim=1) return out class SPPCSPC(nn.Module): # CSP SPP https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5, k=(5, 9, 13)): super(SPPCSPC, self).__init__() c_ = int(2 * c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(c_, c_, 3, 1) self.cv4 = Conv(c_, c_, 1, 1) self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) self.cv5 = Conv(4 * c_, c_, 1, 1) self.cv6 = Conv(c_, c_, 3, 1) self.cv7 = Conv(2 * c_, c2, 1, 1) def forward(self, x): x1 = self.cv4(self.cv3(self.cv1(x))) y1 = self.cv6(self.cv5(torch.cat([x1] + [m(x1) for m in self.m], 1))) y2 = self.cv2(x) return self.cv7(torch.cat((y1, y2), dim=1)) class SPPF(nn.Module): # Spatial Pyramid Pooling - Fast (SPPF) layer by Glenn Jocher def __init__(self, c1, c2, k=5): # equivalent to SPP(k=(5, 9, 13)) super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * 4, c2, 1, 1) self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2) # self.m = SoftPool2d(kernel_size=k, stride=1, padding=k // 2) def forward(self, x): x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter('ignore') # suppress torch 1.9.0 max_pool2d() warning y1 = self.m(x) y2 = self.m(y1) return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1)) import torch.nn.functional as F from torch.nn.modules.utils import _pair class ReOrg(nn.Module): # yolo def __init__(self): super(ReOrg, self).__init__() def forward(self, x): # x(b,c,w,h) -> y(b,4c,w/2,h/2) return torch.cat([x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]], 1) class Contract(nn.Module): # Contract width-height into channels, i.e. x(1,64,80,80) to x(1,256,40,40) def __init__(self, gain=2): super().__init__() self.gain = gain def forward(self, x): b, c, h, w = x.size() # assert (h / s == 0) and (W / s == 0), 'Indivisible gain' s = self.gain x = x.view(b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2) x = x.permute(0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40) return x.view(b, c * s * s, h // s, w // s) # x(1,256,40,40) class Expand(nn.Module): # Expand channels into width-height, i.e. x(1,64,80,80) to x(1,16,160,160) def __init__(self, gain=2): super().__init__() self.gain = gain def forward(self, x): b, c, h, w = x.size() # assert C / s ** 2 == 0, 'Indivisible gain' s = self.gain x = x.view(b, s, s, c // s ** 2, h, w) # x(1,2,2,16,80,80) x = x.permute(0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2) return x.view(b, c // s ** 2, h * s, w * s) # x(1,16,160,160) class Concat(nn.Module): # Concatenate a list of tensors along dimension def __init__(self, dimension=1): super().__init__() self.d = dimension def forward(self, x): return torch.cat(x, self.d) class Shortcut(nn.Module): def __init__(self, dimension=0): super(Shortcut, self).__init__() self.d = dimension def forward(self, x): return x[0]+x[1] class Silence(nn.Module): def __init__(self): super(Silence, self).__init__() def forward(self, x): return x ##### GELAN ##### class SPPELAN(nn.Module): # spp-elan def __init__(self, c1, c2, c3): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() self.c = c3 self.cv1 = Conv(c1, c3, 1, 1) self.cv2 = SP(5) self.cv3 = SP(5) self.cv4 = SP(5) self.cv5 = Conv(4*c3, c2, 1, 1) def forward(self, x): y = [self.cv1(x)] y.extend(m(y[-1]) for m in [self.cv2, self.cv3, self.cv4]) return self.cv5(torch.cat(y, 1)) class RepNCSPELAN4(nn.Module): # csp-elan def __init__(self, c1, c2, c3, c4, c5=1): # ch_in, ch_out, number, shortcut, groups, expansion super().__init__() self.c = c3//2 self.cv1 = Conv(c1, c3, 1, 1) self.cv2 = nn.Sequential(RepNCSP(c3//2, c4, c5), Conv(c4, c4, 3, 1)) self.cv3 = nn.Sequential(RepNCSP(c4, c4, c5), Conv(c4, c4, 3, 1)) self.cv4 = Conv(c3+(2*c4), c2, 1, 1) def forward(self, x): y = list(self.cv1(x).chunk(2, 1)) y.extend((m(y[-1])) for m in [self.cv2, self.cv3]) return self.cv4(torch.cat(y, 1)) def forward_split(self, x): y = list(self.cv1(x).split((self.c, self.c), 1)) y.extend(m(y[-1]) for m in [self.cv2, self.cv3]) return self.cv4(torch.cat(y, 1)) ################# ##### YOLOR ##### class ImplicitA(nn.Module): def __init__(self, channel): super(ImplicitA, self).__init__() self.channel = channel self.implicit = nn.Parameter(torch.zeros(1, channel, 1, 1)) nn.init.normal_(self.implicit, std=.02) def forward(self, x): return self.implicit + x class ImplicitM(nn.Module): def __init__(self, channel): super(ImplicitM, self).__init__() self.channel = channel self.implicit = nn.Parameter(torch.ones(1, channel, 1, 1)) nn.init.normal_(self.implicit, mean=1., std=.02) def forward(self, x): return self.implicit * x ################# ##### CBNet ##### class CBLinear(nn.Module): def __init__(self, c1, c2s, k=1, s=1, p=None, g=1): # ch_in, ch_outs, kernel, stride, padding, groups super(CBLinear, self).__init__() self.c2s = c2s self.conv = nn.Conv2d(c1, sum(c2s), k, s, autopad(k, p), groups=g, bias=True) def forward(self, x): outs = self.conv(x).split(self.c2s, dim=1) return outs class CBFuse(nn.Module): def __init__(self, idx): super(CBFuse, self).__init__() self.idx = idx def forward(self, xs): target_size = xs[-1].shape[2:] res = [F.interpolate(x[self.idx[i]], size=target_size, mode='nearest') for i, x in enumerate(xs[:-1])] out = torch.sum(torch.stack(res + xs[-1:]), dim=0) return out ################# class DetectMultiBackend(nn.Module): # YOLO MultiBackend class for python inference on various backends def __init__(self, weights='yolo.pt', device=torch.device('cpu'), dnn=False, data=None, fp16=False, fuse=True): # Usage: # PyTorch: weights = *.pt # TorchScript: *.torchscript # ONNX Runtime: *.onnx # ONNX OpenCV DNN: *.onnx --dnn # OpenVINO: *_openvino_model # CoreML: *.mlmodel # TensorRT: *.engine # TensorFlow SavedModel: *_saved_model # TensorFlow GraphDef: *.pb # TensorFlow Lite: *.tflite # TensorFlow Edge TPU: *_edgetpu.tflite # PaddlePaddle: *_paddle_model from models.experimental import attempt_download, attempt_load # scoped to avoid circular import super().__init__() w = str(weights[0] if isinstance(weights, list) else weights) pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle, triton = self._model_type(w) fp16 &= pt or jit or onnx or engine # FP16 nhwc = coreml or saved_model or pb or tflite or edgetpu # BHWC formats (vs torch BCWH) stride = 32 # default stride cuda = torch.cuda.is_available() and device.type != 'cpu' # use CUDA if not (pt or triton): w = attempt_download(w) # download if not local if pt: # PyTorch model = attempt_load(weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse) stride = max(int(model.stride.max()), 32) # model stride names = model.module.names if hasattr(model, 'module') else model.names # get class names model.half() if fp16 else model.float() self.model = model # explicitly assign for to(), cpu(), cuda(), half() elif jit: # TorchScript LOGGER.info(f'Loading {w} for TorchScript inference...') extra_files = {'config.txt': ''} # model metadata model = torch.jit.load(w, _extra_files=extra_files, map_location=device) model.half() if fp16 else model.float() if extra_files['config.txt']: # load metadata dict d = json.loads(extra_files['config.txt'], object_hook=lambda d: {int(k) if k.isdigit() else k: v for k, v in d.items()}) stride, names = int(d['stride']), d['names'] elif dnn: # ONNX OpenCV DNN LOGGER.info(f'Loading {w} for ONNX OpenCV DNN inference...') check_requirements('opencv-python>=4.5.4') net = cv2.dnn.readNetFromONNX(w) elif onnx: # ONNX Runtime LOGGER.info(f'Loading {w} for ONNX Runtime inference...') check_requirements(('onnx', 'onnxruntime-gpu' if cuda else 'onnxruntime')) import onnxruntime providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda else ['CPUExecutionProvider'] session = onnxruntime.InferenceSession(w, providers=providers) output_names = [x.name for x in session.get_outputs()] meta = session.get_modelmeta().custom_metadata_map # metadata if 'stride' in meta: stride, names = int(meta['stride']), eval(meta['names']) elif xml: # OpenVINO LOGGER.info(f'Loading {w} for OpenVINO inference...') check_requirements('openvino') # requires openvino-dev: https://pypi.org/project/openvino-dev/ from openvino.runtime import Core, Layout, get_batch ie = Core() if not Path(w).is_file(): # if not *.xml w = next(Path(w).glob('*.xml')) # get *.xml file from *_openvino_model dir network = ie.read_model(model=w, weights=Path(w).with_suffix('.bin')) if network.get_parameters()[0].get_layout().empty: network.get_parameters()[0].set_layout(Layout("NCHW")) batch_dim = get_batch(network) if batch_dim.is_static: batch_size = batch_dim.get_length() executable_network = ie.compile_model(network, device_name="CPU") # device_name="MYRIAD" for Intel NCS2 stride, names = self._load_metadata(Path(w).with_suffix('.yaml')) # load metadata elif engine: # TensorRT LOGGER.info(f'Loading {w} for TensorRT inference...') import tensorrt as trt # https://developer.nvidia.com/nvidia-tensorrt-download check_version(trt.__version__, '7.0.0', hard=True) # require tensorrt>=7.0.0 if device.type == 'cpu': device = torch.device('cuda:0') Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr')) logger = trt.Logger(trt.Logger.INFO) with open(w, 'rb') as f, trt.Runtime(logger) as runtime: model = runtime.deserialize_cuda_engine(f.read()) context = model.create_execution_context() bindings = OrderedDict() output_names = [] fp16 = False # default updated below dynamic = False for i in range(model.num_bindings): name = model.get_binding_name(i) dtype = trt.nptype(model.get_binding_dtype(i)) if model.binding_is_input(i): if -1 in tuple(model.get_binding_shape(i)): # dynamic dynamic = True context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[2])) if dtype == np.float16: fp16 = True else: # output output_names.append(name) shape = tuple(context.get_binding_shape(i)) im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device) bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr())) binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items()) batch_size = bindings['images'].shape[0] # if dynamic, this is instead max batch size elif coreml: # CoreML LOGGER.info(f'Loading {w} for CoreML inference...') import coremltools as ct model = ct.models.MLModel(w) elif saved_model: # TF SavedModel LOGGER.info(f'Loading {w} for TensorFlow SavedModel inference...') import tensorflow as tf keras = False # assume TF1 saved_model model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w) elif pb: # GraphDef https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt LOGGER.info(f'Loading {w} for TensorFlow GraphDef inference...') import tensorflow as tf def wrap_frozen_graph(gd, inputs, outputs): x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), []) # wrapped ge = x.graph.as_graph_element return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs)) def gd_outputs(gd): name_list, input_list = [], [] for node in gd.node: # tensorflow.core.framework.node_def_pb2.NodeDef name_list.append(node.name) input_list.extend(node.input) return sorted(f'{x}:0' for x in list(set(name_list) - set(input_list)) if not x.startswith('NoOp')) gd = tf.Graph().as_graph_def() # TF GraphDef with open(w, 'rb') as f: gd.ParseFromString(f.read()) frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd)) elif tflite or edgetpu: # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python try: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu from tflite_runtime.interpreter import Interpreter, load_delegate except ImportError: import tensorflow as tf Interpreter, load_delegate = tf.lite.Interpreter, tf.lite.experimental.load_delegate, if edgetpu: # TF Edge TPU https://coral.ai/software/#edgetpu-runtime LOGGER.info(f'Loading {w} for TensorFlow Lite Edge TPU inference...') delegate = { 'Linux': 'libedgetpu.so.1', 'Darwin': 'libedgetpu.1.dylib', 'Windows': 'edgetpu.dll'}[platform.system()] interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)]) else: # TFLite LOGGER.info(f'Loading {w} for TensorFlow Lite inference...') interpreter = Interpreter(model_path=w) # load TFLite model interpreter.allocate_tensors() # allocate input_details = interpreter.get_input_details() # inputs output_details = interpreter.get_output_details() # outputs # load metadata with contextlib.suppress(zipfile.BadZipFile): with zipfile.ZipFile(w, "r") as model: meta_file = model.namelist()[0] meta = ast.literal_eval(model.read(meta_file).decode("utf-8")) stride, names = int(meta['stride']), meta['names'] elif tfjs: # TF.js raise NotImplementedError('ERROR: YOLO TF.js inference is not supported') elif paddle: # PaddlePaddle LOGGER.info(f'Loading {w} for PaddlePaddle inference...') check_requirements('paddlepaddle-gpu' if cuda else 'paddlepaddle') import paddle.inference as pdi if not Path(w).is_file(): # if not *.pdmodel w = next(Path(w).rglob('*.pdmodel')) # get *.pdmodel file from *_paddle_model dir weights = Path(w).with_suffix('.pdiparams') config = pdi.Config(str(w), str(weights)) if cuda: config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0) predictor = pdi.create_predictor(config) input_handle = predictor.get_input_handle(predictor.get_input_names()[0]) output_names = predictor.get_output_names() elif triton: # NVIDIA Triton Inference Server LOGGER.info(f'Using {w} as Triton Inference Server...') check_requirements('tritonclient[all]') from utils.triton import TritonRemoteModel model = TritonRemoteModel(url=w) nhwc = model.runtime.startswith("tensorflow") else: raise NotImplementedError(f'ERROR: {w} is not a supported format') # class names if 'names' not in locals(): names = yaml_load(data)['names'] if data else {i: f'class{i}' for i in range(999)} if names[0] == 'n01440764' and len(names) == 1000: # ImageNet names = yaml_load(ROOT / 'data/ImageNet.yaml')['names'] # human-readable names self.__dict__.update(locals()) # assign all variables to self def forward(self, im, augment=False, visualize=False): # YOLO MultiBackend inference b, ch, h, w = im.shape # batch, channel, height, width if self.fp16 and im.dtype != torch.float16: im = im.half() # to FP16 if self.nhwc: im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3) if self.pt: # PyTorch y = self.model(im, augment=augment, visualize=visualize) if augment or visualize else self.model(im) elif self.jit: # TorchScript y = self.model(im) elif self.dnn: # ONNX OpenCV DNN im = im.cpu().numpy() # torch to numpy self.net.setInput(im) y = self.net.forward() elif self.onnx: # ONNX Runtime im = im.cpu().numpy() # torch to numpy y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im}) elif self.xml: # OpenVINO im = im.cpu().numpy() # FP32 y = list(self.executable_network([im]).values()) elif self.engine: # TensorRT if self.dynamic and im.shape != self.bindings['images'].shape: i = self.model.get_binding_index('images') self.context.set_binding_shape(i, im.shape) # reshape if dynamic self.bindings['images'] = self.bindings['images']._replace(shape=im.shape) for name in self.output_names: i = self.model.get_binding_index(name) self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i))) s = self.bindings['images'].shape assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}" self.binding_addrs['images'] = int(im.data_ptr()) self.context.execute_v2(list(self.binding_addrs.values())) y = [self.bindings[x].data for x in sorted(self.output_names)] elif self.coreml: # CoreML im = im.cpu().numpy() im = Image.fromarray((im[0] * 255).astype('uint8')) # im = im.resize((192, 320), Image.ANTIALIAS) y = self.model.predict({'image': im}) # coordinates are xywh normalized if 'confidence' in y: box = xywh2xyxy(y['coordinates'] * [[w, h, w, h]]) # xyxy pixels conf, cls = y['confidence'].max(1), y['confidence'].argmax(1).astype(np.float) y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1) else: y = list(reversed(y.values())) # reversed for segmentation models (pred, proto) elif self.paddle: # PaddlePaddle im = im.cpu().numpy().astype(np.float32) self.input_handle.copy_from_cpu(im) self.predictor.run() y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names] elif self.triton: # NVIDIA Triton Inference Server y = self.model(im) else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU) im = im.cpu().numpy() if self.saved_model: # SavedModel y = self.model(im, training=False) if self.keras else self.model(im) elif self.pb: # GraphDef y = self.frozen_func(x=self.tf.constant(im)) else: # Lite or Edge TPU input = self.input_details[0] int8 = input['dtype'] == np.uint8 # is TFLite quantized uint8 model if int8: scale, zero_point = input['quantization'] im = (im / scale + zero_point).astype(np.uint8) # de-scale self.interpreter.set_tensor(input['index'], im) self.interpreter.invoke() y = [] for output in self.output_details: x = self.interpreter.get_tensor(output['index']) if int8: scale, zero_point = output['quantization'] x = (x.astype(np.float32) - zero_point) * scale # re-scale y.append(x) y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y] y[0][..., :4] *= [w, h, w, h] # xywh normalized to pixels if isinstance(y, (list, tuple)): return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y] else: return self.from_numpy(y) def from_numpy(self, x): return torch.from_numpy(x).to(self.device) if isinstance(x, np.ndarray) else x def warmup(self, imgsz=(1, 3, 640, 640)): # Warmup model by running inference once warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton if any(warmup_types) and (self.device.type != 'cpu' or self.triton): im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device) # input for _ in range(2 if self.jit else 1): # self.forward(im) # warmup @staticmethod def _model_type(p='path/to/model.pt'): # Return model type from model path, i.e. path='path/to/model.onnx' -> type=onnx # types = [pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle] from export import export_formats from utils.downloads import is_url sf = list(export_formats().Suffix) # export suffixes if not is_url(p, check=False): check_suffix(p, sf) # checks url = urlparse(p) # if url may be Triton inference server types = [s in Path(p).name for s in sf] types[8] &= not types[9] # tflite &= not edgetpu triton = not any(types) and all([any(s in url.scheme for s in ["http", "grpc"]), url.netloc]) return types + [triton] @staticmethod def _load_metadata(f=Path('path/to/meta.yaml')): # Load metadata from meta.yaml if it exists if f.exists(): d = yaml_load(f) return d['stride'], d['names'] # assign stride, names return None, None class AutoShape(nn.Module): # YOLO input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS conf = 0.25 # NMS confidence threshold iou = 0.45 # NMS IoU threshold agnostic = False # NMS class-agnostic multi_label = False # NMS multiple labels per box classes = None # (optional list) filter by class, i.e. = [0, 15, 16] for COCO persons, cats and dogs max_det = 1000 # maximum number of detections per image amp = False # Automatic Mixed Precision (AMP) inference def __init__(self, model, verbose=True): super().__init__() if verbose: LOGGER.info('Adding AutoShape... ') copy_attr(self, model, include=('yaml', 'nc', 'hyp', 'names', 'stride', 'abc'), exclude=()) # copy attributes self.dmb = isinstance(model, DetectMultiBackend) # DetectMultiBackend() instance self.pt = not self.dmb or model.pt # PyTorch model self.model = model.eval() if self.pt: m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect() m.inplace = False # Detect.inplace=False for safe multithread inference m.export = True # do not output loss values def _apply(self, fn): # Apply to(), cpu(), cuda(), half() to model tensors that are not parameters or registered buffers self = super()._apply(fn) from models.yolo import Detect, Segment if self.pt: m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect() if isinstance(m, (Detect, Segment)): for k in 'stride', 'anchor_grid', 'stride_grid', 'grid': x = getattr(m, k) setattr(m, k, list(map(fn, x))) if isinstance(x, (list, tuple)) else setattr(m, k, fn(x)) return self @smart_inference_mode() def forward(self, ims, size=640, augment=False, profile=False): # Inference from various sources. For size(height=640, width=1280), RGB images example inputs are: # file: ims = 'data/images/zidane.jpg' # str or PosixPath # URI: = 'https://ultralytics.com/images/zidane.jpg' # OpenCV: = cv2.imread('image.jpg')[:,:,::-1] # HWC BGR to RGB x(640,1280,3) # PIL: = Image.open('image.jpg') or ImageGrab.grab() # HWC x(640,1280,3) # numpy: = np.zeros((640,1280,3)) # HWC # torch: = torch.zeros(16,3,320,640) # BCHW (scaled to size=640, 0-1 values) # multiple: = [Image.open('image1.jpg'), Image.open('image2.jpg'), ...] # list of images dt = (Profile(), Profile(), Profile()) with dt[0]: if isinstance(size, int): # expand size = (size, size) p = next(self.model.parameters()) if self.pt else torch.empty(1, device=self.model.device) # param autocast = self.amp and (p.device.type != 'cpu') # Automatic Mixed Precision (AMP) inference if isinstance(ims, torch.Tensor): # torch with amp.autocast(autocast): return self.model(ims.to(p.device).type_as(p), augment=augment) # inference # Pre-process n, ims = (len(ims), list(ims)) if isinstance(ims, (list, tuple)) else (1, [ims]) # number, list of images shape0, shape1, files = [], [], [] # image and inference shapes, filenames for i, im in enumerate(ims): f = f'image{i}' # filename if isinstance(im, (str, Path)): # filename or uri im, f = Image.open(requests.get(im, stream=True).raw if str(im).startswith('http') else im), im im = np.asarray(exif_transpose(im)) elif isinstance(im, Image.Image): # PIL Image im, f = np.asarray(exif_transpose(im)), getattr(im, 'filename', f) or f files.append(Path(f).with_suffix('.jpg').name) if im.shape[0] < 5: # image in CHW im = im.transpose((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1) im = im[..., :3] if im.ndim == 3 else cv2.cvtColor(im, cv2.COLOR_GRAY2BGR) # enforce 3ch input s = im.shape[:2] # HWC shape0.append(s) # image shape g = max(size) / max(s) # gain shape1.append([int(y * g) for y in s]) ims[i] = im if im.data.contiguous else np.ascontiguousarray(im) # update shape1 = [make_divisible(x, self.stride) for x in np.array(shape1).max(0)] # inf shape x = [letterbox(im, shape1, auto=False)[0] for im in ims] # pad x = np.ascontiguousarray(np.array(x).transpose((0, 3, 1, 2))) # stack and BHWC to BCHW x = torch.from_numpy(x).to(p.device).type_as(p) / 255 # uint8 to fp16/32 with amp.autocast(autocast): # Inference with dt[1]: y = self.model(x, augment=augment) # forward # Post-process with dt[2]: y = non_max_suppression(y if self.dmb else y[0], self.conf, self.iou, self.classes, self.agnostic, self.multi_label, max_det=self.max_det) # NMS for i in range(n): scale_boxes(shape1, y[i][:, :4], shape0[i]) return Detections(ims, y, files, dt, self.names, x.shape) class Detections: # YOLO detections class for inference results def __init__(self, ims, pred, files, times=(0, 0, 0), names=None, shape=None): super().__init__() d = pred[0].device # device gn = [torch.tensor([*(im.shape[i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in ims] # normalizations self.ims = ims # list of images as numpy arrays self.pred = pred # list of tensors pred[0] = (xyxy, conf, cls) self.names = names # class names self.files = files # image filenames self.times = times # profiling times self.xyxy = pred # xyxy pixels self.xywh = [xyxy2xywh(x) for x in pred] # xywh pixels self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)] # xyxy normalized self.xywhn = [x / g for x, g in zip(self.xywh, gn)] # xywh normalized self.n = len(self.pred) # number of images (batch size) self.t = tuple(x.t / self.n * 1E3 for x in times) # timestamps (ms) self.s = tuple(shape) # inference BCHW shape def _run(self, pprint=False, show=False, save=False, crop=False, render=False, labels=True, save_dir=Path('')): s, crops = '', [] for i, (im, pred) in enumerate(zip(self.ims, self.pred)): s += f'\nimage {i + 1}/{len(self.pred)}: {im.shape[0]}x{im.shape[1]} ' # string if pred.shape[0]: for c in pred[:, -1].unique(): n = (pred[:, -1] == c).sum() # detections per class s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string s = s.rstrip(', ') if show or save or render or crop: annotator = Annotator(im, example=str(self.names)) for *box, conf, cls in reversed(pred): # xyxy, confidence, class label = f'{self.names[int(cls)]} {conf:.2f}' if crop: file = save_dir / 'crops' / self.names[int(cls)] / self.files[i] if save else None crops.append({ 'box': box, 'conf': conf, 'cls': cls, 'label': label, 'im': save_one_box(box, im, file=file, save=save)}) else: # all others annotator.box_label(box, label if labels else '', color=colors(cls)) im = annotator.im else: s += '(no detections)' im = Image.fromarray(im.astype(np.uint8)) if isinstance(im, np.ndarray) else im # from np if show: display(im) if is_notebook() else im.show(self.files[i]) if save: f = self.files[i] im.save(save_dir / f) # save if i == self.n - 1: LOGGER.info(f"Saved {self.n} image{'s' * (self.n > 1)} to {colorstr('bold', save_dir)}") if render: self.ims[i] = np.asarray(im) if pprint: s = s.lstrip('\n') return f'{s}\nSpeed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {self.s}' % self.t if crop: if save: LOGGER.info(f'Saved results to {save_dir}\n') return crops @TryExcept('Showing images is not supported in this environment') def show(self, labels=True): self._run(show=True, labels=labels) # show results def save(self, labels=True, save_dir='runs/detect/exp', exist_ok=False): save_dir = increment_path(save_dir, exist_ok, mkdir=True) # increment save_dir self._run(save=True, labels=labels, save_dir=save_dir) # save results def crop(self, save=True, save_dir='runs/detect/exp', exist_ok=False): save_dir = increment_path(save_dir, exist_ok, mkdir=True) if save else None return self._run(crop=True, save=save, save_dir=save_dir) # crop results def render(self, labels=True): self._run(render=True, labels=labels) # render results return self.ims def pandas(self): # return detections as pandas DataFrames, i.e. print(results.pandas().xyxy[0]) new = copy(self) # return copy ca = 'xmin', 'ymin', 'xmax', 'ymax', 'confidence', 'class', 'name' # xyxy columns cb = 'xcenter', 'ycenter', 'width', 'height', 'confidence', 'class', 'name' # xywh columns for k, c in zip(['xyxy', 'xyxyn', 'xywh', 'xywhn'], [ca, ca, cb, cb]): a = [[x[:5] + [int(x[5]), self.names[int(x[5])]] for x in x.tolist()] for x in getattr(self, k)] # update setattr(new, k, [pd.DataFrame(x, columns=c) for x in a]) return new def tolist(self): # return a list of Detections objects, i.e. 'for result in results.tolist():' r = range(self.n) # iterable x = [Detections([self.ims[i]], [self.pred[i]], [self.files[i]], self.times, self.names, self.s) for i in r] # for d in x: # for k in ['ims', 'pred', 'xyxy', 'xyxyn', 'xywh', 'xywhn']: # setattr(d, k, getattr(d, k)[0]) # pop out of list return x def print(self): LOGGER.info(self.__str__()) def __len__(self): # override len(results) return self.n def __str__(self): # override print(results) return self._run(pprint=True) # print results def __repr__(self): return f'YOLO {self.__class__} instance\n' + self.__str__() class Proto(nn.Module): # YOLO mask Proto module for segmentation models def __init__(self, c1, c_=256, c2=32): # ch_in, number of protos, number of masks super().__init__() self.cv1 = Conv(c1, c_, k=3) self.upsample = nn.Upsample(scale_factor=2, mode='nearest') self.cv2 = Conv(c_, c_, k=3) self.cv3 = Conv(c_, c2) def forward(self, x): return self.cv3(self.cv2(self.upsample(self.cv1(x)))) class Classify(nn.Module): # YOLO classification head, i.e. x(b,c1,20,20) to x(b,c2) def __init__(self, c1, c2, k=1, s=1, p=None, g=1): # ch_in, ch_out, kernel, stride, padding, groups super().__init__() c_ = 1280 # efficientnet_b0 size self.conv = Conv(c1, c_, k, s, autopad(k, p), g) self.pool = nn.AdaptiveAvgPool2d(1) # to x(b,c_,1,1) self.drop = nn.Dropout(p=0.0, inplace=True) self.linear = nn.Linear(c_, c2) # to x(b,c2) def forward(self, x): if isinstance(x, list): x = torch.cat(x, 1) return self.linear(self.drop(self.pool(self.conv(x)).flatten(1)))