學習使用pytorch實現LeNet、AlexNet、LSTM、BiLSTM、CNN-LSTM
使用pytorhch自定義LeNet、AlexNet、BiLSTM、CNN-LSTM模型處理識別MNIST資料集中的手寫數字。完整的程式碼實現放在github
模型定義
LeNet和AlexNet就是用於處理影象的,比較好理解。
BiLSTM處理MNIST相當於把影象轉換成時序資料;28*28,可以理解為28個時間點,每個時間點的資料28維;也可以理解為一句話28個詞,每個詞向量28維。
學習實現CNN-LSTM模型是想用於影片資料處理的,一段影片可以抽樣成若干幀圖片,每幀圖片用CNN提取特徵,然後再把從若干幀圖片中提取出來的特徵輸入到LSTM。沒找到的大小合適的資料集,只能把MNIST資料集中的一張圖片強行認為是隻有一幀的一段影片。
參考Attention-Based Bidirectional Long Short-Term Memory Networks for Relation Classification 和相關實現 定義AttBiLSTM實現LSTM+Attention。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import
torch
from
torch
import
nn
import
torch。nn。functional
as
F
# 定義 單雙向LSTM 模型
class
Rnn
(
nn
。
Module
):
def
__init__
(
self
,
in_dim
,
hidden_dim
,
n_layer
,
n_class
,
bidirectional
):
super
(
Rnn
,
self
)
。
__init__
()
self
。
n_layer
=
n_layer
self
。
hidden_dim
=
hidden_dim
self
。
bidirectional
=
bidirectional
self
。
lstm
=
nn
。
LSTM
(
in_dim
,
hidden_dim
,
n_layer
,
batch_first
=
True
,
bidirectional
=
bidirectional
)
if
self
。
bidirectional
:
self
。
classifier
=
nn
。
Linear
(
hidden_dim
*
2
,
n_class
)
else
:
self
。
classifier
=
nn
。
Linear
(
hidden_dim
,
n_class
)
def
forward
(
self
,
x
):
out
,
(
hn
,
_
)
=
self
。
lstm
(
x
)
if
self
。
bidirectional
:
out
=
torch
。
hstack
((
hn
[
-
2
,
:,
:],
hn
[
-
1
,
:,
:]))
else
:
out
=
out
[:,
-
1
,
:]
out
=
self
。
classifier
(
out
)
return
out
class
Attention
(
nn
。
Module
):
def
__init__
(
self
,
rnn_size
:
int
):
super
(
Attention
,
self
)
。
__init__
()
self
。
w
=
nn
。
Linear
(
rnn_size
,
1
)
self
。
tanh
=
nn
。
Tanh
()
self
。
softmax
=
nn
。
Softmax
(
dim
=
1
)
def
forward
(
self
,
H
):
# eq。9: M = tanh(H)
M
=
self
。
tanh
(
H
)
# (batch_size, word_pad_len, rnn_size)
# eq。10: α = softmax(w^T M)
alpha
=
self
。
w
(
M
)
。
squeeze
(
2
)
# (batch_size, word_pad_len)
alpha
=
self
。
softmax
(
alpha
)
# (batch_size, word_pad_len)
# eq。11: r = H
r
=
H
*
alpha
。
unsqueeze
(
2
)
# (batch_size, word_pad_len, rnn_size)
r
=
r
。
sum
(
dim
=
1
)
# (batch_size, rnn_size)
return
r
,
alpha
class
AttBiLSTM
(
nn
。
Module
):
def
__init__
(
self
,
n_classes
:
int
,
emb_size
:
int
,
rnn_size
:
int
,
rnn_layers
:
int
,
dropout
:
float
):
super
(
AttBiLSTM
,
self
)
。
__init__
()
self
。
rnn_size
=
rnn_size
# bidirectional LSTM
self
。
BiLSTM
=
nn
。
LSTM
(
emb_size
,
rnn_size
,
num_layers
=
rnn_layers
,
bidirectional
=
True
,
batch_first
=
True
)
self
。
attention
=
Attention
(
rnn_size
)
self
。
fc
=
nn
。
Linear
(
rnn_size
,
n_classes
)
self
。
tanh
=
nn
。
Tanh
()
self
。
dropout
=
nn
。
Dropout
(
dropout
)
def
forward
(
self
,
x
):
rnn_out
,
_
=
self
。
BiLSTM
(
x
)
H
=
rnn_out
[:,
:,
:
self
。
rnn_size
]
+
rnn_out
[:,
:,
self
。
rnn_size
:]
# attention module
r
,
alphas
=
self
。
attention
(
H
)
# (batch_size, rnn_size), (batch_size, word_pad_len)
# eq。12: h* = tanh(r)
h
=
self
。
tanh
(
r
)
# (batch_size, rnn_size)
scores
=
self
。
fc
(
self
。
dropout
(
h
))
# (batch_size, n_classes)
return
scores
# 定義AlexNet
class
AlexNet
(
nn
。
Module
):
def
__init__
(
self
,
num_classes
=
10
):
super
(
AlexNet
,
self
)
。
__init__
()
self
。
features
=
nn
。
Sequential
(
nn
。
Conv2d
(
1
,
32
,
kernel_size
=
(
3
,
3
),
padding
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
kernel_size
=
2
,
stride
=
2
),
nn
。
Conv2d
(
32
,
64
,
kernel_size
=
(
5
,
5
),
stride
=
(
1
,
1
),
padding
=
(
2
,
2
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
kernel_size
=
2
,
stride
=
2
),
nn
。
Conv2d
(
64
,
128
,
kernel_size
=
(
3
,
3
),
stride
=
(
1
,
1
),
padding
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
Conv2d
(
128
,
256
,
kernel_size
=
(
3
,
3
),
stride
=
(
1
,
1
),
padding
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
Conv2d
(
256
,
256
,
kernel_size
=
(
3
,
3
),
stride
=
(
1
,
1
),
padding
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
kernel_size
=
3
,
stride
=
2
)
)
self
。
classifier
=
nn
。
Sequential
(
nn
。
Dropout
(
p
=
0。5
),
nn
。
Linear
(
256
*
3
*
3
,
1024
),
nn
。
ReLU
(
inplace
=
True
),
nn
。
Dropout
(
p
=
0。5
),
nn
。
Linear
(
1024
,
512
),
nn
。
ReLU
(
inplace
=
True
),
nn
。
Linear
(
512
,
num_classes
)
)
def
forward
(
self
,
x
):
x
=
self
。
features
(
x
)
x
=
torch
。
flatten
(
x
,
start_dim
=
1
)
x
=
self
。
classifier
(
x
)
return
x
class
LeNet
(
nn
。
Module
):
def
__init__
(
self
,
num_classes
=
10
):
super
(
LeNet
,
self
)
。
__init__
()
self
。
features
=
nn
。
Sequential
(
nn
。
Conv2d
(
1
,
16
,
kernel_size
=
(
5
,
5
),
stride
=
(
1
,
1
),
padding
=
(
2
,
2
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
2
,
2
),
nn
。
Conv2d
(
16
,
32
,
kernel_size
=
(
5
,
5
),
stride
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
2
,
2
))
self
。
classifier
=
nn
。
Sequential
(
nn
。
Linear
(
32
*
5
*
5
,
120
),
nn
。
Linear
(
120
,
84
),
nn
。
Linear
(
84
,
num_classes
))
def
forward
(
self
,
x
):
x
=
self
。
features
(
x
)
x
=
x
。
view
(
-
1
,
32
*
5
*
5
)
x
=
self
。
classifier
(
x
)
return
x
class
LeNetVariant
(
nn
。
Module
):
def
__init__
(
self
):
super
(
LeNetVariant
,
self
)
。
__init__
()
self
。
features
=
nn
。
Sequential
(
nn
。
Conv2d
(
1
,
16
,
kernel_size
=
(
5
,
5
),
stride
=
(
1
,
1
),
padding
=
(
2
,
2
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
2
,
2
),
nn
。
Conv2d
(
16
,
32
,
kernel_size
=
(
5
,
5
),
stride
=
(
1
,
1
)),
nn
。
ReLU
(
inplace
=
True
),
nn
。
MaxPool2d
(
2
,
2
))
self
。
classifier
=
nn
。
Sequential
(
nn
。
Linear
(
32
*
5
*
5
,
120
),
nn
。
Linear
(
120
,
84
))
def
forward
(
self
,
x
):
x
=
self
。
features
(
x
)
x
=
x
。
view
(
-
1
,
32
*
5
*
5
)
x
=
self
。
classifier
(
x
)
return
x
class
CNNLSTM
(
nn
。
Module
):
def
__init__
(
self
,
num_classes
=
2
):
super
(
CNNLSTM
,
self
)
。
__init__
()
self
。
cnn
=
LeNetVariant
()
self
。
lstm
=
nn
。
LSTM
(
input_size
=
84
,
hidden_size
=
128
,
num_layers
=
2
,
batch_first
=
True
)
self
。
fc1
=
nn
。
Linear
(
128
,
num_classes
)
def
forward
(
self
,
x_3d
):
cnn_output_list
=
list
()
for
t
in
range
(
x_3d
。
size
(
1
)):
cnn_output_list
。
append
(
self
。
cnn
(
x_3d
[:,
t
,
:,
:,
:]))
x
=
torch
。
stack
(
tuple
(
cnn_output_list
),
dim
=
1
)
out
,
hidden
=
self
。
lstm
(
x
)
x
=
out
[:,
-
1
,
:]
x
=
F
。
relu
(
x
)
x
=
self
。
fc1
(
x
)
return
x
訓練&驗證
這部分基本照搬pytorch官網Tutorial的Quickstart的程式碼。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import torch
def train_loop(dataloader, model, loss_fn, optimizer, which_model):
size = len(dataloader。dataset)
for batch, (X, y) in enumerate(dataloader):
if which_model == 1:
X = X。squeeze(1)
elif which_model == 2:
X = X。unsqueeze(1)
else:
pass
# Compute prediction and loss
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer。zero_grad()
loss。backward()
optimizer。step()
if batch % 100 == 0:
loss, current = loss。item(), batch * len(X)
print(f“loss: {loss:>7f} [{current:>5d}/{size:>5d}]”)
def test_loop(dataloader, model, loss_fn, which_model):
size = len(dataloader。dataset)
num_batches = len(dataloader)
test_loss, correct = 0, 0
with torch。no_grad():
for X, y in dataloader:
if which_model == 1:
X = X。squeeze(1)
elif which_model == 2:
X = X。unsqueeze(1)
else:
pass
pred = model(X)
test_loss += loss_fn(pred, y)。item()
correct += (pred。argmax(1) == y)。type(torch。float)。sum()。item()
test_loss /= num_batches
correct /= size
print(f“Test Error: \n Accuracy: {(100 * correct):>0。1f}%, ”
f“Avg loss: {test_loss:>8f} \n”)
自定義資料集載入類
# -*- coding: utf-8 -*-
import os
from torch。utils。data。dataset import Dataset
import torch
import numpy as np
class CustomDataset(Dataset):
def __init__(self, annotations_file, sample_dir, sample_size,
tensor_shape):
self。annotations_file = annotations_file
self。labels = self。read_label_file()
self。sample_dir = sample_dir
self。sample_size = sample_size
self。tensor_shape = tensor_shape
def __len__(self):
return len(self。labels)
def __getitem__(self, idx):
sample_path = os。path。join(self。sample_dir,
self。labels[idx][0])
sample_tensor = self。transform(sample_path, self。sample_size,
self。tensor_shape)
label = self。labels[idx][1]
label = torch。tensor(self。target_transform(label))
return sample_tensor, label
def read_label_file(self):
label_list = list()
with open(self。annotations_file, ‘r’) as h:
while True:
line = h。readline()
if not line:
break
label_list。append(
line。strip(“ ”)。strip(“\r”)。strip(“\n”)。split(“,”))
return label_list
def target_transform(self, label):
label_num_mapping = {“neg”: 0, “pos”: 1}
return label_num_mapping[label]
def transform(self, sample_path, sample_size, tensor_shape):
with open(sample_path, “rb”) as h:
content = h。read()
content = np。frombuffer(content, dtype=np。uint8, offset=0)
if content。size < sample_size:
padding_len = sample_size - content。size
content = np。hstack(
(content, np。zeros(padding_len, dtype=np。uint8)))
elif content。size > sample_size:
content = content[0:sample_size]
return torch。reshape(torch。tensor(content)。type(torch。float),
tensor_shape)
感受
1、pytorch確實是很友好
2、把模型基本的數學原理搞明白,再去自定義或者修改模型就容易很多了。
參考