您當前的位置:首頁 > 動漫

定時器的實現方案

作者:由 一隻名叫坂本的喵 發表于 動漫時間:2022-09-19

定時器的實現方案:紅黑樹和多級時間輪

1、定時器的使用場景

定時器用於執行定時任務,各種場景都需要用到,比如

心跳檢測keep-alive

倒計時

遊戲技能CD等

各種延時處理等

2、定時器的觸發方式

對於服務端來說,驅動服務端業務邏輯的事件包括網路事件、定時事件、以及訊號事件;通常網路事件和定時事件會進行協同處理。

定時器觸發形式,根據網路事件和定時事件是否在同一執行緒中處理,分為:

一個執行緒中處理,協同處理

不同執行緒中處理,定時任務在單獨的執行緒中處理

2.1、同一執行緒處理

協同處理通常於定時任務比較少的場景。對於多執行緒而言,同一執行緒處理會引起事件處理的不均衡。當一個執行緒擁有較多定時任務時,會影響其對網路事件處理的效率。

為什麼網路事件和定時事件可以協同處理?

reactor 是基於事件的網路模型,對 io 的處理是同步的,對事件的處理是非同步的,定時任務是事件,對定時任務的處理也是非同步的。

如何進行協同處理?

以 io 多路複用作為定時器驅動,“阻塞”地收集就緒事件,timeout 引數用於設定定時。資料結構通常選擇紅黑樹、跳錶、最小堆等來實現定時器,後文會有詳細的解釋。

// 網路事件和定時事件在一個執行緒中處理,協同處理

while (!quit) {

// 最近定時任務的觸發時間 = 最近定時任務新增時設定的觸發時間 - 當前時間

int timeout = get_nearest_timer() - now();

if (timeout < 0) timeout = -1;

// 最近定時任務的觸發時間作為 timeout 引數,timetout 定時任務觸發

// 1、若沒有網路事件,先去處理定時任務

// 2、若收到網路事件,先處理網路事件,再處理定時任務

int nevent = epoll_wait(epfd, ev, nev, timeout);

for (int i = 0; i < nevent; i++) {

// 。。。 處理網路事件

}

// 輪詢處理定時事件

update_timer();

}

基於協同處理的開源框架

單 reactor:redis

多 reactornginx、memcached

2.2、不同執行緒中處理

定時任務在單獨的執行緒中檢測,通常用於處理大量定時任務。

以 usleep(time)作為定時器驅動,time 引數用於設定定時,要小於最小時間精度。

// 網路事件和定時事件在不同執行緒中處理

void * thread_timer(void *thread_param) {

init_timer();

while (!quit) {

update_timer();

sleep(t);

}

clear_timer();

return NULL;

}

pthread_create(&pid, NULL, thread_timer, &thread_param);

資料結構通常選擇時間輪,加鎖粒度比較小(對1個格子加鎖)。時間輪只負責檢測,透過訊號或者插入執行佇列讓其他執行緒執行。

3、定時器設計

3.1、介面設計

所有定時器都要實現的介面

// 初始化定時器

void init_timer();

// 定時器的新增,新增任務結點,基於此可以做多次觸發的介面,經過 expire 時間觸發cb

Node* add_timer(int expire, callback cb);

// 定時器的刪除,刪除定時任務

bool del_timer(Node* node);

// 定時器的更新,到期任務的處理

void update_timer();

在協同處理的方案中,需要額外新增介面,來查詢最近定時任務的觸發時間。

// 返回最近定時任務的觸發時間,用於協同處理

Node* find_nearest_timer();

3.2、資料結構設計

本質:按照定時任務的優先順序進行組織任務的執行順序。

組織方式

按照時間順序組織,要求資料結構有序,能快速查詢最近觸發的定時任務。在實現中,關鍵是要考慮相同時間觸發的定時任務如何處理。

紅黑樹(絕對有序): nginx

跳錶(絕對有序):redis未來引入

最小堆(相對有序): libevent, libev, go

按照執行順序組織:時間輪

4、紅黑樹

紅黑樹中序有序,查詢效率高,按照時間順序組織定時任務(考慮相同觸發時間如何組織),用於協同處理的方式,但加鎖粒度大(對整個紅黑樹加鎖)。

4.1、定時器的驅動

首先,選擇定時器驅動的方式,這裡選擇 epoll 來實現,透過引數 timeout 設定定時。

while (true) {

// 最近任務的觸發時間介面:TimeToSlee,作為 timeout 引數

int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());

for (int i = 0; i < n; i++) {

/* 處理網路事件 */

}

// 處理定時事件

while(timer->CheckTimer());

}

4.2、資料結構設計

C++ 中提供了map set等容器。為了簡單,我們這裡選擇 set來儲存定時器任務

set> timermap;

在設計定時器任務結點時,有一個關鍵的問題,

相同觸發時間的定時任務的如何處理?

