《深潛Dubbo》· HashedWheelTimer定時輪演算法
HashedWheelTimer定時輪演算法被廣泛使用,netty、dubbo甚至是作業系統Linux中都有其身影,用於管理及維護大量Timer排程演算法。
一個HashedWheelTimer是環形結構,類似一個時鐘,分為很多槽,一個槽代表一個時間間隔,每個槽使用雙向連結串列儲存定時任務,指標週期性的跳動, 跳動到一個槽位,就執行該槽位的定時任務。
定時輪演算法示意圖
實現
定時輪的具體實現中,按照職責不同,可分為
時鐘引擎、時鐘槽、定時任務
3個主要角色,為透徹理解其實現,行文有穿插, 這一部分抹掉了具體實現語言的特性。
定時任務——HashedWheelTimeout
在具體實現中定時任務
HashedWheelTimeout
扮演著雙重角色,既是雙向連結串列的節點,同時也是實際排程任務
TimerTask
的容器, 其
由引擎在滴答執行起始時刻使用&取hash裝入對應的時鐘槽
。
關鍵屬性.
HashedWheelTimeout next,prev
:當前定時任務在連結串列中的前驅和後繼引用
TimerTask task
:實際被排程的任務
long deadline
:該時間是相對於引擎的
startTime
的,由公式
currentTime + delay - startTime
得到,時間單位一般為納秒
delay:任務在提交時給出的相對當前的滯後執行時間
currentTime:當前核心時間
int state
:定時任務當前所處狀態,
INIT0——初始態,CANCELLED1——已被取消,EXPIRED2——已過期
Note
狀態的標記是在發起
expire
或
cancel
操作的起始瞬間完成的
HashedWheelTimeout
本身支援的操作並不多,如下:
remove
:呼叫所屬時鐘槽的
remove
將自身從中移除,若尚未被裝入槽中,需要額外對所屬定時輪的
pendingTimeouts
執行
-1
處理
expire
:任務到期,驅動
TimerTask
執行
cancel
:任務被提交方取消,取消的任務被裝入定時輪的
cancelledTimeouts
佇列中,待引擎進入下一個滴答時刻呼叫其
remove
移除自身
時鐘槽——HashedWheelBucket
時鐘槽實際上就是一個用於快取和管理定時任務的雙向連結串列容器每一個節點也即一個定時任務。它持有連結串列的首尾兩個節點,由於每個節點均持有前驅和後繼的引用, 因此利用連結串列的這個特性可以完成如下操作:
addTimeout
:新增尾節點,新增任務
pollTimeout
:移除首節點,取出頭部任務
remove
:根據引用移除指定節點,被移除任務已被處理或被取消,對所屬定時輪的
pendingTimeouts
相應
-1
處理
clearTimeouts
:迴圈呼叫
pollTimeout
獲取到所有未超時或者未被取消的任務
expireTimeouts
:從首節點開始迴圈遍歷槽中所有節點,呼叫
remove
取出到期任務執行expire或直接移除remove被取消的任務,對其它正常 任務的剩餘輪數執行
-1
操作
時鐘引擎——HashedWheelTimer
時鐘引擎有節律地週期性運作,總是根據當前時鐘滴答選定對應的時鐘槽,從連結串列頭部開始迭代,對每一個任務計算出其是否屬於當前時鐘週期,屬於則取出執行, 否則便將對剩下時鐘週期數執行減一操作。
另外,引擎維持著兩個快取定時任務的阻塞佇列,其中一個用於接受外界斷斷續續地投遞進來的,另外一個則用於快取那些主動取消的,引擎需要在滴答開始期間 先行將他們裝入對應的時鐘槽或從中移除他們。
關鍵屬性.
Queue
:佇列,用於快取外界主動提交或取消的任務
int workerState
:定時輪當前所處狀態:
Note
狀態值
init0——初始態
started1——已開始執行
shutdown2——已結束執行
startTime
:當前定時輪正式開始排程任務的時間,此後所有提交的定時任務第一個任務提交的開始,引擎就開始正式執行了,均以該時間點作為起點
ticks
:滴答,由時鐘引擎維護,是步長為1的單調遞增計數,也即
ticks+=1
ticksDuration
:滴答時長,每輪詢一個特定時鐘槽代表代表走完一個滴答時長
pendingTimeouts
當前定時輪實時任務剩餘數
n
: 時鐘輪槽數為n,不一定和期望達到的槽數一致,取大於且最靠近的2的冪次方值,其計算公式為
n=2x
mask
:掩碼,
mask = n - 1
,執行
ticks & mask
便能定位到對應位置的時鐘槽,效果上相當於
ticks % (mask + 1)
,由n這個2的冪次方保證
引擎核心——Worker
時鐘引擎實際上分為對外介面和排程執行兩部分,可以想象核心就是一個引擎的心臟起搏器驅動著定時輪的執行,完成任務的排程,實現上對應一個工作執行緒,為方便 理解,先單獨闡述他們所依賴的核心狀態。
核心狀態
對於任何引擎來說,狀態機是其關鍵組成,因此狀態值的控制對其而言是至關重要,因而將這部分作為單獨的部分闡述。核心狀態有定時輪維護管理,對外提供的 介面都要藉助它實現。初始時便為init狀態,當引擎被設計成不可復活時,便不存在
init/started/shutdown → init
這樣的遷移過程。
Note
start()
init → started.
於引擎的整個生命週期而言,這個狀態的遷移過程只允許發生一次,實現中會結合
startTime
做防禦性保護,直到整個過程完成為止
started → started.
表示引擎已經被啟用,一般直接忽略,否則也會等效於什麼也不做
shutdown → started.
定時輪一般被設計為不能復活的,這種情況下該過程是不允許發生的,屬於外界呼叫方的越界行為
Note
stop()
init → shutdown.
尚未開始就進入終結狀態,一般發生在對定時輪已經完成初始化,但尚未給其提交任務或呼叫過
start()
操作。 但還有另外一種特殊的情形,在多個執行緒對同一定時輪進行操作時發生爭用,一個執行緒在另外一個執行緒剛開始進入
start()
操作時,呼叫了`stop()`
started → shutdown.
正常的引擎關閉操作,一旦進入該過程,會持續到引擎完全終止,對應到實現上就是結束Work執行緒
shutdown → shutdown.
該遷移過程沒有實際語義,一般直接跳過
外部介面
這部分內容實際上已經在核心狀態一節已經有過具體闡述
start
:用於定時輪開啟引擎,但外界不一定需要呼叫此方法啟用定時輪,因為外界每次呼叫
newTimeout()
提交任務時,定時輪都會主動呼叫該介面, 以確保引擎已經處於執行狀態。
stop
:完成定時輪引擎的關閉過程,返回未被處理的定時任務
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit)
:用於向引擎提交任務,在任務被正式加入
timeouts
佇列之前:1)定時輪 會首先呼叫
start()
確保引擎已經啟動;然後為加入的
Timeout
計算出
deadline
值。
排程執行
有了以上的分析,對定時輪的任務排程也就不難理解了,簡單而言就是週期性的執行滴答操作,對應如下幾個操作:
等待進入滴答週期
時鐘轉動,滴答週期開始:
將外界主動取消的裝載在
cancelledTimeouts
佇列的任務逐個移除
將外界提交的裝載在
timeouts
佇列的任務逐個載入對應的時鐘槽裡
根據當前
tick
定位對應時鐘槽,執行其中的定時任務
檢測引擎核心狀態是否已經被終止,若未被終止,則迴圈執行上述操作,否則往下繼續執行
將下述方式獲取到未被處理的任務加入
unprocessedTimeouts
佇列:
遍歷時鐘槽呼叫
clearTimeouts()
對
timeouts
佇列中未被加入槽中迴圈呼叫
poll()
移除最後一個滴答週期後加入到
cancelledTimeouts
佇列任務
Important
相鄰兩個滴答週期的開始時間理論上來說是等距的,但是結束時間則會隨該週期所需處理任務的數目及時長有所變化。因而引擎剩下的休眠時間需要使用如下公式獲得:
tickDuration * (tick + 1) - (currentTime - startTime)
定時輪在dubbo中的應用
實際上,定時輪演算法並不直接用於等週期性的執行某些提交任務,向其提交的任務只會到期執行一次,但具體應用中,會利用每次任務的執行,呼叫
newTimeout()
提交
Timer
所引用的當前任務,使其在若干單位時間後重新繼續執行。這樣做的好處是,如果諸如IO等耗時任務,甚至是某些原因導致的當下執行任務卡住比較 長時間,後面不會有同樣的任務不斷提交進來,而導致任務堆積至無法處理。可見這裡說的額週期性任務不是嚴格固定
每x單位時間
執行一次的任務。
Dubbo中對定時輪的應用主要體現在如下幾個方面:
失敗重試
註冊
Register
取消註冊
Unregister
訂閱
Subscribe
取消訂閱
Unsubscribe
週期任務
心跳
Heartbeat
重連
Reconnect
下線
CloseChannel
Important
定時輪用單一的執行緒去管理觸發Task的執行,Task執行期間,不能直接拋異常,否則會導致整個定時輪引擎的奔潰而使得提交的後續任務無法執行。Task的模式如下:
try
{
if
(
sthCheck
())
{
logger
。
warn
(
“Sth happended”
);
doBuz
();
}
}
catch
(
Throwable
t
)
{
logger
。
warn
(
“Exception when do sth ”
,
t
);
}
週期任務
在dubbo中每一個連線被表徵為一個Channel通道,dubbo節點間建立連線相互通訊,單個節點需要維護和多個連入節點的連線:①透過持續傳送心跳檢測以保持連線 ②對超過了一定時間段處於空閒狀態的連線進行下線處理;③對已經掉線非下線處理的Channel進行重連處理。
基本的步驟如下:
滴答執行時Task透過回撥獲得當前節點的所有連入Channel
對沒有被關閉的節點執行實際的任務操作,比如心跳
透過
volatile
的可見性保證屬性檢測當前任務是否被取消,是返回,否繼續
若定時輪是否還在執行,則使用其提供的
newTimeout()
提交一個新的Task
以下結合原始碼進行分析:
public
abstract
class
AbstractTimerTask
implements
TimerTask
{
/**
* 該屬性比較關鍵,真正執行Task操作的是定時輪所持有的執行緒,而喚起``cancel()``操作的是提交任務的其它執行緒
*/
protected
volatile
boolean
cancel
=
false
;
public
void
cancel
()
{
this
。
cancel
=
true
;
}
。。。。
//省略部分程式碼
private
void
reput
(
Timeout
timeout
,
Long
tick
)
{
if
(
timeout
==
null
||
tick
==
null
)
{
throw
new
IllegalArgumentException
();
}
if
(
cancel
)
{
return
;
}
Timer
timer
=
timeout
。
timer
();
if
(
timer
。
isStop
()
||
timeout
。
isCancelled
())
{
return
;
}
timer
。
newTimeout
(
timeout
。
task
(),
tick
,
TimeUnit
。
MILLISECONDS
);
}
@Override
public
void
run
(
Timeout
timeout
)
throws
Exception
{
Collection
<
Channel
>
c
=
channelProvider
。
getChannels
();
for
(
Channel
channel
:
c
)
{
if
(
channel
。
isClosed
())
{
continue
;
}
doTask
(
channel
);
}
reput
(
timeout
,
tick
);
}
protected
abstract
void
doTask
(
Channel
channel
);
interface
ChannelProvider
{
Collection
<
Channel
>
getChannels
();
}
}
Note
從以上程式碼可以看出,利用定時輪實現間隔時間任務的模式比較固定,如下:
//保證立馬被定時輪獲知任務已被取消
protected
volatile
boolean
cancel
=
false
;
public
void
run
(
Timeout
timeout
)
throws
Exception
{
//執行任務業務邏輯
doTask
()
//重新
reput
(
timeout
,
tick
);
}
private
void
reput
(
Timeout
timeout
,
Long
tick
)
{
if
(
cancel
)
{
return
;
}
//確認定時輪還處於執行狀態
Timer
timer
=
timeout
。
timer
();
if
(
timer
。
isStop
()
||
timeout
。
isCancelled
())
{
return
;
}
//向定時輪重新提交一個新的__Task__,指定tick單位時間後執行
timer
。
newTimeout
(
timeout
。
task
(),
tick
,
TimeUnit
。
MILLISECONDS
);
}
週期性任務中對每一個Channel所做得事情比較簡單,實際上是在滿足條件的情況呼叫channel的指定操作
心跳——
channel。send(req)
Request
req
=
new
Request
();
req
。
setVersion
(
Version
。
getProtocolVersion
());
//雙邊通訊
req
。
setTwoWay
(
true
);
//事件型別:心跳
req
。
setEvent
(
Request
。
HEARTBEAT_EVENT
);
重連——
((Client) channel)。reconnect()
下線——
channel。close()
失敗重試
網路情況的的複雜多變性,使得一件原本在單機上很輕易的事情,分散式應用中,為確保某型別的操作能發生可能需要重試多次。 除了catch到異常後進行重試和對重試次數有規定外,和上述的週期任務實現幾乎一樣。模式如下:
/**
* times of retry。
* retry task is execute in single thread so that the times is not need volatile。
*/
//重試次數並沒有被申明為volatile,原因是該變數只會被定時輪引擎中的工作執行緒所使用到,投遞任務的那個執行緒並沒有直接接觸
private
int
times
=
1
;
。。。。
protected
void
reput
(
Timeout
timeout
,
long
tick
)
{
if
(
timeout
==
null
)
{
throw
new
IllegalArgumentException
();
}
Timer
timer
=
timeout
。
timer
();
if
(
timer
。
isStop
()
||
timeout
。
isCancelled
()
||
isCancel
())
{
return
;
}
times
++;
timer
。
newTimeout
(
timeout
。
task
(),
tick
,
TimeUnit
。
MILLISECONDS
);
}
@Override
public
void
run
(
Timeout
timeout
)
throws
Exception
{
if
(
timeout
。
isCancelled
()
||
timeout
。
timer
()。
isStop
()
||
isCancel
())
{
// other thread cancel this timeout or stop the timer。
return
;
}
if
(
times
>
retryTimes
)
{
// reach the most times of retry。
logger
。
warn
(
“Final failed to execute task ”
+
taskName
+
“, url: ”
+
url
+
“, retry ”
+
retryTimes
+
“ times。”
);
return
;
}
if
(
logger
。
isInfoEnabled
())
{
logger
。
info
(
taskName
+
“ : ”
+
url
);
}
try
{
doRetry
(
url
,
registry
,
timeout
);
}
catch
(
Throwable
t
)
{
// Ignore all the exceptions and wait for the next retry
logger
。
warn
(
“Failed to execute task ”
+
taskName
+
“, url: ”
+
url
+
“, waiting for again, cause:”
+
t
。
getMessage
(),
t
);
// reput this task when catch exception。
reput
(
timeout
,
retryPeriod
);
}
}
protected
abstract
void
doRetry
(
URL
url
,
FailbackRegistry
registry
,
Timeout
timeout
);
上一篇:釣魚100豆,什麼意思?
下一篇:不想用動如何才能減肥?