MyResnet_resnet18 / modeling_MyResnet.py
linsa11's picture
Upload model
fb22435 verified
import os
import torch
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel
from .configuration_MyResnet import MyResnetConfig
# 设置CUDA异常阻塞,用于调试CUDA相关问题
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
"""
定义自己的模型
"""
# 定义残差块
class Residual(nn.Module):
def __init__(self, input_channels, num_channels,
use_1x1conv=False, strides=1):
super().__init__()
# 第一个3x3卷积层
self.conv1 = nn.Conv2d(input_channels, num_channels,
kernel_size=3, padding=1, stride=strides)
# 第二个3x3卷积层
self.conv2 = nn.Conv2d(num_channels, num_channels,
kernel_size=3, padding=1)
# 可选的1x1卷积层,用于调整输入的通道数
if use_1x1conv:
self.conv3 = nn.Conv2d(input_channels, num_channels,
kernel_size=1, stride=strides)
else:
self.conv3 = None
# 批量归一化层
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
def forward(self, X):
# 第一个卷积 -> 批量归一化 -> ReLU激活
Y = F.relu(self.bn1(self.conv1(X)))
# 第二个卷积 -> 批量归一化
Y = self.bn2(self.conv2(Y))
# 如果使用1x1卷积,调整输入的通道数
if self.conv3:
X = self.conv3(X)
# 将输入与输出相加
Y += X
return F.relu(Y) # 返回激活后的结果
# 组合多个残差块
def resnet_block(input_channels, num_channels, num_residuals,
first_block=False):
"""
:param first_block: 是否为第一个块,用于确定是否需要1x1卷积
:param input_channels: 输入通道数
:param num_channels: 残差块的输出通道数
:param num_residuals: 残差块的数量
:return: 组合后的多个残差块
"""
blk = []
for i in range(num_residuals):
# 第一个残差块需要降维
if i == 0 and not first_block:
blk.append(Residual(input_channels, num_channels,
use_1x1conv=True, strides=2))
else:
blk.append(Residual(num_channels, num_channels))
return blk
# 定义残差网络
def net(in_channels, num_channels, num_residuals, num_classes):
"""
:param in_channels: 输入图像的通道数
:param num_channels: 第一个卷积层的输出通道数
:param num_residuals: 每个阶段的残差块数量
:param num_classes: 分类的数量
:return: 构建的残差网络模型
"""
# 首先是一个7x7卷积层,接着是批量归一化、ReLU激活和3x3最大池化
b1 = nn.Sequential(nn.Conv2d(in_channels, num_channels, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
# 构建多个残差块
b2 = nn.Sequential(*resnet_block(64, num_channels, num_residuals[0], first_block=True))
b3 = nn.Sequential(*resnet_block(num_channels, num_channels * 2, num_residuals[1]))
b4 = nn.Sequential(*resnet_block(num_channels * 2, num_channels * 4, num_residuals[2]))
b5 = nn.Sequential(*resnet_block(num_channels * 4, num_channels * 8, num_residuals[3]))
# 全局平均池化后,连接一个全连接层进行分类
resnet = nn.Sequential(b1, b2, b3, b4, b5,
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(), nn.Linear(num_channels * 8, num_classes))
return resnet
"""
把模型封装成huggingface的模型,
可以使用transformers库进行训练和推理
这里定义了两个模型类:一个用于从一批图像中提取隐藏特征(类似于 BertModel),
另一个适用于图像分类(类似于 BertForSequenceClassification)。
"""
class MyResnetModel(PreTrainedModel):
config_class = MyResnetConfig # 指定配置类
def __init__(self, config):
super().__init__(config)
# 根据配置初始化模型
self.model = net(
in_channels=config.in_channels,
num_channels=config.num_channels,
num_residuals=config.num_residuals,
num_classes=config.num_classes
)
def forward(self, tensor, labels=None):
return self.model.forward_features(tensor) # 返回特征
class MyResnetModelForImageClassification(PreTrainedModel):
config_class = MyResnetConfig # 指定配置类
def __init__(self, config):
super().__init__(config)
# 根据配置初始化模型
self.model = net(
in_channels=config.in_channels,
num_channels=config.num_channels,
num_residuals=config.num_residuals,
num_classes=config.num_classes
)
"""
你可以让模型返回任何你想要的内容,
但是像这样返回一个字典,并在传递标签时包含loss,可以使你的模型能够在 Trainer 类中直接使用。
只要你计划使用自己的训练循环或其他库进行训练,也可以使用其他输出格式。
"""
def forward(self, X, y):
# 前向传播,计算模型输出
# print(y)
y_hat = self.model(X)
if y is not None:
# 计算损失
loss = torch.nn.functional.cross_entropy(y_hat, y)
return {"loss": loss, "logits": y_hat} # 返回损失和输出
return {"logits": y_hat}
def forward_features(self, X):
# 返回特征
for layer in self.model:
X = layer(X)
print(layer.__class__.__name__, 'output shape:\t', X.shape)