您當前的位置:首頁 > 書法

kafka的leader切換速度受哪些因素影響?

作者:由 星河Aria 發表于 書法時間:2018-07-24

kafka的leader切換速度受哪些因素影響?huxihx2018-07-24 12:03:48

謝邀

關於這個問題說說我的淺見~ leader切換的確和很多因素有關,事實上很難定量地分析具體的耗時,不過我們依然可以試一試~

首先說一下,Kafka如何判斷leader掛了? 簡單來說是依靠Zookeeper的臨時節點機制:一旦發現某個broker不可用,其/brokers/ids/節點會自動消失,Zookeeper的監聽器至少保證會在zookeeper。session。timeout。ms時間內(預設6s)發現此節點變更,之後broker會將此訊息包裝成一個BrokerChange事件插入到controller的阻塞佇列。那麼與該佇列有關的有兩個JMX指標可以監控一下:一個是EventQueueSize;另一個是EventQueueTimeMs。前者是controller的阻塞隊列當前大小,如果很多的話說明controller端有很多積壓任務,無法及時響應leader選舉;另一個表示事件在佇列中的等待時間,也可以用於表明controller端的繁忙程度。

之後,當BrokerChange事件從佇列中被獲取之後,對應的執行緒會進行處理之。可以監控JMX指標:kafka。controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs,檢視總體的處理時間。當然這是總體的處理時間,具體到分割槽leader選舉的部分, 可以再重點關注一下StopRelica/LeaderAndIsr以及UpdateMetadata處理時間方面的表現。

最後,對leader選舉速度影響最大的一個可能的原因就是controller本身承載了大量的客戶端請求處理任務,使得無法立即響應這些請求。目前社群有個KIP正在討論是否需要將這些需求分成不同的優先順序進行處理,有興趣的可以看看:KIP-291: Have separate queues for control requests and data requests

kafka的leader切換速度受哪些因素影響?石臻臻的雜貨鋪2021-12-21 11:06:10

【福利】速進高質量滴滴技術交流群Kafka大全

大家好,我是石臻臻,這是 <

a href="htt

ps://

http://

mp。weixin。qq。com/mp/app

msgalbum?__biz=Mzg4ODY1NTcxNg==&action=getalbum&album_id=1966026980307304450

“> 「kafka專欄」 連載中的第「N」篇文章。。。

前幾天有個群友問我: kafka如何修改優先副本? 他們有個需求是, 想指定某個分割槽中的其中一個副本為

Leader

kafka的leader切換速度受哪些因素影響?

需求分析

對於這麼一個問題,在我們生產環境還是挺常見的,經常有需要修改某個Topic中某分割槽的

Leader

比如

topic1-0

這個分割槽有3個副本

[0,1,2]

, 按照「優先副本」的規則,那麼

0

號副本肯定就是

Leader

了 我們都知道分割槽中的只有

Leader

副本才會提供讀寫副本,其他副本作為備份 假如在某些情況下,

「0」

號副本效能資源不夠,或者網路不太好,或者IO壓力比較大,那麼肯定對Topic的整體讀寫效能有很大影響, 這個時候切換一臺壓力較小副本作為

Leader

就顯得很重要;

優先副本: 分割槽中的

AR

(所有副本)資訊, 優先選擇排在第一位的副本作為Leader Leader機制: 分割槽中只有一個Leader來承擔讀寫,其他副本只是作為備份

那麼如何實現這樣一個需求呢?

解決方案

知道了原理之後,我們就能想到對應的解決方案了 只要將 分割槽的

AR

中的第一個位置,替換成你指定副本就行了; AR = { 0,1,2 } ==> AR = {2,1,0}

一般能夠達到這個目的有兩種方案,下面我們來分析一下

方案一: 分割槽副本重分配

之前關於

分割槽副本重分配

我已經寫過很多文章了,如果想詳細瞭解

分割槽副本重分配、資料遷移、副本擴縮容

可以看看連結的文章, 這裡我就簡單說一下;

一般分割槽副本重分配主要有三個流程

生成推薦的遷移Json檔案

執行遷移Json檔案

驗證遷移流程是否完成

這裡我們主要看第2步驟, 來看看遷移檔案一般是什麼樣子的

{

”version“: 1,

”partitions“: [{

”topic“: ”topic1“,

”partition“: 0,

”replicas“: [0,1,2]

}]

}

這個遷移Json意思是, 把