我們不能將觸發時間 expire 作為紅黑樹的 key 值。舉個栗子,A 事件到來時 tick=15,15 後執行,expire=30;B 事件到來時 tick=20, 10 後執行,expire=30。兩者的觸發時間相同,將在 tick=30 後觸發,這樣無法對事件排序。為避免上述情況的出現,在觸發時間相同時,我們根據

插入的先後順序

來決定事件的執行順序,先插入的先執行,放在紅黑樹左側。後插入後執行,放在紅黑樹右側。我們使用 id 屬性來描述事件到來的先後順序。

這裡,我們選擇 expire和 id 來唯一標識一個定時結點:

// 定義定時結點的基類,儲存唯一標識的元素

struct TimerNodeBase {

time_t expire; // 任務觸發時間(到期時間)

int64_t id; // 用來描述插入先後順序,int64_t,能記錄5000多年

};

// 定時結點,包含定時任務等

struct TimerNode : public TimerNodeBase {

// 定時器任務回撥函式

// 函式物件複製代價高,在容器內複製構造後不會再去移動

using Callback = std::function

Callback func;

// 建構函式,容器內部就地複製構造呼叫一次,此後不會再去呼叫

TimerNode(int64_t id, time_t expire, Callback func) : func(func) {

this->expire = expire;

this->id = id;

}

};

函式物件作為類,佔用大量的空間,複製控制和移動代價高,因此,我們拆分成基類和派生類,基類儲存標識,用於複製控制和移動;子類儲存函式物件等,在容器內就地構造後,不再賦複製控制和移動。

同樣在用函式物件實現比較函式,採用基類引用比較,體現多型特性,減少複製移動。

// 按觸發時間的先後順序對結點進行排序

// 基類引用,多型特性

bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {

// 先比較觸發時間

if (lhd。expire < rhd。expire)

return true;

else if (lhd。expire > rhd。expire)

return false;

// 觸發時間相同,比較插入的先後順序

// 比較id大小,先插入的結點id小,先執行

return lhd。id < rhd。id;

}

在 timer 類的介面實現中 find 函式利用了 C++14 的新特性,利用等價 key 比較,不需要構造 key 物件進行比較,也是同樣的原理。

4.3、timer 介面實現

初始化定時器

獲取當前時間介面,用於計算觸發時間

steady_clock:系統啟動到當前時間,用於計算程式執行時間

system_clock:時間戳,可以修改

high_resolution_clock:高精度版本的steady_clock

// 獲取當前時間

static time_t GetTick() {

auto sc = chrono::time_point_cast(chrono::steady_clock::now());

auto temp = chrono::duration_cast(sc。time_since_epoch());

return temp。count();

}

定時器的新增

// 引數: msec 任務觸發時間間隔,func 任務執行的回撥函式

TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {

time_t expire = GetTick() + msec;

// emplace 容器內就地構造,避免複製構造和移動構造,

auto ele = timermap。emplace(GenID(), expire, func);

return static_cast(*ele。first);

}

定時器的刪除

bool DelTimer(TimerNodeBase &node) {

// C++14的新特性:只需傳遞等價 key 比較,無需建立 key 物件比較,

// 代替子類結點,避免函式物件複製控制和移動

auto iter = timermap。find(node);

// 若存在,則刪除該結點

if (iter != timermap。end()) {

timermap。erase(iter);

return true;

}

return false;

}

定時器的更新

bool CheckTimer() {

auto iter = timermap。begin();

if (iter != timermap。end() && iter->expire <= GetTick()) {

// 定時任務被觸發,則執行對應的定時任務

iter->func(*iter);

// 刪除執行完畢的定時任務

timermap。erase(iter);

return true;

}

return false;

}

返回最近定時任務的觸發時間,用於一個執行緒協同處理,返回值作為 timeout 引數

time_t TimeToSleep() {

auto iter = timermap。begin();

if (iter == timermap。end()) {

return -1;

}

// 最近任務的觸發時間 = 最近任務初始設定的觸發時間 - 當前時間

time_t diss = iter->expire - GetTick();

// 最近要觸發的任務時間 > 0,繼續等待;= 0,立即處理任務

return diss > 0 ? diss : 0;

}

4。4、程式碼實現

#include

#include

#include

#include

#include

#include

using namespace std;

// 定時結點的基類,儲存唯一標識的元素,輕量級,用於比較

struct TimerNodeBase {

time_t expire; // 任務觸發時間

int64_t id; // 用來描述插入先後順序,int64_t,能記錄5000多年

};

// 定時結點,包含定時任務等

struct TimerNode : public TimerNodeBase {

// 定時器任務回撥函式

// 函式物件複製代價高,在容器內複製構造後不會再去移動

using Callback = std::function

Callback func;

// 建構函式,容器內部就地複製構造呼叫一次,此後不會再去呼叫

TimerNode(int64_t id, time_t expire, Callback func) : func(func) {

this->expire = expire;

this->id = id;

}

};

