您當前的位置:首頁 > 書法

[PyTorch 學習筆記] 2.1 DataLoader 與 DataSet

作者:由 張賢同學 發表于 書法時間:2020-08-25

本章程式碼:

https://github.com/zhangxiann/PyTorch_Practice/blob/master/lesson2/rmb_classification/

人民幣 二分類

實現 1 元人民幣和 100 元人民幣的圖片二分類。前面講過 PyTorch 的五大模組:資料、模型、損失函式、最佳化器和迭代訓練。

資料模組又可以細分為 4 個部分:

資料收集:樣本和標籤。

資料劃分:訓練集、驗證集和測試集

資料讀取:對應於 PyTorch 的 DataLoader。其中 DataLoader 包括 Sampler 和 DataSet。Sampler 的功能是生成索引, DataSet 是根據生成的索引讀取樣本以及標籤。

資料預處理:對應於 PyTorch 的 transforms

[PyTorch 學習筆記] 2.1 DataLoader 與 DataSet

# DataLoader 與 DataSet

torch.utils.data.DataLoader()

torch。utils。data。DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)

功能:構建可迭代的資料裝載器

dataset: Dataset 類,決定資料從哪裡讀取以及如何讀取

batchsize: 批大小

num_works:num_works: 是否多程序讀取資料

sheuffle: 每個 epoch 是否亂序

drop_last: 當樣本數不能被 batchsize 整除時,是否捨棄最後一批資料

Epoch, Iteration, Batchsize

Epoch: 所有訓練樣本都已經輸入到模型中,稱為一個 Epoch

Iteration: 一批樣本輸入到模型中,稱為一個 Iteration

Batchsize: 批大小,決定一個 iteration 有多少樣本,也決定了一個 Epoch 有多少個 Iteration

假設樣本總數有 80,設定 Batchsize 為 8,則共有

80 \div 8=10

個 Iteration。這裡

1 Epoch = 10 Iteration

假設樣本總數有 86,設定 Batchsize 為 8。如果

drop_last=True

則共有 10 個 Iteration;如果

drop_last=False

則共有 11 個 Iteration。

torch.utils.data.Dataset

功能:Dataset 是抽象類,所有自定義的 Dataset 都需要繼承該類,並且重寫

__getitem()__

方法和

__len__()

方法 。

__getitem()__

方法的作用是接收一個索引,返回索引對應的樣本和標籤,這是我們自己需要實現的邏輯。

__len__()

方法是返回所有樣本的數量。

資料讀取包含 3 個方面

讀取哪些資料:每個 Iteration 讀取一個 Batchsize 大小的資料,每個 Iteration 應該讀取哪些資料。

從哪裡讀取資料:如何找到硬碟中的資料,應該在哪裡設定檔案路徑引數

如何讀取資料:不同的檔案需要使用不同的讀取方法和庫。

這裡的路徑結構如下,有兩類人民幣圖片:1 元和 100 元,每一類各有 100 張圖片。

RMB_data

1

100

首先劃分資料集為訓練集、驗證集和測試集,比例為 8:1:1。

資料劃分好後的路徑構造如下:

rmb_split

train

1

100

valid

1

100

test

1

100

實現讀取資料的 Dataset,編寫一個

get_img_info()

方法,讀取每一個圖片的路徑和對應的標籤,組成一個元組,再把所有的元組作為 list 存放到

self。data_info

變數中,這裡需要注意的是標籤需要對映到 0 開始的整數:

rmb_label = {“1”: 0, “100”: 1}

@staticmethod

def get_img_info(data_dir):

data_info = list()

# data_dir 是訓練集、驗證集或者測試集的路徑

for root, dirs, _ in os。walk(data_dir):

# 遍歷類別

# dirs [‘1’, ‘100’]

for sub_dir in dirs:

# 檔案列表

img_names = os。listdir(os。path。join(root, sub_dir))

# 取出 jpg 結尾的檔案