topic1

的「0」號分割槽的副本分配成

[0,1,2]

,也就是說

topic1-0

號分割槽最終有3個副本分別在 {brokerId-0,brokerId-1,brokerId-2} ; 如果你有看過我之前寫的

分割槽副本重分配原理原始碼分析

,那麼肯定就知道,不管你之前的分配方式是什麼樣子的, 最終副本分配都是 [0,1,2] , 之前副本多的,會被刪掉,少的會被新增;

那麼我們想要實現 我們的需求 是不是把這個Json檔案 中的 ”replicas“: [0,1,2] 改一下就行了,比如改成 ”replicas“: [2,1,0] 改完Json後執行,執行

execute

, 正式開始重分配流程! 遷移完成之後, 就會發現,Leader已經變成上面的第一個位置的副本「2」 了

優缺點

優點: 實現了需求, 並且主動切換了Leader

缺點: 操作比較複雜容易出錯,需要先獲取原先的分割槽分配資料,然後手動修改Json檔案,這裡比較容易出錯,影響會比較大,當然這些都可以透過校驗介面來做好限制, 最重要的一點是 副本重分配當前只能有一個任務 ! 假如你當前有一個「副本重分配」的任務在,那麼這裡就不能夠執行了, 「副本重分配」是一個比較「重」 了的操作,出錯對叢集的影響比較大

方案二: 手動修改AR順序

首先,我們知道分割槽副本的分配資料是儲存在zookeeper中的節點

brokers/topics/{topicName}

中; 我們看個

Topic1

的節點資料例子;

{

”version“: 2,

”partitions“: {

”2“: [3, 2, 1],

”1“: [2, 1, 3],

”4“: [2, 3, 1],

”0“: [1, 3, 2],

”3“: [1, 2, 3]

},

”adding_replicas“: {},

”removing_replicas“: {}

}

資料解釋:

version:

版本資訊, 現在有 「1」、「2」 兩個版本

removing_replicas:

需要刪除的副本資料, 在進行分割槽副本重分配過程中, 多餘的副本會在資料遷移快完成的時候被刪除掉,刪除成功這裡的資料會被清除

adding_replicas:

需要新增的副本資料,在進行分割槽副本重分配過程中, 新增加的副本將會被新增,新增完成這裡的資料會清除;

partitions:

Topic的所有分割槽副本分配方式; 上面表示總共有5個分割槽,以及對應的副本位置;

知道了這些之後,想要修改優先副本,是不是可以透過直接修改zookeeper中的節點資料就行了; 比如 我們把 「1」號分割槽的副本位置改成 [2,1,3]

kafka的leader切換速度受哪些因素影響?

改成這樣之後, 還需要 執行 重新進行優先副本選舉操作 ,例如透過kafka的命令執行

sh bin/kafka-leader-election。sh ——bootstrap-server xxxx:9090 ——topic Topic1——election-type PREFERRED ——partition 1

——election-type

PREFERRED

這個表示的以優先副本的方式進行重新選舉

那麼做完這兩步之後, 我們的

修改優先副本的目的就達成了.........嗎 ?

實則並沒有, 因為這裡僅僅只是修改了

zookeeper

節點的資料, 而

bin/kafka-leader-election。sh

重選舉的操作是

Controller

來進行的; 如果你對

Controller

的作用和原始碼足夠了解, 肯定知道Controller裡面儲存了每個Topic的分割槽副本資訊, 是儲存在JVM記憶體中的, 然後我們手動修改Zookeeper中的節點,並沒有觸發

Controller

更新自身的記憶體 也就是說 就算我們執行了

kafka-leader-election。sh

, 它也不會有任何變化,因為優先副本沒有被感知到修改了;

解決這個問題也很簡單,讓

Controller

感知到資料的變更就行了 最簡單的方法, 讓

Controller

發生重新選舉, 資料重新載入!

總結

手動修改zookeeper中的「AR」順序

Controller 重新選舉

執行 分割槽副本重選舉操作(優先副本策略)

簡單程式碼

當然上面功能,肯定是要整合到

LogiKM

中的咯; 簡單程式碼如下

// 這裡轉換成HashMap型別,切勿自定義型別,以防kafka節點資料後續新增資料節點,導致資料丟失

HashMap partitionMap = zkConfig。get(ZkPathUtil。getBrokerTopicRoot(topicName), HashMap。class);