// 根據觸發時間對結點進行排序

// 基類引用,多型特性,基類代替timerNode結點,避免複製構造子類

bool operator < (const TimerNodeBase &lhd, const TimerNodeBase &rhd) {

// 先比較觸發時間

if (lhd。expire < rhd。expire)

return true;

else if (lhd。expire > rhd。expire)

return false;

// 觸發時間相同,比較插入的先後順序

// 比較id大小,先插入的結點id小,先執行

return lhd。id < rhd。id;

}

// 定時器類的實現

class Timer {

public:

// 獲取當前時間

static time_t GetTick() {

// 獲取系統時間戳,系統啟動到當前的時間

auto sc = chrono::time_point_cast(chrono::steady_clock::now());

// 獲取到時間戳的時間段

auto temp = chrono::duration_cast(sc。time_since_epoch());

return temp。count();

}

// 2、新增定時任務

// 引數: msec 任務觸發時間間隔,func 任務執行的回撥函式

TimerNodeBase AddTimer(time_t msec, TimerNode::Callback func) {

time_t expire = GetTick() + msec;

// emplace 容器內就地構造,避免複製構造和移動構造,

// 此後不再移動 TimerNode 結點(函式物件記憶體佔用多)

auto ele = timermap。emplace(GenID(), expire, func);

return static_cast(*ele。first);

}

// 3、刪除/取消定時任務

bool DelTimer(TimerNodeBase &node) {

// C++14的新特性:只需傳遞等價 key 比較,無需建立 key 物件比較,

// 代替子類結點,避免函式物件複製控制和移動

auto iter = timermap。find(node);

// 若存在,則刪除該結點

if (iter != timermap。end()) {

timermap。erase(iter);

return true;

}

return false;

}

// 4、檢測定時任務是否被觸發,觸發則執行定時任務

bool CheckTimer() {

auto iter = timermap。begin();

if (iter != timermap。end() && iter->expire <= GetTick()) {

// 定時任務被觸發,則執行對應的定時任務

iter->func(*iter);

// 刪除執行完畢的定時任務

timermap。erase(iter);

return true;

}

return false;

}

// 5、返回最近定時任務觸發時間,作為timeout的引數

time_t TimeToSleep() {

auto iter = timermap。begin();

if (iter == timermap。end()) {

return -1;

}

// 最近任務的觸發時間 = 最近任務初始設定的觸發時間 - 當前時間

time_t diss = iter->expire - GetTick();

// 最近要觸發的任務時間 > 0,繼續等待;= 0,立即處理任務

return diss > 0 ? diss : 0;

}

private:

// 產生 id 的方法

static int64_t GenID() {

return gid++;

}

static int64_t gid;

// 利用 set 排序快速查詢要到期的任務

set> timermap;

};

int64_t Timer::gid = 0;

int main() {

// 定時器驅動

int epfd = epoll_create(1);

// 建立定時器

unique_ptr timer = make_unique();

int i = 0;

timer->AddTimer(1000, [&](const TimerNode &node) {

cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;

});

timer->AddTimer(1000, [&](const TimerNode &node) {

cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;

});

timer->AddTimer(3000, [&](const TimerNode &node) {

cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;

});

auto node = timer->AddTimer(2100, [&](const TimerNode &node) {

cout << Timer::GetTick() << “node id:” << node。id << “ revoked times:” << ++i << endl;

});

timer->DelTimer(node);

cout << “now time:” << Timer::GetTick() << endl;

epoll_event ev[64] = {0};

while (true) {

// 最近任務的觸發時間介面:TimeToSlee,作為 timeout 引數

int n = epoll_wait(epfd, ev, 64, timer->TimeToSleep());

for (int i = 0; i < n; i++) {

/*。。。 處理網路事件 。。。*/

}

// 處理定時事件

while(timer->CheckTimer());

}

return 0;

}

5、多級時間輪

時間輪 timewheel 是一個環形結構,使用 hash + list 實現,類似時鐘。時鐘上的格子(槽位 slot)代表一段時間,slot大小表示時間精度,slot的數量表示時間範圍。每個格子指向一條定時器 list,儲存該格子上所有到期的任務。同一格子的任務觸發時間相同,用 list 解決了 hash 衝突。錶盤上的指標隨時間一格一格轉動 tick,當指標指向一個格子,執行格子對應 list 中的所有到期的任務。任務透過取模決定應該放入哪個格子。

定時器的實現方案

簡單時間輪

如果任務的時間跨度很大,數量也多,傳統的單輪時間輪會造成任務的 round 很大,單個格子的任務 list 很長,並會維持很長一段時間。這時可將時間輪按時間粒度分級

定時器的實現方案

多級時間輪

多級時間輪,低一級輪轉動一圈,高一級輪轉動一格,本質就是計數進位制。秒針在每個定時週期移動一個格子,當秒針轉一圈後,分鐘轉動一個格子。秒針輪指標主動轉動,而分鐘輪時針輪無法主動轉動,只能等待低階輪進位才能轉動,同時自動把即將到期的定時器任務遷移到低一級輪子裡。

