[PyTorch 學習筆記] 2.1 DataLoader 與 DataSet
本章程式碼:
https://github.com/zhangxiann/PyTorch_Practice/blob/master/lesson2/rmb_classification/
人民幣 二分類
實現 1 元人民幣和 100 元人民幣的圖片二分類。前面講過 PyTorch 的五大模組:資料、模型、損失函式、最佳化器和迭代訓練。
資料模組又可以細分為 4 個部分:
資料收集:樣本和標籤。
資料劃分:訓練集、驗證集和測試集
資料讀取:對應於 PyTorch 的 DataLoader。其中 DataLoader 包括 Sampler 和 DataSet。Sampler 的功能是生成索引, DataSet 是根據生成的索引讀取樣本以及標籤。
資料預處理:對應於 PyTorch 的 transforms
# DataLoader 與 DataSet
torch.utils.data.DataLoader()
torch。utils。data。DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)
功能:構建可迭代的資料裝載器
dataset: Dataset 類,決定資料從哪裡讀取以及如何讀取
batchsize: 批大小
num_works:num_works: 是否多程序讀取資料
sheuffle: 每個 epoch 是否亂序
drop_last: 當樣本數不能被 batchsize 整除時,是否捨棄最後一批資料
Epoch, Iteration, Batchsize
Epoch: 所有訓練樣本都已經輸入到模型中,稱為一個 Epoch
Iteration: 一批樣本輸入到模型中,稱為一個 Iteration
Batchsize: 批大小,決定一個 iteration 有多少樣本,也決定了一個 Epoch 有多少個 Iteration
假設樣本總數有 80,設定 Batchsize 為 8,則共有
個 Iteration。這裡
。
假設樣本總數有 86,設定 Batchsize 為 8。如果
drop_last=True
則共有 10 個 Iteration;如果
drop_last=False
則共有 11 個 Iteration。
torch.utils.data.Dataset
功能:Dataset 是抽象類,所有自定義的 Dataset 都需要繼承該類,並且重寫
__getitem()__
方法和
__len__()
方法 。
__getitem()__
方法的作用是接收一個索引,返回索引對應的樣本和標籤,這是我們自己需要實現的邏輯。
__len__()
方法是返回所有樣本的數量。
資料讀取包含 3 個方面
讀取哪些資料:每個 Iteration 讀取一個 Batchsize 大小的資料,每個 Iteration 應該讀取哪些資料。
從哪裡讀取資料:如何找到硬碟中的資料,應該在哪裡設定檔案路徑引數
如何讀取資料:不同的檔案需要使用不同的讀取方法和庫。
這裡的路徑結構如下,有兩類人民幣圖片:1 元和 100 元,每一類各有 100 張圖片。
RMB_data
1
100
首先劃分資料集為訓練集、驗證集和測試集,比例為 8:1:1。
資料劃分好後的路徑構造如下:
rmb_split
train
1
100
valid
1
100
test
1
100
實現讀取資料的 Dataset,編寫一個
get_img_info()
方法,讀取每一個圖片的路徑和對應的標籤,組成一個元組,再把所有的元組作為 list 存放到
self。data_info
變數中,這裡需要注意的是標籤需要對映到 0 開始的整數:
rmb_label = {“1”: 0, “100”: 1}
。
@staticmethod
def get_img_info(data_dir):
data_info = list()
# data_dir 是訓練集、驗證集或者測試集的路徑
for root, dirs, _ in os。walk(data_dir):
# 遍歷類別
# dirs [‘1’, ‘100’]
for sub_dir in dirs:
# 檔案列表
img_names = os。listdir(os。path。join(root, sub_dir))
# 取出 jpg 結尾的檔案
img_names = list(filter(lambda x: x。endswith(‘。jpg’), img_names))
# 遍歷圖片
for i in range(len(img_names)):
img_name = img_names[i]
# 圖片的絕對路徑
path_img = os。path。join(root, sub_dir, img_name)
# 標籤,這裡需要對映為 0、1 兩個類別
label = rmb_label[sub_dir]
# 儲存在 data_info 變數中
data_info。append((path_img, int(label)))
return data_info
然後在
Dataset
的初始化函式中呼叫
get_img_info()
方法。
def __init__(self, data_dir, transform=None):
“”“
rmb面額分類任務的Dataset
:param data_dir: str, 資料集所在路徑
:param transform: torch。transform,資料預處理
”“”
# data_info儲存所有圖片路徑和標籤,在DataLoader中透過index讀取樣本
self。data_info = self。get_img_info(data_dir)
self。transform = transform
然後在
__getitem__()
方法中根據
index
讀取
self。data_info
中路徑對應的資料,並在這裡做 transform 操作,返回的是樣本和標籤。
def __getitem__(self, index):
# 透過 index 讀取樣本
path_img, label = self。data_info[index]
# 注意這裡需要 convert(‘RGB’)
img = Image。open(path_img)。convert(‘RGB’) # 0~255
if self。transform is not None:
img = self。transform(img) # 在這裡做transform,轉為tensor等等
# 返回是樣本和標籤
return img, label
在
__len__()
方法中返回
self。data_info
的長度,即為所有樣本的數量。
# 返回所有樣本的數量
def __len__(self):
return len(self。data_info)
在
train_lenet。py
中,分 5 步構建模型。
第 1 步設定資料。首先定義訓練集、驗證集、測試集的路徑,定義訓練集和測試集的
transforms
。然後構建訓練集和驗證集的
RMBDataset
物件,把對應的路徑和
transforms
傳進去。再構建
DataLoder
,設定 batch_size,其中訓練集設定
shuffle=True
,表示每個 Epoch 都打亂樣本。
# 構建MyDataset例項train_data = RMBDataset(data_dir=train_dir, transform=train_transform)valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)
# 構建DataLoder
# 其中訓練集設定 shuffle=True,表示每個 Epoch 都打亂樣本
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)
第 2 步構建模型,這裡採用經典的 Lenet 圖片分類網路。
net = LeNet(classes=2)
net。initialize_weights()
第 3 步設定損失函式,這裡使用交叉熵損失函式。
criterion = nn。CrossEntropyLoss()
第 4 步設定最佳化器。這裡採用 SGD 最佳化器。
optimizer = optim。SGD(net。parameters(), lr=LR, momentum=0。9) # 選擇最佳化器
scheduler = torch。optim。lr_scheduler。StepLR(optimizer, step_size=10, gamma=0。1) # 設定學習率下降策略
第 5 步迭代訓練模型,在每一個 epoch 裡面,需要遍歷 train_loader 取出資料,每次取得資料是一個 batchsize 大小。這裡又分為 4 步。第 1 步進行前向傳播,第 2 步進行反向傳播求導,第 3 步使用
optimizer
更新權重,第 4 步統計訓練情況。每一個 epoch 完成時都需要使用
scheduler
更新學習率,和計算驗證集的準確率、loss。
for epoch in range(MAX_EPOCH):
loss_mean = 0。
correct = 0。
total = 0。
net。train()
# 遍歷 train_loader 取資料
for i, data in enumerate(train_loader):
# forward
inputs, labels = data
outputs = net(inputs)
# backward
optimizer。zero_grad()
loss = criterion(outputs, labels)
loss。backward()
# update weights
optimizer。step()
# 統計分類情況
_, predicted = torch。max(outputs。data, 1)
total += labels。size(0)
correct += (predicted == labels)。squeeze()。sum()。numpy()
# 列印訓練資訊
loss_mean += loss。item()
train_curve。append(loss。item())
if (i+1) % log_interval == 0:
loss_mean = loss_mean / log_interval
print(“Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:。4f} Acc:{:。2%}”。format(
epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))
loss_mean = 0。
scheduler。step() # 更新學習率
# 每個 epoch 計算驗證集得準確率和loss
。。。
。。。
我們可以看到每個 iteration,我們是從
train_loader
中取出資料的。
def __iter__(self):
if self。num_workers == 0:
return _SingleProcessDataLoaderIter(self)
else:
return _MultiProcessingDataLoaderIter(self)
這裡我們沒有設定多程序,會執行
_SingleProcessDataLoaderIter
的方法。我們以
_SingleProcessDataLoaderIter
為例。在
_SingleProcessDataLoaderIter
裡只有一個方法
_next_data()
,如下:
def _next_data(self):
index = self。_next_index() # may raise StopIteration
data = self。_dataset_fetcher。fetch(index) # may raise StopIteration
if self。_pin_memory:
data = _utils。pin_memory。pin_memory(data)
return data
在該方法中,
self。_next_index()
是獲取一個 batchsize 大小的 index 列表,程式碼如下:
def _next_index(self):
return next(self。_sampler_iter) # may raise StopIteration
其中呼叫的
sampler
類的
__iter__()
方法返回 batch_size 大小的隨機 index 列表。
def __iter__(self):
batch = []
for idx in self。sampler:
batch。append(idx)
if len(batch) == self。batch_size:
yield batch
batch = []
if len(batch) > 0 and not self。drop_last:
yield batch
然後再返回看
dataloader
的
_next_data()
方法:
def _next_data(self):
index = self。_next_index() # may raise StopIteration
data = self。_dataset_fetcher。fetch(index) # may raise StopIteration
if self。_pin_memory:
data = _utils。pin_memory。pin_memory(data)
return data
在第二行中呼叫了
self。_dataset_fetcher。fetch(index)
獲取資料。這裡會呼叫
_MapDatasetFetcher
中的
fetch()
函式:
def fetch(self, possibly_batched_index):
if self。auto_collation:
data = [self。dataset[idx] for idx in possibly_batched_index]
else:
data = self。dataset[possibly_batched_index]
return self。collate_fn(data)
這裡呼叫了
self。dataset[idx]
,這個函式會呼叫
dataset。__getitem__()
方法獲取具體的資料,所以
__getitem__()
方法是我們必須實現的。我們拿到的
data
是一個 list,每個元素是一個 tunple,每個 tunple 包括樣本和標籤。所以最後要使用
self。collate_fn(data)
把 data 轉換為兩個 list,第一個 元素 是樣本的 batch 形式,形狀為 [16, 3, 32, 32] (16 是 batch size,[3, 32, 32] 是圖片畫素);第二個元素是標籤的 batch 形式,形狀為 [16]。
所以在程式碼中,我們使用
inputs, labels = data
來接收資料。
PyTorch 資料讀取流程圖
首先在 for 迴圈中遍歷`DataLoader`,然後根據是否採用多程序,決定使用單程序或者多程序的`DataLoaderIter`。在`DataLoaderIter`裡呼叫`Sampler`生成`Index`的 list,再呼叫`DatasetFetcher`根據`index`獲取資料。在`DatasetFetcher`裡會呼叫`Dataset`的`__getitem__()`方法獲取真正的資料。這裡獲取的資料是一個 list,其中每個元素是 (img, label) 的元組,再使用 `collate_fn()`函式整理成一個 list,裡面包含兩個元素,分別是 img 和 label 的`tenser`。
下圖是我們的訓練過程的 loss 曲線:
參考資料
深度之眼 PyTorch 框架班
如果你覺得這篇文章對你有幫助,不妨點個贊,讓我有更多動力寫出好文章。