keras や pytorch などのパッケージを使用すると、ニューラルネットワークや畳み込み演算などを容易に計算できるようになる。そのため、画像分類を行うための予測器を構築することが容易になってきた。このページではユーザー数が多く、情報量の多い pytorch を使用して物体分類用のモデルの構築方法を示していく。
この項目では、pytorch を使用して非常に単純なニューラルネットワークを構築して、画像分類を行う方法を示す。この項目で示した例は、pytorch の training a classifier ページを参照して作成した。まず、この項目で使うモジュールなどをインポートする。
import torch
import torchvision
import torchvision.transforms as transforms
# パソコンに OpenMP ランタイムが複数ある場合、異常終了するので、回避策として次の環境変数を設定する
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
ここでデータセットを用意します。とりあえず、pytorch が用意されている関数を使って、サンプルデータセットをダウンロードしてくる。このデータセットは、学習用とテスト用の両方に別れているので、ここで両方をダウンロードしてくる。データのダウンロードは torchvision.datasets.CIFAR10
関数で行う。ダウンロードした後に、torch.utils.data.DataLoader
関数で、データを pytroch にデータを認識させて、学習に使える状態にする。
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# 訓練データ
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=1)
# テストデータ
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=True, num_workers=1)
データの用意が終わったので、簡単な浅いニューラルネットワークを作成する。ここで作成するネットワークは、畳み込み層、プーリング層、畳み込み層、プーリング層、全結合層、全結合層のような構成にする。
import torch.nn as nn
import torch.nn.functional as F
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
# 畳み込み演算用のカーネルのサイズを定義
# 引数は(入力チャンネル数, 出力チャンネル数, カーネルサイズ)
self.conv1 = nn.Conv2d(3, 6, 5)
self.conv2 = nn.Conv2d(6, 16, 5)
# プーリング演算用のカーネルのサイズの定義
# 引数は(カーネルサイズ、移動ステップ数)
self.pool = nn.MaxPool2d(2, 2)
# 全結合層の構造を定義
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
# 画像(行列型)のデータに対して畳み込み演算とプーリング演算を 2 回繰り返す
x = F.relu(self.conv1(x))
x = self.pool(x)
x = F.relu(self.conv2(x))
x = self.pool(x)
# 行列型のデータをベクトルに整形
x = x.view(-1, 16 * 5 * 5)
# ベクトルを全結合層に代入
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
# 出力層
x = self.fc3(x)
return x
net = Net()
損失関数を定義し、最適化を行うためのアルゴリズムを定義する。多クラス分類問題の場合は、損失関数として交差エントロピーを使用するのが一般的であるから、ここで CrossEntropyLoss
を使用する。また、最適化アルゴリズムは、とりあえずもっとも基礎的なアルゴリズムである確率的勾配降下法 SGD
を使用する。
import torch.optim as optim
criterion = nn.CrossEntropyLoss()
# 上で定義したニューラルネットワークのパラメーターをすべて微分可能な形で optimizer に保存
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
以上で、データの準備、ニューラルネットワークの構造、損失関数の定義、最適化アルゴリズムの準備および設定を終えた。これで学習を行うための作業をすべて終わり、早速、for
文を使って、構築したニューラルネットワークを学習させてみます。
n_epochs = 2
for epoch in range(n_epochs):
running_loss = 0.0
# 訓練データローダーからデータを 1 batch ずつ取り出す
for i, data in enumerate(trainloader):
inputs, labels = data
# 前回の学習時に伝播してきた誤差をゼロにする
optimizer.zero_grad()
# ニューラルネットワークに学習データを代入して結果を得る
outputs = net(inputs)
# その出力と教師ラベルを比べて、両者の損失を計算する
loss = criterion(outputs, labels)
# その損失を誤差逆伝播法で偏微分可能なパラメーター全体に伝播させ、
loss.backward()
# 全パラメータを更新する(勾配に学習率をかけて更新)
optimizer.step()
# 以降、学習の進捗状況を知りたいので、1万 batch ずつ、途中進捗を出力
running_loss += loss.item()
if i % 2000 == 1999:
print('[%d, %5d] loss: %.3f' %
(epoch + 1, i + 1, running_loss / 2000))
running_loss = 0.0
print('Finished Training')
画像 2 回繰り返して学習を行なった。では、テストデータを使って検証結果をみていくことにする。pytorch では、net(images)
を実行するとパラメーターが一時的にメモリに保存され、誤差逆伝播法によるパラメーター更新を高速にしている。しかし、テスト時は、パラメーター更新を行わないので、無駄にメモリを使わないために、torch.no_grad
の制約下で、パラメーターを保持しないように指定して実行する。
n_correct = 0
n_total = 0
#
with torch.no_grad():
# テストデータを 1 batch ずつロードする
for data in testloader:
images, labels = data
# 画像をネットワークに代入して、予測値を得る
outputs = net(images)
# 出力値が確率なので、最大確率のラベルを取得
_, predicted = torch.max(outputs.data, 1)
# これまでにテストした画像の合計枚数
n_total += labels.size(0)
# これまでのテストで正解数
n_correct += (predicted == labels).sum().item()
print('Accuracy of the network on the 10000 test images: %d %%' % (
100 * n_correct / n_total))
次に各クラスごとの正解率を個別に調べてみる。
# 全クラスの種類
classes= ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))
with torch.no_grad():
for data in testloader:
images, labels = data
outputs = net(images)
_, predicted = torch.max(outputs, 1)
c = (predicted == labels).squeeze()
# テストデータをロードする時バッチサイズを 4 にしてあったので、1 枚ずつチェックしていく
for i in range(4):
label = labels[i]
class_correct[label] += c[i].item()
class_total[label] += 1
for i in range(len(classes)):
print('Accuracy of %5s : %2d %%' % (
classes[i], 100 * class_correct[i] / class_total[i]))
学習済みのネットワークを保存するとき、torch.save
関数を使用する。この際に、保存先のパスを指定する。この関数で保存されるのはネットワーク中のパラメーターなどであり、ネットワーク構造が保存されない。
torch.save(net.state_dict(), './cifar_net.pth')
そのため、次に、このネットワークをファイルから読み込んで使用する時に、まずネットワーク構造のインスタンスを一度生成してから、そのインスタンスにパラメーターをセットアップする形で読み込む。
net = Net()
net.load_state_dict(torch.load('./cifar_net.pth'))
GPU を使用して学習と検証を行う場合は、次のようにしてニューラルネットワークとデータを GPU 上に転送する必要がある。
# CUDA (GPU向けの汎用並列コンピューティングプラットフォーム) 見つかれば CUDA を使用
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
# データセット
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=4, shuffle=True, num_workers=1)
# テストデータ
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=4, shuffle=True, num_workers=1)
# 学習
n_epochs = 2
net = Net()
net.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
for epoch in range(n_epochs):
for i, data in enumerate(trainloader):
inputs, labels = data[0].to(device), data[1].to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
print('Finished Training')
# テスト
n_correct = 0
n_total = 0
with torch.no_grad():
for data in testloader:
inputs, labels = data[0].to(device), data[1].to(device)
outputs = net(inputs)
_, predicted = torch.max(outputs.data, 1)
n_total += labels.size(0)
n_correct += (predicted == labels).sum().item()
print('Accuracy of the network on the 10000 test images: %d %%' % (100 * n_correct / n_total))
ニューラルネットワークを構築して、すべての重みを初期化して、空の状態で学習を始めると、学習の進み具合が遅い。これに対して、他のデータセットである程度学習を済ませたモデルを持ってきて使用すると、学習が早く進む。例えを言うならば、何もわかっていない状態でフランス語を勉強するのと、英語を習得した上でフランス語を勉強するのに似ている。
この項目は、pytorch ウェブサイトの TRANSFER LEARNING FOR COMPUTER VISION TUTORIAL ページを参照して作成した。
from __future__ import print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
ここではハチとアリのサンプルデータセットを使用して転移学習を行う例を示す。サンプルデータセットは次の URL でダウンロードできる。ダウンロードして、展開後、Jupyter Notebook のファイルと同じ場所に置く。
!wget https://download.pytorch.org/tutorial/hymenoptera_data.zip
!unzip hymenoptera_data.zip
画像データセットの前処理の手順を定義する。このデータに含まれている画像の枚数が少ないので、訓練データに対して水増しをやってみる。テストデータに対しては、標準化のみ行う。
# 画像データの前処理を行うための手順を定義
data_transforms = {
# 訓練データ画像に対する前処理
'train': transforms.Compose([
# 入力画像の任意の位置から 224x224 に切り抜き
transforms.RandomResizedCrop(224),
# 平行反転
transforms.RandomHorizontalFlip(),
# 画像データをテンソル(多次元ベクトル)に変換
transforms.ToTensor(),
# 0-225 の数値をほぼ 0-1 の範囲に収める
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
# テストデータに対する前処理
'val': transforms.Compose([
# 入力画像を強制的に 256x256 に縮小
transforms.Resize(256),
# 256x256 の中心から 224x224 の画像を切り出す
transforms.CenterCrop(224),
# テンソルに変換
transforms.ToTensor(),
# 標準化
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
# 画像を pytorch に認識させる
data_dir = './hymenoptera_data'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
data_transforms[x])
for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=4,
shuffle=True, num_workers=4)
for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
# GPU を使えるならば GPU を使用する
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
モデルの学習手続きを定義する。
def train_model(model, criterion, optimizer, scheduler, num_epochs=20):
since = time.time()
# 学習途中でいいモデルができるかもしれないので、これまでの学習のなかで最適なモデルの重みを保存
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
# num_epochs 回分だけ学習する
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
# 1 エポックごとに学習と評価を繰り返す
for phase in ['train', 'val']:
if phase == 'train':
model.train() # training mode
else:
model.eval() # evaluate mode
running_loss = 0.0
running_corrects = 0
# 1 バッチごと学習と評価を繰り返す、評価結果である損失をこのエポックの損失として加算する
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
# 勾配削除
optimizer.zero_grad()
# 学習時のみ行う重みを記録する
with torch.set_grad_enabled(phase == 'train'):
# 学習
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
# 損失計算
loss = criterion(outputs, labels)
# 損失を逆伝播、そして重みを更新
if phase == 'train':
loss.backward()
optimizer.step()
# 損失を加算
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
if phase == 'train':
# 学習率を徐々に下げる
scheduler.step()
# このエポックの損失と精度
epoch_loss = running_loss / dataset_sizes[phase]
epoch_acc = running_corrects.double() / dataset_sizes[phase]
print('{} Loss: {:.4f} Acc: {:.4f}'.format(
phase, epoch_loss, epoch_acc))
# 精度が過去最高であれば、その重みを保存
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(
time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 学習過程で最高精度を出した重みをセットアップして、最適モデルとして返す
model.load_state_dict(best_model_wts)
return model
# 訓練済み ResNet を取得
model_ft = models.resnet18(pretrained=True)
print(model_ft)
モデル構造を確認すると、ResNet 18 は畳み込み演算とプーリング演算の連続になっていて、ニューラルネットワークとなっているのは最後の結合層 (fc): Linear(in_features=512, out_features=1000, bias=True)
となっていることが確認できる。
なお、torchsummary
パッケージを使用した出力が見やすいので、torchsummary
でモデルのアーキテクチャを出力してみるのもよい。
from torchsummary import summary
summary(model_ft.cuda(), (3, 224, 224))
# ResNet の出力層のユニット数を取得
# (ResNet は ImageNet で訓練されたので出力層が 1000 ユニット)
num_ftrs = model_ft.fc.in_features
# 出力層 100 ユニットを 2 ユニットに置き換える(つなぎ直す)
model_ft.fc = nn.Linear(num_ftrs, len(class_names))
# モデルを GPU に送る
model_ft = model_ft.to(device)
# 損失関数を定義
criterion = nn.CrossEntropyLoss()
# モデル中の微分可能なパラメーターを最適化関数にセットアップ
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
# 7 エポックごとに学習率を 0.1 倍だけ小さくするようにセットアップ
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=20)
データセットにもよるが、転移学習を利用した場合はより少ないエポック数で高い精度を達成することができる。
転移学習では、他のデータセットで学習済みのモデルに、自分たちのデータで再学習させた。この際、学習済みのモデル中のすべての重みを更新しながら再学習を行なっていた。
これに対して、学習済みのモデル中の重みのうち一部だけを再学習させることもできる。例えば 18 層からなる学習済みモデルがあるとき、第 1 層〜第 10 層までの重みを固定させ、第 11 層〜第 18 層の重みだけを再学習させる、といったことができる。何層目まで固定し、何層目から自由に動かすかは、そのモデルが昔に学習したデータセットとこれから学習しようとするデータセットの類似度と量によってチューニングする必要がある。
この項目は pytorch ウェブサイトの FINETUNING TORCHVISION MODELS を参照して作成した。
from __future__ import print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)
学習手続きを定義する。
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
since = time.time()
val_acc_history = []
best_model_wts = copy.deepcopy(model.state_dict())
best_acc = 0.0
for epoch in range(num_epochs):
print('Epoch {}/{}'.format(epoch, num_epochs - 1))
print('-' * 10)
for phase in ['train', 'val']:
if phase == 'train':
model.train() # training mode
else:
model.eval() # evaluate mode
running_loss = 0.0
running_corrects = 0
# for each mini_batch
for inputs, labels in dataloaders[phase]:
inputs = inputs.to(device)
labels = labels.to(device)
optimizer.zero_grad()
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
loss = criterion(outputs, labels)
_, preds = torch.max(outputs, 1)
if phase == 'train':
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
running_corrects += torch.sum(preds == labels.data)
epoch_loss = running_loss / len(dataloaders[phase].dataset)
epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
if phase == 'val' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy(model.state_dict())
if phase == 'val':
val_acc_history.append(epoch_acc)
print()
time_elapsed = time.time() - since
print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
model.load_state_dict(best_model_wts)
return model, val_acc_history
ここで、転移学習などでよく使われる VGG16 と呼ばれる学習済みモデルを取得し fine-tuning を行う。VGG16 のアーキテクチャを出力し、その構造を確認してみる。
model_ft = models.vgg16(pretrained=True)
print(model_ft)
まずモデルの描くレイヤーへのアクセス方法を確認する。
for child_name, child in model_ft.named_children():
print(child_name)
for child_layer_i, param in enumerate(child.parameters()):
print(child_layer_i)
例えば、最後の classifier
レイヤーの最後の 3 レイヤーのみを再学習させたければ次のようにする。
#for child_name, child in model_ft.named_children():
# for child_layer_i, param in enumerate(child.parameters()):
#
# if child_name == 'classifier' and child_layer_id > 2:
# param.requires_grad = True
# else:
# param.requires_grad = False
#
今回は特徴量抽出を行うので、以下のようにする。
for param in model_ft.parameters():
param.requires_grad = False
VGG16 の出力層 (classifier)(6)
が 1000 となっているため、ここでは 2 クラスとなるように設定する。
num_classes = 2
num_ftrs = model_ft.classifier[6].in_features
model_ft.classifier[6] = nn.Linear(num_ftrs, num_classes)
input_size = 224
data_dir = "./hymenoptera_data"
batch_size = 8
data_transforms = {
'train': transforms.Compose([
transforms.RandomResizedCrop(input_size),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
'val': transforms.Compose([
transforms.Resize(input_size),
transforms.CenterCrop(input_size),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'val']}
dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in ['train', 'val']}
次に損失を定義し、学習すべきパラメーターのみを最適化関数に代入する。
criterion = nn.CrossEntropyLoss()
params_to_update = model_ft.parameters()
params_to_update = []
for name, param in model_ft.named_parameters():
if param.requires_grad == True:
params_to_update.append(param)
print("\t",name)
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)
GPU デバイスをセットアップし、学習を開始する。
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_ft = model_ft.to(device)
num_epochs = 10
model_ft, hist = train_model(model_ft, dataloaders_dict, criterion, optimizer_ft,
num_epochs=num_epochs)