根據輪子的型別,可以分為主動輪和從動輪。

主動輪

:當刻度指標指向當前槽的時候,槽內的任務被順序執行。

從動輪

:當對應輪的刻度指標指向當前槽位的時候,槽內的任務鏈依次向低階輪(序號較高的輪)轉移,從動輪沒有執行任務許可權,只是對任務進行記錄與快取。

為什麼時間輪分成多個層級?

減少空間佔用。若採用單級時間輪,則需要 12 * 60 * 60 個 slot ,若採用上述三級結構,則需要空間大小 60 + 60 + 12 個slot,極大地減少了空間佔用。

只需要關注最近要觸發的定時任務(主動輪),按照任務觸發的優先順序組織任務,減少對任務的檢測

多級時間輪定時器的應用場景很多,比如 linux 核心,skynet,kafka,netty等

以 skynet 為例,skynet 作為單 reacto r模型,適用於 cpu 密集型的場景。timer 由 timer 執行緒管理,當有定時任務時將任務派發給其他執行緒執行。下圖即為多執行緒環境下 skynet 時間輪執行圖。

定時器的實現方案

skynet 執行原理圖

接下來,對 skynet 原始碼做了簡化改動,介紹多級時間輪的實現方式。

5.1、定時器的驅動

首先,選擇定時器驅動的方式,這裡選擇 usleep 來實現

時間精度:usleep gettime兩個介面確定,10ms

時間範圍:uint32_t time

// 檢測定時器,每過1/4時間精度執行一次

// 原因是 dispatch 分發任務花費時間,影響精度

void expire_timer(void) {

// 獲取當前系統執行時間,不受使用者的影響

uint64_t cp = gettime();

// 當前系統啟動時間與定時器記錄的系統啟動時間不相等

if (cp != TI->current_point) {

// 獲取上述兩者的差值

uint32_t diff = (uint32_t)(cp - TI->current_point);

// 更新定時器記錄的系統執行時間

TI->current_point = cp;

// 更新timer的執行時間

TI->current += diff;

// 更新定時器的時間(time的值),並執行對應的過期任務

int i;

for (i=0; i

// 每執行一次timer_update,其內部的函式

// timer_shift: time+1,time代表從timer啟動後至今一共經歷了多少次tick

// timer_execute: 執行near中的定時器

timer_update(TI);

}

}

}

// timer 執行緒中,每過1/4時間精度,即2。5ms,執行一次定時器的檢測

while (!ctx。quit) {

expire_timer();

usleep(2500);

}

5.2、資料結構設計

定時器的設計

指標陣列

以 skynet、linux 核心定時器的多級時間輪為例,定義了5個連結串列陣列,每個數組裡包含多個定時器連結串列,near 陣列大小為28,其餘陣列大小為26,表示的時間範圍 28+6+6+6+6 = 232。

定時器的實現方案

多級指標陣列

time 指標

我們僅關注儲存最近要觸發事件的 near 陣列,其他陣列指標可以透過計算獲得,所以只需要一個時間指標 time 即可。該指標記錄 timer 自啟動到現在的 tick 數,指向時間輪上的一個槽 slot。當 tick 在低一級移動一圈,需要將高一級輪子中的定時任務重新對映。

在 skynet 中 32 位無符號整數 time 就是該指標,不同位數分別對應陣列near[256] 和t[4][64],每過 10 ms 增加一次。當 time 溢位時,32位無符號迴圈,再次從0開始計數。

定時器的實現方案

time 指標

定時器的定義如下:

typedef struct timer {

link_list_t near[TIME_NEAR]; // 最低階的時間輪,主動輪

link_list_t t[4][TIME_LEVEL]; // 其他層級的時間輪,從動輪

struct spinlock lock; // 自旋鎖,O(1)

uint32_t time; // tick 指標,當前時間片

uint64_t current; // timer執行時間,時間精度10ms

uint64_t current_point; // 系統執行時間,時間精度10ms

}s_timer_t;

任務結點的設計

任務結點使用連結串列儲存,連結串列中儲存同一時間觸發的任務結點

struct timer_node {

struct timer_node *next; // 相同過期時間的待執行的下一個任務

uint32_t expire; // 任務過期時間

handler_pt callback; // 任務回撥函式

uint8_t cancel; // 刪除任務的標記,取消任務的執行

int id; // 執行該任務的執行緒 id

};

5.3、timer 介面實現

初始化定時器

// 初始化定時器

void init_timer(void) {

TI = timer_create_timer(); // 建立定時器

TI->current_point = gettime(); // 獲取系統當前執行時間

}

// 建立定時器

s_timer_t* timer_create_timer() {

s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t));

memset(r, 0, sizeof(*r));

int i, j;

// 建立主動輪,最低階時間輪