JSONObject partitionJson = (JSONObject)partitionMap。get(”partitions“);

JSONArray partitions = (JSONArray)partitionJson。get(partition);

//部分程式碼省略

//調換序列 優先副本

Integer first = partitions。getInteger(0);

partitions。set(0,targetBroker);

partitions。set(index,first);

zkUtils = ZookeeperUtils。getKafkaZkUtils(clusterDO。getZookeeper());

String json = JSON。toJSONString(partitionMap);

zkUtils。updatePersistentPath(ZkPathUtil。getBrokerTopicRoot(topicName), json,null);

//寫入成功之後觸發一下 非同步去優先副本選舉

new Thread(()->{

try {

// 1。 先讓Controller重新選舉 (不然上面修改的還沒有生效) (TODO。。 待最佳化 -> 頻繁的Controller重選舉對叢集效能會有影響)

zkConfig。deletePath(ZkPathUtil。CONTROLLER_ROOT_NODE);

// 等待 Controller 選舉一下

Thread。sleep(1000);

//2。 然後再發起副本重新選舉

preferredReplicalElectCommand。preferredReplicaElection(clusterId,topicName,partition,”“);

} catch (ConfigException | InterruptedException e) {

LOGGER。error(”重新選舉異常。e:{}“,e);

e。printStackTrace();

}

})。start();

優缺點

優點: 實現了目標需求, 簡單, 操作方便

缺點: 頻繁的

Controller

重選舉對生產環境來說會有一些影響;

最佳化與改進

第二種方案中,需要

Controller

重選舉, 頻繁的選舉肯定是對生產環境有影響的;

Controller

承擔了非常多的責任,比如

分割槽副本重分配

刪除topic

Leader選舉

等等還有很多都是它在幹

那麼如何不進行Controller的重選舉,也能達到我們的需求呢?

我們的需求是,當我們 修改了zookeeper中的節點資料的時候,能夠迅速的讓Controller感知到,並更新自己的記憶體資料就行了;

對於這個問題,我會在下一期文章中介紹

問題

看完這篇文章,提幾個相關的問題給大家思考一下;

如果我在修改zk中的「AR」資訊時候不僅僅是調換順序,而是有新增或者刪除副本會發生什麼情況呢?

如果手動修改

brokers/topics/{topicName}/partitions/{分割槽號}/state

節點裡面的leader資訊,能不能直接更新Leader?

副本選舉的整個流程是什麼樣子的?

大家可以思考一下, 問題答案我會在後面的文章中一一講解!

點個關注, 推送更多 乾貨 內容, 一起進 【滴滴技術答疑群 】 跟眾多技術專家交流技術吧!

【福利】速進高質量滴滴技術交流群Kafka大全

kafka的leader切換速度受哪些因素影響?知乎使用者3HSaHD2022-01-13 19:35:08

目錄

kafka的leader切換速度受哪些因素影響?

思考幾個問題

什麼是分割槽狀態機?

建立Topic的時候如何選舉Leader?

分割槽的所有副本都不線上, 這個時候啟動一臺之前不在ISR內的副本,它會當選為Leader嗎?

當所有副本都不線上,然後一個一個重啟Broker上副本上線,誰會當選為Leader?誰先啟動就誰當選嗎?

Broker下線了,Leader切換給了其他副本, 當Broker重啟的時候,Leader會還給之前的副本嗎?

選舉成功的那一刻, 生產者和消費著都做了哪些事情?

Leader選舉期間對分割槽的影響

分割槽Leader選舉流程分析

在開始原始碼分析之前, 大家先看下面這張圖, 好讓自己對Leader選舉有一個非常清晰的認知,然後再去看後面的原始碼分析文章,會更容易理解。

kafka的leader切換速度受哪些因素影響?

整個流程分為三大塊

觸發選舉場景 圖左

執行選舉流程 圖中

Leader選舉策略 圖右

分割槽狀態機

❝首先大家得了解兩個狀態機

1. 分割槽狀態機

控制分割槽狀態流轉

2. 副本狀態機

控制副本狀態流轉

這裡我們主要講解分割槽狀態機,這張圖表示的是分割槽狀態機

kafka的leader切換速度受哪些因素影響?

NonExistentPartition

:分割槽在將要被建立之前的初始狀態是這個,表示不存在

NewPartition

: 表示正在建立新的分割槽, 是一箇中間狀態, 這個時候只是在Controller的記憶體中存了狀態資訊

