训练是一个不断迭代的过程,每一次迭代,都会计算输出,计算输出的损失(和真实值的差距),收集损失相对参数的导数,然后使用梯度下降优化这些参数。
import torch
from torch import nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import os
# 数据源来自: https://www.kaggle.com/datasets/muniryadi/cat-vs-rabbit
train_path = './cat_or_rabbit/train-cat-rabbit'
test_path = './cat_or_rabbit/test-images'
val_path = './cat_or_rabbit/val-cat-rabbit'
# ToTensor 对输入做归一化处理,将资料范围变成[0,1]之间
# 正规化是机器学习常用的资料前处理,将资料范围变成[-1,1]之间
normalize=transforms.Normalize(mean=[.5,.5,.5],std=[.5,.5,.5])
transform=transforms.Compose([
transforms.RandomCrop(224), # 随即裁剪
transforms.RandomHorizontalFlip(), # 水平翻转
transforms.ToTensor(), # tensor 格式
normalize
])
# 建立資料集
train_dataset = datasets.ImageFolder(train_path, transform = transform)
val_dataset = datasets.ImageFolder(val_path, transform = transform)
test_dataset = datasets.ImageFolder(test_path, transform = transform)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)
device = (
"cuda"
if torch.cuda.is_available()
else "mps"
if torch.backends.mps.is_available()
else "cpu"
)
print(f"Using {device} device")
class NeuralNetwork(nn.Module):
def __init__(self, img_size):
super().__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(img_size, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
model = NeuralNetwork(3*224*224).to(device)
print(model)
learning_rate = 1e-3
batch_size = 64
epochs = 10
# 初始化loss function
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
def train_loop(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset)
model.train()
for batch, (X, y) in enumerate(dataloader):
# 將資料讀取到GPU中
X, y = X.to(device), y.to(device)
# 運算出結果並計算loss
pred = model(X)
loss = loss_fn(pred, y)
# 反向傳播
loss.backward()
optimizer.step()
optimizer.zero_grad()
if batch % 100 == 0:
print(batch)
loss, current = loss.item(), (batch + 1) * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def test_loop(dataloader, model, loss_fn):
model.eval()
size = len(dataloader.dataset)
num_batches = len(dataloader)
test_loss, correct = 0, 0
# 驗證或測試時記得加入 torch.no_grad() 讓神經網路不要更新
with torch.no_grad():
for X, y in dataloader:
X, y = X.to(device), y.to(device)
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
epochs = 10
for t in range(epochs):
print(f"Epoch {t+1}\n-------------------------------")
train_loop(train_dataloader, model, loss_fn, optimizer)
test_loop(val_dataloader, model, loss_fn)
print("Done!")
# 保存为内部字典
torch.save(model.state_dict(), 'cat_vs_rabbit_cls_v1.pth')
# 重新载入的时候,要先建立和原来一样的模型结构
reload_model = NeuralNetwork(3*224*224)
reload_model.load_state_dict(torch.load('cat_vs_rabbit_cls_v1.pth'))
reload_model.to(device)
reload_model.eval()
# 验证
epochs = 1
for t in range(epochs):
test_loop(val_dataloader, reload_model)
实际工作中,我们很少会从同训练一个模型,这样的工作量太大了,往往会拿一个已经训练好的模型,在其基础上进行微调。预训练模型已经有了比较强的泛化能力,我们只需要在我们感兴趣的数据集上进行微调,就可以得到比较好的效果。
1.对ConvNet进行微调(Finetuning):与随机初始化不同,我们使用预训练的网络来初始化网络,例如那些在Imagenet 数据集上训练过的网络。其馀的训练过程与平常步骤一样。
2.ConvNet作为固定特徵提取器:在这裡,我们会冻结网络的所有权重,除了最后的全连接层之外。这个最后的全连接层会被一个新的、带有随机权重的层所取代,并且只训练这一层。
import torchvision.models as models
import torchvision
# torchvision.models下面有很多已经训练好的模型,我们可以直接加载
mobilenet_v3_model = models.mobilenet_v3_small(pretrained=True)
print(mobilenet_v3_model)
# 针对新模型参数做优化
optimizer = torch.optim.SGD(mobilenet_v3_model.parameters(), lr=learning_rate)
# 修改一下输出参数,因为我们的输出类别是2个
num_classes = 2
mobilenet_v3_model.classifier = nn.Sequential(
nn.Linear(576, 1024),
nn.ReLU(),
nn.Linear(1024, num_classes)
)
mobilenet_v3_model.to(device)