kafka的leader切換速度受哪些因素影響?
謝邀
關於這個問題說說我的淺見~ leader切換的確和很多因素有關,事實上很難定量地分析具體的耗時,不過我們依然可以試一試~
首先說一下,Kafka如何判斷leader掛了? 簡單來說是依靠Zookeeper的臨時節點機制:一旦發現某個broker不可用,其/brokers/ids/
之後,當BrokerChange事件從佇列中被獲取之後,對應的執行緒會進行處理之。可以監控JMX指標:kafka。controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs,檢視總體的處理時間。當然這是總體的處理時間,具體到分割槽leader選舉的部分, 可以再重點關注一下StopRelica/LeaderAndIsr以及UpdateMetadata處理時間方面的表現。
最後,對leader選舉速度影響最大的一個可能的原因就是controller本身承載了大量的客戶端請求處理任務,使得無法立即響應這些請求。目前社群有個KIP正在討論是否需要將這些需求分成不同的優先順序進行處理,有興趣的可以看看:KIP-291: Have separate queues for control requests and data requests
【福利】速進高質量滴滴技術交流群Kafka大全
大家好,我是石臻臻,這是 <
a href="htt
ps://
http://
mp。weixin。qq。com/mp/app
msgalbum?__biz=Mzg4ODY1NTcxNg==&action=getalbum&album_id=1966026980307304450
“> 「kafka專欄」 連載中的第「N」篇文章。。。
前幾天有個群友問我: kafka如何修改優先副本? 他們有個需求是, 想指定某個分割槽中的其中一個副本為
Leader
需求分析
對於這麼一個問題,在我們生產環境還是挺常見的,經常有需要修改某個Topic中某分割槽的
Leader
比如
topic1-0
這個分割槽有3個副本
[0,1,2]
, 按照「優先副本」的規則,那麼
0
號副本肯定就是
Leader
了 我們都知道分割槽中的只有
Leader
副本才會提供讀寫副本,其他副本作為備份 假如在某些情況下,
「0」
號副本效能資源不夠,或者網路不太好,或者IO壓力比較大,那麼肯定對Topic的整體讀寫效能有很大影響, 這個時候切換一臺壓力較小副本作為
Leader
就顯得很重要;
優先副本: 分割槽中的
AR
(所有副本)資訊, 優先選擇排在第一位的副本作為Leader Leader機制: 分割槽中只有一個Leader來承擔讀寫,其他副本只是作為備份
那麼如何實現這樣一個需求呢?
解決方案
知道了原理之後,我們就能想到對應的解決方案了 只要將 分割槽的
AR
中的第一個位置,替換成你指定副本就行了; AR = { 0,1,2 } ==> AR = {2,1,0}
一般能夠達到這個目的有兩種方案,下面我們來分析一下
方案一: 分割槽副本重分配
之前關於
分割槽副本重分配
我已經寫過很多文章了,如果想詳細瞭解
分割槽副本重分配、資料遷移、副本擴縮容
可以看看連結的文章, 這裡我就簡單說一下;
一般分割槽副本重分配主要有三個流程
生成推薦的遷移Json檔案
執行遷移Json檔案
驗證遷移流程是否完成
這裡我們主要看第2步驟, 來看看遷移檔案一般是什麼樣子的
{
”version“: 1,
”partitions“: [{
”topic“: ”topic1“,
”partition“: 0,
”replicas“: [0,1,2]
}]
}
這個遷移Json意思是, 把
topic1
的「0」號分割槽的副本分配成
[0,1,2]
,也就是說
topic1-0
號分割槽最終有3個副本分別在 {brokerId-0,brokerId-1,brokerId-2} ; 如果你有看過我之前寫的
分割槽副本重分配原理原始碼分析
,那麼肯定就知道,不管你之前的分配方式是什麼樣子的, 最終副本分配都是 [0,1,2] , 之前副本多的,會被刪掉,少的會被新增;
那麼我們想要實現 我們的需求 是不是把這個Json檔案 中的 ”replicas“: [0,1,2] 改一下就行了,比如改成 ”replicas“: [2,1,0] 改完Json後執行,執行
execute
, 正式開始重分配流程! 遷移完成之後, 就會發現,Leader已經變成上面的第一個位置的副本「2」 了
優缺點
優點: 實現了需求, 並且主動切換了Leader
缺點: 操作比較複雜容易出錯,需要先獲取原先的分割槽分配資料,然後手動修改Json檔案,這裡比較容易出錯,影響會比較大,當然這些都可以透過校驗介面來做好限制, 最重要的一點是 副本重分配當前只能有一個任務 ! 假如你當前有一個「副本重分配」的任務在,那麼這裡就不能夠執行了, 「副本重分配」是一個比較「重」 了的操作,出錯對叢集的影響比較大
方案二: 手動修改AR順序
首先,我們知道分割槽副本的分配資料是儲存在zookeeper中的節點
brokers/topics/{topicName}
中; 我們看個
Topic1
的節點資料例子;
{
”version“: 2,
”partitions“: {
”2“: [3, 2, 1],
”1“: [2, 1, 3],
”4“: [2, 3, 1],
”0“: [1, 3, 2],
”3“: [1, 2, 3]
},
”adding_replicas“: {},
”removing_replicas“: {}
}
資料解釋:
version:
版本資訊, 現在有 「1」、「2」 兩個版本
removing_replicas:
需要刪除的副本資料, 在進行分割槽副本重分配過程中, 多餘的副本會在資料遷移快完成的時候被刪除掉,刪除成功這裡的資料會被清除
adding_replicas:
需要新增的副本資料,在進行分割槽副本重分配過程中, 新增加的副本將會被新增,新增完成這裡的資料會清除;
partitions:
Topic的所有分割槽副本分配方式; 上面表示總共有5個分割槽,以及對應的副本位置;
知道了這些之後,想要修改優先副本,是不是可以透過直接修改zookeeper中的節點資料就行了; 比如 我們把 「1」號分割槽的副本位置改成 [2,1,3]
改成這樣之後, 還需要 執行 重新進行優先副本選舉操作 ,例如透過kafka的命令執行
sh bin/kafka-leader-election。sh ——bootstrap-server xxxx:9090 ——topic Topic1——election-type PREFERRED ——partition 1
——election-type
:
PREFERRED
這個表示的以優先副本的方式進行重新選舉
那麼做完這兩步之後, 我們的
修改優先副本的目的就達成了.........嗎 ?
實則並沒有, 因為這裡僅僅只是修改了
zookeeper
節點的資料, 而
bin/kafka-leader-election。sh
重選舉的操作是
Controller
來進行的; 如果你對
Controller
的作用和原始碼足夠了解, 肯定知道Controller裡面儲存了每個Topic的分割槽副本資訊, 是儲存在JVM記憶體中的, 然後我們手動修改Zookeeper中的節點,並沒有觸發
Controller
更新自身的記憶體 也就是說 就算我們執行了
kafka-leader-election。sh
, 它也不會有任何變化,因為優先副本沒有被感知到修改了;
解決這個問題也很簡單,讓
Controller
感知到資料的變更就行了 最簡單的方法, 讓
Controller
發生重新選舉, 資料重新載入!
總結
手動修改zookeeper中的「AR」順序
Controller 重新選舉
執行 分割槽副本重選舉操作(優先副本策略)
簡單程式碼
當然上面功能,肯定是要整合到
LogiKM
中的咯; 簡單程式碼如下
// 這裡轉換成HashMap型別,切勿自定義型別,以防kafka節點資料後續新增資料節點,導致資料丟失
HashMap partitionMap = zkConfig。get(ZkPathUtil。getBrokerTopicRoot(topicName), HashMap。class);
JSONObject partitionJson = (JSONObject)partitionMap。get(”partitions“);
JSONArray partitions = (JSONArray)partitionJson。get(partition);
//部分程式碼省略
//調換序列 優先副本
Integer first = partitions。getInteger(0);
partitions。set(0,targetBroker);
partitions。set(index,first);
zkUtils = ZookeeperUtils。getKafkaZkUtils(clusterDO。getZookeeper());
String json = JSON。toJSONString(partitionMap);
zkUtils。updatePersistentPath(ZkPathUtil。getBrokerTopicRoot(topicName), json,null);
//寫入成功之後觸發一下 非同步去優先副本選舉
new Thread(()->{
try {
// 1。 先讓Controller重新選舉 (不然上面修改的還沒有生效) (TODO。。 待最佳化 -> 頻繁的Controller重選舉對叢集效能會有影響)
zkConfig。deletePath(ZkPathUtil。CONTROLLER_ROOT_NODE);
// 等待 Controller 選舉一下
Thread。sleep(1000);
//2。 然後再發起副本重新選舉
preferredReplicalElectCommand。preferredReplicaElection(clusterId,topicName,partition,”“);
} catch (ConfigException | InterruptedException e) {
LOGGER。error(”重新選舉異常。e:{}“,e);
e。printStackTrace();
}
})。start();
優缺點
優點: 實現了目標需求, 簡單, 操作方便
缺點: 頻繁的
Controller
重選舉對生產環境來說會有一些影響;
最佳化與改進
第二種方案中,需要
Controller
重選舉, 頻繁的選舉肯定是對生產環境有影響的;
Controller
承擔了非常多的責任,比如
分割槽副本重分配
、
刪除topic
、
Leader選舉
等等還有很多都是它在幹
那麼如何不進行Controller的重選舉,也能達到我們的需求呢?
我們的需求是,當我們 修改了zookeeper中的節點資料的時候,能夠迅速的讓Controller感知到,並更新自己的記憶體資料就行了;
對於這個問題,我會在下一期文章中介紹
問題
看完這篇文章,提幾個相關的問題給大家思考一下;
如果我在修改zk中的「AR」資訊時候不僅僅是調換順序,而是有新增或者刪除副本會發生什麼情況呢?
如果手動修改
brokers/topics/{topicName}/partitions/{分割槽號}/state
節點裡面的leader資訊,能不能直接更新Leader?
副本選舉的整個流程是什麼樣子的?
大家可以思考一下, 問題答案我會在後面的文章中一一講解!
點個關注, 推送更多 乾貨 內容, 一起進 【滴滴技術答疑群 】 跟眾多技術專家交流技術吧!
【福利】速進高質量滴滴技術交流群Kafka大全
目錄
思考幾個問題
什麼是分割槽狀態機?
建立Topic的時候如何選舉Leader?
分割槽的所有副本都不線上, 這個時候啟動一臺之前不在ISR內的副本,它會當選為Leader嗎?
當所有副本都不線上,然後一個一個重啟Broker上副本上線,誰會當選為Leader?誰先啟動就誰當選嗎?
Broker下線了,Leader切換給了其他副本, 當Broker重啟的時候,Leader會還給之前的副本嗎?
選舉成功的那一刻, 生產者和消費著都做了哪些事情?
Leader選舉期間對分割槽的影響
分割槽Leader選舉流程分析
在開始原始碼分析之前, 大家先看下面這張圖, 好讓自己對Leader選舉有一個非常清晰的認知,然後再去看後面的原始碼分析文章,會更容易理解。
整個流程分為三大塊
觸發選舉場景 圖左
執行選舉流程 圖中
Leader選舉策略 圖右
分割槽狀態機
❝首先大家得了解兩個狀態機
1. 分割槽狀態機
控制分割槽狀態流轉
2. 副本狀態機
控制副本狀態流轉
這裡我們主要講解分割槽狀態機,這張圖表示的是分割槽狀態機
NonExistentPartition
:分割槽在將要被建立之前的初始狀態是這個,表示不存在
NewPartition
: 表示正在建立新的分割槽, 是一箇中間狀態, 這個時候只是在Controller的記憶體中存了狀態資訊
OnlinePartition
: 線上狀態, 正常的分割槽就應該是這種狀態,只有在線的分割槽才能夠提供服務
OfflinePartition
: 下線狀態, 分割槽可能因為Broker宕機或者刪除Topic等原因流轉到這個狀態, 下線了就不能提供服務了
NonExistentPartition
: 分割槽不存在的狀態, 當Topic刪除完成成功之後, 就會流轉到這個狀態, 當還處在刪除中的時候,還是停留在下線狀態。
我們今天要講的Leader選舉
就是在
之前狀態=>OnlinePartition狀態的時候發生的。
Leader選舉流程分析
原始碼入口:
PartitionStateMachine#electLeaderForPartitions
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
可以看到 我們最終是呼叫了
doElectLeaderForPartitions
執行分割槽Leader選舉。
PartitionStateMachine#doElectLeaderForPartitions
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
總結一下上面的原始碼
去zookeeper節點
/broker/topics/{topic名稱}/partitions/{分割槽號}/state
節點讀取基本資訊。
遍歷從zk中獲取的
leaderIsrAndControllerEpoch
資訊,做一些簡單的校驗:zk中獲取的資料的controllerEpoch必須<=當前的Controller的controller_epoch。最終得到
validLeaderAndIsrs
, controller_epoch 就是用來防止腦裂的, 當有兩個Controller當選的時候,他們的epoch肯定不一樣, 那麼最新的epoch才是真的Controller
如果沒有獲取到有效的
validLeaderAndIsrs
資訊 則直接返回
根據入參
partitionLeaderElectionStrategy
來匹配不同的Leader選舉策略。來選出合適的Leader和ISR資訊
根據上面的選舉策略選出的
LeaderAndIsr
資訊進行遍歷, 將它們一個個寫入到zookeeper節點
/broker/topics/{topic名稱}/partitions/{分割槽號}/state
中。 (當然如果上面沒有選擇出合適的leader,那麼久不會有這個過程了)
遍歷上面寫入zk成功的分割槽, 然後更新Controller裡面的分割槽leader和isr的記憶體資訊 併發送
LeaderAndISR
請求,通知對應的Broker Leader更新了。
看上面的Leader選舉策略是不是很簡單, 但是中間究竟是如何選擇Leader的? 這個是根據傳入的策略型別, 來做不同的選擇
❝那麼有哪些策略呢?以及什麼時候觸發這些選舉呢?
分割槽的幾種策略以及對應的觸發場景
1. OfflinePartitionLeaderElectionStrategy
❝遍歷分割槽的AR, 找到第一個滿足以下條件的副本:
副本線上
在ISR中
。
如果找不到滿足條件的副本,那麼再根據 傳入的引數
allowUnclean
判斷
allowUnclean=true
:AR順序中所有
線上副本
中的第一個副本。
allowUnclean=false
: 需要去查詢配置
unclean。leader。election。enable
的值。
若=true ,則跟上面 1一樣 。
若=false,直接返回None,沒有找到合適的Leader。
原始碼位置:
Election#leaderForOffline
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
先組裝所有給定的
validLeaderAndIsrs
的資訊 其實主要還是要去獲取每個Topic的對應的
unclean。leader。election。enable
屬性值。
預設情況下,我們呼叫到這裡的時候 這個入參
allowUnclean=false
。
如果是false
那我們需要去查詢一下指定的topic它的屬性
unclean。leader。election。enable
是什麼
如果是true
則表示直接覆蓋了
unclean。leader。election。enable
的配置為true。
找到 第一個滿足條件:
副本線上
&& 在
ISR中的副本
。
如果沒有滿足條件的 則判斷入
uncleanLeaderElectionEnabled
的配置 如果是true,則從不在isr中的存活副本中獲取副本作為leader。 當然這個
uncleanLeaderElectionEnabled
引數是上 步驟1中決定的。
觸發場景:Controller 重新載入
❝Controller 當選的時候會啟動
分割槽狀態機
partitionStateMachine
, 啟動的時候會重新載入所有分割槽的狀態到記憶體中, 並觸發 對處於
NewPartition
或
OfflinePartition
狀態的所有分割槽嘗試變更為
OnlinePartition
狀態的狀態。把新建立的分割槽和離線的分割槽觸發一下選舉流程啊
觸發原始碼入口:
KafkaController#onControllerFailover
partitionStateMachine。startup()
partitionStateMachine。triggerOnlinePartitionStateChange()
觸發場景:指令碼執行髒選舉
❝當執行
kafka-leader-election。sh
的時候並且模式選擇的是
UNCLEAN
。 則會觸發這個模式。
這裡注意一下,入參
allowUnclean
= (electionTrigger == AdminClientTriggered)
意思是: 當觸發的場景是
AdminClientTriggered
的時候, 則
allowUnclean=true
,表示 不關心配置引數
unclean。leader。election。enable
是什麼
如果沒有找到符合條件的Leader, 則就去非ISR 列表找Leader。 剛好我們執行指令碼的時候觸發器就是
AdminClientTriggered
其他觸發器有:
AutoTriggered
: 定時自動觸發。
ZkTriggered
:Controller切換的時候觸發的(zk節點/controller 的變更便是Controller角色的切換) AdminClientTriggered:客戶端主動觸發。
觸發場景:Controller 監聽到有Broker啟動了
❝同上。
觸發原始碼入口:
KafkaController#processBrokerChange#onBrokerStartup
partitionStateMachine。triggerOnlinePartitionStateChange()
觸發場景:Controller 監聽 LeaderAndIsrResponseReceived請求
❝同上。
當Controller向對應的Broker發起
LeaderAndIsrRequest
請求的時候。
有一個回撥函式callback, 這個回撥函式會向Controller發起一個事件為
LeaderAndIsrResponseReceived
請求。
具體原始碼在:
ControllerChannelManager#sendLeaderAndIsrRequest
Controller收到這個事件的請求之後,根據返回的
leaderAndIsrResponse
資料
會判斷一下有沒有
新增加
的離線副本(一般都是由於磁碟訪問有問題)
如果有新的離線副本,則需要將這個離線副本標記為Offline狀態
原始碼入口:
KafkaController#onReplicasBecomeOffline
partitionStateMachine。triggerOnlinePartitionStateChange()
觸發場景:Controller 監聽 UncleanLeaderElectionEnable請求
❝當我們在修改動態配置的時候, 將動態配置:
unclean。leader。election。enable
設定為 true 的時候
會觸發向Controller發起
UncleanLeaderElectionEnable
的請求,這個時候則需要觸發一下。觸發請求
同上
。
觸發原始碼入口:
KafkaController#processTopicUncleanLeaderElectionEnable
partitionStateMachine。triggerOnlinePartitionStateChange(topic)
上面的觸發呼叫的程式碼就是下面的介面
對處於
NewPartition
或
OfflinePartition
狀態的所有分割槽嘗試變更為
OnlinePartition
的狀態。 狀態的流程觸發了Leader選舉。
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
獲取所有 OfflinePartition 、NewPartition 的分割槽狀態
嘗試將 所有 NewPartition or OfflinePartition 狀態的分割槽全部轉別成 OnlinePartition狀態, 但是如果對應的Topic正在刪除中,則會被排除掉
分割槽狀態機進行狀態流轉 使用
OfflinePartitionLeaderElectionStrategy
選舉策略(
allowUnclean=true
表示如果從isr中沒有選出leader,則允許從非isr列表中選舉leader ,
allowUnclean=false
表示如果從isr中沒有選出leader, 則需要去讀取配置檔案的配置
unclean。leader。election。enable
來決定是否允許從非ISR列表中選舉Leader。 )
2. ReassignPartitionLeaderElectionStrategy
❝
分割槽副本重分配選舉策略:
當執行分割槽副本重分配的時候, 原來的Leader可能有變更, 則需要觸發一下 Leader選舉。
只有當之前的Leader副本在經過重分配之後不存在了
。
例如: [2,0] ==> [1,0] 。 原來2是Leader副本,經過重分配之後變成了 [1,0]。2已經不復存在了,所以需要重新選舉Leader。
當原來的分割槽Leader副本 因為某些異常,下線了
。需要重新選舉Leader
分割槽副本重分配發生的Leader選舉.
Election#leaderForReassign
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
總結:
❝從當前的副本分配列表中,獲取
副本線上
&&
副本在ISR中
的 第一個副本,遍歷的順序是當前副本的分配方式(
AR
),跟ISR的順序沒有什麼關係。
觸發場景:分割槽副本重分配
❝並不是每次執行分割槽副本重分配都會觸發這個Leader選舉策略, 下面兩種情況才會觸發
只有當之前的Leader副本在經過重分配之後不存在了
。例如: [2,0] ==> [1,0] 。 原來2是Leader副本,經過重分配之後變成了 [1,0]。2已經不復存在了,所以需要重新選舉Leader。
當原來的分割槽Leader副本 因為某些異常,下線了
。需要重新選舉Leader
對應的判斷條件程式碼如下:
KafkaController#moveReassignedPartitionLeaderIfRequired
private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition,
newAssignment: ReplicaAssignment): Unit = {
// 重分配之後的所有副本
val reassignedReplicas = newAssignment。replicas
//當前的分割槽Leader是哪個
val currentLeader = controllerContext。partitionLeadershipInfo(topicPartition)。leaderAndIsr。leader
// 如果分配後的副本不包含當前Leader副本,則需要重新選舉
if (!reassignedReplicas。contains(currentLeader)) {
//觸發Leader重選舉,策略是ReassignPartitionLeaderElectionStrategy
partitionStateMachine。handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
} else if (controllerContext。isReplicaOnline(currentLeader, topicPartition)) {
// 上面2種情況都不符合, 那麼就沒有必要leader重選舉了, 更新一下leaderEpoch就行 了
updateLeaderEpochAndSendRequest(topicPartition, newAssignment)
} else {
//觸發Leader重選舉,策略是ReassignPartitionLeaderElectionStrategy
partitionStateMachine。handleStateChanges(Seq(topicPartition), OnlinePartition, Some(ReassignPartitionLeaderElectionStrategy))
}
}
點選檢視分割槽重分配的原始碼解析
3. PreferredReplicaPartitionLeaderElectionStrategy
❝優先副本選舉策略, 必須滿足三個條件:
是第一個副本&&副本線上&&副本在ISR列表中。
滿足上面三個條件才會當選leader,不滿足則不會做變更。
def leaderForPreferredReplica(controllerContext: ControllerContext,
leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
leaderAndIsrs。map { case (partition, leaderAndIsr) =>
leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)
}
}
private def leaderForPreferredReplica(partition: TopicPartition,
leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext): ElectionResult = {
// AR列表
val assignment = controllerContext。partitionReplicaAssignment(partition)
// 線上副本
val liveReplicas = assignment。filter(replica => controllerContext。isReplicaOnline(replica, partition))
val isr = leaderAndIsr。isr
// 找出第一個副本 是否線上 並且在ISR中。
val leaderOpt = PartitionLeaderElectionAlgorithms。preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas。toSet)
// 組裝leaderandisr返回 ,注意這裡是沒有修改ISR資訊的
val newLeaderAndIsrOpt = leaderOpt。map(leader => leaderAndIsr。newLeader(leader))
ElectionResult(partition, newLeaderAndIsrOpt, assignment)
}
def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
assignment。headOption。filter(id => liveReplicas。contains(id) && isr。contains(id))
}
從記憶體中獲取TopicPartition的分配方式
過濾不線上的副本
找到第一個副本判斷一下是否線上&&在ISR列表中。如果滿足,則選他為leader,如果不滿足,也不會再找其他副本了。
返回leaderAndIsr資訊, 這裡的ISR是沒有做修改的。
觸發場景:自動定時執行優先副本選舉任務
❝Controller 啟動的時候,會啟動一個定時任務 。每隔一段時間就去執行
優先副本選舉
任務。
與之相關配置:
## 如果為true表示會建立定時任務去執行 優先副本選舉,為false則不會建立
auto。leader。rebalance。enable=true
## 每隔多久執行一次 ; 預設300秒;
leader。imbalance。check。interval。seconds partition = 300
##標識每個 Broker 失去平衡的比率,如果超過該比率,則執行重新選舉 Broker 的 leader;預設比例是10%;
##這個比率的演算法是 :broker不平衡率=非優先副本的leader個數/總分割槽數,
##假如一個topic有3個分割槽[0,1,2],並且有3個副本 ,正常情況下,[0,1,2]分別都為一個leader副本; 這個時候 0/3=0%;
leader。imbalance。per。broker。percentage = 10
觸發場景: Controller 重新載入的時候
❝在這個觸發之前還有執行
partitionStateMachine。startup()
相當於是先把 OfflinePartition、NewPartition狀態的分割槽執行了
OfflinePartitionLeaderElectionStrategy
策略。
然後又執行了
PreferredReplicaPartitionLeaderElectionStrategy
策略 這裡是從zk節點
/admin/preferred_replica_election
讀取資料, 來進行判斷是否有需要執行Leader選舉的分割槽
它是在執行
kafka-preferred-replica-election
命令的時候會建立這個zk節點
但是這個已經被標記為廢棄了,並且在3。0的時候直接移除了。
原始碼位置:
KafkaController#onControllerFailover
// 從zk節點/admin/preferred_replica_election找到哪些符合條件需要執行優先副本選舉的分割槽
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
// 這裡的觸發型別 是 ZkTriggered
onReplicaElection(pendingPreferredReplicaElections, ElectionType。PREFERRED, ZkTriggered)
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
// 去zk讀取節點 /admin/preferred_replica_election
val partitionsUndergoingPreferredReplicaElection = zkClient。getPreferredReplicaElection
// 如果指定分割槽的 leader 已經是AR的第一個副本 或者 topic被刪除了,則 過濾掉這個分割槽(沒有必要執行leader選舉了)
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection。filter { partition =>
val replicas = controllerContext。partitionReplicaAssignment(partition)
val topicDeleted = replicas。isEmpty
val successful =
if (!topicDeleted) controllerContext。partitionLeadershipInfo(partition)。leaderAndIsr。leader == replicas。head else false
successful || topicDeleted
}
// 將zk獲取到的分割槽資料 - 剛剛需要忽略的資料 = 還需要執行選舉的資料
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection —— partitionsThatCompletedPreferredReplicaElection
// 找到哪些分割槽正在刪除
val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion。filter(partition => topicDeletionManager。isTopicQueuedUpForDeletion(partition。topic))
// 待刪除的分割槽也過濾掉
val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion —— pendingPreferredReplicaElectionsSkippedFromTopicDeletion
// 返回最終需要執行優先副本選舉的資料。
pendingPreferredReplicaElections
}
觸發場景:執行優先副本選舉指令碼的時候
❝執行指令碼
kafka-leader-election。sh
並且選擇的模式是
PREFERRED
(優先副本選舉) 則會選擇
PreferredReplicaPartitionLeaderElectionStrategy
策略選舉
4. ControlledShutdownPartitionLeaderElectionStrategy
❝
受控關機選舉策略
:
當Broker關機的過程中,會向Controller發起一個請求, 讓它重新發起一次選舉, 把在所有正在關機(也就是發起請求的那個Broker,或其它同時正在關機的Broker) 的Broker裡面的副本給剔除掉。
根據演算法算出leader:找到第一個滿足條件的副本:
副本線上 && 副本在ISR中 && 副本所在的Broker不在正在關閉的Broker集合中 。
構造新的ISR列表: 在之前的isr列表中將 正在被關閉的Broker裡面的副本 給剔除掉
Election#leaderForControlledShutdown
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
觸發場景:Broker關機的時候
❝當Broker關閉的時候, 會向Controller發一起一個
ControlledShutdownRequest
請求, Controller收到這個請求會針對性的做一些善後事件。比如說
執行Leader重選舉
等等之類的。
原始碼位置:
KafkaServer#controlledShutdown
Controller收到請求的原始碼位置:
KafkaController#doControlledShutdown
與之相關的配置有:
controlled。shutdown。enable : 是否啟用受控關閉操作
controlled。shutdown。max。retries 受控關機操作 最大重試的次數
controlled。shutdown。retry。backoff。ms 失敗後等等多久再次重試
其他場景
新建立的Topic Leader選舉策略
❝建立新的Topic的時候,並沒有發生Leader選舉的操作, 而是預設從分割槽對應的所有線上副本中選擇第一個為leader, 然後isr就為 所有線上副本,再組裝一下當前的
controller_epoch
資訊,寫入到zk節點
/brokers/topics/{Topic名稱}/partitions/{分割槽號}/state
中。
最後發起
LeaderAndIsrRequest
請求,通知 leader 的變更。
詳細看看原始碼:
PartitionStateMachine#doHandleStateChanges
分割槽狀態從
NewPartition
流轉到
OnlinePartition
因篇幅原因原始碼省略
想獲得更好的閱讀體驗和【檢視原始碼】 請點選【閱讀原文】
從當前的Controller 記憶體中獲取所有入參的分割槽對應的副本資訊
過濾那些已經下線的副本( Broker宕機、網路異常、磁碟離線、等等都有可能造成副本下線) 。
每個分割槽對應的所有線上副本資訊 為 ISR 資訊,然後取ISR的第一個副本為leader分割槽。當然特別注意一下, 這個時候獲取的isr資訊的順序就是 分割槽建立時候分配好的AR順序, 獲取第一個線上的。(因為在其他情況下 ISR的順序跟AR的順序並不一致)
組裝 上面的
isr
、
leader
、
controller_epoch
等資訊 寫入到zk節點 /brokers/topics/{Topic名稱}/partitions/{分割槽號}/state例如下面所示{”controller_epoch“:1,”leader“:0,”version“:1,”leader_epoch“:0,”isr“:[0,1,2]}
然後向其他相關Broker 發起
LeaderAndIsrRequest
請求,通知他們Leader和Isr資訊已經變更了,去做一下想要的處理。比如去新的leader發起Fetcher請求同步資料。
可以看看之前我們分析過的 Topic建立的原始碼解析 的原理圖 如下
重點看:
回答上面的問題
現在,看完全文之後,我想你應該對下面的問題很清楚了吧!
什麼是分割槽狀態機
❝所有的分割槽狀態的流轉都是透過分割槽狀態機來進行的, 統一管理! 每個分割槽狀態的流轉 都是有嚴格限制並且固定的,流轉到不同狀態需要執行的操作不一樣, 例如 當分割槽狀態流轉到
OnlinePartition
的時候, 就需要判斷是否需要執行
Leader選舉
,
建立Topic的時候如何選舉Leader?
❝建立Topic的時候並沒有發生 Leader選舉, 而是預設將 線上的第一個副本設定為Leader,所有線上的副本列表 為 ISR 列表。 寫入到了zookeeper中。
分割槽的所有副本都不線上, 這個時候啟動一臺之前不在ISR內的副本的Broker,它會當選為Leader嗎?
❝視情況而定。 首先, 啟動一臺Broker, 會用什麼策略選舉?
看上面的圖,我們可以知道是
OfflinePartitionLeaderElectionStrategy
然後看下這個策略是如何選舉的?
那麼最終結果就是:
所有副本不線上,那麼一個Leader的候選者都當選不了
那麼這個時候就會判斷
unclean。leader。election。enable
配置是否為true。
如果是true
, 則當前線上的副本就是隻有自己這個剛啟動的線上副本,自然而然就會當選Leader了。
如果是fase
, 則沒有副本能夠當前Leader, 次數處於一個無Leader的狀態。
當所有副本都不線上,然後一個一個重啟Broker上副本上線,誰會當選為Leader?誰先啟動就誰當選嗎?
❝不是, 跟上一個問題同理
根據
unclean。leader。election。enable
配置決定。
如果是true
, 則誰先啟動,誰就當選(會丟失部分資料)
如果是false
,則第一個在ISR列表中的副本當選。
順便再提一句, 雖然在這裡可能不是AR中的第一個副本當選Leader。
但是最終還是會自動執行Leader均衡的,自動均衡使用的策略是
PreferredReplicaPartitionLeaderElectionStrategy
(前提是開啟了自動均衡:
auto。leader。rebalance。enable=true
)
Broker下線了,Leader切換給了其他副本, 當Broker重啟的時候,Leader會還給之前的副本嗎?
❝根據配置
auto。leader。rebalance。enable=true
決定。
true:
會自動執行Leader均衡, 自動均衡策略是
PreferredReplicaPartitionLeaderElectionStrategy
策略
false:
不執行自動均衡。 那麼久不會還回去。 關於更詳細的 Leader均衡機制請看 Leader 均衡機制
Leader選舉期間對分割槽的影響
❝Leader的選舉基本上不會造成什麼影響, Leader的切換非常快, 每個分割槽不可用的時間在幾毫秒內。
希望能對你有所幫助!
下一篇:關於平安普惠那些事