您當前的位置:首頁 > 書法

圖解Kafka Producer 訊息快取模型

作者:由 石臻臻的雜貨鋪 發表于 書法時間:2022-03-17

GitHub - didi/LogiKM: 一站式Apache Kafka叢集指標監控與運維管控平臺

✏️更強大的管控能力✏️ 更高效的問題定位能力 更便捷的叢集運維能力 更專業的資源治理 更友好的運維生態

大家好,我是彥祖呀

在閱讀本文之前, 希望你可以思考一下下面幾個問題, 帶著問題去閱讀文章會獲得更好的效果。

傳送訊息的時候, 當Broker掛掉了,訊息體還能寫入到訊息快取中嗎?

當訊息還儲存在快取中的時候, 假如Producer客戶端掛掉了,訊息是不是就丟失了?

當最新的ProducerBatch還有空餘的記憶體,但是接下來的一條訊息很大,不足以加上上一個Batch中,會怎麼辦呢?

那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?

原文地址:圖解Kafka Producer 訊息快取模型

什麼是訊息累加器RecordAccumulator

kafka為了提高Producer客戶端的傳送吞吐量和提高效能,選擇了將訊息暫時快取起來,等到滿足一定的條件, 再進行批次傳送, 這樣可以減少網路請求,提高吞吐量。

而快取這個訊息的就是RecordAccumulator類。

圖解Kafka Producer 訊息快取模型

上圖就是整個訊息存放的快取模型,我們接下來一個個來講解。

訊息快取模型

圖解Kafka Producer 訊息快取模型

上圖表示的就是 訊息快取的模型, 生產的訊息就是暫時存放在這個裡面。

每條訊息,我們按照TopicPartition維度,把他們放在不同的

Deque

佇列裡面。 TopicPartition相同,會在相同

Deque

的裡面。

ProducerBatch

: 表示同一個批次的訊息, 訊息真正傳送到Broker端的時候都是按照批次來發送的, 這個批次可能包含一條或者多條訊息。

如果沒有找到訊息對應的ProducerBatch佇列, 則建立一個佇列。

找到ProducerBatch佇列隊尾的Batch,發現Batch還可以塞下這條訊息,則將訊息直接塞到這個Batch中

找到ProducerBatch佇列隊尾的Batch,發現Batch中剩餘記憶體,不夠塞下這條訊息,則會建立新的Batch

當訊息傳送成功之後, Batch會被釋放掉。

ProducerBatch的記憶體大小

❝ 那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?

先說結論: 當訊息預估記憶體大於

batch。size

的時候,則按照訊息預估記憶體建立, 否則按照

batch。size

的大小建立(預設16k)。

我們來看一段程式碼,這段程式碼就是在建立ProducerBatch的時候預估記憶體的大小

RecordAccumulator#append

/**

* 公眾號: 石臻臻的雜貨鋪

* 微信:szzdzhp001

**/

// 找到 batch。size 和 這條訊息在batch中的總記憶體大小的 最大值

int size = Math。max(this。batchSize, AbstractRecords。estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

// 申請記憶體

buffer = free。allocate(size, maxTimeToBlock);

假設當前生產了一條訊息為M, 剛好訊息M找不到可以存放訊息的ProducerBatch(不存在或者滿了),那麼這個時候就需要建立一個新的ProducerBatch了

預估訊息的大小 跟

batch。size

預設大小16384(16kb)。 對比,取最大值用於申請的記憶體大小的值。

原文地址:圖解Kafka Producer 訊息快取模型

❝ 那麼, 這個訊息的預估是如何預估的?純粹的是訊息體的大小嗎?

DefaultRecordBatch#estimateBatchSizeUpperBound

預估需要的Batch大小,是一個預估值,因為沒有考慮壓縮演算法從額外開銷

/**

* 使用給定的鍵和值獲取只有一條記錄的批次大小的上限。

* 這只是一個估計,因為它沒有考慮使用的壓縮演算法的額外開銷。

**/

static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) {

return RECORD_BATCH_OVERHEAD + DefaultRecord。recordSizeUpperBound(key, value, headers);

}

預估這個訊息M的大小 + 一個

RECORD_BATCH_OVERHEAD

的大小

RECORD_BATCH_OVERHEAD

是一個Batch裡面的一些基本元資訊,總共佔用了

61B

訊息M的大小也並不是單單的只有訊息體的大小,總大小=(key,value,headers)的大小+

MAX_RECORD_OVERHEAD

MAX_RECORD_OVERHEAD

:一條訊息頭最大佔用空間, 最大值為

21B

也就是說建立一個ProducerBatch,最少就要83B 。

比如我傳送一條訊息 “ 1 ” , 預估得到的大小是 86B, 跟

batch。size(預設16384)

相比取最大值。 那麼申請記憶體的時候取最大值 16384 。