OnlinePartition

: 線上狀態, 正常的分割槽就應該是這種狀態,只有在線的分割槽才能夠提供服務

OfflinePartition

: 下線狀態, 分割槽可能因為Broker宕機或者刪除Topic等原因流轉到這個狀態, 下線了就不能提供服務了

NonExistentPartition

: 分割槽不存在的狀態, 當Topic刪除完成成功之後, 就會流轉到這個狀態, 當還處在刪除中的時候,還是停留在下線狀態。

我們今天要講的Leader選舉

就是在

之前狀態=>OnlinePartition狀態的時候發生的。

Leader選舉流程分析

原始碼入口:

PartitionStateMachine#electLeaderForPartitions

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

可以看到 我們最終是呼叫了

doElectLeaderForPartitions

執行分割槽Leader選舉。

PartitionStateMachine#doElectLeaderForPartitions

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

總結一下上面的原始碼

去zookeeper節點

/broker/topics/{topic名稱}/partitions/{分割槽號}/state

節點讀取基本資訊。

遍歷從zk中獲取的

leaderIsrAndControllerEpoch

資訊,做一些簡單的校驗:zk中獲取的資料的controllerEpoch必須<=當前的Controller的controller_epoch。最終得到

validLeaderAndIsrs

, controller_epoch 就是用來防止腦裂的, 當有兩個Controller當選的時候,他們的epoch肯定不一樣, 那麼最新的epoch才是真的Controller

如果沒有獲取到有效的

validLeaderAndIsrs

資訊 則直接返回

根據入參

partitionLeaderElectionStrategy

來匹配不同的Leader選舉策略。來選出合適的Leader和ISR資訊

根據上面的選舉策略選出的

LeaderAndIsr

資訊進行遍歷, 將它們一個個寫入到zookeeper節點

/broker/topics/{topic名稱}/partitions/{分割槽號}/state

中。 (當然如果上面沒有選擇出合適的leader,那麼久不會有這個過程了)

遍歷上面寫入zk成功的分割槽, 然後更新Controller裡面的分割槽leader和isr的記憶體資訊 併發送

LeaderAndISR

請求,通知對應的Broker Leader更新了。

kafka的leader切換速度受哪些因素影響?

看上面的Leader選舉策略是不是很簡單, 但是中間究竟是如何選擇Leader的? 這個是根據傳入的策略型別, 來做不同的選擇

❝那麼有哪些策略呢?以及什麼時候觸發這些選舉呢?

分割槽的幾種策略以及對應的觸發場景

1. OfflinePartitionLeaderElectionStrategy

❝遍歷分割槽的AR, 找到第一個滿足以下條件的副本:

副本線上

在ISR中

如果找不到滿足條件的副本,那麼再根據 傳入的引數

allowUnclean

判斷

allowUnclean=true

:AR順序中所有

線上副本

中的第一個副本。

allowUnclean=false

: 需要去查詢配置

unclean。leader。election。enable

的值。

若=true ,則跟上面 1一樣 。

若=false,直接返回None,沒有找到合適的Leader。

kafka的leader切換速度受哪些因素影響?

原始碼位置:

Election#leaderForOffline

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

先組裝所有給定的

validLeaderAndIsrs

的資訊 其實主要還是要去獲取每個Topic的對應的

unclean。leader。election。enable

屬性值。

預設情況下,我們呼叫到這裡的時候 這個入參

allowUnclean=false

如果是false

那我們需要去查詢一下指定的topic它的屬性

unclean。leader。election。enable

是什麼

如果是true

則表示直接覆蓋了

unclean。leader。election。enable

的配置為true。

kafka的leader切換速度受哪些因素影響?

找到 第一個滿足條件:

副本線上

&& 在

ISR中的副本

如果沒有滿足條件的 則判斷入

uncleanLeaderElectionEnabled

的配置 如果是true,則從不在isr中的存活副本中獲取副本作為leader。 當然這個

uncleanLeaderElectionEnabled

引數是上 步驟1中決定的。

觸發場景:Controller 重新載入

❝Controller 當選的時候會啟動

分割槽狀態機

partitionStateMachine

, 啟動的時候會重新載入所有分割槽的狀態到記憶體中, 並觸發 對處於

NewPartition

OfflinePartition

狀態的所有分割槽嘗試變更為

OnlinePartition

