CNN与计算机视觉实战
计算机视觉是深度学习最成功的应用领域之一。从图像分类到目标检测,从语义分割到人脸识别,卷积神经网络(CNN)几乎统治了整个视觉领域。这篇文章会带你从卷积操作的基本原理讲起,遍历经典CNN架构的演进,最后用PyTorch实现ResNet并训练在CIFAR-10上。
卷积层:理解卷积操作
卷积层是CNN的核心。不同于全连接层每个神经元与所有输入相连,卷积层只和局部区域连接。这种局部连接和权重共享的设计,让CNN能够高效处理图像这种高维数据。
卷积操作到底在做什么
卷积操作本质上是一个滑动窗口滤波器。假设你有一个5×5的输入图像和一个3×3的卷积核(也叫滤波器)。卷积核会在图像上滑动,每到一个位置就做一次”局部区域与卷积核的对应元素相乘再求和”的操作。
你可能会问:为什么要这样做?答案是特征提取。不同的卷积核能检测不同的视觉特征:边缘、纹理、角点、形状等。比如一个检测水平边缘的卷积核,在遇到水平边缘时会产生高响应,遇到垂直边缘则响应低。
卷积核的值是模型要学习的参数。通过反向传播,CNN自动学习出对当前任务最有用的特征检测器。这就是深度学习”端到端”学习魅力的体现——你不需要手动设计特征,模型自己学出来。
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
# 手动实现卷积操作,理解其原理
def manual_conv2d(input_tensor, kernel, stride=1, padding=0):
"""
手动实现2D卷积
input_tensor: (batch, channels, height, width)
kernel: (out_channels, in_channels, kH, kW)
"""
if padding > 0:
input_tensor = F.pad(input_tensor, (padding,)*4)
batch, in_channels, H, W = input_tensor.shape
out_channels, _, kH, kW = kernel.shape
out_H = (H - kH) // stride + 1
out_W = (W - kW) // stride + 1
output = torch.zeros(batch, out_channels, out_H, out_W)
for b in range(batch):
for oc in range(out_channels):
for i in range(out_H):
for j in range(out_W):
h_start = i * stride
w_start = j * stride
# 提取局部区域
region = input_tensor[b, :, h_start:h_start+kH, w_start:w_start+kW]
# 逐通道卷积并求和
output[b, oc, i, j] = (region * kernel[oc]).sum()
return output
# 测试卷积操作
input_tensor = torch.randn(1, 1, 5, 5)
kernel = torch.randn(1, 1, 3, 3)
print("输入:")
print(input_tensor.squeeze().numpy())
print("\n卷积核:")
print(kernel.squeeze().numpy())
# PyTorch的卷积验证
conv = nn.Conv2d(1, 1, kernel_size=3, bias=False)
conv.weight.data = kernel
output = conv(input_tensor)
print("\n输出 (PyTorch Conv2d):")
print(output.squeeze().detach().numpy())步长、填充与感受野
三个关键参数控制卷积的行为:步长(stride)、填充(padding)和感受野(receptive field)。
步长控制卷积核滑动的步进大小。stride=1时每步移动1个像素,stride=2时跳过一个像素。步长大于1可以降低特征图尺寸,实现下采样。
填充是在输入边界周围添加额外的像素。常用的”same”填充保持输出尺寸与输入相同。无填充(valid)会使输出变小。padding=1常用来补偿3×3卷积导致的尺寸缩小(每层减2)。
感受野是指输出特征图上一个像素对应输入图像的区域大小。浅层的卷积核感受野小,只能看到局部细节;深层的卷积核感受野大,能看到更大的区域。堆叠多层小卷积核可以达到和大卷积核相同的感受野,但参数量更少、非线性更多。
# 对比不同步长和填充的效果
print("=" * 60)
print("步长和填充对特征图尺寸的影响")
print("=" * 60)
# 输入7x7图像,3x3卷积
input_size = 7
kernel_size = 3
for stride in [1, 2]:
for padding in [0, 1]:
output_size = (input_size + 2*padding - kernel_size) // stride + 1
print(f"输入={input_size}×{input_size}, stride={stride}, padding={padding} → 输出={output_size}×{output_size}")
# 感受野计算
print("\n" + "=" * 60)
print("感受野计算")
print("=" * 60)
def calc_receptive_field(layers, kernel_size=3):
"""计算多层卷积后的感受野"""
rf = 1
for i, layer in enumerate(layers):
rf = rf + (kernel_size - 1) * layer
print(f"Layer {i+1}: RF = {rf}")
return rf
# 3层3x3卷积的感受野
print("\n3层3x3卷积的感受野:")
calc_receptive_field([1, 1, 1])
# 等效的单层7x7卷积
print("\n等效单层7x7卷积:")
print("Layer 1: RF = 7")多通道卷积与分组卷积
真实世界的图像通常是彩色的,有RGB三个通道。每个卷积核也要有3个通道,分别和RGB做卷积,然后相加得到一个输出通道。
一个卷积层可以有多个卷积核,每个卷积核产生一个输出通道。所以卷积层的参数形状是 (out_channels, in_channels, kH, kW)。
分组卷积(Grouped Convolution)是一种节省计算量的技巧。它把输入通道分成G组,每组独立做卷积,最后再合并。这最早在AlexNet中用来解决显存问题,后来ShuffleNet将其发扬光大。
深度可分离卷积(Depthwise Separable Convolution)是分组卷积的极端情况:每组只有一个通道。它先做逐通道卷积(Depthwise),再做一个1×1卷积融合通道信息(Pointwise)。MobileNet系列就是靠这个实现了极轻量的模型。
# 多通道卷积演示
batch, in_channels, height, width = 2, 3, 32, 32
out_channels = 16
kernel_size = 3
# 标准卷积
conv_standard = nn.Conv2d(in_channels, out_channels, kernel_size)
print(f"标准卷积参数: {sum(p.numel() for p in conv_standard.parameters())}")
# 分组卷积 (groups=2)
conv_grouped = nn.Conv2d(in_channels, out_channels, kernel_size, groups=2)
print(f"分组卷积 (g=2) 参数: {sum(p.numel() for p in conv_grouped.parameters())}")
# 深度可分离卷积
conv_dw = nn.Conv2d(in_channels, in_channels, kernel_size, groups=in_channels)
conv_pw = nn.Conv2d(in_channels, out_channels, 1)
depthwise_separable = nn.Sequential(conv_dw, conv_pw)
total_params = sum(p.numel() for p in conv_dw.parameters()) + sum(p.numel() for p in conv_pw.parameters())
print(f"深度可分离卷积参数: {total_params}")
print(f"\n参数节省比例: {(1 - total_params/sum(p.numel() for p in conv_standard.parameters()))*100:.1f}%")经典CNN架构演进
理解CNN架构的演进历史,不仅能帮助你更好地设计模型,还能从前辈的创新中汲取灵感。
LeNet到AlexNet:深度学习的觉醒
1998年LeCun提出的LeNet是CNN的开山之作,但受限于当时的计算能力和数据量,它一直默默无闻。直到2012年AlexNet在ImageNet竞赛中以压倒性优势夺冠,深度学习才开始席卷计算机视觉领域。
AlexNet相比LeNet的主要改进:
- 网络更深(8层 vs 5层)
- 使用ReLU激活函数(比tanh快很多)
- Dropout正则化(防止过拟合)
- GPU并行训练
- 数据增强(随机裁剪、水平翻转)
# LeNet-5 实现
class LeNet(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(3, 6, kernel_size=5) # 32x32 → 28x28
self.pool1 = nn.AvgPool2d(kernel_size=2, stride=2) # 28x28 → 14x14
self.conv2 = nn.Conv2d(6, 16, kernel_size=5) # 14x14 → 10x10
self.pool2 = nn.AvgPool2d(kernel_size=2, stride=2) # 10x10 → 5x5
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, num_classes)
def forward(self, x):
x = self.pool1(torch.relu(self.conv1(x)))
x = self.pool2(torch.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
# AlexNet 实现
class AlexNet(nn.Module):
def __init__(self, num_classes=1000):
super().__init__()
self.features = nn.Sequential(
# 224x224x3 → 55x55x96
nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2), # 55x55 → 27x27
# 27x27x256 → 13x13x256
nn.Conv2d(96, 256, kernel_size=5, padding=2),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2), # 27x27 → 13x13
# 13x13x384 → 13x13x384
nn.Conv2d(256, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
# 13x13x384 → 13x13x384
nn.Conv2d(384, 384, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
# 13x13x384 → 13x13x256
nn.Conv2d(384, 256, kernel_size=3, padding=1),
nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=3, stride=2), # 13x13 → 6x6
)
self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
self.classifier = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(256 * 6 * 6, 4096),
nn.ReLU(inplace=True),
nn.Dropout(0.5),
nn.Linear(4096, 4096),
nn.ReLU(inplace=True),
nn.Linear(4096, num_classes),
)
def forward(self, x):
x = self.features(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.classifier(x)
return x
print("LeNet-5 参数量:", sum(p.numel() for p in LeNet().parameters()))
print("AlexNet 参数量:", sum(p.numel() for p in AlexNet().parameters()))VGG:更深的网络,更小的卷积核
2014年VGGNet提出了一个简单但深刻的观点:用多层小卷积核代替大卷积核。两个3×3卷积堆叠的有效感受野是5×5,但参数量只有25 vs 18(假设单通道)。三个3×3堆叠相当于7×7感受野。
VGG-16和VGG-19成为后续很多工作的backbone。但VGG的缺点也很明显:参数量大(140M+)、训练慢、 inference慢。
残差连接:越过了深度的墙
ResNet(Residual Network)是2015年何恺明团队的杰作,解决了”网络越深效果越差”的问题。他们提出了残差连接(skip connection):不是学习x到H(x)的映射,而是学习残差F(x) = H(x) - x。
为什么残差学习更容易?因为如果 identity mapping 已经是最优解,把F(x)push到0比学习一个新的映射容易得多。残差连接让信息可以直接流动,缓解了梯度消失问题,使得训练1000+层的网络成为可能。
# ResNet的核心:残差块
class ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
# 残差连接
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
# 完整的ResNet-18实现
class ResNet18(nn.Module):
def __init__(self, num_classes=1000):
super().__init__()
self.in_channels = 64
self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = self._make_layer(64, 2, stride=1)
self.layer2 = self._make_layer(128, 2, stride=2)
self.layer3 = self._make_layer(256, 2, stride=2)
self.layer4 = self._make_layer(512, 2, stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512, num_classes)
# 权重初始化
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _make_layer(self, out_channels, num_blocks, stride):
downsample = None
if stride != 1 or self.in_channels != out_channels:
downsample = nn.Sequential(
nn.Conv2d(self.in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels)
)
layers = []
layers.append(ResidualBlock(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels
for _ in range(1, num_blocks):
layers.append(ResidualBlock(out_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
print("ResNet-18 参数量:", sum(p.numel() for p in ResNet18().parameters()))批量归一化BatchNorm
BatchNorm是深度学习中最重要的技术之一。它在每个batch上对神经元输出做归一化,使得每层的输入保持稳定的分布,大大加速了训练。
BatchNorm的工作原理
BatchNorm在训练时的行为:
- 计算当前batch的均值μ_B和方差σ²_B
- 归一化:(x - μ_B) / √(σ²_B + ε)
- 线性变换:γ * x_norm + β
γ和β是可学习的参数,让模型自己决定是否需要归一化。
BatchNorm在推理时使用全局统计量(移动平均的均值和方差),而不是当前batch的。这保证了inference的确定性。
BatchNorm在网络中的位置
关于BatchNorm的位置(BN before or after activation),有两种主流观点:
原论文方案(BN before ReLU):归一化 → 线性变换(含γβ)→ ReLU。这是原始ResNet的做法。
实践方案(BN after ReLU):卷积 → ReLU → BN。这种做法在很多现代架构中使用。
两种方案各有道理,实际效果差别不大,按照主流实现来用就好。
# BatchNorm演示
bn = nn.BatchNorm2d(num_features=64)
# 模拟训练模式
bn.train()
print("训练模式下的BatchNorm:")
for i in range(3):
x = torch.randn(32, 64, 32, 32) # batch=32, channels=64
y = bn(x)
print(f" Batch {i+1}: mean={y.mean().item():.4f}, std={y.std().item():.4f}")
# 模拟推理模式
bn.eval()
print("\n推理模式下的BatchNorm:")
with torch.no_grad():
for i in range(3):
x = torch.randn(32, 64, 32, 32)
y = bn(x)
print(f" Batch {i+1}: mean={y.mean().item():.4f}, std={y.std().item():.4f}")
print(f"\n全局统计量 - running_mean前4个: {bn.running_mean[:4]}")
print(f"全局统计量 - running_var前4个: {bn.running_var[:4]}")数据增强:扩充你的数据集
数据增强是防止过拟合、提升模型泛化能力的关键技术。它通过对训练数据做随机变换,人为增加数据的多样性。
经典数据增强
翻转(水平翻转为主,垂直翻转要谨慎)、旋转(一般±15度)、裁剪(随机裁剪到原图的0.8-1.0)、缩放、色彩抖动(亮度、对比度、饱和度、色调)、添加噪声。
from torchvision import transforms
from PIL import Image
import torchvision.datasets as datasets
# 定义训练和测试的数据增强
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224, scale=(0.08, 1.0)), # 随机裁剪并缩放到224
transforms.RandomHorizontalFlip(p=0.5), # 50%概率水平翻转
transforms.RandomRotation(15), # ±15度随机旋转
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1), # 色彩抖动
transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)), # 随机仿射变换
transforms.ToTensor(), # 转换为张量
transforms.Normalize(mean=[0.485, 0.456, 0.406], # ImageNet标准化
std=[0.229, 0.224, 0.225]),
transforms.RandomErasing(p=0.1, scale=(0.02, 0.2)), # 随机擦除(SimCLR使用)
])
test_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 演示数据增强效果
def visualize_augmentations(image_path=None):
"""可视化数据增强效果"""
# 加载一张示例图片
if image_path:
img = Image.open(image_path).convert('RGB')
else:
# 创建一张测试图片
img = Image.new('RGB', (224, 224), color=(100, 150, 200))
from PIL import ImageDraw, ImageFont
draw = ImageDraw.Draw(img)
draw.text((80, 90), "Test Image", fill='white')
fig, axes = plt.subplots(2, 5, figsize=(15, 6))
axes = axes.flatten()
for i in range(10):
if i == 0:
axes[i].imshow(img)
axes[i].set_title("Original")
else:
# 应用随机变换
img_aug = train_transform(img)
# 反标准化用于显示
mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
img_display = img_aug * std + mean
img_display = torch.clamp(img_display, 0, 1)
axes[i].imshow(img_display.permute(1, 2, 0).numpy())
axes[i].set_title(f"Augmented {i}")
axes[i].axis('off')
plt.tight_layout()
plt.savefig('data_augmentation_examples.png', dpi=150)
# visualize_augmentations()MixUp与CutMix
MixUp和CutMix是更高级的数据增强方法,在分类和检测任务上效果显著。
MixUp把两张图片按比例混合,同时混合它们的标签。比如两张图片按0.7:0.3混合,标签也按同样比例混合。这强迫模型学习更平滑的决策边界。
CutMix把图片的一块区域裁剪下来贴到另一张图片上,标签按面积比例混合。好处是模型能看到完整的物体,而不是被混合后的模糊图片。
def mixup_data(x, y, alpha=0.2):
"""MixUp数据增强"""
if alpha > 0:
lam = np.random.beta(alpha, alpha)
else:
lam = 1
batch_size = x.size(0)
index = torch.randperm(batch_size).to(x.device)
mixed_x = lam * x + (1 - lam) * x[index, :]
y_a, y_b = y, y[index]
return mixed_x, y_a, y_b, lam
def cutmix_data(x, y, alpha=1.0):
"""CutMix数据增强"""
lam = np.random.beta(alpha, alpha)
batch_size = x.size(0)
index = torch.randperm(batch_size).to(x.device)
W = x.size(2)
H = x.size(3)
cut_rat = np.sqrt(1. - lam)
cut_w = int(W * cut_rat)
cut_h = int(H * cut_rat)
cx = np.random.randint(W)
cy = np.random.randint(H)
bbx1 = np.clip(cx - cut_w // 2, 0, W)
bby1 = np.clip(cy - cut_h // 2, 0, H)
bbx2 = np.clip(cx + cut_w // 2, 0, W)
bby2 = np.clip(cy + cut_h // 2, 0, H)
x[:, :, bbx1:bbx2, bby1:bby2] = x[index, :, bbx1:bbx2, bby1:bby2]
lam = 1 - ((bbx2 - bbx1) * (bby2 - bby1) / (W * H))
y_a, y_b = y, y[index]
return x, y_a, y_b, lam
def mixup_criterion(criterion, pred, y_a, y_b, lam):
"""MixUp/CutMix的损失函数"""
return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)
# 可视化MixUp和CutMix
def visualize_mixup_cutmix():
"""可视化MixUp和CutMix效果"""
# 创建两张测试图片
img1 = Image.new('RGB', (224, 224), color=(255, 100, 100)) # 红色
img2 = Image.new('RGB', (224, 224), color=(100, 100, 255)) # 蓝色
# 转换为张量
to_tensor = transforms.ToTensor()
img1_tensor = to_tensor(img1).unsqueeze(0)
img2_tensor = to_tensor(img2).unsqueeze(0)
fig, axes = plt.subplots(2, 3, figsize=(12, 8))
# 原始图片
axes[0, 0].imshow(img1)
axes[0, 0].set_title('Image 1')
axes[0, 0].axis('off')
axes[0, 1].imshow(img2)
axes[0, 1].set_title('Image 2')
axes[0, 1].axis('off')
# MixUp效果 (lambda=0.5)
lam = 0.5
mixed = lam * img1_tensor + (1 - lam) * img2_tensor
mixed_img = transforms.ToPILImage()(mixed.squeeze().clamp(0, 1))
axes[0, 2].imshow(mixed_img)
axes[0, 2].set_title(f'MixUp (λ={lam})')
axes[0, 2].axis('off')
# CutMix效果
W, H = 224, 224
cut_w, cut_h = int(W * np.sqrt(0.6)), int(H * np.sqrt(0.6))
cx, cy = 112, 112
bbx1, bby1 = cx - cut_w // 2, cy - cut_h // 2
bbx2, bby2 = cx + cut_w // 2, cy + cut_h // 2
cutmix = img1_tensor.clone()
cutmix[:, :, bbx1:bbx2, bby1:bby2] = img2_tensor[:, :, bbx1:bbx2, bby1:bby2]
cutmix_img = transforms.ToPILImage()(cutmix.squeeze().clamp(0, 1))
axes[1, 0].imshow(cutmix_img)
axes[1, 0].set_title(f'CutMix (λ={1-(bbx2-bbx1)*(bby2-bby1)/(W*H):.2f})')
axes[1, 0].axis('off')
# 更多的MixUp例子
for i, lam in enumerate([0.3, 0.7]):
mixed = lam * img1_tensor + (1 - lam) * img2_tensor
mixed_img = transforms.ToPILImage()(mixed.squeeze().clamp(0, 1))
axes[1, i+1].imshow(mixed_img)
axes[1, i+1].set_title(f'MixUp (λ={lam})')
axes[1, i+1].axis('off')
plt.tight_layout()
plt.savefig('mixup_cutmix_examples.png', dpi=150)
visualize_mixup_cutmix()实战:ResNet在CIFAR-10上训练
现在来一个完整的实战:用PyTorch实现ResNet并在CIFAR-10上训练。
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import torchvision.models as models
import time
import copy
# 设置随机种子保证可复现性
def set_seed(seed=42):
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
set_seed(42)
# 检查GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# CIFAR-10数据集准备
# CIFAR-10: 32x32 RGB图像,10个类别
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
trainloader = DataLoader(trainset, batch_size=128, shuffle=True, num_workers=4, pin_memory=True)
testloader = DataLoader(testset, batch_size=128, shuffle=False, num_workers=4, pin_memory=True)
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
print(f"训练集大小: {len(trainset)}")
print(f"测试集大小: {len(testset)}")
# 针对CIFAR-10优化的ResNet (ResNet-18变体,输入32x32)
class BasicBlock(nn.Module):
expansion = 1
def __init__(self, in_channels, out_channels, stride=1, downsample=None):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3,
stride=stride, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3,
stride=1, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.downsample = downsample
def forward(self, x):
identity = x
out = F.relu(self.bn1(self.conv1(x)))
out = self.bn2(self.conv2(out))
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = F.relu(out)
return out
class ResNetCIFAR(nn.Module):
"""适用于CIFAR-32x32输入的ResNet"""
def __init__(self, block, num_blocks, num_classes=10):
super().__init__()
self.in_channels = 64
# CIFAR-10使用的初始化:3个卷积层而不是ResNet原版的7x7
self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(64)
# 不用maxpool,直接跟上residual blocks
self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
self.fc = nn.Linear(512 * block.expansion, num_classes)
# 权重初始化
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm2d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _make_layer(self, block, out_channels, num_blocks, stride):
downsample = None
if stride != 1 or self.in_channels != out_channels * block.expansion:
downsample = nn.Sequential(
nn.Conv2d(self.in_channels, out_channels * block.expansion,
kernel_size=1, stride=stride, bias=False),
nn.BatchNorm2d(out_channels * block.expansion)
)
layers = []
layers.append(block(self.in_channels, out_channels, stride, downsample))
self.in_channels = out_channels * block.expansion
for _ in range(1, num_blocks):
layers.append(block(self.in_channels, out_channels))
return nn.Sequential(*layers)
def forward(self, x):
x = F.relu(self.bn1(self.conv1(x)))
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
def ResNet18CIFAR():
return ResNetCIFAR(BasicBlock, [2, 2, 2, 2])
# 创建模型
model = ResNet18CIFAR().to(device)
print(f"\nResNet-18 for CIFAR-10 参数量: {sum(p.numel() for p in model.parameters()):,}")
# 训练函数
def train_epoch(model, dataloader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in dataloader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, targets)
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return running_loss / total, correct / total
def evaluate(model, dataloader, device):
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in dataloader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
return correct / total
# 训练配置
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[82, 123], gamma=0.1)
# 训练循环
num_epochs = 150
best_acc = 0.0
history = {'train_loss': [], 'train_acc': [], 'test_acc': []}
print("\n开始训练...")
start_time = time.time()
for epoch in range(num_epochs):
train_loss, train_acc = train_epoch(model, trainloader, criterion, optimizer, device)
test_acc = evaluate(model, testloader, device)
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['test_acc'].append(test_acc)
scheduler.step()
if test_acc > best_acc:
best_acc = test_acc
best_model_state = copy.deepcopy(model.state_dict())
if (epoch + 1) % 10 == 0 or epoch == 0:
print(f"Epoch [{epoch+1:3d}/{num_epochs}] "
f"Loss: {train_loss:.4f} "
f"Train Acc: {train_acc:.4f} "
f"Test Acc: {test_acc:.4f} "
f"LR: {scheduler.get_last_lr()[0]:.6f}")
total_time = time.time() - start_time
print(f"\n训练完成! 耗时: {total_time/60:.1f}分钟")
print(f"最佳测试准确率: {best_acc:.4f}")
# 保存最佳模型
torch.save(best_model_state, 'resnet18_cifar10.pth')
print("模型已保存到 resnet18_cifar10.pth")
# 绘制训练曲线
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].plot(history['train_loss'], label='Train Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[1].plot(history['train_acc'], label='Train Acc')
axes[1].plot(history['test_acc'], label='Test Acc')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy')
axes[1].set_title('Training vs Test Accuracy')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('training_curves.png', dpi=150)迁移学习:从预训练模型出发
从头训练大型CNN需要海量数据和计算资源。迁移学习让你站在巨人的肩膀上——使用在ImageNet等大数据集上预训练的模型,只需微调就能适应新任务。
两种迁移策略
特征提取(Feature Extraction):冻结预训练模型的大部分层,只训练最后几层(通常是分类头)。适合小数据集、计算资源有限的场景。预训练模型充当强大的特征提取器。
微调(Fine-tuning):解冻部分或全部层,用较小的学习率训练整个网络。适合有一定数据量的场景,让模型学习新任务的特定特征。
# 迁移学习实战:使用预训练的ResNet-18
print("=" * 60)
print("迁移学习实战:ImageNet预训练模型")
print("=" * 60)
# 加载预训练的ResNet-18
model_pretrained = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
# 特征提取模式:冻结所有层
for param in model_pretrained.parameters():
param.requires_grad = False
# 替换最后的分类层
num_features = model_pretrained.fc.in_features
model_pretrained.fc = nn.Linear(num_features, 10) # CIFAR-10有10类
# 只有fc层的参数是可训练的
trainable_params = sum(p.numel() for p in model_pretrained.parameters() if p.requires_grad)
total_params = sum(p.numel() for p in model_pretrained.parameters())
print(f"特征提取模式 - 可训练参数: {trainable_params:,} / {total_params:,} ({trainable_params/total_params*100:.2f}%)")
# 微调模式:解冻最后几层
for param in model_pretrained.layer4.parameters():
param.requires_grad = True
# 使用更小的学习率
optimizer_ft = optim.SGD([
{'params': model_pretrained.fc.parameters(), 'lr': 0.1},
{'params': model_pretrained.layer4.parameters(), 'lr': 0.01},
], momentum=0.9, weight_decay=5e-4)
# 学习率调度器 - 恢复预训练层用更小的学习率
def train_model(model, trainloader, testloader, optimizer, scheduler, num_epochs=20):
best_acc = 0
for epoch in range(num_epochs):
# 训练
model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, targets in trainloader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = F.cross_entropy(outputs, targets)
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
total += targets.size(0)
correct += predicted.eq(targets).sum().item()
scheduler.step()
# 评估
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
for inputs, targets in testloader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = model(inputs)
_, predicted = outputs.max(1)
test_total += targets.size(0)
test_correct += predicted.eq(targets).sum().item()
train_acc = correct / total
test_acc = test_correct / test_total
if test_acc > best_acc:
best_acc = test_acc
if (epoch + 1) % 5 == 0:
print(f"Epoch {epoch+1}: Train Acc={train_acc:.4f}, Test Acc={test_acc:.4f}")
return best_acc
# 训练迁移学习模型
model_pretrained = model_pretrained.to(device)
optimizer_ft = optim.SGD(filter(lambda p: p.requires_grad, model_pretrained.parameters()),
lr=0.01, momentum=0.9, weight_decay=5e-4)
scheduler_ft = optim.lr_scheduler.StepLR(optimizer_ft, step_size=5, gamma=0.1)
print("\n训练特征提取模型...")
best_acc_ft = train_model(model_pretrained, trainloader, testloader, optimizer_ft, scheduler_ft, num_epochs=30)
print(f"特征提取最佳准确率: {best_acc_ft:.4f}")
# 对比从头训练 vs 迁移学习
print("\n" + "=" * 60)
print("对比:从头训练 vs 迁移学习")
print("=" * 60)
print(f"从头训练 (150 epochs): 最佳准确率 ~{best_acc:.4f}")
print(f"迁移学习 (30 epochs): 最佳准确率 {best_acc_ft:.4f}")
print(f"迁移学习只用了1/5的时间就达到了相近甚至更好的效果!")总结
这篇文章从卷积操作的基本原理讲起,涵盖了CNN的核心知识点。
卷积操作是CNN的基础,理解步长、填充、感受野这些概念对设计和调试模型至关重要。经典架构的演进展示了深度学习研究的脉络:从LeNet到AlexNet唤醒了这个领域,VGG证明了深度的力量,ResNet用残差连接突破了深度的瓶颈。
BatchNorm、Dropout、数据增强这些技术是训练稳定模型的必备。BatchNorm通过归一化稳定了每层的输入分布,Dropout防止过拟合,数据增强扩充了训练数据的多样性。MixUp和CutMix是更高级的增强方法,能显著提升模型泛化能力。
迁移学习是现代计算机视觉的基石。站在ImageNet预训练模型的肩膀上,你只需要很少的数据和计算资源就能获得很好的效果。特征提取适合小数据场景,微调适合有一定数据量的场景。
最后提醒一句:深度学习是实践性很强的领域,光看不动手是不行的。找一些开源数据集,自己跑一跑代码,改一改参数,感受一下不同选择带来的效果差异。祝你玩得开心!
本文为深度学习实战指南系列文章,主要涵盖CNN与计算机视觉的核心知识点和实践技巧。