從零開始的機器學習實戰(十六):強化學習
本章是《Hands-On Machine Learning with Scikit-Learn & TensorFlow》的最後一章
強化學習(RL)是近些年深度學習最令人激動的領域之一,雖然早在1950s強化學習就有了研究,但沒有搞出什麼大新聞,但在2013年有了革命性的成果,Deepmind的研究者用RL成功設計了一個玩雅達利遊戲的專案,僅僅輸入遊戲畫素而不需要了解先驗知識,2016年又設計了可以打敗圍棋冠軍李世石的系統,這在歷史上是從未接近過的目標!關鍵在於他們將深度學習運用到強化學習領域,結果卻超越了他們最瘋狂的設想。現在RL依舊是最活躍的領域,Deepmind也被谷歌在2014年以超過五億美元收購。
學習最佳化獎勵
在強化學習中,智慧體在環境中觀察並做出決策,作為回報,它們獲得獎勵(你可以認為正獎勵代表愉快,負獎勵代表痛苦),它的目標就是讓愉快最大,痛苦最小。
這是一個相當廣泛的設定,可以適用於各種各樣的任務。下面有幾個例子
智慧體可以是控制一個機械狗的程式。環境就是真實的世界,智慧體透過許多的感測器例如攝像機或者感測器來觀察,它可以透過給電機發送訊號來行動。到達目的正獎勵,浪費時間或者走錯方向或摔倒了就得到負獎勵。
智慧體可以是控制 MS。Pac-Man 的程式(雅達利遊戲模擬),行為是 9 個操縱桿位(上下左右中間等等),觀察是螢幕,回報就是遊戲點數。
智慧體也可以是棋盤遊戲的程式例如:圍棋。
智慧體調整到目標溫度以節能時會得到正獎勵,當人們需要自己去調節溫度時它會得到負獎勵,所以智慧體必須學會預見人們的需要。
智慧體觀測股票市場價格以實時決定買賣。獎勵的依據顯然為掙錢或者賠錢。
也可以沒有正獎勵,比如智慧體在迷宮內每時每刻都受到負獎勵,因此要儘快走出去
策略搜尋
策略是智慧體用來決定如何行動的,比如,策略是一個神經網路,觀測時輸入,行為是輸出,如圖:
策略可以是任何演算法,甚至不必是確定的。比如,你訓練一個智慧吸塵器,獎勵是30分鐘內的吸塵數量,它的策略是以p的機率直走,以1-p的機率轉彎,角度在-r到r之間隨機選擇。策略是隨機策略。
我們怎麼來訓練呢,我們要選取兩個策略引數,P和R,我們需要進行策略搜尋,當然可以嘗試多次隨機組合,但是在通常引數空間太大的情況下,隨機的選擇就像大海撈針。比較有效的方法有:
遺傳演算法。如隨機創造100個第一代的策略基因,隨後殺死 80 個糟糕的策略,讓 20 個倖存策略繁衍 4 代。後代只是它父輩基因的複製加上一些隨機變異,迭代下去找到好的策略。
最佳化技術,透過評估獎勵關於策略引數的梯度(
策略梯度
(PG)),跟隨梯度向更高的獎勵(梯度上升)調整這些引數。如,吸塵器機器人,你可以稍微增加機率P並評估這是否增加了機器人在 30 分鐘內拾起的灰塵的量;如果確實增加了,就相對應增加
P
,否則減少
P
。
我們將使用 Tensorflow 來實現 PG 演算法,但是我們需要首先智慧體創造一個生存的環境,所以現在是介紹 OpenAI 的時候了。
OpenAI 的介紹
強化學習的一個挑戰是,要訓練智慧體,就要首先有一個環境,比如你要智慧體學習翫雅達利遊戲,你要有一個雅達利遊戲的模擬,你要訓練一個行走的機器人,環境可以是真實世界,但這樣通常有很多限制:跌倒了你不能簡單的撤銷,也不能加速訓練即使你計算力強大,同時訓練1000個機器人價值昂貴,簡而言之,訓練在現實世界中是困難和緩慢的,所以你通常需要一個模擬環境,至少需要引導訓練。
OpenAI gym 是一個工具包,它提供各種各樣的模擬環境(雅達利遊戲,2D/3D物理環境),來幫助你訓練RL演算法程式。
$ pip install ——upgrade gym #pip安裝
然後建立一個環境,例中是 CartPole 環境。這是一個 2D 模擬,其中推車可以被左右加速,以平衡放置在它上面的平衡杆(如圖):需要使用
reset()
初始化,會返回第一個觀察結果,觀測的結果包含四個浮點的 1D Numpy 向量:這些浮點數代表推車的水平位置(0 為中心)、其速度、杆的角度(0 維垂直)及其角速度。
>>>
import
gym
>>>
env
=
gym
。
make
(
“CartPole-v0”
)
[
2016
-
10
-
14
16
:
03
:
23
,
199
]
Making
new
env
:
MsPacman
-
v0
>>>
obs
=
env
。
reset
()
>>>
obs
array
([
-
0。03799846
,
-
0。03288115
,
0。02337094
,
0。00720711
])
>>>
env
。
render
()
#render()方法顯示如圖所示的環境
來詢問環境什麼動作是可能的:
>>>
env
。
action_space
Discrete
(
2
)
表示可能的動作是整數 0 和 1,表示向左(0)或右(1)的加速。其他環境可能有更多的動作。因為杆子向右傾斜,讓我們向右加速推車:
>>> action = 1 # accelerate right
>>> obs, reward, done, info = env。step(action)
>>> obs
array([-0。03865608, 0。16189797, 0。02351508, -0。27801135])
>>> reward
1。0
>>> done
False
>>> info
{}
step()
表示執行給定的動作並返回四個值:
obs
:新的觀測,小車現在正在向右走(
obs[1]>0
)。平衡杆仍然向右傾斜(
obs[2]>0
),但是他的角速度現在為負(
obs[3]<0
),所以它在下一步後可能會向左傾斜。
reward
:無論做什麼,每一步都會得到 1。0 獎勵,所以遊戲的目標就是儘可能長的執行。
done
:當遊戲結束時這個值會為
True
。當平衡杆傾斜太多,之後,必須重新設定環境
info
:提供額外的除錯資訊。這些資料不應該用於訓練(這是作弊)。
我們編寫一個簡單的策略,當杆向左傾斜時向左加速,當杆向右傾斜時向右加速:
def basic_policy(obs):
angle = obs[2]
return 0 if angle < 0 else 1
totals = []
for episode in range(500):
episode_rewards = 0
obs = env。reset()
for step in range(1000): # 最多1000 步,我們不想讓它永遠執行下去
action = basic_policy(obs)
obs, reward, done, info = env。step(action)
episode_rewards += reward
if done:
break
totals。append(episode_rewards)
>>> import numpy as np
>>> np。mean(totals), np。std(totals), np。min(totals), np。max(totals)
(42。125999999999998, 9。1237121830974033, 24。0, 68。0)
我們發現最多隻有68步,這不理想。從模擬中可以觀察到推車越來越強烈地左右擺動,直到平衡杆傾斜太多。
神經網路策略
讓我們建立一個神經網路策略:和之前的一樣,把觀測作為輸入,動作作為輸出。在CartPole裡,只需要一個神經元來決定左或右,如圖,輸出代表動作0(左)的機率
假設輸出是0。7,我們將有0。7機率向左,0。3機率向右。這可能讓你感到奇怪?為什麼不選擇機率高的呢?
實際上,我們需要使智慧體在
探索新的行為
和
利用那些已知可行的行動
之間找到正確的平衡。比如你去餐館就餐,如果每次都點最愛吃的,就會錯過機會探索一些新出的菜品,即使它們更好吃。
這個CartPole 問題是簡單的,它無噪聲也觀測到了全部狀態。因此無需考慮過去,但是在其他環境下,你可能需要考慮過去。比如,有一些隱藏狀態,那麼你也需要考慮過去,例如環境僅有推車的位置,你要考慮先前的觀測,以便估計當前的速度;或者觀測是有噪聲的的,通常你想用過去的觀察來估計最可能的當前狀態。
import tensorflow as tf
from tensorflow。contrib。layers import fully_connected
# 1。 宣告神經網路結構
n_inputs = 4 # == env。observation_space。shape[0]
n_hidden = 4 # 這只是個簡單的測試,不需要過多的隱藏層
n_outputs = 1 # 只輸出向左加速的機率
initializer = tf。contrib。layers。variance_scaling_initializer()
# 2。 建立神經網路
X = tf。placeholder(tf。float32, shape=[None, n_inputs])
hidden = fully_connected(X, n_hidden, activation_fn=tf。nn。elu,weights_initializer=initializer)
# 隱層啟用函式使用指數線性函式
logits = fully_connected(hidden, n_outputs, activation_fn=None,weights_initializer=initializer)
outputs = tf。nn。sigmoid(logits) #如果多個狀態,可能需要softmax
# 3。 在機率基礎上隨機選擇動作
p_left_and_right = tf。concat(axis=1, values=[outputs, 1 - outputs])
#如果softmax輸出全部機率,此處可不必
action = tf。multinomial(tf。log(p_left_and_right), num_samples=1) #按機率隨機選擇一個
init = tf。global_variables_initializer()
評價行為:信用分配問題
如果我們知道每步最好的行為,訓練很容易的,我們只要最小化目標和輸出的交叉熵就可以了。但是強化學習獎勵常常是稀疏和延遲的,假設100步杆子沒有倒下來,我們不知道哪些步是獲得好結果的關鍵,如果某次杆子倒下去了,最後一步也不該負全責,可能之前某次已經出了大的偏差。這被稱為信用分配問題:當智慧體得到獎勵時,很難知道哪些行為應該被信任(或責備)。就像一隻小狗在行為良好後的幾小時得到獎勵,它會明白它為什麼獲得回報嗎?
一個通常的方法是,評價一個策略基於未來的步驟獎勵的總和,通常會用一個衰減率r,如圖所示,假設r=0。8,第3步獲得-50的獎勵,那麼依次按0。8向前遞推,第二部的獎勵是-40,第一步是-22。如果衰減率接近0,那麼未來的獎勵就無足輕重,如果衰減率接近1,那麼未來和現在一樣重要。典型的衰減率通常為是 0。95 或 0。99。如果衰減率為 0。95,那麼未來 13 步的獎勵大約是即時獎勵的一半(
),而當衰減率為 0。99,未來 69 步的獎勵是即時獎勵的一半。在 CartPole 環境下,行為具有相對短期的影響,因此選擇 0。95 的衰減率是合理的。
當然,如果好的動作後面跟著壞的動作,好的動作也會受到牽連得到低分,但是我們訓練的足夠久的話,好動作平均會比壞的動作得分要高,因此我們要訓練多輪,並對每個動作的得分進行標準化。
策略梯度
正如前面所討論的,策略梯度(PG)演算法沿著高回報的梯度來最佳化策略引數。一種流行的 PG 演算法,稱為增強演算法,在 1992 由 Ronald Williams 提出。這是一個常見的變體:
首先,讓神經網路策略玩幾次遊戲,並在每一步計算梯度,這使得智慧體更可能選擇行為,但不應用這些梯度。
執行幾次後,計算每個動作的得分(如上)。
每個梯度向量乘以相應的動作得分:分數為正的(好的動作),應用較早計算的梯度,增大未來選擇的機率。但是,分數是負的(壞動作),應用負梯度來減小可能。
最後,計算所有得到的梯度向量的平均值,並使用它來執行梯度下降步驟。
下面是TF的實現
在執行階段,演算法將執行策略,並在每個步驟中評估這些梯度張量並存儲它們的值。在多次執行之後,它如先前所解釋的調整這些梯度(即,透過動作分數乘以它們並使它們歸一化),並計算調整後的梯度的平均值。接下來,需要將結果梯度反饋到最佳化器,以便它可以執行最佳化步驟。這意味著對於每一個梯度向量我們需要一個佔位符。
n_inputs
=
4
n_hidden
=
4
n_outputs
=
1
initializer
=
tf
。
contrib
。
layers
。
variance_scaling_initializer
()
learning_rate
=
0。01
X
=
tf
。
placeholder
(
tf
。
float32
,
shape
=
[
None
,
n_inputs
])
hidden
=
fully_connected
(
X
,
n_hidden
,
activation_fn
=
tf
。
nn
。
elu
,
weights_initializer
=
initializer
)
logits
=
fully_connected
(
hidden
,
n_outputs
,
activation_fn
=
None
,
weights_initializer
=
initializer
)
outputs
=
tf
。
nn
。
sigmoid
(
logits
)
p_left_and_right
=
tf
。
concat
(
axis
=
1
,
values
=
[
outputs
,
1
-
outputs
])
action
=
tf
。
multinomial
(
tf
。
log
(
p_left_and_right
),
num_samples
=
1
)
y
=
1。
-
tf
。
to_float
(
action
)
cross_entropy
=
tf
。
nn
。
sigmoid_cross_entropy_with_logits
(
labels
=
y
,
logits
=
logits
)
optimizer
=
tf
。
train
。
AdamOptimizer
(
learning_rate
)
grads_and_vars
=
optimizer
。
compute_gradients
(
cross_entropy
)
#呼叫最佳化器的compute_gradients()方法,而不是minimize()方法。這是因為我們想要在使用它們之前調整梯度
gradients
=
[
grad
for
grad
,
variable
in
grads_and_vars
]
gradient_placeholders
=
[]
grads_and_vars_feed
=
[]
for
grad
,
variable
in
grads_and_vars
:
gradient_placeholder
=
tf
。
placeholder
(
tf
。
float32
,
shape
=
grad
。
get_shape
())
gradient_placeholders
。
append
(
gradient_placeholder
)
grads_and_vars_feed
。
append
((
gradient_placeholder
,
variable
))
training_op
=
optimizer
。
apply_gradients
(
grads_and_vars_feed
)
#呼叫最佳化器的apply_gradients()函式,該函式接受梯度向量/變數對的列表。
#我們不給它原始的梯度向量,而是給它一個包含更新梯度的列表
init
=
tf
。
global_variables_initializer
()
saver
=
tf
。
train
。
Saver
()
計算總折扣獎勵,給予原始獎勵,以及歸一化多次迴圈的結果
def
discount_rewards
(
rewards
,
discount_rate
):
discounted_rewards
=
np
。
empty
(
len
(
rewards
))
cumulative_rewards
=
0
for
step
in
reversed
(
range
(
len
(
rewards
))):
cumulative_rewards
=
rewards
[
step
]
+
cumulative_rewards
*
discount_rate
discounted_rewards
[
step
]
=
cumulative_rewards
return
discounted_rewards
def
discount_and_normalize_rewards
(
all_rewards
,
discount_rate
):
#歸一化
all_discounted_rewards
=
[
discount_rewards
(
rewards
)
for
rewards
in
all_rewards
]
flat_rewards
=
np
。
concatenate
(
all_discounted_rewards
)
reward_mean
=
flat_rewards
。
mean
()
reward_std
=
flat_rewards
。
std
()
return
[(
discounted_rewards
-
reward_mean
)
/
reward_std
for
discounted_rewards
in
all_discounted_rewards
]
n_iterations
=
250
# 訓練迭代次數
n_max_steps
=
1000
# 每一次的最大步長
n_games_per_update
=
10
# 每迭代十次訓練一次策略網路
save_iterations
=
10
# 每十次迭代儲存模型
discount_rate
=
0。95
with
tf
。
Session
()
as
sess
:
init
。
run
()
for
iteration
in
range
(
n_iterations
):
all_rewards
=
[]
#每一次的所有獎勵
all_gradients
=
[]
#每一次的所有梯度
for
game
in
range
(
n_games_per_update
):
current_rewards
=
[]
#當前步的所有獎勵
current_gradients
=
[]
#當前步的所有梯度
obs
=
env
。
reset
()
for
step
in
range
(
n_max_steps
):
action_val
,
gradients_val
=
sess
。
run
([
action
,
gradients
],
feed_dict
=
{
X
:
obs
。
reshape
(
1
,
n_inputs
)})
# 一個obs
obs
,
reward
,
done
,
info
=
env
。
step
(
action_val
[
0
][
0
])
current_rewards
。
append
(
reward
)
current_gradients
。
append
(
gradients_val
)
if
done
:
break
all_rewards
。
append
(
current_rewards
)
all_gradients
。
append
(
current_gradients
)
# 此時我們每10次執行一次策略,即使用迭代10次的結果來最佳化當前的策略。
all_rewards
=
discount_and_normalize_rewards
(
all_rewards
)
feed_dict
=
{}
for
var_index
,
grad_placeholder
in
enumerate
(
gradient_placeholders
):
# 將梯度與行為分數相乘,並計算平均值
mean_gradients
=
np
。
mean
(
[
reward
*
all_gradients
[
game_index
][
step
][
var_index
]
for
game_index
,
rewards
in
enumerate
(
all_rewards
)
for
step
,
reward
in
enumerate
(
rewards
)],
axis
=
0
)
feed_dict
[
grad_placeholder
]
=
mean_gradients
sess
。
run
(
training_op
,
feed_dict
=
feed_dict
)
if
iteration
%
save_iterations
==
0
:
saver
。
save
(
sess
,
“。/my_policy_net_pg。ckpt”
)
在運行了這 10 次之後,我們使用
discount_and_normalize_rewards()
函式計算動作得分;我們遍歷每個可訓練變數,在所有次數和所有步驟中,透過其相應的動作分數來乘以每個梯度向量;並且我們計算結果的平均值。最後,我們執行訓練操作,給它提供平均梯度(對每個可訓練變數提供一個)。我們繼續每 10 個訓練次數儲存一次模型。
儘管它相對簡單,但是該演算法是非常強大的。你可以用它來解決更難的問題,而不僅僅是平衡一輛手推車上的平衡杆。事實上,AlphaGo 是基於類似的 PG 演算法(加上蒙特卡羅樹搜尋,這超出了本書的範圍)。
馬爾可夫決策過程
在20世紀初,Markov 研究了沒有記憶的隨機過程,稱為
馬爾可夫鏈
。它具有固定數量的狀態,並且在每個步驟中隨機地從一個狀態演化到另一個狀態。它從狀態
S
演變為狀態
S‘
的機率是固定的,它只依賴於
(S, S’)
對,而不是依賴於過去的狀態(沒有記憶)。
下圖展示了一個例子。從狀態
S0
開始,下一步有 70% 的機率保持不變。但最終必然離開那個狀態,並且永遠不會回來,因為沒有其他狀態回到
S0
。如果它進入狀態
S1
,那麼它很可能會進入狀態
S2
(90% 的機率),然後立即回到狀態
S1
(以 100% 的機率)。它可以在這兩個狀態之間交替多次,但最終它會落入狀態
S3
並永遠留在那裡(這是一個終端狀態)。馬爾可夫鏈在不同學科有著很多不同的應用。
馬爾可夫決策過程(MDP)最初在 20 世紀 50 年代由 Richard Bellman 描述的。它類似於馬爾可夫鏈,但有一個不同:狀態轉移中,一個智慧體可以選擇幾種可能的動作中的一個,並且轉移機率取決於所選擇的動作。此外,一些狀態轉移返回一些獎勵(正或負),智慧體的目標是找到一個策略,隨著時間的推移將最大限度地提高獎勵。
如圖,智慧體從
S0
開始,有
A0
、
A1
或
A2
三個動作可以選擇。可以選擇a1呆在原地不動,但是,如果它選擇動作A0,它有 70% 的機率獲得 10 獎勵,並保持在狀態S0。在狀態
S0
中,清楚地知道
A0
是最好的選擇,在狀態S2中,智慧體別無選擇,只能採取行動
A1
,但是在狀態
S1
中,智慧體否應該保持不動(
A0
)或透過火(
A2
),這是不明確的。
Bellman 找到了一種估計任何狀態
S
的最佳狀態值的方法,他提出了
V(s)
,它是智慧體在其採取最佳行為達到狀態
s
後所有衰減未來獎勵的總和的平均期望。
這引出了一種演算法,可以精確估計每個可能狀態的最優狀態值:
首先將所有狀態值估計初始化為零
然後用數值迭代演算法迭代更新它們(見公式 16-2)
給定足夠的時間,這些估計保證收斂到最優狀態值,對應於最優策略。
該演算法是動態規劃的一個例子,它將了一個複雜的問題變為可處理的子問題,迭代地處理
但這仍不能明確應該讓智慧體採取什麼動作,Bellman 發現了一種非常類似的演算法來估計
最優狀態-動作值
(
state-action values
),通常稱為
Q 值
。狀態行動
(S, A)
對的最優 Q 值,記為
Q(s, a)
,是智慧體在到達狀態
S
,然後選擇動作
A
之後平均衰減未來獎勵的期望的總和。但是在它看到這個動作的結果之前,假設它在該動作之後的動作是最優的。
當智慧體處於狀態
S
時,它應該選擇具有最高 Q 值的動作
可以寫一個簡單的實現:
nan=np。nan # 代表不可能的動作
T = np。array([ # shape=[s, a, s‘]
[[0。7, 0。3, 0。0], [1。0, 0。0, 0。0], [0。8, 0。2, 0。0]],
[[0。0, 1。0, 0。0], [nan, nan, nan], [0。0, 0。0, 1。0]],
[[nan, nan, nan], [0。8, 0。1, 0。1], [nan, nan, nan]], ])
R = np。array([ # shape=[s, a, s’]
[[10。, 0。0, 0。0], [0。0, 0。0, 0。0], [0。0, 0。0, 0。0]],
[[10。, 0。0, 0。0], [nan, nan, nan], [0。0, 0。0, -50。]],
[[nan, nan, nan], [40。, 0。0, 0。0], [nan, nan, nan]], ])
possible_actions = [[0, 1, 2], [0, 2], [1]]
Q = np。full((3, 3), -np。inf) # -inf 對應著不可能的動作
for state, actions in enumerate(possible_actions):
Q[state, actions] = 0。0 # 對所有可能的動作初始化為0。0
learning_rate = 0。01
discount_rate = 0。95
n_iterations = 100
for iteration in range(n_iterations):
Q_prev = Q。copy()
for s in range(3):
for a in possible_actions[s]:
Q[s, a] = np。sum([T[s, a, sp] * (R[s, a, sp] + discount_rate * np。max(Q_prev[sp]))
for sp in range(3)])
>>> Q
array([[ 21。89498982, 20。80024033, 16。86353093],
[ 1。11669335, -inf, 1。17573546],
[ -inf, 53。86946068, -inf]])
>>> np。argmax(Q, axis=1) # 每一狀態的最優動作
array([0, 2, 1])
這給我們這個 MDP 的最佳策略, 0。95 的衰減率時:狀態
S1
選擇透過火焰!而衰減率降低到 0。9,最好的動作變成保持不變。因為如果你認為現在比未來更重要,那麼未來獎勵的前景是不值得立刻經歷痛苦(透過火)。
時間差分學習與 Q 學習
離散動作的強化學習常常都是馬爾可夫決策,但是問題在於智慧體最開始不知道轉移的可能性(T)和獎勵(R)。要想知道獎勵,至少要經過一次,要知道轉移的機率,要經過多次來估計得到。
時間差分學習
(TD 學習)演算法與數值迭代演算法非常類似,智慧體使用探索策略,如隨機探索 MDP,並且隨著它的發展,TD 學習演算法基於實際觀察到的轉換和獎勵來更新狀態值的估計。
a
是學習率(例如 0。01)
類似地,此時的Q 學習演算法是
TD 學習與隨機梯度下降有許多相似之處,特別是它一次處理一個樣本的行為。就像 SGD 一樣,只有當你逐漸降低學習速率時,它才能真正收斂
下面是實現
import
numpy。random
as
rnd
learning_rate0
=
0。05
learning_rate_decay
=
0。1
n_iterations
=
20000
s
=
0
# 在狀態 0開始
Q
=
np
。
full
((
3
,
3
),
-
np
。
inf
)
# -inf 對應著不可能的動作
for
state
,
actions
in
enumerate
(
possible_actions
):
Q
[
state
,
actions
]
=
0。0
# 對於所有可能的動作初始化為 0。0
for
iteration
in
range
(
n_iterations
):
a
=
rnd
。
choice
(
possible_actions
[
s
])
# 隨機選擇動作
sp
=
rnd
。
choice
(
range
(
3
),
p
=
T
[
s
,
a
])
# 使用 T[s, a] 挑選下一狀態
reward
=
R
[
s
,
a
,
sp
]
learning_rate
=
learning_rate0
/
(
1
+
iteration
*
learning_rate_decay
)
Q
[
s
,
a
]
=
learning_rate
*
Q
[
s
,
a
]
+
(
1
-
learning_rate
)
*
(
reward
+
discount_rate
*
np
。
max
(
Q
[
sp
]))
s
=
sp
# 移動至下一狀態
給定足夠的迭代,該演算法將收斂到最優 Q 值。這被稱為離線策略演算法,因為正在訓練的策略不是正在執行的策略。
策略探索
只有對策略探索清楚,Q學習才會起作用。單純隨機的探索需要花很長的時間,一個更好的方法是:
ε 貪婪策略
,它以機率
ε
隨機地探索或以機率為
1-ε
選擇具有最高 Q 值的動作。ε 貪婪策略的優點在於,與完全隨機策略相比,將花費越來越多的時間來探索環境中有趣的部分,因為 Q 值估計越來越好,同時仍花費一些時間訪問 MDP 的未知區域。我們常常讓
ε
初始的時候為很高的值(如1),然後逐漸減小它(如下降到 0。05)。
另一種方法是鼓勵探索策略來嘗試它以前沒有嘗試過的行動。這可以被實現為附加於 Q 值估計的獎金
近似Q學習
Q 學習的主要問題是,它不能很好地擴充套件到具有許多狀態和動作的大(甚至中等)的 MDP,因為狀態數量非常多,你絕對無法追蹤每一個 Q 值的估計值。
解決方案是找到一個函式,使用可管理數量的引數來近似 Q 值。這被稱為近似 Q 學習。之前人們一直在手工提取Q值,但是 DeepMind 表明使用深度神經網路可以工作得更好,特別是對於複雜的問題。它不需要任何特徵工程。用於估計 Q 值的 DNN 被稱為深度 Q 網路(DQN),並且使用近似 Q 學習的 DQN 被稱為深度 Q 學習。
學習去使用深度 Q 學習來玩 Ms。Pac-Man
首先要安裝一些環境
$ brew install cmake boost boost-python sdl2 swig wget
$ pip3 install ——upgrade ‘gym[all]’
你可以建立一個吃豆人小姐的環境
>>> env = gym。make(“MsPacman-v0”)
>>> obs = env。reset()
>>> obs。shape # [長,寬,通道]
(210, 160, 3)
>>> env。action_space
Discrete(9)
有九個離散動作可用,它對應於操縱桿的九個可能位置(左、右、上、下、中、左上等)。
觀察結果是 Atari 螢幕的截圖(上圖左),表示為 3D Numpy 矩陣。這些影象有點大,所以我們將建立一個小的預處理函式,將影象裁剪並縮小到88×80畫素,將其轉換成灰度,並提高 Ms。Pac-Man 的對比度。這將減少 DQN 所需的計算量,並加快培訓練。
mspacman_color = np。array([210, 164, 74])。mean()
def preprocess_observation(obs):
img = obs[1:176:2, ::2] # 裁剪
img = img。mean(axis=2) # 灰度化
img[img==mspacman_color] = 0 # 提升對比度
img = (img - 128) / 128 - 1 # 正則化為-1到1。
return img。reshape(88, 80, 1)
接下來,讓我們建立 DQN。它可以只取一個狀態動作對
(S,A)
作為輸入,並輸出相應的 Q 值
Q(s,a)
的估計值,
我們將使用的訓練演算法需要兩個具有相同架構(但不同引數)的 DQN:一個將在訓練期間用於驅動 Ms。Pac-Man(the
actor
,行動者),另一個將觀看行動者並從其試驗和錯誤中學習(the
critic
,評判者)。每隔一定時間,我們把評判者網路複製給行動者網路。因為我們需要兩個相同的 DQN,所以我們將建立一個
q_network()
函式來構建它們
from
tensorflow。contrib。layers
import
convolution2d
,
fully_connected
input_height
=
88
input_width
=
80
input_channels
=
1
conv_n_maps
=
[
32
,
64
,
64
]
conv_kernel_sizes
=
[(
8
,
8
),
(
4
,
4
),
(
3
,
3
)]
conv_strides
=
[
4
,
2
,
1
]
conv_paddings
=
[
“SAME”
]
*
3
conv_activation
=
[
tf
。
nn
。
relu
]
*
3
n_hidden_in
=
64
*
11
*
10
# conv3 有 64 個 11x10 對映
each
n_hidden
=
512
hidden_activation
=
tf
。
nn
。
relu
n_outputs
=
env
。
action_space
。
n
# 9個離散動作
initializer
=
tf
。
contrib
。
layers
。
variance_scaling_initializer
()
def
q_network
(
X_state
,
scope
):
prev_layer
=
X_state
conv_layers
=
[]
with
tf
。
variable_scope
(
scope
)
as
scope
:
for
n_maps
,
kernel_size
,
stride
,
padding
,
activation
in
zip
(
conv_n_maps
,
conv_kernel_sizes
,
conv_strides
,
conv_paddings
,
conv_activation
):
prev_layer
=
convolution2d
(
prev_layer
,
num_outputs
=
n_maps
,
kernel_size
=
kernel_size
,
stride
=
stride
,
padding
=
padding
,
activation_fn
=
activation
,
weights_initializer
=
initializer
)
conv_layers
。
append
(
prev_layer
)
last_conv_layer_flat
=
tf
。
reshape
(
prev_layer
,
shape
=
[
-
1
,
n_hidden_in
])
hidden
=
fully_connected
(
last_conv_layer_flat
,
n_hidden
,
activation_fn
=
hidden_activation
,
weights_initializer
=
initializer
)
outputs
=
fully_connected
(
hidden
,
n_outputs
,
activation_fn
=
None
,
weights_initializer
=
initializer
)
trainable_vars
=
tf
。
get_collection
(
tf
。
GraphKeys
。
TRAINABLE_VARIABLES
,
scope
=
scope
。
name
)
trainable_vars_by_name
=
{
var
。
name
[
len
(
scope
。
name
):]:
var
for
var
in
trainable_vars
}
return
outputs
,
trainable_vars_by_name
其中,
trainable_vars_by_name
字典收集了所有 DQN 的可訓練變數。當我們建立操作以將評論家 DQN 複製到行動者 DQN 時,這將是有用的。字典的鍵是變數的名稱,去掉與範圍名稱相對應的字首的一部分。看起來像這樣:
>>>
trainable_vars_by_name
{
‘/Conv/biases:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
at
0x121cf7b50
>
,
‘/Conv/weights:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/Conv_1/biases:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/Conv_1/weights:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/Conv_2/biases:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/Conv_2/weights:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/fully_connected/biases:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/fully_connected/weights:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/fully_connected_1/biases:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
,
‘/fully_connected_1/weights:0’
:
<
tensorflow
。
python
。
ops
。
variables
。
Variable
。。。>
}
輸入佔位符,以及複製評論家 DQN 給行動者 DQN 的操作:
X_state = tf。placeholder(tf。float32,
shape=[None, input_height, input_width,input_channels])
actor_q_values, actor_vars = q_network(X_state, scope=“q_networks/actor”)
critic_q_values, critic_vars = q_network(X_state, scope=“q_networks/critic”)
copy_ops = [actor_var。assign(critic_vars[var_name])
for var_name, actor_var in actor_vars。items()]
copy_critic_to_actor = tf。group(*copy_ops) #tf。group()函式將所有賦值操作分組到一個方便的操作中
行動者 DQN 可以用來扮演 Ms。Pac-Man(最初非常糟糕)。正如前面所討論的,你希望它足夠深入地探究遊戲,所以通常情況下你想將它用 ε 貪婪策略或另一種探索策略相結合。
評論家將試圖使其預測的 Q 值去匹配行動者透過其經驗的遊戲估計的 Q 值。具體的說
讓行動者玩一段時間,把所有的經驗儲存在回放記憶儲存器中。
從回放儲存器中取樣一批記憶,並且我們將估計這些儲存器中的 Q 值。
將使用監督學習技術訓練評論家 DQN 去預測這些 Q 值。每隔幾個訓練週期,我們會把評論家 DQN 複製到行動者 DQN。
回放記憶是可選的,但強烈推薦使它存在。沒有它,你會訓練評論家 DQN 使用連續的經驗,這可能是相關的。這將引入大量的偏差並且減慢訓練演算法的收斂性。透過使用回放記憶,我們確保饋送到訓練演算法的儲存器可以是不相關的。
新增評論家 DQN 的訓練操作:
X_action
=
tf
。
placeholder
(
tf
。
int32
,
shape
=
[
None
])
q_value
=
tf
。
reduce_sum
(
critic_q_values
*
tf
。
one_hot
(
X_action
,
n_outputs
),
axis
=
1
,
keep_dims
=
True
)
#選擇的動作相對應的 Q 值
假設目標Q值將透過佔位符饋入。我們還建立了一個不可訓練的變數
global_step
。最佳化器的
minimize()
操作將負責增加它。另外,我們建立了
init
操作和
Saver
::
y
=
tf
。
placeholder
(
tf
。
float32
,
shape
=
[
None
,
1
])
cost
=
tf
。
reduce_mean
(
tf
。
square
(
y
-
q_value
))
global_step
=
tf
。
Variable
(
0
,
trainable
=
False
,
name
=
‘global_step’
)
optimizer
=
tf
。
train
。
AdamOptimizer
(
learning_rate
)
training_op
=
optimizer
。
minimize
(
cost
,
global_step
=
global_step
)
init
=
tf
。
global_variables_initializer
()
saver
=
tf
。
train
。
Saver
()
編寫一個小函式來隨機地從回放記憶中取樣一批處理:
from
collections
import
deque
replay_memory_size
=
10000
replay_memory
=
deque
([],
maxlen
=
replay_memory_size
)
def
sample_memories
(
batch_size
):
indices
=
rnd
。
permutation
(
len
(
replay_memory
))[:
batch_size
]
cols
=
[[],
[],
[],
[],
[]]
# state, action, reward, next_state, continue
for
idx
in
indices
:
memory
=
replay_memory
[
idx
]
for
col
,
value
in
zip
(
cols
,
memory
):
col
。
append
(
value
)
cols
=
[
np
。
array
(
col
)
for
col
in
cols
]
return
(
cols
[
0
],
cols
[
1
],
cols
[
2
]
。
reshape
(
-
1
,
1
),
cols
[
3
],
cols
[
4
]
。
reshape
(
-
1
,
1
))
使用 ε 貪婪策略,並在 50000 個訓練步驟中逐步將
ε
從 1 降低到 0。05。
eps_min
=
0。05
eps_max
=
1。0
eps_decay_steps
=
50000
def
epsilon_greedy
(
q_values
,
step
):
epsilon
=
max
(
eps_min
,
eps_max
-
(
eps_max
-
eps_min
)
*
step
/
eps_decay_steps
)
if
rnd
。
rand
()
<
epsilon
:
return
rnd
。
randint
(
n_outputs
)
# 隨機動作
else
:
return
np
。
argmax
(
q_values
)
# 最優動作
初始化變數:
n_steps = 100000 # 總的訓練步長
training_start = 1000 # 在遊戲1000次迭代後開始訓練
training_interval = 3 # 每3次迭代訓練一次
save_steps = 50 # 每50訓練步長儲存模型
copy_steps = 25 # 每25訓練步長後複製評論家Q值到行動者
discount_rate = 0。95
skip_start = 90 # 跳過遊戲開始(只是等待時間)
batch_size = 50
iteration = 0 # 遊戲迭代
checkpoint_path = “。/my_dqn。ckpt”
done = True # env 需要被重置
開始訓練:
with tf。Session() as sess:
if os。path。isfile(checkpoint_path):
saver。restore(sess, checkpoint_path)
else:
init。run()
while True:
step = global_step。eval()
if step >= n_steps:
break
iteration += 1
if done: # 遊戲結束,重來
obs = env。reset()
for skip in range(skip_start): # 跳過遊戲開頭
obs, reward, done, info = env。step(0)
state = preprocess_observation(obs)
# 行動者評估要幹什麼
q_values = actor_q_values。eval(feed_dict={X_state: [state]})
action = epsilon_greedy(q_values, step)
# 行動者開始玩遊戲
obs, reward, done, info = env。step(action)
next_state = preprocess_observation(obs)
# 讓我們記下來剛才發生了啥
replay_memory。append((state, action, reward, next_state, 1。0 - done)) state = next_state
if iteration < training_start or iteration % training_interval != 0: continue
# 評論家學習
X_state_val, X_action_val, rewards, X_next_state_val, continues = ( sample_memories(batch_size))
next_q_values = actor_q_values。eval( feed_dict={X_state: X_next_state_val})
max_next_q_values = np。max(next_q_values, axis=1, keepdims=True)
y_val = rewards + continues * discount_rate * max_next_q_values
training_op。run(feed_dict={X_state: X_state_val,X_action: X_action_val, y: y_val})
# 複製評論家Q值到行動者
if step % copy_steps == 0:
copy_critic_to_actor。run()
# 儲存模型
if step % save_steps == 0:
saver。save(sess, checkpoint_path)
不幸的是,訓練過程是非常緩慢的:如果你使用你的膝上型電腦進行訓練的話,想讓 Ms。 Pac-Man 變好一點點你得花好幾天。一種解決方案是將盡可能多的先驗知識注入到模型中(例如,透過預處理、獎勵等),也可以嘗試透過首先訓練它來模仿基本策略來引導模型。
《Hands-On Machine Learning with Scikit-Learn & TensorFlow》的學習就告一段落了,可以訪問ageron/handson-ml GITHUB 專案獲取更多內容
上一篇:資然泉讓你想起媽媽的味道
下一篇:這輩子,總會用到地圖的