狀態的狀態。把新建立的分割槽和離線的分割槽觸發一下選舉流程啊

觸發原始碼入口:

KafkaController#onControllerFailover

partitionStateMachine。startup()

partitionStateMachine。triggerOnlinePartitionStateChange()

觸發場景:指令碼執行髒選舉

❝當執行

kafka-leader-election。sh

的時候並且模式選擇的是

UNCLEAN

。 則會觸發這個模式。

這裡注意一下,入參

allowUnclean

= (electionTrigger == AdminClientTriggered)

意思是: 當觸發的場景是

AdminClientTriggered

的時候, 則

allowUnclean=true

,表示 不關心配置引數

unclean。leader。election。enable

是什麼

如果沒有找到符合條件的Leader, 則就去非ISR 列表找Leader。 剛好我們執行指令碼的時候觸發器就是

AdminClientTriggered

其他觸發器有:

AutoTriggered

: 定時自動觸發。

ZkTriggered

:Controller切換的時候觸發的(zk節點/controller 的變更便是Controller角色的切換) AdminClientTriggered:客戶端主動觸發。

觸發場景:Controller 監聽到有Broker啟動了

❝同上。

觸發原始碼入口:

KafkaController#processBrokerChange#onBrokerStartup

partitionStateMachine。triggerOnlinePartitionStateChange()

觸發場景:Controller 監聽 LeaderAndIsrResponseReceived請求

❝同上。

當Controller向對應的Broker發起

LeaderAndIsrRequest

請求的時候。

有一個回撥函式callback, 這個回撥函式會向Controller發起一個事件為

LeaderAndIsrResponseReceived

請求。

具體原始碼在:

ControllerChannelManager#sendLeaderAndIsrRequest

kafka的leader切換速度受哪些因素影響?

Controller收到這個事件的請求之後,根據返回的

leaderAndIsrResponse

資料

會判斷一下有沒有

新增加

的離線副本(一般都是由於磁碟訪問有問題)

如果有新的離線副本,則需要將這個離線副本標記為Offline狀態

原始碼入口:

KafkaController#onReplicasBecomeOffline

partitionStateMachine。triggerOnlinePartitionStateChange()

觸發場景:Controller 監聽 UncleanLeaderElectionEnable請求

❝當我們在修改動態配置的時候, 將動態配置:

unclean。leader。election。enable

設定為 true 的時候

會觸發向Controller發起

UncleanLeaderElectionEnable

的請求,這個時候則需要觸發一下。觸發請求

同上

觸發原始碼入口:

KafkaController#processTopicUncleanLeaderElectionEnable

partitionStateMachine。triggerOnlinePartitionStateChange(topic)

上面的觸發呼叫的程式碼就是下面的介面

對處於

NewPartition

OfflinePartition

狀態的所有分割槽嘗試變更為

OnlinePartition

的狀態。 狀態的流程觸發了Leader選舉。

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

獲取所有 OfflinePartition 、NewPartition 的分割槽狀態

嘗試將 所有 NewPartition or OfflinePartition 狀態的分割槽全部轉別成 OnlinePartition狀態, 但是如果對應的Topic正在刪除中,則會被排除掉

分割槽狀態機進行狀態流轉 使用

OfflinePartitionLeaderElectionStrategy

選舉策略(

allowUnclean=true

表示如果從isr中沒有選出leader,則允許從非isr列表中選舉leader ,

allowUnclean=false

表示如果從isr中沒有選出leader, 則需要去讀取配置檔案的配置

unclean。leader。election。enable

來決定是否允許從非ISR列表中選舉Leader。 )

2. ReassignPartitionLeaderElectionStrategy

分割槽副本重分配選舉策略:

當執行分割槽副本重分配的時候, 原來的Leader可能有變更, 則需要觸發一下 Leader選舉。

只有當之前的Leader副本在經過重分配之後不存在了

例如: [2,0] ==> [1,0] 。 原來2是Leader副本,經過重分配之後變成了 [1,0]。2已經不復存在了,所以需要重新選舉Leader。

當原來的分割槽Leader副本 因為某些異常,下線了

。需要重新選舉Leader

kafka的leader切換速度受哪些因素影響?

分割槽副本重分配發生的Leader選舉.

Election#leaderForReassign

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

總結:

❝從當前的副本分配列表中,獲取

副本線上

&&

副本在ISR中

