圖解Kafka Producer 訊息快取模型
❝
GitHub - didi/LogiKM: 一站式Apache Kafka叢集指標監控與運維管控平臺
✏️更強大的管控能力✏️ 更高效的問題定位能力 更便捷的叢集運維能力 更專業的資源治理 更友好的運維生態
大家好,我是彥祖呀
在閱讀本文之前, 希望你可以思考一下下面幾個問題, 帶著問題去閱讀文章會獲得更好的效果。
傳送訊息的時候, 當Broker掛掉了,訊息體還能寫入到訊息快取中嗎?
當訊息還儲存在快取中的時候, 假如Producer客戶端掛掉了,訊息是不是就丟失了?
當最新的ProducerBatch還有空餘的記憶體,但是接下來的一條訊息很大,不足以加上上一個Batch中,會怎麼辦呢?
那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?
原文地址:圖解Kafka Producer 訊息快取模型
什麼是訊息累加器RecordAccumulator
kafka為了提高Producer客戶端的傳送吞吐量和提高效能,選擇了將訊息暫時快取起來,等到滿足一定的條件, 再進行批次傳送, 這樣可以減少網路請求,提高吞吐量。
而快取這個訊息的就是RecordAccumulator類。
上圖就是整個訊息存放的快取模型,我們接下來一個個來講解。
訊息快取模型
上圖表示的就是 訊息快取的模型, 生產的訊息就是暫時存放在這個裡面。
每條訊息,我們按照TopicPartition維度,把他們放在不同的
Deque
佇列裡面。 TopicPartition相同,會在相同
Deque
的裡面。
ProducerBatch
: 表示同一個批次的訊息, 訊息真正傳送到Broker端的時候都是按照批次來發送的, 這個批次可能包含一條或者多條訊息。
如果沒有找到訊息對應的ProducerBatch佇列, 則建立一個佇列。
找到ProducerBatch佇列隊尾的Batch,發現Batch還可以塞下這條訊息,則將訊息直接塞到這個Batch中
找到ProducerBatch佇列隊尾的Batch,發現Batch中剩餘記憶體,不夠塞下這條訊息,則會建立新的Batch
當訊息傳送成功之後, Batch會被釋放掉。
ProducerBatch的記憶體大小
❝ 那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?
先說結論: 當訊息預估記憶體大於
batch。size
的時候,則按照訊息預估記憶體建立, 否則按照
batch。size
的大小建立(預設16k)。
我們來看一段程式碼,這段程式碼就是在建立ProducerBatch的時候預估記憶體的大小
RecordAccumulator#append
/**
* 公眾號: 石臻臻的雜貨鋪
* 微信:szzdzhp001
**/
// 找到 batch。size 和 這條訊息在batch中的總記憶體大小的 最大值
int size = Math。max(this。batchSize, AbstractRecords。estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 申請記憶體
buffer = free。allocate(size, maxTimeToBlock);
假設當前生產了一條訊息為M, 剛好訊息M找不到可以存放訊息的ProducerBatch(不存在或者滿了),那麼這個時候就需要建立一個新的ProducerBatch了
預估訊息的大小 跟
batch。size
預設大小16384(16kb)。 對比,取最大值用於申請的記憶體大小的值。
原文地址:圖解Kafka Producer 訊息快取模型
❝ 那麼, 這個訊息的預估是如何預估的?純粹的是訊息體的大小嗎?
DefaultRecordBatch#estimateBatchSizeUpperBound
預估需要的Batch大小,是一個預估值,因為沒有考慮壓縮演算法從額外開銷
/**
* 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。
* 這只是一個估計,因為它沒有考慮使用的壓縮演算法的額外開銷。
**/
static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {
return RECORD_BATCH_OVERHEAD + DefaultRecord。recordSizeUpperBound(key, value, headers);
}
預估這個訊息M的大小 + 一個
RECORD_BATCH_OVERHEAD
的大小
RECORD_BATCH_OVERHEAD
是一個Batch裡面的一些基本元資訊,總共佔用了
61B
訊息M的大小也並不是單單的只有訊息體的大小,總大小=(key,value,headers)的大小+
MAX_RECORD_OVERHEAD
MAX_RECORD_OVERHEAD
:一條訊息頭最大佔用空間, 最大值為
21B
也就是說建立一個ProducerBatch,最少就要83B 。
比如我傳送一條訊息 “ 1 ” , 預估得到的大小是 86B, 跟
batch。size(預設16384)
相比取最大值。 那麼申請記憶體的時候取最大值 16384 。
關於Batch的結構和訊息的結構,我們回頭
單獨用一篇文章來講解
。
記憶體分配
我們都知道RecordAccumulator裡面的快取大小是一開始定義好的, 由
buffer。memory
控制, 預設33554432 (32M)
當生產的速度大於傳送速度的時候,就可能出現Producer寫入阻塞。
而且頻繁的建立和釋放ProducerBatch,會導致頻繁GC, 所有kafka中有個快取池的概念,這個快取池會被重複使用,但是隻有固定( batch。size)的大小才能夠使用快取池。
PS:以下16k指得是 batch.size的預設值.
Batch的建立和釋放
1. 記憶體16K 快取池中有可用記憶體
①。 建立Batch的時候, 會去快取池中,獲取隊首的一塊記憶體ByteBuffer 使用。
②。 訊息傳送完成,釋放Batch, 則會把這個ByteBuffer,放到快取池的隊尾中,並且呼叫
ByteBuffer。clear
清空資料。以便下次重複使用
2. 記憶體16K 快取池中無可用記憶體
①。 建立Batch的時候, 去非快取池中的記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池nonPooledAvailableMemory 減少 16K 的記憶體, 然後Batch正常建立就行了,
不要誤以為好像真的發生了記憶體的轉移。
②。 訊息傳送完成,釋放Batch, 則會把這個ByteBuffer,放到快取池的隊尾中,並且呼叫
ByteBuffer。clear
清空資料, 以便下次重複使用
原文地址:圖解Kafka Producer 訊息快取模型
3. 記憶體非16K 非快取池中記憶體夠用
①。 建立Batch的時候, 去非快取池(nonPooledAvailableMemory)記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池(nonPooledAvailableMemory)
減少
對應的記憶體, 然後Batch正常建立就行了,
不要誤以為好像真的發生了記憶體的轉移。
②。 訊息傳送完成,釋放Batch, 純粹的是在非快取池(nonPooledAvailableMemory)中加上剛剛釋放的Batch記憶體大小。 當然這個Batch會被GC掉
4. 記憶體非16K 非快取池記憶體不夠用
①。 先嚐試將 快取池中的記憶體一個一個釋放到 非快取池中, 直到非快取池中的記憶體夠用與建立Batch了
②。 建立Batch的時候, 去非快取池(nonPooledAvailableMemory)記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池(nonPooledAvailableMemory)
減少
對應的記憶體, 然後Batch正常建立就行了,
不要誤以為好像真的發生了記憶體的轉移。
③。 訊息傳送完成,釋放Batch, 純粹的是在非快取池(nonPooledAvailableMemory)中加上剛剛釋放的Batch記憶體大小。 當然這個Batch會被GC掉
例如: 下面我們需要建立 48k的batch, 因為超過了16k,所以需要在非快取池中分配記憶體, 但是非快取池中當前可用記憶體為0 , 分配不了, 這個時候就會嘗試去 快取池裡面釋放一部分記憶體到 非快取池。
釋放第一個ByteBuffer(16k) 不夠,則繼續釋放第二個,直到釋放了3個之後總共48k,發現記憶體這時候夠了, 再去建立Batch。
注意:這裡我們涉及到的 非快取池中的記憶體分配, 僅僅指的的記憶體數字的增加和減少。
問題和答案
❝
傳送訊息的時候, 當Broker掛掉了,訊息體還能寫入到訊息快取中嗎?
當Broker掛掉了,Producer會提示下面的警告⚠️, 但是
傳送訊息
過程中
這個訊息體還是可以
寫入到 訊息快取中
的,也僅僅是寫到到快取中而已。
WARN [Producer clientId=console-producer] Connection to node 0 (/172。23。164。192:9090) could not be established。 Broker may not be available
❝
當最新的ProducerBatch還有空餘的記憶體,但是接下來的一條訊息很大,不足以加上上一個Batch中,會怎麼辦呢?
那麼會建立新的ProducerBatch。
❝
那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?
觸發建立ProducerBatch的那條訊息預估大小大於batch。size ,則以預估記憶體建立。 否則,以batch。size建立。
還有一個問題供大家思考:
當訊息還儲存在快取中的時候, 假如Producer客戶端掛掉了,訊息是不是就丟失了?
進由滴滴工程師們建立的kafka中文社群