for (i = 0; i < TIME_NEAR; ++i) {

link_clear(&r->near[i]);

}

// 建立從動輪,高層級時間輪

for (i = 0; i < 4; ++i) {

for (j = 0;j < TIME_LEVEL; ++j) {

link_clear(&r->t[i][j]);

}

}

// 初始化自旋鎖

spinlock_init(&r->lock);

r->current = 0;

return r;

}

// 獲取系統當前執行時間,時間精度10ms

uint64_t gettime() {

uint64_t t;

#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)

struct timespec ti;

clock_gettime(CLOCK_MONOTONIC, &ti); // CLOCK_MONOTONIC

t = (uint64_t)ti。tv_sec * 1000;

t += ti。tv_nsec / 1000000;

#else

struct timeval tv;

gettimeofday(&tv, NULL);

t = (uint64_t)tv。tv_sec * 100;

t += tv。tv_usec / 10000;

#endif

return t;

}

定時器的新增

計算定時器到期時間 expire 和當前定時器啟動時間 time 的差值,記為 msec

根據 msec 的值判斷結點應該新增到定時器指標陣列的哪一層

// 新增任務結點到定時器中

// 根據 msec 判斷結點應該放入時間輪的層級

void add_node(s_timer_t *T, timer_node_t *node) {

uint32_t time = node->expire; // 過期時間

uint32_t current_time=T->time; // 當前時間

uint32_t msec = time - current_time; // 剩餘時間

//根據 expire-time 的差值將結點放入相應的層級

//[0, 2^8)

if (msec < TIME_NEAR) {

link(&T->near[time&TIME_NEAR_MASK],node);

}

//[2^8, 2^14)

else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {

link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);

}

//[2^14, 2^20)

else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {

link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

//[2^20, 2^26)

else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {

link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

//[2^26, 2^32)

else {

link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

}

// 新增定時任務

timer_node_t* add_timer(int time, handler_pt func, int threadid) {

timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));

spinlock_lock(&TI->lock);

// 設定定時任務結點的屬性

node->expire = time + TI->time; // 新增觸發時間 = 觸發時間間隔 + 當前時間

node->callback = func; // 新增任務回撥函式

node->id = threadid; // 新增執行該任務的執行緒id

// 判斷是否需要立即執行任務

if (time <= 0) {

spinlock_unlock(&TI->lock);

node->callback(node);

free(node);

return NULL;

}

// 新增任務結點到定時器中

add_node(TI, node);

spinlock_unlock(&TI->lock);

return node;

}

定時器的刪除

由於結點位置可能發生變化(重新對映),不能找到任務結點的位置,無法刪除。

在結點中新增一個 cancel 欄位,任務觸發碰到該標記則不執行任務,之後統一釋放空間。

void del_timer(timer_node_t *node) {

node->cancel = 1;

}

定時器的更新

主要包括對到期任務的處理和對從動輪任務(time高24位對應的連結串列)的重新對映。

void timer_update(s_timer_t *T) {

spinlock_lock(&T->lock);

// 執行任務

timer_execute(T);

// time+1,並判斷是否進行重新對映

timer_shift(T);

// 若發生重新對映,若time的指向有任務,則需要執行

timer_execute(T);

spinlock_unlock(&T->lock);

}

到期任務的處理

在每次tick事件中,定時器以當前 tick 值的低8位作為索引,取出 near 陣列中對應的 list,list 裡面包含了所有在該 tick 到期的定時器列表。

// 執行任務

// 以time的低8位對應的near陣列的索引,取出該位置對應的list

void timer_execute(s_timer_t *T) {

// 取出time低8位對應的索引值

int idx = T->time & TIME_NEAR_MASK;

// 如果低8位值對應的near陣列元素有連結串列,則取出

while (T->near[idx]。head。next) {

// 取出對應的定時器list

timer_node_t *current = link_clear(&T->near[idx]);

spinlock_unlock(&T->lock);

// 將連結串列各結點的任務派發出去

dispatch_list(current);

spinlock_lock(&T->lock);

}

}

// 任務派發

void dispatch_list(timer_node_t *current) {

do {

timer_node_t *temp = current;

current=current->next;

// cancel 標記為0,執行任務回撥函式;否則,不執行任務回撥

if (temp->cancel == 0)

temp->callback(temp);

free(temp);

} while (current);

}

重新對映

為什麼要對結點重新對映

只有主動輪的結點要執行,從動輪只是儲存結點,主動輪結點執行完後,需要從動輪補充。

重新對映的流程

確定重新對映的位置

取出該指標指向槽位的所有任務,time = tick0 + time - tick

再次新增結點到時間輪中

如何確定重新對映的位置

檢查time是否溢位,如果溢位則將t[3][0]這個連結串列取出並依次將該連結串列中的節點新增(即實現該連結串列的移動操作),如果time未溢位,則進行下一步。