的 第一個副本,遍歷的順序是當前副本的分配方式(

AR

),跟ISR的順序沒有什麼關係。

觸發場景:分割槽副本重分配

❝並不是每次執行分割槽副本重分配都會觸發這個Leader選舉策略, 下面兩種情況才會觸發

只有當之前的Leader副本在經過重分配之後不存在了

。例如: [2,0] ==> [1,0] 。 原來2是Leader副本,經過重分配之後變成了 [1,0]。2已經不復存在了,所以需要重新選舉Leader。

當原來的分割槽Leader副本 因為某些異常,下線了

。需要重新選舉Leader

對應的判斷條件程式碼如下:

KafkaController#moveReassignedPartitionLeaderIfRequired

private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,

newAssignment: ReplicaAssignment): Unit = {

// 重分配之後的所有副本

val reassignedReplicas = newAssignment。replicas

//當前的分割槽Leader是哪個

val currentLeader = controllerContext。partitionLeadershipInfo(topicPartition)。leaderAndIsr。leader

// 如果分配後的副本不包含當前Leader副本,則需要重新選舉

if (!reassignedReplicas。contains(currentLeader)) {

//觸發Leader重選舉,策略是ReassignPartitionLeaderElectionStrategy

partitionStateMachine。handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))

} else if (controllerContext。isReplicaOnline(currentLeader, topicPartition)) {

// 上面2種情況都不符合, 那麼就沒有必要leader重選舉了, 更新一下leaderEpoch就行 了

updateLeaderEpochAndSendRequest(topicPartition, newAssignment)

} else {

//觸發Leader重選舉,策略是ReassignPartitionLeaderElectionStrategy

partitionStateMachine。handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))

}

}

kafka的leader切換速度受哪些因素影響?

點選檢視分割槽重分配的原始碼解析

3. PreferredReplicaPartitionLeaderElectionStrategy

❝優先副本選舉策略, 必須滿足三個條件:

是第一個副本&&副本線上&&副本在ISR列表中。

滿足上面三個條件才會當選leader,不滿足則不會做變更。

kafka的leader切換速度受哪些因素影響?

def leaderForPreferredReplica(controllerContext: ControllerContext,

leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {

leaderAndIsrs。map { case (partition, leaderAndIsr) =>

leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)

}

}

private def leaderForPreferredReplica(partition: TopicPartition,

leaderAndIsr: LeaderAndIsr,

controllerContext: ControllerContext): ElectionResult = {

// AR列表

val assignment = controllerContext。partitionReplicaAssignment(partition)

// 線上副本

val liveReplicas = assignment。filter(replica => controllerContext。isReplicaOnline(replica, partition))

val isr = leaderAndIsr。isr

// 找出第一個副本 是否線上 並且在ISR中。

val leaderOpt = PartitionLeaderElectionAlgorithms。preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas。toSet)

// 組裝leaderandisr返回 ,注意這裡是沒有修改ISR資訊的

val newLeaderAndIsrOpt = leaderOpt。map(leader => leaderAndIsr。newLeader(leader))

ElectionResult(partition, newLeaderAndIsrOpt, assignment)

}

def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {

assignment。headOption。filter(id => liveReplicas。contains(id) && isr。contains(id))

}

從記憶體中獲取TopicPartition的分配方式

過濾不線上的副本

找到第一個副本判斷一下是否線上&&在ISR列表中。如果滿足,則選他為leader,如果不滿足,也不會再找其他副本了。

返回leaderAndIsr資訊, 這裡的ISR是沒有做修改的。

觸發場景:自動定時執行優先副本選舉任務

❝Controller 啟動的時候,會啟動一個定時任務 。每隔一段時間就去執行

優先副本選舉

任務。

與之相關配置:

## 如果為true表示會建立定時任務去執行 優先副本選舉,為false則不會建立

auto。leader。rebalance。enable=true

## 每隔多久執行一次 ; 預設300秒;

leader。imbalance。check。interval。seconds partition = 300

##標識每個 Broker 失去平衡的比率,如果超過該比率,則執行重新選舉 Broker 的 leader;預設比例是10%;

##這個比率的演算法是 :broker不平衡率=非優先副本的leader個數/總分割槽數,

##假如一個topic有3個分割槽[0,1,2],並且有3個副本 ,正常情況下,[0,1,2]分別都為一個leader副本; 這個時候 0/3=0%;

leader。imbalance。per。broker。percentage = 10

