神经网络学习笔记(10) – 迁移学习
迁移学习是指重用预先训练好的神经网络,固定那些低层(靠近输入)的权值,而训练高层(靠近输出)的权值,以解决类似的问题。这意味着无需自己重建网络的结构,并能节省训练的时间。至于何为类似的问题没有精确的说法,如图像识别问题、文本翻译问题可分别自成一类。比如,一个用来翻译英语到法语的模型,可以重用于印度语到西班牙语的翻译模型,尽管印度语到西班牙语的训练数据远不如英语到法语的。
PyTorch预置了一些著名的CNN模型。如Alexnet, VGG, ResNet, Inception等。
迁移学习的4种场景
根据迁移学习的训练集数据的多少,和迁移模型与原模型的应用相似程度等,可分为以下4种场景。
场景1
迁移学习的新数据集较少,与原模型应用相似。此时加载预训练参数,只修改最后的全连接层(如分类问题,要修改最后的类别数)。在Pytorch中,加载模型时pretrained=True,并替换fc层。只训练fc层的weight。
场景2
迁移学习的新数据集较少,与原模型应用不相似。此时加载预训练参数,调整全连接层,固定低层,训练高层。(见下方代码Alternatvie 2)
场景3
迁移学习的新数据集充足,与原模型应用不相似。此时不加载预训练参数。调整全连接层,训练模型的所有参数。在Pytorch中,加载模型时pretrained=False,或默认留空。
场景4
迁移学习的新数据集充足,与原模型应用相似。此时加载预训练参数,调整全连接层,训练所有参数。
下面的例子里,使用了模型resnet18实现图像分类。使用的数据集为CIFAR10。这里替换了最后的全连接层,只开启全连接层的梯度计算,其余均使用预训练的权值配置。因此能进行高效的训练!
在train_model函数中,我们看到了3重循环。最里面的2层循环使用PyTorch的DataLoader进行分批的模型训练和评估。由于训练集使用了DataLoader的shuffle,每次训练数据是随机的,再由num_epochs次的评估结果中找到最优的模型。
import copy
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets,transforms
#根据https://pytorch.org/vision/stable/models.html, PyTorch的预训练模型均对输入的图像有要求。
#1)图像尺寸为(3 x H x W),其中H与W为至少224。
#2)像素值要被转化到[0,1]区间。
#3)采用以下固定的均值和标准差进行标准化。
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
train_transform = transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean = mean, std = std)
])
test_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=mean, std=std)
])
data_dir = 'datasets/cifar10/train'
batch_size = 8
num_workers = 2
trainset = datasets.CIFAR10(root=data_dir,
train=True,
download=True,
transform=train_transform)
testset = datasets.CIFAR10(root=data_dir,
train=False,
download=True,
transform=test_transform)
trainloader = torch.utils.data.DataLoader(trainset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers)
testloader = torch.utils.data.DataLoader(testset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers)
dataloaders = {
'train': trainloader,
'test': testloader
}
dataset_sizes = { 'train': len(trainloader), 'test': len(testloader) } #{'train': 6250, 'test': 1250}
class_names = trainset.classes #['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
#数据加载结束,开始定义训练过程
from torchvision import models
model = models.resnet18(pretrained=True)#加载预训练的模型resnet18
#Alternative:也可从URL加载指定的预训练参数
#torch.utils.model_zoo.load_url("https://download.pytorch.org/models/resnet18-5c106cde.pth", model_dir="./models")
#resnet18.load_state_dict(torch.load("./models/resnet18-5c106cde.pth"))
#先禁掉所有模型参数的梯度计算,固定所有权值。
for param in model.parameters():
param.requires_grad = False
'''
#Alternatvie 2: 只固定前面几层。
count = 0
for child in model.children():
count+=1
if count < 7:
for param in child.parameters():
param.requires_grad = False
'''
#fc即最后的全连接层。保持其输入特征数不变,将输出替换为10个(即class_names的个数)。添加的新层默认是开启梯度计算的。
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)
#GPU加速
device = torch.device("cpu")
if torch.cuda.is_available():
device = torch.device("cuda")
model.to(device)
#分类问题用CrossEntropyLoss作为Loss function
criterion = nn.CrossEntropyLoss()
#使用SGD作为optimizer
optimizer_ft = optim.SGD( model.parameters(), lr=0.001, momentum=0.9)
#使用lr_scheduler衰减optimizer的Learning rate。在下面的例子每过7个epoch,衰减为原来的0.1倍。
exp_lr_scheduler = lr_scheduler.StepLR( optimizer_ft, step_size=7, gamma=0.1)
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
best_acc = 0.0 #在num_epochs次迭代中,找最好的model,best_acc用以计算每次迭代模型的准确率
best_model_wts = copy.deepcopy(model.state_dict())#深copy一个字典表示的模型的副本
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
step = 0
for phase in ['train', 'test']:
if phase == 'train':
scheduler.step()
model.train(True)# Set model to training mode
else:
model.train(False)# Set model to evaluate mode
running_loss = 0.0
running_corrects = 0
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
_, preds = torch.max(outputs.data, 1)#1表示在哪个维度计算最大值。第0维为batch维。第1维为单个样本各类别的概率值。
loss = criterion(outputs, labels)
step += 1
if step % 500 == 0:
print('Epoch: {} Loss: {:.4f}, Step: {}'.format(epoch, loss.item(), step))
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
optimizer.step()
# statistics
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double()/ (dataset_sizes[phase]*batch_size)
print('{} Loss: {:.4f} Acc: {:.4f} '.format(
phase, epoch_loss, epoch_acc))
if phase == 'test' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
print('Training complete')
print('Best test Acc: {:4f}'.format(best_acc))
# load best model weights
model.load_state_dict(best_model_wts)
return model
model = train_model(model, criterion, optimizer_ft,
exp_lr_scheduler)
#测试结果展示
def imshow(inp, title):
inp = inp.cpu().numpy().transpose((1, 2, 0))
inp = std * inp + mean
inp = np.clip(inp, 0, 1)
plt.imshow(inp)
plt.title(title)
plt.pause(5)
with torch.no_grad():
inputs, labels = iter(dataloaders['test']).next()
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
for j in range(len(inputs)):
inp = inputs.data[j]
imshow(inp, 'predicted:' + class_names[preds[j]])