檢查 time 的第1-8位是否溢位產生進位,沒有則結束,有則說明 time 恰走完 near 陣列。繼續判斷檢查第9-14位是否溢位。有則進行下一步,沒有則說明是t[0]陣列移動。此時將 time 的第9-14位的值作為索引 idx,取出 t[0][idx] 中的 list,並將 list 中的結點依次重新新增到定時器中。而這些結點一定會新增到 near 陣列中,實現了 t[0]向 near的遷移。

檢查 time 的第9-14位是否溢位產生進位,沒有則結束,有則說明 time 恰走完t[0]陣列。繼續判斷檢查第15-20位是否溢位,有則進行下一步,沒有則說明是t[1]陣列移動。此時將 time 的第15-20位作為索引 idx,取出t[1][idx]中的 list,將 list 中的結點依次重新新增到定時器中,而這些結點一定會新增到 t[0] 陣列中,實現了 t[1]向 t[0]的遷移。

……

後續所有操作以此類推,時間輪由低向高逐級判斷,因為高階輪溢位,低階一定溢位,這樣實現了多級定時器逐級遷移的過程

// 重新對映,判斷是否需要重新對映

// 時間片time自加1,將高24位對應的4個6位的陣列中的各個元素的連結串列往低位移

void timer_shift(s_timer_t *T) {

int mask = TIME_NEAR;

// 時間片+1

uint32_t ct = ++T->time;

// 時間片溢位,無符號整數,迴圈,time重置0

if (ct == 0) {

// 將對應的t[3][0]連結串列取出,重新移動到定時器中

move_list(T, 3, 0);

}

else {

// ct右移8位,進入到從動輪

uint32_t time = ct >> TIME_NEAR_SHIFT;

// 第 i 層時間輪

int i = 0;

// 判斷是否需要重新對映?

// 即迴圈判斷當前層級對應的數位是否全0,即溢位產生進位

while ((ct & (mask-1))==0) {

// 取當前層級的索引值

int idx = time & TIME_LEVEL_MASK;

// idx=0 說明當前層級溢位,繼續迴圈判斷直至當前層級不溢位

if (idx != 0) {

// 將對應的t[i][idx]連結串列取出,依次移動到定時器中

move_list(T, i, idx);

break;

}

mask <<= TIME_LEVEL_SHIFT; // mask 右移

time >>= TIME_LEVEL_SHIFT; // time 左移

++i; // 時間輪層級增加

}

}

}

5.4、程式碼實現

// timer_wheel。h

#ifndef _MARK_TIMEWHEEL_

#define _MARK_TIMEWHEEL_

#include

#define TIME_NEAR_SHIFT 8

#define TIME_NEAR (1 << TIME_NEAR_SHIFT)

#define TIME_LEVEL_SHIFT 6

#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)

#define TIME_NEAR_MASK (TIME_NEAR-1)

#define TIME_LEVEL_MASK (TIME_LEVEL-1)

typedef struct timer_node timer_node_t;

typedef void (*handler_pt) (struct timer_node *node);

// 任務結點

struct timer_node {

struct timer_node *next; // 相同過期時間的待執行的下一個任務

uint32_t expire; // 任務過期時間

handler_pt callback; // 任務回撥函式

uint8_t cancel; // 刪除任務,遇到該標記則取消任務的執行

int id; // 此時攜帶引數

};

timer_node_t* add_timer(int time, handler_pt func, int threadid);

void expire_timer(void);

void del_timer(timer_node_t* node);

void init_timer(void);

void clear_timer();

#endif

// timer_wheel。c

#include “spinlock。h”

#include “timewheel。h”

#include

#include

#include

#if defined(__APPLE__)

#include

#include

#include

#include

#else

#include

#endif

typedef struct link_list {

timer_node_t head;

timer_node_t *tail;

}link_list_t;

// 定時器的資料結構

typedef struct timer {

link_list_t near[TIME_NEAR]; // 最低階的時間輪,主動輪

link_list_t t[4][TIME_LEVEL]; // 其他層級的時間輪,從動輪

struct spinlock lock; // 自旋鎖,O(1)

uint32_t time; // tick 指標,當前時間片

uint64_t current; // timer執行時間,精度10ms

uint64_t current_point; // 系統執行時間,精度10ms

}s_timer_t;

static s_timer_t * TI = NULL;

// 清空連結串列

// 並返回指向連結串列的第一個結點的指標

timer_node_t* link_clear(link_list_t *list) {

// 指向頭指標的下一個位置

timer_node_t * ret = list->head。next;

// 頭結點斷鏈

list->head。next = 0;

// 尾指標指向頭結點

list->tail = &(list->head);

return ret;

}

// 尾插法

void link(link_list_t *list, timer_node_t *node) {

list->tail->next = node;

list->tail = node;

node->next=0;

}

// 新增任務結點到定時器中

// 根據 time 判斷結點應該放入時間輪的層級