觸發場景: Controller 重新載入的時候

❝在這個觸發之前還有執行

partitionStateMachine。startup()

相當於是先把 OfflinePartition、NewPartition狀態的分割槽執行了

OfflinePartitionLeaderElectionStrategy

策略。

然後又執行了

PreferredReplicaPartitionLeaderElectionStrategy

策略 這裡是從zk節點

/admin/preferred_replica_election

讀取資料, 來進行判斷是否有需要執行Leader選舉的分割槽

它是在執行

kafka-preferred-replica-election

命令的時候會建立這個zk節點

但是這個已經被標記為廢棄了,並且在3。0的時候直接移除了。

原始碼位置:

KafkaController#onControllerFailover

// 從zk節點/admin/preferred_replica_election找到哪些符合條件需要執行優先副本選舉的分割槽

val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()

// 這裡的觸發型別 是 ZkTriggered

onReplicaElection(pendingPreferredReplicaElections, ElectionType。PREFERRED, ZkTriggered)

private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {

// 去zk讀取節點 /admin/preferred_replica_election

val partitionsUndergoingPreferredReplicaElection = zkClient。getPreferredReplicaElection

// 如果指定分割槽的 leader 已經是AR的第一個副本 或者 topic被刪除了,則 過濾掉這個分割槽(沒有必要執行leader選舉了)

val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection。filter { partition =>

val replicas = controllerContext。partitionReplicaAssignment(partition)

val topicDeleted = replicas。isEmpty

val successful =

if (!topicDeleted) controllerContext。partitionLeadershipInfo(partition)。leaderAndIsr。leader == replicas。head else false

successful || topicDeleted

}

// 將zk獲取到的分割槽資料 - 剛剛需要忽略的資料 = 還需要執行選舉的資料

val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection —— partitionsThatCompletedPreferredReplicaElection

// 找到哪些分割槽正在刪除

val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion。filter(partition => topicDeletionManager。isTopicQueuedUpForDeletion(partition。topic))

// 待刪除的分割槽也過濾掉

val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion —— pendingPreferredReplicaElectionsSkippedFromTopicDeletion

// 返回最終需要執行優先副本選舉的資料。

pendingPreferredReplicaElections

}

觸發場景:執行優先副本選舉指令碼的時候

❝執行指令碼

kafka-leader-election。sh

並且選擇的模式是

PREFERRED

(優先副本選舉) 則會選擇

PreferredReplicaPartitionLeaderElectionStrategy

策略選舉

4. ControlledShutdownPartitionLeaderElectionStrategy

受控關機選舉策略

當Broker關機的過程中,會向Controller發起一個請求, 讓它重新發起一次選舉, 把在所有正在關機(也就是發起請求的那個Broker,或其它同時正在關機的Broker) 的Broker裡面的副本給剔除掉。

根據演算法算出leader:找到第一個滿足條件的副本:

副本線上 && 副本在ISR中 && 副本所在的Broker不在正在關閉的Broker集合中 。

構造新的ISR列表: 在之前的isr列表中將 正在被關閉的Broker裡面的副本 給剔除掉

kafka的leader切換速度受哪些因素影響?

Election#leaderForControlledShutdown

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

觸發場景:Broker關機的時候

❝當Broker關閉的時候, 會向Controller發一起一個

ControlledShutdownRequest

請求, Controller收到這個請求會針對性的做一些善後事件。比如說

執行Leader重選舉

等等之類的。

原始碼位置:

KafkaServer#controlledShutdown

Controller收到請求的原始碼位置:

KafkaController#doControlledShutdown

與之相關的配置有:

controlled。shutdown。enable : 是否啟用受控關閉操作

controlled。shutdown。max。retries 受控關機操作 最大重試的次數

controlled。shutdown。retry。backoff。ms 失敗後等等多久再次重試

kafka的leader切換速度受哪些因素影響?

其他場景

新建立的Topic Leader選舉策略

❝建立新的Topic的時候,並沒有發生Leader選舉的操作, 而是預設從分割槽對應的所有線上副本中選擇第一個為leader, 然後isr就為 所有線上副本,再組裝一下當前的

controller_epoch

資訊,寫入到zk節點

/brokers/topics/{Topic名稱}/partitions/{分割槽號}/state

中。

最後發起

LeaderAndIsrRequest

請求,通知 leader 的變更。

詳細看看原始碼:

PartitionStateMachine#doHandleStateChanges

分割槽狀態從

NewPartition

流轉到

OnlinePartition

因篇幅原因原始碼省略

想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】

從當前的Controller 記憶體中獲取所有入參的分割槽對應的副本資訊

過濾那些已經下線的副本( Broker宕機、網路異常、磁碟離線、等等都有可能造成副本下線) 。

每個分割槽對應的所有線上副本資訊 為 ISR 資訊,然後取ISR的第一個副本為leader分割槽。當然特別注意一下, 這個時候獲取的isr資訊的順序就是 分割槽建立時候分配好的AR順序, 獲取第一個線上的。(因為在其他情況下 ISR的順序跟AR的順序並不一致)

組裝 上面的

isr

leader

controller_epoch

等資訊 寫入到zk節點 /brokers/topics/{Topic名稱}/partitions/{分割槽號}/state例如下面所示{”controller_epoch“:1,”leader“:0,”version“:1,”leader_epoch“:0,”isr“:[0,1,2]}

然後向其他相關Broker 發起

LeaderAndIsrRequest

請求,通知他們Leader和Isr資訊已經變更了,去做一下想要的處理。比如去新的leader發起Fetcher請求同步資料。

可以看看之前我們分析過的 Topic建立的原始碼解析 的原理圖 如下

kafka的leader切換速度受哪些因素影響?

重點看:

kafka的leader切換速度受哪些因素影響?

回答上面的問題

現在,看完全文之後,我想你應該對下面的問題很清楚了吧!

什麼是分割槽狀態機

❝所有的分割槽狀態的流轉都是透過分割槽狀態機來進行的, 統一管理! 每個分割槽狀態的流轉 都是有嚴格限制並且固定的,流轉到不同狀態需要執行的操作不一樣, 例如 當分割槽狀態流轉到

OnlinePartition

的時候, 就需要判斷是否需要執行

Leader選舉

kafka的leader切換速度受哪些因素影響?

建立Topic的時候如何選舉Leader?

❝建立Topic的時候並沒有發生 Leader選舉, 而是預設將 線上的第一個副本設定為Leader,所有線上的副本列表 為 ISR 列表。 寫入到了zookeeper中。

分割槽的所有副本都不線上, 這個時候啟動一臺之前不在ISR內的副本的Broker,它會當選為Leader嗎?

❝視情況而定。 首先, 啟動一臺Broker, 會用什麼策略選舉?

看上面的圖,我們可以知道是

OfflinePartitionLeaderElectionStrategy

然後看下這個策略是如何選舉的?

kafka的leader切換速度受哪些因素影響?

那麼最終結果就是:

所有副本不線上,那麼一個Leader的候選者都當選不了

那麼這個時候就會判斷

unclean。leader。election。enable

配置是否為true。

如果是true

, 則當前線上的副本就是隻有自己這個剛啟動的線上副本,自然而然就會當選Leader了。

如果是fase

, 則沒有副本能夠當前Leader, 次數處於一個無Leader的狀態。

當所有副本都不線上,然後一個一個重啟Broker上副本上線,誰會當選為Leader?誰先啟動就誰當選嗎?

❝不是, 跟上一個問題同理

根據

unclean。leader。election。enable

配置決定。

如果是true

, 則誰先啟動,誰就當選(會丟失部分資料)

如果是false

,則第一個在ISR列表中的副本當選。

順便再提一句, 雖然在這裡可能不是AR中的第一個副本當選Leader。

但是最終還是會自動執行Leader均衡的,自動均衡使用的策略是

PreferredReplicaPartitionLeaderElectionStrategy

(前提是開啟了自動均衡:

auto。leader。rebalance。enable=true

Broker下線了,Leader切換給了其他副本, 當Broker重啟的時候,Leader會還給之前的副本嗎?

❝根據配置

auto。leader。rebalance。enable=true

決定。

true:

會自動執行Leader均衡, 自動均衡策略是

PreferredReplicaPartitionLeaderElectionStrategy

策略

false:

不執行自動均衡。 那麼久不會還回去。 關於更詳細的 Leader均衡機制請看 Leader 均衡機制

Leader選舉期間對分割槽的影響

❝Leader的選舉基本上不會造成什麼影響, Leader的切換非常快, 每個分割槽不可用的時間在幾毫秒內。

希望能對你有所幫助!

標簽: 副本  Leader  分割槽  選舉