神经网络学习笔记(10) – 迁移学习

迁移学习是指重用预先训练好的神经网络,固定那些低层(靠近输入)的权值,而训练高层(靠近输出)的权值,以解决类似的问题。这意味着无需自己重建网络的结构,并能节省训练的时间。至于何为类似的问题没有精确的说法,如图像识别问题、文本翻译问题可分别自成一类。比如,一个用来翻译英语到法语的模型,可以重用于印度语到西班牙语的翻译模型,尽管印度语到西班牙语的训练数据远不如英语到法语的。

PyTorch预置了一些著名的CNN模型。如Alexnet, VGG, ResNet, Inception等。

迁移学习的4种场景

根据迁移学习的训练集数据的多少,和迁移模型与原模型的应用相似程度等,可分为以下4种场景。

场景1

迁移学习的新数据集较少,与原模型应用相似。此时加载预训练参数,只修改最后的全连接层(如分类问题,要修改最后的类别数)。在Pytorch中,加载模型时pretrained=True,并替换fc层。只训练fc层的weight。

场景2

迁移学习的新数据集较少,与原模型应用不相似。此时加载预训练参数,调整全连接层,固定低层,训练高层。(见下方代码Alternatvie 2)

场景3

迁移学习的新数据集充足,与原模型应用不相似。此时不加载预训练参数。调整全连接层,训练模型的所有参数。在Pytorch中,加载模型时pretrained=False,或默认留空。

场景4

迁移学习的新数据集充足,与原模型应用相似。此时加载预训练参数,调整全连接层,训练所有参数。

下面的例子里,使用了模型resnet18实现图像分类。使用的数据集为CIFAR10。这里替换了最后的全连接层,只开启全连接层的梯度计算,其余均使用预训练的权值配置。因此能进行高效的训练!

在train_model函数中,我们看到了3重循环。最里面的2层循环使用PyTorch的DataLoader进行分批的模型训练和评估。由于训练集使用了DataLoader的shuffle,每次训练数据是随机的,再由num_epochs次的评估结果中找到最优的模型。


import copy
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler

import torchvision
from torchvision import datasets,transforms 

#根据https://pytorch.org/vision/stable/models.html, PyTorch的预训练模型均对输入的图像有要求。
#1)图像尺寸为(3 x H x W),其中H与W为至少224。
#2)像素值要被转化到[0,1]区间。
#3)采用以下固定的均值和标准差进行标准化。
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean = mean, std = std)
])
test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std)
])

data_dir = 'datasets/cifar10/train'
batch_size = 8
num_workers = 2
trainset = datasets.CIFAR10(root=data_dir,
                            train=True,
                            download=True,
                            transform=train_transform)
testset = datasets.CIFAR10(root=data_dir,
                           train=False,
                           download=True,
                           transform=test_transform)
trainloader = torch.utils.data.DataLoader(trainset,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=num_workers)
testloader = torch.utils.data.DataLoader(testset,
                                         batch_size=batch_size,
                                         shuffle=False,
                                         num_workers=num_workers)
dataloaders = {
    'train': trainloader,
    'test': testloader
}
dataset_sizes = { 'train': len(trainloader), 'test': len(testloader) } #{'train': 6250, 'test': 1250}
class_names = trainset.classes #['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

#数据加载结束,开始定义训练过程
from torchvision import models
model = models.resnet18(pretrained=True)#加载预训练的模型resnet18

#Alternative:也可从URL加载指定的预训练参数
#torch.utils.model_zoo.load_url("https://download.pytorch.org/models/resnet18-5c106cde.pth", model_dir="./models")
#resnet18.load_state_dict(torch.load("./models/resnet18-5c106cde.pth"))

#先禁掉所有模型参数的梯度计算,固定所有权值。
for param in model.parameters():
    param.requires_grad = False
'''
#Alternatvie 2: 只固定前面几层。
count = 0 
for child in model.children():
    count+=1
    if count < 7:
        for param in child.parameters():
            param.requires_grad = False
'''
#fc即最后的全连接层。保持其输入特征数不变,将输出替换为10个(即class_names的个数)。添加的新层默认是开启梯度计算的。
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 10)

#GPU加速
device = torch.device("cpu")
if torch.cuda.is_available():
    device = torch.device("cuda")
model.to(device)

#分类问题用CrossEntropyLoss作为Loss function
criterion        = nn.CrossEntropyLoss()

#使用SGD作为optimizer
optimizer_ft     = optim.SGD( model.parameters(), lr=0.001, momentum=0.9)

#使用lr_scheduler衰减optimizer的Learning rate。在下面的例子每过7个epoch,衰减为原来的0.1倍。
exp_lr_scheduler = lr_scheduler.StepLR( optimizer_ft, step_size=7, gamma=0.1)

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    
    best_acc = 0.0 #在num_epochs次迭代中,找最好的model,best_acc用以计算每次迭代模型的准确率
    best_model_wts = copy.deepcopy(model.state_dict())#深copy一个字典表示的模型的副本
    

    for epoch in range(num_epochs):
        
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)
        step = 0
        
        for phase in ['train', 'test']:
            
            if phase == 'train':
                scheduler.step()
                model.train(True)# Set model to training mode  
            else:
                model.train(False)# Set model to evaluate mode  

            running_loss = 0.0
            running_corrects = 0


            for inputs, labels in dataloaders[phase]:

                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()

                outputs = model(inputs)
                _, preds = torch.max(outputs.data, 1)#1表示在哪个维度计算最大值。第0维为batch维。第1维为单个样本各类别的概率值。
                loss = criterion(outputs, labels)
                
                step += 1
                if step % 500 == 0:
                    print('Epoch: {} Loss: {:.4f},  Step: {}'.format(epoch, loss.item(), step))
                
                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double()/ (dataset_sizes[phase]*batch_size)

            print('{} Loss: {:.4f} Acc: {:.4f} '.format(
                phase, epoch_loss, epoch_acc))

            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

    
    print('Training complete')
    print('Best test Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    
    return model

model = train_model(model, criterion, optimizer_ft,
                         exp_lr_scheduler)

#测试结果展示
def imshow(inp, title):
    inp = inp.cpu().numpy().transpose((1, 2, 0))
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    
    plt.imshow(inp)
    plt.title(title)
    plt.pause(5)
with torch.no_grad():
    
    inputs, labels = iter(dataloaders['test']).next()
    inputs, labels = inputs.to(device), labels.to(device)
    
    outputs = model(inputs)
    _, preds = torch.max(outputs, 1)
    
    for j in range(len(inputs)):
        inp = inputs.data[j]
        imshow(inp, 'predicted:' + class_names[preds[j]])