void add_node(s_timer_t *T, timer_node_t *node) {

uint32_t time = node->expire; // 過期時間

uint32_t current_time=T->time; // 當前時間

uint32_t msec = time - current_time; // 剩餘時間

//根據 expire-time 的差值將結點放入相應的層級

//[0, 2^8)

if (msec < TIME_NEAR) {

link(&T->near[time&TIME_NEAR_MASK],node);

}

//[2^8, 2^14)

else if (msec < (1 << (TIME_NEAR_SHIFT+TIME_LEVEL_SHIFT))) {

link(&T->t[0][((time>>TIME_NEAR_SHIFT) & TIME_LEVEL_MASK)],node);

}

//[2^14, 2^20)

else if (msec < (1 << (TIME_NEAR_SHIFT+2*TIME_LEVEL_SHIFT))) {

link(&T->t[1][((time>>(TIME_NEAR_SHIFT + TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

//[2^20, 2^26)

else if (msec < (1 << (TIME_NEAR_SHIFT+3*TIME_LEVEL_SHIFT))) {

link(&T->t[2][((time>>(TIME_NEAR_SHIFT + 2*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

//[2^26, 2^32)

else {

link(&T->t[3][((time>>(TIME_NEAR_SHIFT + 3*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);

}

}

// 新增定時任務

timer_node_t* add_timer(int time, handler_pt func, int threadid) {

timer_node_t *node = (timer_node_t *)malloc(sizeof(*node));

spinlock_lock(&TI->lock);

// 設定定時任務結點的屬性

node->expire = time + TI->time; // 新增任務觸發時間 expire = time + tick

node->callback = func; // 新增任務回撥函式

node->id = threadid; // 新增執行該任務的執行緒id

// 判斷是否需要立即執行任務

if (time <= 0) {

spinlock_unlock(&TI->lock);

node->callback(node);

free(node);

return NULL;

}

// 新增任務結點到定時器中

add_node(TI, node);

spinlock_unlock(&TI->lock);

return node;

}

// 移動連結串列

// 第level層第idx個位置的連結串列結點重新新增到定時器T中

void move_list(s_timer_t *T, int level, int idx) {

timer_node_t *current = link_clear(&T->t[level][idx]);

while (current) {

timer_node_t *temp = current->next;

add_node(T, current);

current = temp;

}

}

// 重新對映,判斷是否需要重新對映

// 時間片time自加1,將高24位對應的4個6位的陣列中的各個元素的連結串列往低位移

void timer_shift(s_timer_t *T) {

int mask = TIME_NEAR;

// 時間片+1

uint32_t ct = ++T->time;

// 時間片溢位,無符號整數,迴圈,time重置0

if (ct == 0) {

// 將對應的t[3][0]連結串列取出,重新移動到定時器中

move_list(T, 3, 0);

}

else {

// ct右移8位,進入到從動輪

uint32_t time = ct >> TIME_NEAR_SHIFT;

// 第 i 層時間輪

int i = 0;

// 判斷是否需要重新對映?

// 即迴圈判斷當前層級對應的數位是否全0,即溢位產生進位

while ((ct & (mask-1))==0) {

// 取當前層級的索引值

int idx = time & TIME_LEVEL_MASK;

// idx=0 說明當前層級溢位,繼續迴圈判斷直至當前層級不溢位

if (idx != 0) {

// 將對應的t[i][idx]連結串列取出,依次移動到定時器中

move_list(T, i, idx);

break;

}

mask <<= TIME_LEVEL_SHIFT; // mask 右移

time >>= TIME_LEVEL_SHIFT; // time 左移

++i; // 時間輪層級增加

}

}

}

// 任務派發給其他執行緒執行

void dispatch_list(timer_node_t *current) {

do {

timer_node_t *temp = current;

current=current->next;

// cancel 標記為0,執行任務回撥函式

if (temp->cancel == 0)

temp->callback(temp);

free(temp);

} while (current);

}

// 執行任務

// 以time的低8位對應的near陣列的索引,取出該位置對應的list

void timer_execute(s_timer_t *T) {

// 取出time低8位對應的值

int idx = T->time & TIME_NEAR_MASK;

// 如果低8位值對應的near陣列元素有連結串列,則取出

while (T->near[idx]。head。next) {

// 取出對應的定時器list

timer_node_t *current = link_clear(&T->near[idx]);

spinlock_unlock(&T->lock);

// 將連結串列各結點的任務派發出去

dispatch_list(current);

spinlock_lock(&T->lock);

}

}

// 定時器更新

void timer_update(s_timer_t *T) {

spinlock_lock(&T->lock);

// 執行任務

timer_execute(T);

/// time+1,並判斷是否進行重新對映

timer_shift(T);

// 若發生重新對映,若time的指向有任務,則需要執行

timer_execute(T);

spinlock_unlock(&T->lock);

}

// 刪除定時器任務

void del_timer(timer_node_t *node) {

node->cancel = 1;

}

// 建立定時器

s_timer_t * timer_create_timer() {

s_timer_t *r = (s_timer_t *)malloc(sizeof(s_timer_t));

memset(r, 0, sizeof(*r));

int i, j;

// 建立主動輪,最低階時間輪

for (i = 0; i < TIME_NEAR; ++i) {

link_clear(&r->near[i]);

}

// 建立從動輪,高層級時間輪

for (i = 0; i < 4; ++i) {

for (j = 0;j < TIME_LEVEL; ++j) {

link_clear(&r->t[i][j]);

}

}

// 初始化自旋鎖

spinlock_init(&r->lock);

r->current = 0;

return r;

}

// 獲取當前時間,時間精度10ms

uint64_t gettime() {

uint64_t t;

#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)

struct timespec ti;

clock_gettime(CLOCK_MONOTONIC, &ti);// CLOCK_MONOTONIC,從系統啟動這一刻起開始計時,不受系統時間被使用者改變的影響

t = (uint64_t)ti。tv_sec * 1000;

t += ti。tv_nsec / 1000000;

#else

struct timeval tv;

gettimeofday(&tv, NULL);

t = (uint64_t)tv。tv_sec * 100;

t += tv。tv_usec / 10000;

#endif

return t;

}

// 檢測定時器,時間精度10ms,每過1/4時間精度2。5ms執行1次

// 原因是dispatch分發任務花費時間,影響精度

void expire_timer(void) {

// 獲取當前系統執行時間,不受系統時間被使用者的影響

uint64_t cp = gettime();

// 當前系統啟動時間與定時器記錄的系統啟動時間不相等

if (cp != TI->current_point) {

// 獲取上述兩者的差值

uint32_t diff = (uint32_t)(cp - TI->current_point);

// 更新定時器記錄的系統執行時間

TI->current_point = cp;

// 更新timer的執行時間

TI->current += diff;

// 更新定時器的時間(time的值),並執行對應的過期任務

int i;

for (i=0; i

// 每執行一次timer_update,其內部的函式

// timer_shift: time+1,time代表從timer啟動後至今一共經歷了多少次tick

// timer_execute: 執行near中的定時器

timer_update(TI);

}

}

}

// 初始化定時器

void init_timer(void) {

TI = timer_create_timer(); // 建立定時器

TI->current_point = gettime(); // 獲取當前時間

}

void clear_timer() {

int i,j;

for (i=0;i

link_list_t * list = &TI->near[i];

timer_node_t* current = list->head。next;

while(current) {

timer_node_t * temp = current;

current = current->next;

free(temp);

}

link_clear(&TI->near[i]);

}

for (i=0;i<4;i++) {

for (j=0;j

link_list_t * list = &TI->t[i][j];

timer_node_t* current = list->head。next;

while (current) {

timer_node_t * temp = current;

current = current->next;

free(temp);

}

link_clear(&TI->t[i][j]);

}

}

}

// tw-timer。c

// gcc tw-timer。c timewheel。c -o tw -I。/ -lpthread #include

#include

#include

#include

#include

#include “timewheel。h”

struct context {

int quit;

int thread;

};

struct thread_param {

struct context *ctx;

int id;

};

static struct context ctx = {0};

void do_timer(timer_node_t *node) {

printf(“do_timer expired:%d - thread-id:%d\n”, node->expire, node->id);

add_timer(100, do_timer, node->id);

}

void do_clock(timer_node_t *node) {

static int time;

time ++;

printf(“——-time = %d ——-\n”, time);

add_timer(100, do_clock, node->id);

}

void* thread_worker(void *p) {

struct thread_param *tp = p;

int id = tp->id;

struct context *ctx = tp->ctx;

int expire = rand() % 200;

add_timer(expire, do_timer, id);

while (!ctx->quit) {

usleep(1000);

}

printf(“thread_worker:%d exit!\n”, id);

return NULL;

}

void do_quit(timer_node_t * node) {

ctx。quit = 1;

}

int main() {

srand(time(NULL));

ctx。thread = 2;

pthread_t pid[ctx。thread];

init_timer();

add_timer(6000, do_quit, 100);

add_timer(0, do_clock, 100);

struct thread_param task_thread_p[ctx。thread];

int i;

for (i = 0; i < ctx。thread; i++) {

task_thread_p[i]。id = i;

task_thread_p[i]。ctx = &ctx;

if (pthread_create(&pid[i], NULL, thread_worker, &task_thread_p[i])) {

fprintf(stderr, “create thread failed\n”);

exit(1);

}

}

while (!ctx。quit) {

expire_timer();

usleep(2500); // 2。5ms

}

clear_timer();

for (i = 0; i < ctx。thread; i++) {

pthread_join(pid[i], NULL);

}

printf(“all thread is closed\n”);

return 0;

}

6、參考資料

skynet 原始碼閱讀筆記 —— skynet 中的定時器機制

Skynet定時器原理

標簽: timer  node  TIME  定時器