img_names = list(filter(lambda x: x。endswith(‘。jpg’), img_names))

# 遍歷圖片

for i in range(len(img_names)):

img_name = img_names[i]

# 圖片的絕對路徑

path_img = os。path。join(root, sub_dir, img_name)

# 標籤,這裡需要對映為 0、1 兩個類別

label = rmb_label[sub_dir]

# 儲存在 data_info 變數中

data_info。append((path_img, int(label)))

return data_info

然後在

Dataset

的初始化函式中呼叫

get_img_info()

方法。

def __init__(self, data_dir, transform=None):

“”“

rmb面額分類任務的Dataset

:param data_dir: str, 資料集所在路徑

:param transform: torch。transform,資料預處理

”“”

# data_info儲存所有圖片路徑和標籤,在DataLoader中透過index讀取樣本

self。data_info = self。get_img_info(data_dir)

self。transform = transform

然後在

__getitem__()

方法中根據

index

讀取

self。data_info

中路徑對應的資料,並在這裡做 transform 操作,返回的是樣本和標籤。

def __getitem__(self, index):

# 透過 index 讀取樣本

path_img, label = self。data_info[index]

# 注意這裡需要 convert(‘RGB’)

img = Image。open(path_img)。convert(‘RGB’) # 0~255

if self。transform is not None:

img = self。transform(img) # 在這裡做transform,轉為tensor等等

# 返回是樣本和標籤

return img, label

__len__()

方法中返回

self。data_info

的長度,即為所有樣本的數量。

# 返回所有樣本的數量

def __len__(self):

return len(self。data_info)

train_lenet。py

中,分 5 步構建模型。

第 1 步設定資料。首先定義訓練集、驗證集、測試集的路徑,定義訓練集和測試集的

transforms

。然後構建訓練集和驗證集的

RMBDataset

物件,把對應的路徑和

transforms

傳進去。再構建

DataLoder

,設定 batch_size,其中訓練集設定

shuffle=True

,表示每個 Epoch 都打亂樣本。

# 構建MyDataset例項train_data = RMBDataset(data_dir=train_dir, transform=train_transform)valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)

# 構建DataLoder

# 其中訓練集設定 shuffle=True,表示每個 Epoch 都打亂樣本

train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)

valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)

第 2 步構建模型,這裡採用經典的 Lenet 圖片分類網路。

net = LeNet(classes=2)

net。initialize_weights()

第 3 步設定損失函式,這裡使用交叉熵損失函式。

criterion = nn。CrossEntropyLoss()

第 4 步設定最佳化器。這裡採用 SGD 最佳化器。

optimizer = optim。SGD(net。parameters(), lr=LR, momentum=0。9) # 選擇最佳化器

scheduler = torch。optim。lr_scheduler。StepLR(optimizer, step_size=10, gamma=0。1) # 設定學習率下降策略

第 5 步迭代訓練模型,在每一個 epoch 裡面,需要遍歷 train_loader 取出資料,每次取得資料是一個 batchsize 大小。這裡又分為 4 步。第 1 步進行前向傳播,第 2 步進行反向傳播求導,第 3 步使用

optimizer

更新權重,第 4 步統計訓練情況。每一個 epoch 完成時都需要使用

scheduler

更新學習率,和計算驗證集的準確率、loss。

for epoch in range(MAX_EPOCH):

loss_mean = 0。

correct = 0。

total = 0。

net。train()

# 遍歷 train_loader 取資料

for i, data in enumerate(train_loader):

# forward

inputs, labels = data

outputs = net(inputs)

# backward

optimizer。zero_grad()

loss = criterion(outputs, labels)

loss。backward()

# update weights

optimizer。step()

# 統計分類情況

_, predicted = torch。max(outputs。data, 1)

total += labels。size(0)

correct += (predicted == labels)。squeeze()。sum()。numpy()

# 列印訓練資訊

loss_mean += loss。item()

train_curve。append(loss。item())

if (i+1) % log_interval == 0:

loss_mean = loss_mean / log_interval

print(“Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:。4f} Acc:{:。2%}”。format(

epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))

loss_mean = 0。

scheduler。step() # 更新學習率

# 每個 epoch 計算驗證集得準確率和loss

。。。

。。。

我們可以看到每個 iteration,我們是從

train_loader

中取出資料的。

def __iter__(self):

if self。num_workers == 0:

return _SingleProcessDataLoaderIter(self)

else:

return _MultiProcessingDataLoaderIter(self)

這裡我們沒有設定多程序,會執行

_SingleProcessDataLoaderIter

的方法。我們以

_SingleProcessDataLoaderIter

為例。在

_SingleProcessDataLoaderIter

裡只有一個方法

_next_data()

,如下:

def _next_data(self):

index = self。_next_index() # may raise StopIteration

data = self。_dataset_fetcher。fetch(index) # may raise StopIteration

if self。_pin_memory:

data = _utils。pin_memory。pin_memory(data)

return data

在該方法中,

self。_next_index()

是獲取一個 batchsize 大小的 index 列表,程式碼如下:

def _next_index(self):

return next(self。_sampler_iter) # may raise StopIteration

其中呼叫的

sampler

類的

__iter__()

方法返回 batch_size 大小的隨機 index 列表。

def __iter__(self):

batch = []

for idx in self。sampler:

batch。append(idx)

if len(batch) == self。batch_size:

yield batch

batch = []

if len(batch) > 0 and not self。drop_last:

yield batch

然後再返回看

dataloader

_next_data()

方法:

def _next_data(self):

index = self。_next_index() # may raise StopIteration

data = self。_dataset_fetcher。fetch(index) # may raise StopIteration

if self。_pin_memory:

data = _utils。pin_memory。pin_memory(data)

return data

在第二行中呼叫了

self。_dataset_fetcher。fetch(index)

獲取資料。這裡會呼叫

_MapDatasetFetcher

中的

fetch()

函式:

def fetch(self, possibly_batched_index):

if self。auto_collation:

data = [self。dataset[idx] for idx in possibly_batched_index]

else:

data = self。dataset[possibly_batched_index]

return self。collate_fn(data)

這裡呼叫了

self。dataset[idx]

,這個函式會呼叫

dataset。__getitem__()

方法獲取具體的資料,所以

__getitem__()

方法是我們必須實現的。我們拿到的

data

是一個 list,每個元素是一個 tunple,每個 tunple 包括樣本和標籤。所以最後要使用

self。collate_fn(data)

把 data 轉換為兩個 list,第一個 元素 是樣本的 batch 形式,形狀為 [16, 3, 32, 32] (16 是 batch size,[3, 32, 32] 是圖片畫素);第二個元素是標籤的 batch 形式,形狀為 [16]。

所以在程式碼中,我們使用

inputs, labels = data

來接收資料。

PyTorch 資料讀取流程圖

[PyTorch 學習筆記] 2.1 DataLoader 與 DataSet

首先在 for 迴圈中遍歷`DataLoader`,然後根據是否採用多程序,決定使用單程序或者多程序的`DataLoaderIter`。在`DataLoaderIter`裡呼叫`Sampler`生成`Index`的 list,再呼叫`DatasetFetcher`根據`index`獲取資料。在`DatasetFetcher`裡會呼叫`Dataset`的`__getitem__()`方法獲取真正的資料。這裡獲取的資料是一個 list,其中每個元素是 (img, label) 的元組,再使用 `collate_fn()`函式整理成一個 list,裡面包含兩個元素,分別是 img 和 label 的`tenser`。

下圖是我們的訓練過程的 loss 曲線:

[PyTorch 學習筆記] 2.1 DataLoader 與 DataSet

參考資料

深度之眼 PyTorch 框架班

如果你覺得這篇文章對你有幫助,不妨點個贊,讓我有更多動力寫出好文章。

標簽: data  self  __  img  index