File size: 5,950 Bytes
fb22435
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import os
import torch
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel
from .configuration_MyResnet import MyResnetConfig

# 设置CUDA异常阻塞,用于调试CUDA相关问题
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

"""

定义自己的模型

"""


# 定义残差块
class Residual(nn.Module):
    def __init__(self, input_channels, num_channels,

                 use_1x1conv=False, strides=1):
        super().__init__()
        # 第一个3x3卷积层
        self.conv1 = nn.Conv2d(input_channels, num_channels,
                               kernel_size=3, padding=1, stride=strides)
        # 第二个3x3卷积层
        self.conv2 = nn.Conv2d(num_channels, num_channels,
                               kernel_size=3, padding=1)
        # 可选的1x1卷积层,用于调整输入的通道数
        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels,
                                   kernel_size=1, stride=strides)
        else:
            self.conv3 = None
        # 批量归一化层
        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        # 第一个卷积 -> 批量归一化 -> ReLU激活
        Y = F.relu(self.bn1(self.conv1(X)))
        # 第二个卷积 -> 批量归一化
        Y = self.bn2(self.conv2(Y))
        # 如果使用1x1卷积,调整输入的通道数
        if self.conv3:
            X = self.conv3(X)
        # 将输入与输出相加
        Y += X
        return F.relu(Y)  # 返回激活后的结果


# 组合多个残差块
def resnet_block(input_channels, num_channels, num_residuals,

                 first_block=False):
    """

    :param first_block: 是否为第一个块,用于确定是否需要1x1卷积

    :param input_channels: 输入通道数

    :param num_channels: 残差块的输出通道数

    :param num_residuals: 残差块的数量

    :return: 组合后的多个残差块

    """
    blk = []
    for i in range(num_residuals):
        # 第一个残差块需要降维
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels,
                                use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk


# 定义残差网络
def net(in_channels, num_channels, num_residuals, num_classes):
    """

    :param in_channels: 输入图像的通道数

    :param num_channels: 第一个卷积层的输出通道数

    :param num_residuals: 每个阶段的残差块数量

    :param num_classes: 分类的数量

    :return: 构建的残差网络模型

    """
    # 首先是一个7x7卷积层,接着是批量归一化、ReLU激活和3x3最大池化
    b1 = nn.Sequential(nn.Conv2d(in_channels, num_channels, kernel_size=7, stride=2, padding=3),
                       nn.BatchNorm2d(64), nn.ReLU(),
                       nn.MaxPool2d(kernel_size=3, stride=2, padding=1))

    # 构建多个残差块
    b2 = nn.Sequential(*resnet_block(64, num_channels, num_residuals[0], first_block=True))
    b3 = nn.Sequential(*resnet_block(num_channels, num_channels * 2, num_residuals[1]))
    b4 = nn.Sequential(*resnet_block(num_channels * 2, num_channels * 4, num_residuals[2]))
    b5 = nn.Sequential(*resnet_block(num_channels * 4, num_channels * 8, num_residuals[3]))

    # 全局平均池化后,连接一个全连接层进行分类
    resnet = nn.Sequential(b1, b2, b3, b4, b5,
                           nn.AdaptiveAvgPool2d((1, 1)),
                           nn.Flatten(), nn.Linear(num_channels * 8, num_classes))
    return resnet


"""

把模型封装成huggingface的模型,

可以使用transformers库进行训练和推理

这里定义了两个模型类:一个用于从一批图像中提取隐藏特征(类似于 BertModel),

另一个适用于图像分类(类似于 BertForSequenceClassification)。

"""


class MyResnetModel(PreTrainedModel):
    config_class = MyResnetConfig  # 指定配置类

    def __init__(self, config):
        super().__init__(config)
        # 根据配置初始化模型
        self.model = net(
            in_channels=config.in_channels,
            num_channels=config.num_channels,
            num_residuals=config.num_residuals,
            num_classes=config.num_classes
        )

    def forward(self, tensor, labels=None):
        return self.model.forward_features(tensor)  # 返回特征


class MyResnetModelForImageClassification(PreTrainedModel):
    config_class = MyResnetConfig  # 指定配置类

    def __init__(self, config):
        super().__init__(config)
        # 根据配置初始化模型
        self.model = net(
            in_channels=config.in_channels,
            num_channels=config.num_channels,
            num_residuals=config.num_residuals,
            num_classes=config.num_classes
        )

    """

    你可以让模型返回任何你想要的内容,

    但是像这样返回一个字典,并在传递标签时包含loss,可以使你的模型能够在 Trainer 类中直接使用。

    只要你计划使用自己的训练循环或其他库进行训练,也可以使用其他输出格式。

    """

    def forward(self, X, y):
        # 前向传播,计算模型输出
        # print(y)
        y_hat = self.model(X)
        if y is not None:
            # 计算损失
            loss = torch.nn.functional.cross_entropy(y_hat, y)
            return {"loss": loss, "logits": y_hat}  # 返回损失和输出
        return {"logits": y_hat}

    def forward_features(self, X):
        # 返回特征
        for layer in self.model:
            X = layer(X)
            print(layer.__class__.__name__, 'output shape:\t', X.shape)