定時器的實現方案
定時器的實現方案:紅黑樹和多級時間輪
1、定時器的使用場景
定時器用於執行定時任務,各種場景都需要用到,比如
心跳檢測keep-alive
倒計時
遊戲技能CD等
各種延時處理等
2、定時器的觸發方式
對於服務端來說,驅動服務端業務邏輯的事件包括網路事件、定時事件、以及訊號事件;通常網路事件和定時事件會進行協同處理。
定時器觸發形式,根據網路事件和定時事件是否在同一執行緒中處理,分為:
一個執行緒中處理,協同處理
不同執行緒中處理,定時任務在單獨的執行緒中處理
2.1、同一執行緒處理
協同處理通常於定時任務比較少的場景。對於多執行緒而言,同一執行緒處理會引起事件處理的不均衡。當一個執行緒擁有較多定時任務時,會影響其對網路事件處理的效率。
為什麼網路事件和定時事件可以協同處理?
reactor 是基於事件的網路模型,對 io 的處理是同步的,對事件的處理是非同步的,定時任務是事件,對定時任務的處理也是非同步的。
如何進行協同處理?
以 io 多路複用作為定時器驅動,“阻塞”地收集就緒事件,timeout 引數用於設定定時。資料結構通常選擇紅黑樹、跳錶、最小堆等來實現定時器,後文會有詳細的解釋。
// 網路事件和定時事件在一個執行緒中處理,協同處理
while (!quit) {
// 最近定時任務的觸發時間 = 最近定時任務新增時設定的觸發時間 - 當前時間
int timeout = get_nearest_timer() - now();
if (timeout < 0) timeout = -1;
// 最近定時任務的觸發時間作為 timeout 引數,timetout 定時任務觸發
// 1、若沒有網路事件,先去處理定時任務
// 2、若收到網路事件,先處理網路事件,再處理定時任務
int nevent = epoll_wait(epfd, ev, nev, timeout);
for (int i = 0; i < nevent; i++) {
// 。。。 處理網路事件
}
// 輪詢處理定時事件
update_timer();
}
基於協同處理的開源框架
單 reactor:redis
多 reactornginx、memcached
2.2、不同執行緒中處理
定時任務在單獨的執行緒中檢測,通常用於處理大量定時任務。
以 usleep(time)作為定時器驅動,time 引數用於設定定時,要小於最小時間精度。
// 網路事件和定時事件在不同執行緒中處理
void * thread_timer(void *thread_param) {
init_timer();
while (!quit) {
update_timer();
sleep(t);
}
clear_timer();
return NULL;
}
pthread_create(&pid, NULL, thread_timer, &thread_param);
資料結構通常選擇時間輪,加鎖粒度比較小(對1個格子加鎖)。時間輪只負責檢測,透過訊號或者插入執行佇列讓其他執行緒執行。
3、定時器設計
3.1、介面設計
所有定時器都要實現的介面
// 初始化定時器
void init_timer();
// 定時器的新增,新增任務結點,基於此可以做多次觸發的介面,經過 expire 時間觸發cb
Node* add_timer(int expire, callback cb);
// 定時器的刪除,刪除定時任務
bool del_timer(Node* node);
// 定時器的更新,到期任務的處理
void update_timer();
在協同處理的方案中,需要額外新增介面,來查詢最近定時任務的觸發時間。
// 返回最近定時任務的觸發時間,用於協同處理
Node* find_nearest_timer();
3.2、資料結構設計
本質:按照定時任務的優先順序進行組織任務的執行順序。
組織方式
按照時間順序組織,要求資料結構有序,能快速查詢最近觸發的定時任務。在實現中,關鍵是要考慮相同時間觸發的定時任務如何處理。
紅黑樹(絕對有序): nginx
跳錶(絕對有序):redis未來引入
最小堆(相對有序): libevent, libev, go
按照執行順序組織:時間輪
4、紅黑樹
紅黑樹中序有序,查詢效率高,按照時間順序組織定時任務(考慮相同觸發時間如何組織),用於協同處理的方式,但加鎖粒度大(對整個紅黑樹加鎖)。
4.1、定時器的驅動
首先,選擇定時器驅動的方式,這裡選擇 epoll 來實現,透過引數 timeout 設定定時。
while (true) {
// 最近任務的觸發時間介面:TimeToSlee,作為 timeout 引數
int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());
for (int i = 0; i < n; i++) {
/* 處理網路事件 */
}
// 處理定時事件
while(timer->CheckTimer());
}
4.2、資料結構設計
C++ 中提供了map set等容器。為了簡單,我們這裡選擇 set來儲存定時器任務
set
在設計定時器任務結點時,有一個關鍵的問題,
相同觸發時間的定時任務的如何處理?
我們不能將觸發時間 expire 作為紅黑樹的 key 值。舉個栗子,A 事件到來時 tick=15,15 後執行,expire=30;B 事件到來時 tick=20, 10 後執行,expire=30。兩者的觸發時間相同,將在 tick=30 後觸發,這樣無法對事件排序。為避免上述情況的出現,在觸發時間相同時,我們根據
插入的先後順序
來決定事件的執行順序,先插入的先執行,放在紅黑樹左側。後插入後執行,放在紅黑樹右側。我們使用 id 屬性來描述事件到來的先後順序。
這裡,我們選擇 expire和 id 來唯一標識一個定時結點:
// 定義定時結點的基類,儲存唯一標識的元素
struct TimerNodeBase {
time_t expire; // 任務觸發時間(到期時間)
int64_t id; // 用來描述插入先後順序,int64_t,能記錄5000多年
};
// 定時結點,包含定時任務等
struct TimerNode : public TimerNodeBase {
// 定時器任務回撥函式
// 函式物件複製代價高,在容器內複製構造後不會再去移動
using Callback = std::function
Callback func;
// 建構函式,容器內部就地複製構造呼叫一次,此後不會再去呼叫
TimerNode(int64_t id, time_t expire, Callback func) : func(func) {
this->expire = expire;
this->id = id;
}
};
函式物件作為類,佔用大量的空間,複製控制和移動代價高,因此,我們拆分成基類和派生類,基類儲存標識,用於複製控制和移動;子類儲存函式物件等,在容器內就地構造後,不再賦複製控制和移動。
同樣在用函式物件實現比較函式,採用基類引用比較,體現多型特性,減少複製移動。
// 按觸發時間的先後順序對結點進行排序
// 基類引用,多型特性
bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {
// 先比較觸發時間
if (lhd。expire < rhd。expire)
return true;
else if (lhd。expire > rhd。expire)
return false;
// 觸發時間相同,比較插入的先後順序
// 比較id大小,先插入的結點id小,先執行
return lhd。id < rhd。id;
}
在 timer 類的介面實現中 find 函式利用了 C++14 的新特性,利用等價 key 比較,不需要構造 key 物件進行比較,也是同樣的原理。
4.3、timer 介面實現
初始化定時器
獲取當前時間介面,用於計算觸發時間
steady_clock:系統啟動到當前時間,用於計算程式執行時間
system_clock:時間戳,可以修改
high_resolution_clock:高精度版本的steady_clock
// 獲取當前時間
static time_t GetTick() {
auto sc = chrono::time_point_cast
auto temp = chrono::duration_cast
return temp。count();
}
定時器的新增
// 引數: msec 任務觸發時間間隔,func 任務執行的回撥函式
TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {
time_t expire = GetTick() + msec;
// emplace 容器內就地構造,避免複製構造和移動構造,
auto ele = timermap。emplace(GenID(), expire, func);
return static_cast
}
定時器的刪除
bool DelTimer(TimerNodeBase &node) {
// C++14的新特性:只需傳遞等價 key 比較,無需建立 key 物件比較,
// 代替子類結點,避免函式物件複製控制和移動
auto iter = timermap。find(node);
// 若存在,則刪除該結點
if (iter != timermap。end()) {
timermap。erase(iter);
return true;
}
return false;
}
定時器的更新
bool CheckTimer() {
auto iter = timermap。begin();
if (iter != timermap。end() && iter->expire <= GetTick()) {
// 定時任務被觸發,則執行對應的定時任務
iter->func(*iter);
// 刪除執行完畢的定時任務
timermap。erase(iter);
return true;
}
return false;
}
返回最近定時任務的觸發時間,用於一個執行緒協同處理,返回值作為 timeout 引數
time_t TimeToSleep() {
auto iter = timermap。begin();
if (iter == timermap。end()) {
return -1;
}
// 最近任務的觸發時間 = 最近任務初始設定的觸發時間 - 當前時間
time_t diss = iter->expire - GetTick();
// 最近要觸發的任務時間 > 0,繼續等待;= 0,立即處理任務
return diss > 0 ? diss : 0;
}
4。4、程式碼實現
#include
#include
#include
#include
#include
#include
using namespace std;
// 定時結點的基類,儲存唯一標識的元素,輕量級,用於比較
struct TimerNodeBase {
time_t expire; // 任務觸發時間
int64_t id; // 用來描述插入先後順序,int64_t,能記錄5000多年
};
// 定時結點,包含定時任務等
struct TimerNode : public TimerNodeBase {
// 定時器任務回撥函式
// 函式物件複製代價高,在容器內複製構造後不會再去移動
using Callback = std::function
Callback func;
// 建構函式,容器內部就地複製構造呼叫一次,此後不會再去呼叫
TimerNode(int64_t id, time_t expire, Callback func) : func(func) {
this->expire = expire;
this->id = id;
}
};
// 根據觸發時間對結點進行排序
// 基類引用,多型特性,基類代替timerNode結點,避免複製構造子類
bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {
// 先比較觸發時間
if (lhd。expire < rhd。expire)
return true;
else if (lhd。expire > rhd。expire)
return false;
// 觸發時間相同,比較插入的先後順序
// 比較id大小,先插入的結點id小,先執行
return lhd。id < rhd。id;
}
// 定時器類的實現
class Timer {
public:
// 獲取當前時間
static time_t GetTick() {
// 獲取系統時間戳,系統啟動到當前的時間
auto sc = chrono::time_point_cast
// 獲取到時間戳的時間段
auto temp = chrono::duration_cast
return temp。count();
}
// 2、新增定時任務
// 引數: msec 任務觸發時間間隔,func 任務執行的回撥函式
TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {
time_t expire = GetTick() + msec;
// emplace 容器內就地構造,避免複製構造和移動構造,
// 此後不再移動 TimerNode 結點(函式物件記憶體佔用多)
auto ele = timermap。emplace(GenID(), expire, func);
return static_cast
}
// 3、刪除/取消定時任務
bool DelTimer(TimerNodeBase &node) {
// C++14的新特性:只需傳遞等價 key 比較,無需建立 key 物件比較,
// 代替子類結點,避免函式物件複製控制和移動
auto iter = timermap。find(node);
// 若存在,則刪除該結點
if (iter != timermap。end()) {
timermap。erase(iter);
return true;
}
return false;
}
// 4、檢測定時任務是否被觸發,觸發則執行定時任務
bool CheckTimer() {
auto iter = timermap。begin();
if (iter != timermap。end() && iter->expire <= GetTick()) {
// 定時任務被觸發,則執行對應的定時任務
iter->func(*iter);
// 刪除執行完畢的定時任務
timermap。erase(iter);
return true;
}
return false;
}
// 5、返回最近定時任務觸發時間,作為timeout的引數
time_t TimeToSleep() {
auto iter = timermap。begin();
if (iter == timermap。end()) {
return -1;
}
// 最近任務的觸發時間 = 最近任務初始設定的觸發時間 - 當前時間
time_t diss = iter->expire - GetTick();
// 最近要觸發的任務時間 > 0,繼續等待;= 0,立即處理任務
return diss > 0 ? diss : 0;
}
private:
// 產生 id 的方法
static int64_t GenID() {
return gid++;
}
static int64_t gid;
// 利用 set 排序快速查詢要到期的任務
set
};
int64_t Timer::gid = 0;
int main() {
// 定時器驅動
int epfd = epoll_create(1);
// 建立定時器
unique_ptr
int i = 0;
timer->AddTimer(1000, [&](const TimerNode &node) {
cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;
});
timer->AddTimer(1000, [&](const TimerNode &node) {
cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;
});
timer->AddTimer(3000, [&](const TimerNode &node) {
cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;
});
auto node = timer->AddTimer(2100, [&](const TimerNode &node) {
cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;
});
timer->DelTimer(node);
cout << “now time:” << Timer::GetTick() << endl;
epoll_event ev[64] = {0};
while (true) {
// 最近任務的觸發時間介面:TimeToSlee,作為 timeout 引數
int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());
for (int i = 0; i < n; i++) {
/*。。。 處理網路事件 。。。*/
}
// 處理定時事件
while(timer->CheckTimer());
}
return 0;
}
5、多級時間輪
時間輪 timewheel 是一個環形結構,使用 hash + list 實現,類似時鐘。時鐘上的格子(槽位 slot)代表一段時間,slot大小表示時間精度,slot的數量表示時間範圍。每個格子指向一條定時器 list,儲存該格子上所有到期的任務。同一格子的任務觸發時間相同,用 list 解決了 hash 衝突。錶盤上的指標隨時間一格一格轉動 tick,當指標指向一個格子,執行格子對應 list 中的所有到期的任務。任務透過取模決定應該放入哪個格子。
簡單時間輪
如果任務的時間跨度很大,數量也多,傳統的單輪時間輪會造成任務的 round 很大,單個格子的任務 list 很長,並會維持很長一段時間。這時可將時間輪按時間粒度分級
多級時間輪
多級時間輪,低一級輪轉動一圈,高一級輪轉動一格,本質就是計數進位制。秒針在每個定時週期移動一個格子,當秒針轉一圈後,分鐘轉動一個格子。秒針輪指標主動轉動,而分鐘輪時針輪無法主動轉動,只能等待低階輪進位才能轉動,同時自動把即將到期的定時器任務遷移到低一級輪子裡。
根據輪子的型別,可以分為主動輪和從動輪。
主動輪
:當刻度指標指向當前槽的時候,槽內的任務被順序執行。
從動輪
:當對應輪的刻度指標指向當前槽位的時候,槽內的任務鏈依次向低階輪(序號較高的輪)轉移,從動輪沒有執行任務許可權,只是對任務進行記錄與快取。
為什麼時間輪分成多個層級?
減少空間佔用。若採用單級時間輪,則需要 12 * 60 * 60 個 slot ,若採用上述三級結構,則需要空間大小 60 + 60 + 12 個slot,極大地減少了空間佔用。
只需要關注最近要觸發的定時任務(主動輪),按照任務觸發的優先順序組織任務,減少對任務的檢測
多級時間輪定時器的應用場景很多,比如 linux 核心,skynet,kafka,netty等
以 skynet 為例,skynet 作為單 reacto r模型,適用於 cpu 密集型的場景。timer 由 timer 執行緒管理,當有定時任務時將任務派發給其他執行緒執行。下圖即為多執行緒環境下 skynet 時間輪執行圖。
skynet 執行原理圖
接下來,對 skynet 原始碼做了簡化改動,介紹多級時間輪的實現方式。
5.1、定時器的驅動
首先,選擇定時器驅動的方式,這裡選擇 usleep 來實現
時間精度:usleep gettime兩個介面確定,10ms
時間範圍:uint32_t time
// 檢測定時器,每過1/4時間精度執行一次
// 原因是 dispatch 分發任務花費時間,影響精度
void expire_timer(void) {
// 獲取當前系統執行時間,不受使用者的影響
uint64_t cp = gettime();
// 當前系統啟動時間與定時器記錄的系統啟動時間不相等
if (cp != TI->current_point) {
// 獲取上述兩者的差值
uint32_t diff = (uint32_t)(cp - TI->current_point);
// 更新定時器記錄的系統執行時間
TI->current_point = cp;
// 更新timer的執行時間
TI->current += diff;
// 更新定時器的時間(time的值),並執行對應的過期任務
int i;
for (i=0; i // 每執行一次timer_update,其內部的函式 // timer_shift: time+1,time代表從timer啟動後至今一共經歷了多少次tick // timer_execute: 執行near中的定時器 timer_update(TI); } } } // timer 執行緒中,每過1/4時間精度,即2。5ms,執行一次定時器的檢測 while (!ctx。quit) { expire_timer(); usleep(2500); } 5.2、資料結構設計 定時器的設計 指標陣列 以 skynet、linux 核心定時器的多級時間輪為例,定義了5個連結串列陣列,每個數組裡包含多個定時器連結串列,near 陣列大小為28,其餘陣列大小為26,表示的時間範圍 28+6+6+6+6 = 232。 多級指標陣列 time 指標 我們僅關注儲存最近要觸發事件的 near 陣列,其他陣列指標可以透過計算獲得,所以只需要一個時間指標 time 即可。該指標記錄 timer 自啟動到現在的 tick 數,指向時間輪上的一個槽 slot。當 tick 在低一級移動一圈,需要將高一級輪子中的定時任務重新對映。 在 skynet 中 32 位無符號整數 time 就是該指標,不同位數分別對應陣列near[256] 和t[4][64],每過 10 ms 增加一次。當 time 溢位時,32位無符號迴圈,再次從0開始計數。 time 指標 定時器的定義如下: typedef struct timer { link_list_t near[TIME_NEAR]; // 最低階的時間輪,主動輪 link_list_t t[4][TIME_LEVEL]; // 其他層級的時間輪,從動輪 struct spinlock lock; // 自旋鎖,O(1) uint32_t time; // tick 指標,當前時間片 uint64_t current; // timer執行時間,時間精度10ms uint64_t current_point; // 系統執行時間,時間精度10ms }s_timer_t; 任務結點的設計 任務結點使用連結串列儲存,連結串列中儲存同一時間觸發的任務結點 struct timer_node { struct timer_node *next; // 相同過期時間的待執行的下一個任務 uint32_t expire; // 任務過期時間 handler_pt callback; // 任務回撥函式 uint8_t cancel; // 刪除任務的標記,取消任務的執行 int id; // 執行該任務的執行緒 id }; 5.3、timer 介面實現 初始化定時器 // 初始化定時器 void init_timer(void) { TI = timer_create_timer(); // 建立定時器 TI->current_point = gettime(); // 獲取系統當前執行時間 } // 建立定時器 s_timer_t* timer_create_timer() { s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t)); memset(r, 0, sizeof(*r)); int i, j; // 建立主動輪,最低階時間輪 for (i = 0; i < TIME_NEAR; ++i) { link_clear(&r->near[i]); } // 建立從動輪,高層級時間輪 for (i = 0; i < 4; ++i) { for (j = 0;j < TIME_LEVEL; ++j) { link_clear(&r->t[i][j]); } } // 初始化自旋鎖 spinlock_init(&r->lock); r->current = 0; return r; } // 獲取系統當前執行時間,時間精度10ms uint64_t gettime() { uint64_t t; #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER) struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); // CLOCK_MONOTONIC t = (uint64_t)ti。tv_sec * 1000; t += ti。tv_nsec / 1000000; #else struct timeval tv; gettimeofday(&tv, NULL); t = (uint64_t)tv。tv_sec * 100; t += tv。tv_usec / 10000; #endif return t; } 定時器的新增 計算定時器到期時間 expire 和當前定時器啟動時間 time 的差值,記為 msec 根據 msec 的值判斷結點應該新增到定時器指標陣列的哪一層 // 新增任務結點到定時器中 // 根據 msec 判斷結點應該放入時間輪的層級 void add_node(s_timer_t *T, timer_node_t *node) { uint32_t time = node->expire; // 過期時間 uint32_t current_time=T->time; // 當前時間 uint32_t msec = time - current_time; // 剩餘時間 //根據 expire-time 的差值將結點放入相應的層級 //[0, 2^8) if (msec < TIME_NEAR) { link(&T->near[time&TIME_NEAR_MASK],node); } //[2^8, 2^14) else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) { link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); } //[2^14, 2^20) else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) { link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^20, 2^26) else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) { link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^26, 2^32) else { link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } } // 新增定時任務 timer_node_t* add_timer(int time, handler_pt func, int threadid) { timer_node_t *node = (timer_node_t *)malloc(sizeof(*node)); spinlock_lock(&TI->lock); // 設定定時任務結點的屬性 node->expire = time + TI->time; // 新增觸發時間 = 觸發時間間隔 + 當前時間 node->callback = func; // 新增任務回撥函式 node->id = threadid; // 新增執行該任務的執行緒id // 判斷是否需要立即執行任務 if (time <= 0) { spinlock_unlock(&TI->lock); node->callback(node); free(node); return NULL; } // 新增任務結點到定時器中 add_node(TI, node); spinlock_unlock(&TI->lock); return node; } 定時器的刪除 由於結點位置可能發生變化(重新對映),不能找到任務結點的位置,無法刪除。 在結點中新增一個 cancel 欄位,任務觸發碰到該標記則不執行任務,之後統一釋放空間。 void del_timer(timer_node_t *node) { node->cancel = 1; } 定時器的更新 主要包括對到期任務的處理和對從動輪任務(time高24位對應的連結串列)的重新對映。 void timer_update(s_timer_t *T) { spinlock_lock(&T->lock); // 執行任務 timer_execute(T); // time+1,並判斷是否進行重新對映 timer_shift(T); // 若發生重新對映,若time的指向有任務,則需要執行 timer_execute(T); spinlock_unlock(&T->lock); } 到期任務的處理 在每次tick事件中,定時器以當前 tick 值的低8位作為索引,取出 near 陣列中對應的 list,list 裡面包含了所有在該 tick 到期的定時器列表。 // 執行任務 // 以time的低8位對應的near陣列的索引,取出該位置對應的list void timer_execute(s_timer_t *T) { // 取出time低8位對應的索引值 int idx = T->time & TIME_NEAR_MASK; // 如果低8位值對應的near陣列元素有連結串列,則取出 while (T->near[idx]。head。next) { // 取出對應的定時器list timer_node_t *current = link_clear(&T->near[idx]); spinlock_unlock(&T->lock); // 將連結串列各結點的任務派發出去 dispatch_list(current); spinlock_lock(&T->lock); } } // 任務派發 void dispatch_list(timer_node_t *current) { do { timer_node_t *temp = current; current=current->next; // cancel 標記為0,執行任務回撥函式;否則,不執行任務回撥 if (temp->cancel == 0) temp->callback(temp); free(temp); } while (current); } 重新對映 為什麼要對結點重新對映 只有主動輪的結點要執行,從動輪只是儲存結點,主動輪結點執行完後,需要從動輪補充。 重新對映的流程 確定重新對映的位置 取出該指標指向槽位的所有任務,time = tick0 + time - tick 再次新增結點到時間輪中 如何確定重新對映的位置 檢查time是否溢位,如果溢位則將t[3][0]這個連結串列取出並依次將該連結串列中的節點新增(即實現該連結串列的移動操作),如果time未溢位,則進行下一步。 檢查 time 的第1-8位是否溢位產生進位,沒有則結束,有則說明 time 恰走完 near 陣列。繼續判斷檢查第9-14位是否溢位。有則進行下一步,沒有則說明是t[0]陣列移動。此時將 time 的第9-14位的值作為索引 idx,取出 t[0][idx] 中的 list,並將 list 中的結點依次重新新增到定時器中。而這些結點一定會新增到 near 陣列中,實現了 t[0]向 near的遷移。 檢查 time 的第9-14位是否溢位產生進位,沒有則結束,有則說明 time 恰走完t[0]陣列。繼續判斷檢查第15-20位是否溢位,有則進行下一步,沒有則說明是t[1]陣列移動。此時將 time 的第15-20位作為索引 idx,取出t[1][idx]中的 list,將 list 中的結點依次重新新增到定時器中,而這些結點一定會新增到 t[0] 陣列中,實現了 t[1]向 t[0]的遷移。 …… 後續所有操作以此類推,時間輪由低向高逐級判斷,因為高階輪溢位,低階一定溢位,這樣實現了多級定時器逐級遷移的過程 // 重新對映,判斷是否需要重新對映 // 時間片time自加1,將高24位對應的4個6位的陣列中的各個元素的連結串列往低位移 void timer_shift(s_timer_t *T) { int mask = TIME_NEAR; // 時間片+1 uint32_t ct = ++T->time; // 時間片溢位,無符號整數,迴圈,time重置0 if (ct == 0) { // 將對應的t[3][0]連結串列取出,重新移動到定時器中 move_list(T, 3, 0); } else { // ct右移8位,進入到從動輪 uint32_t time = ct >> TIME_NEAR_SHIFT; // 第 i 層時間輪 int i = 0; // 判斷是否需要重新對映? // 即迴圈判斷當前層級對應的數位是否全0,即溢位產生進位 while ((ct & (mask-1))==0) { // 取當前層級的索引值 int idx = time & TIME_LEVEL_MASK; // idx=0 說明當前層級溢位,繼續迴圈判斷直至當前層級不溢位 if (idx != 0) { // 將對應的t[i][idx]連結串列取出,依次移動到定時器中 move_list(T, i, idx); break; } mask <<= TIME_LEVEL_SHIFT; // mask 右移 time >>= TIME_LEVEL_SHIFT; // time 左移 ++i; // 時間輪層級增加 } } } 5.4、程式碼實現 // timer_wheel。h #ifndef _MARK_TIMEWHEEL_ #define _MARK_TIMEWHEEL_ #include #define TIME_NEAR_SHIFT 8 #define TIME_NEAR (1 << TIME_NEAR_SHIFT) #define TIME_LEVEL_SHIFT 6 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT) #define TIME_NEAR_MASK (TIME_NEAR-1) #define TIME_LEVEL_MASK (TIME_LEVEL-1) typedef struct timer_node timer_node_t; typedef void (*handler_pt) (struct timer_node *node); // 任務結點 struct timer_node { struct timer_node *next; // 相同過期時間的待執行的下一個任務 uint32_t expire; // 任務過期時間 handler_pt callback; // 任務回撥函式 uint8_t cancel; // 刪除任務,遇到該標記則取消任務的執行 int id; // 此時攜帶引數 }; timer_node_t* add_timer(int time, handler_pt func, int threadid); void expire_timer(void); void del_timer(timer_node_t* node); void init_timer(void); void clear_timer(); #endif // timer_wheel。c #include “spinlock。h” #include “timewheel。h” #include #include #include #if defined(__APPLE__) #include #include #include #include #else #include #endif typedef struct link_list { timer_node_t head; timer_node_t *tail; }link_list_t; // 定時器的資料結構 typedef struct timer { link_list_t near[TIME_NEAR]; // 最低階的時間輪,主動輪 link_list_t t[4][TIME_LEVEL]; // 其他層級的時間輪,從動輪 struct spinlock lock; // 自旋鎖,O(1) uint32_t time; // tick 指標,當前時間片 uint64_t current; // timer執行時間,精度10ms uint64_t current_point; // 系統執行時間,精度10ms }s_timer_t; static s_timer_t * TI = NULL; // 清空連結串列 // 並返回指向連結串列的第一個結點的指標 timer_node_t* link_clear(link_list_t *list) { // 指向頭指標的下一個位置 timer_node_t * ret = list->head。next; // 頭結點斷鏈 list->head。next = 0; // 尾指標指向頭結點 list->tail = &(list->head); return ret; } // 尾插法 void link(link_list_t *list, timer_node_t *node) { list->tail->next = node; list->tail = node; node->next=0; } // 新增任務結點到定時器中 // 根據 time 判斷結點應該放入時間輪的層級 void add_node(s_timer_t *T, timer_node_t *node) { uint32_t time = node->expire; // 過期時間 uint32_t current_time=T->time; // 當前時間 uint32_t msec = time - current_time; // 剩餘時間 //根據 expire-time 的差值將結點放入相應的層級 //[0, 2^8) if (msec < TIME_NEAR) { link(&T->near[time&TIME_NEAR_MASK],node); } //[2^8, 2^14) else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) { link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node); } //[2^14, 2^20) else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) { link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^20, 2^26) else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) { link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } //[2^26, 2^32) else { link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } } // 新增定時任務 timer_node_t* add_timer(int time, handler_pt func, int threadid) { timer_node_t *node = (timer_node_t *)malloc(sizeof(*node)); spinlock_lock(&TI->lock); // 設定定時任務結點的屬性 node->expire = time + TI->time; // 新增任務觸發時間 expire = time + tick node->callback = func; // 新增任務回撥函式 node->id = threadid; // 新增執行該任務的執行緒id // 判斷是否需要立即執行任務 if (time <= 0) { spinlock_unlock(&TI->lock); node->callback(node); free(node); return NULL; } // 新增任務結點到定時器中 add_node(TI, node); spinlock_unlock(&TI->lock); return node; } // 移動連結串列 // 第level層第idx個位置的連結串列結點重新新增到定時器T中 void move_list(s_timer_t *T, int level, int idx) { timer_node_t *current = link_clear(&T->t[level][idx]); while (current) { timer_node_t *temp = current->next; add_node(T, current); current = temp; } } // 重新對映,判斷是否需要重新對映 // 時間片time自加1,將高24位對應的4個6位的陣列中的各個元素的連結串列往低位移 void timer_shift(s_timer_t *T) { int mask = TIME_NEAR; // 時間片+1 uint32_t ct = ++T->time; // 時間片溢位,無符號整數,迴圈,time重置0 if (ct == 0) { // 將對應的t[3][0]連結串列取出,重新移動到定時器中 move_list(T, 3, 0); } else { // ct右移8位,進入到從動輪 uint32_t time = ct >> TIME_NEAR_SHIFT; // 第 i 層時間輪 int i = 0; // 判斷是否需要重新對映? // 即迴圈判斷當前層級對應的數位是否全0,即溢位產生進位 while ((ct & (mask-1))==0) { // 取當前層級的索引值 int idx = time & TIME_LEVEL_MASK; // idx=0 說明當前層級溢位,繼續迴圈判斷直至當前層級不溢位 if (idx != 0) { // 將對應的t[i][idx]連結串列取出,依次移動到定時器中 move_list(T, i, idx); break; } mask <<= TIME_LEVEL_SHIFT; // mask 右移 time >>= TIME_LEVEL_SHIFT; // time 左移 ++i; // 時間輪層級增加 } } } // 任務派發給其他執行緒執行 void dispatch_list(timer_node_t *current) { do { timer_node_t *temp = current; current=current->next; // cancel 標記為0,執行任務回撥函式 if (temp->cancel == 0) temp->callback(temp); free(temp); } while (current); } // 執行任務 // 以time的低8位對應的near陣列的索引,取出該位置對應的list void timer_execute(s_timer_t *T) { // 取出time低8位對應的值 int idx = T->time & TIME_NEAR_MASK; // 如果低8位值對應的near陣列元素有連結串列,則取出 while (T->near[idx]。head。next) { // 取出對應的定時器list timer_node_t *current = link_clear(&T->near[idx]); spinlock_unlock(&T->lock); // 將連結串列各結點的任務派發出去 dispatch_list(current); spinlock_lock(&T->lock); } } // 定時器更新 void timer_update(s_timer_t *T) { spinlock_lock(&T->lock); // 執行任務 timer_execute(T); /// time+1,並判斷是否進行重新對映 timer_shift(T); // 若發生重新對映,若time的指向有任務,則需要執行 timer_execute(T); spinlock_unlock(&T->lock); } // 刪除定時器任務 void del_timer(timer_node_t *node) { node->cancel = 1; } // 建立定時器 s_timer_t * timer_create_timer() { s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t)); memset(r, 0, sizeof(*r)); int i, j; // 建立主動輪,最低階時間輪 for (i = 0; i < TIME_NEAR; ++i) { link_clear(&r->near[i]); } // 建立從動輪,高層級時間輪 for (i = 0; i < 4; ++i) { for (j = 0;j < TIME_LEVEL; ++j) { link_clear(&r->t[i][j]); } } // 初始化自旋鎖 spinlock_init(&r->lock); r->current = 0; return r; } // 獲取當前時間,時間精度10ms uint64_t gettime() { uint64_t t; #if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER) struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti);// CLOCK_MONOTONIC,從系統啟動這一刻起開始計時,不受系統時間被使用者改變的影響 t = (uint64_t)ti。tv_sec * 1000; t += ti。tv_nsec / 1000000; #else struct timeval tv; gettimeofday(&tv, NULL); t = (uint64_t)tv。tv_sec * 100; t += tv。tv_usec / 10000; #endif return t; } // 檢測定時器,時間精度10ms,每過1/4時間精度2。5ms執行1次 // 原因是dispatch分發任務花費時間,影響精度 void expire_timer(void) { // 獲取當前系統執行時間,不受系統時間被使用者的影響 uint64_t cp = gettime(); // 當前系統啟動時間與定時器記錄的系統啟動時間不相等 if (cp != TI->current_point) { // 獲取上述兩者的差值 uint32_t diff = (uint32_t)(cp - TI->current_point); // 更新定時器記錄的系統執行時間 TI->current_point = cp; // 更新timer的執行時間 TI->current += diff; // 更新定時器的時間(time的值),並執行對應的過期任務 int i; for (i=0; i // 每執行一次timer_update,其內部的函式 // timer_shift: time+1,time代表從timer啟動後至今一共經歷了多少次tick // timer_execute: 執行near中的定時器 timer_update(TI); } } } // 初始化定時器 void init_timer(void) { TI = timer_create_timer(); // 建立定時器 TI->current_point = gettime(); // 獲取當前時間 } void clear_timer() { int i,j; for (i=0;i link_list_t * list = &TI->near[i]; timer_node_t* current = list->head。next; while(current) { timer_node_t * temp = current; current = current->next; free(temp); } link_clear(&TI->near[i]); } for (i=0;i<4;i++) { for (j=0;j link_list_t * list = &TI->t[i][j]; timer_node_t* current = list->head。next; while (current) { timer_node_t * temp = current; current = current->next; free(temp); } link_clear(&TI->t[i][j]); } } } // tw-timer。c // gcc tw-timer。c timewheel。c -o tw -I。/ -lpthread #include #include #include #include #include #include “timewheel。h” struct context { int quit; int thread; }; struct thread_param { struct context *ctx; int id; }; static struct context ctx = {0}; void do_timer(timer_node_t *node) { printf(“do_timer expired:%d - thread-id:%d\n”, node->expire, node->id); add_timer(100, do_timer, node->id); } void do_clock(timer_node_t *node) { static int time; time ++; printf(“——-time = %d ——-\n”, time); add_timer(100, do_clock, node->id); } void* thread_worker(void *p) { struct thread_param *tp = p; int id = tp->id; struct context *ctx = tp->ctx; int expire = rand() % 200; add_timer(expire, do_timer, id); while (!ctx->quit) { usleep(1000); } printf(“thread_worker:%d exit!\n”, id); return NULL; } void do_quit(timer_node_t * node) { ctx。quit = 1; } int main() { srand(time(NULL)); ctx。thread = 2; pthread_t pid[ctx。thread]; init_timer(); add_timer(6000, do_quit, 100); add_timer(0, do_clock, 100); struct thread_param task_thread_p[ctx。thread]; int i; for (i = 0; i < ctx。thread; i++) { task_thread_p[i]。id = i; task_thread_p[i]。ctx = &ctx; if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) { fprintf(stderr, “create thread failed\n”); exit(1); } } while (!ctx。quit) { expire_timer(); usleep(2500); // 2。5ms } clear_timer(); for (i = 0; i < ctx。thread; i++) { pthread_join(pid[i], NULL); } printf(“all thread is closed\n”); return 0; } 6、參考資料 skynet 原始碼閱讀筆記 —— skynet 中的定時器機制 Skynet定時器原理