關於Batch的結構和訊息的結構,我們回頭

單獨用一篇文章來講解

記憶體分配

我們都知道RecordAccumulator裡面的快取大小是一開始定義好的, 由

buffer。memory

控制, 預設33554432 (32M)

當生產的速度大於傳送速度的時候,就可能出現Producer寫入阻塞。

而且頻繁的建立和釋放ProducerBatch,會導致頻繁GC, 所有kafka中有個快取池的概念,這個快取池會被重複使用,但是隻有固定( batch。size)的大小才能夠使用快取池。

PS:以下16k指得是 batch.size的預設值.

Batch的建立和釋放

1. 記憶體16K 快取池中有可用記憶體

①。 建立Batch的時候, 會去快取池中,獲取隊首的一塊記憶體ByteBuffer 使用。

②。 訊息傳送完成,釋放Batch, 則會把這個ByteBuffer,放到快取池的隊尾中,並且呼叫

ByteBuffer。clear

清空資料。以便下次重複使用

圖解Kafka Producer 訊息快取模型

2. 記憶體16K 快取池中無可用記憶體

①。 建立Batch的時候, 去非快取池中的記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池nonPooledAvailableMemory 減少 16K 的記憶體, 然後Batch正常建立就行了,

不要誤以為好像真的發生了記憶體的轉移。

②。 訊息傳送完成,釋放Batch, 則會把這個ByteBuffer,放到快取池的隊尾中,並且呼叫

ByteBuffer。clear

清空資料, 以便下次重複使用

圖解Kafka Producer 訊息快取模型

原文地址:圖解Kafka Producer 訊息快取模型

3. 記憶體非16K 非快取池中記憶體夠用

①。 建立Batch的時候, 去非快取池(nonPooledAvailableMemory)記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池(nonPooledAvailableMemory)

減少

對應的記憶體, 然後Batch正常建立就行了,

不要誤以為好像真的發生了記憶體的轉移。

②。 訊息傳送完成,釋放Batch, 純粹的是在非快取池(nonPooledAvailableMemory)中加上剛剛釋放的Batch記憶體大小。 當然這個Batch會被GC掉

圖解Kafka Producer 訊息快取模型

4. 記憶體非16K 非快取池記憶體不夠用

①。 先嚐試將 快取池中的記憶體一個一個釋放到 非快取池中, 直到非快取池中的記憶體夠用與建立Batch了

②。 建立Batch的時候, 去非快取池(nonPooledAvailableMemory)記憶體獲取一部分記憶體用於建立Batch。 注意:這裡說的獲取記憶體給Batch, 其實就是讓 非快取池(nonPooledAvailableMemory)

減少

對應的記憶體, 然後Batch正常建立就行了,

不要誤以為好像真的發生了記憶體的轉移。

③。 訊息傳送完成,釋放Batch, 純粹的是在非快取池(nonPooledAvailableMemory)中加上剛剛釋放的Batch記憶體大小。 當然這個Batch會被GC掉

例如: 下面我們需要建立 48k的batch, 因為超過了16k,所以需要在非快取池中分配記憶體, 但是非快取池中當前可用記憶體為0 , 分配不了, 這個時候就會嘗試去 快取池裡面釋放一部分記憶體到 非快取池。

釋放第一個ByteBuffer(16k) 不夠,則繼續釋放第二個,直到釋放了3個之後總共48k,發現記憶體這時候夠了, 再去建立Batch。

圖解Kafka Producer 訊息快取模型

注意:這裡我們涉及到的 非快取池中的記憶體分配, 僅僅指的的記憶體數字的增加和減少。

問題和答案

傳送訊息的時候, 當Broker掛掉了,訊息體還能寫入到訊息快取中嗎?

當Broker掛掉了,Producer會提示下面的警告⚠️, 但是

傳送訊息

過程中

這個訊息體還是可以

寫入到 訊息快取中

的,也僅僅是寫到到快取中而已。

WARN [Producer clientId=console-producer] Connection to node 0 (/172。23。164。192:9090) could not be established。 Broker may not be available

圖解Kafka Producer 訊息快取模型

當最新的ProducerBatch還有空餘的記憶體,但是接下來的一條訊息很大,不足以加上上一個Batch中,會怎麼辦呢?

那麼會建立新的ProducerBatch。

那麼建立ProducerBatch的時候,應該分配多少的記憶體呢?

觸發建立ProducerBatch的那條訊息預估大小大於batch。size ,則以預估記憶體建立。 否則,以batch。size建立。

還有一個問題供大家思考:

當訊息還儲存在快取中的時候, 假如Producer客戶端掛掉了,訊息是不是就丟失了?

進由滴滴工程師們建立的kafka中文社群

圖解Kafka Producer 訊息快取模型

圖解Kafka Producer 訊息快取模型

標簽: 快取  batch  記憶體  訊息  建立