您當前的位置:首頁 > 繪畫

位元組跳動基於Flink的MQ-Hive實時資料整合

作者:由 Flink 中文社群 發表于 繪畫時間:2020-07-15

在資料中臺建設過程中,一個典型的資料整合場景是將 MQ (Message Queue,例如 Kafka、RocketMQ 等)的資料匯入到 Hive 中,以供下游數倉建設以及指標統計。由於 MQ-Hive 是數倉建設第一層,因此對資料的準確性以及實時性要求比較高。

本文主要圍繞 MQ-Hive 場景,針對目前位元組跳動內已有解決方案的痛點,提出基於 Flink 的實時解決方案,並介紹新方案在位元組跳動內部的使用現狀。

已有方案及痛點

位元組跳動內已有解決方案如下圖所示,主要分了兩個步驟:

透過 Dump 服務將 MQ 的資料寫入到 HDFS 檔案

再透過 Batch ETL 將 HDFS 資料匯入到 Hive 中,並新增 Hive 分割槽

位元組跳動基於Flink的MQ-Hive實時資料整合

痛點

任務鏈較長,原始資料需要經過多次轉換最終才能進入 Hive

實時性比較差,Dump Service、Batch ETL 延遲都會導致最終資料產出延遲

儲存、計算開銷大,MQ 資料重複儲存和計算

基於原生 Java 打造,資料流量持續增長後,存在單點故障和機器負載不均衡等問題

運維成本較高,架構上無法複用公司內 Hadoop/Flink/Yarn 等現有基礎設施

不支援異地容災

基於 Flink 實時解決方案

優勢

針對目前公司傳統解決方案的痛點,我們提出基於 Flink 的實時解決方案,將 MQ 的資料實時寫入到 Hive,並支援事件時間以及 Exactly Once 語義。相比老方案,新方案優勢如下所示:

基於流式引擎 Flink 開發,支援 Exactly Once 語義

實時性更高,MQ 資料直接進入 Hive,無中間計算環節

減少中間儲存,整個流程資料只會落地一次

支撐 Yarn 部署模式,方便使用者遷移

資源管理彈性,方便擴容以及運維

支援雙機房容災

整體架構

整體架構如下圖所示,主要包括 DTS(Data Transmission Service) Source、DTS Core、DTS Sink 三大模組,具體功能如下:

DTS Source 接入不同 MQ 資料來源,支援 Kafka、RocketMQ 等

DTS Sink 將資料輸出到目標資料來源,支援 HDFS、Hive 等

DTS Core 貫穿整個資料同步流程,透過 Source 讀取源端資料,經過 DTS Framework 處理,最後透過 Sink 將資料輸出到目標端。

DTS Framework 整合型別系統、檔案切分、Exactly Once、任務資訊採集、事件時間、髒資料收集等核心功能

支援 Yarn 部署模式,資源排程、管理比較彈性

位元組跳動基於Flink的MQ-Hive實時資料整合

DTS Dump架構圖

Exactly Once

Flink 框架透過 Checkpoint 機制,能夠提供 Exactly Once 或者 At Least Once 語義。為了實現 MQ-Hive 全鏈路支援 Exactly-once 語義,還需要 MQ Source、Hive Sink 端支援 Exactly Once 語義。本文透過 Checkpoint + 2PC 協議實現,具體過程如下:

資料寫入時,Source 端從上游 MQ 拉取資料併發送到 Sink 端;Sink 端將資料寫入到臨時目錄中

Checkpoint Snapshot 階段,Source 端將 MQ Offset 儲存到 State 中;Sink 端關閉寫入的檔案控制代碼,並儲存當前 Checkpoint ID 到 State 中;

Checkpoint Complete 階段,Source 端 Commit MQ Offset;Sink 端將臨時目錄中的資料移動到正式目錄下

Checkpoint Recover 階段,載入最新一次成功的 Checkpoint 目錄並恢復 State 資訊,其中 Source 端將 State 中儲存的 MQ Offset 作為起始位置;Sink 端恢復最新一次成功的 Checkpoint ID,並將臨時目錄的資料移動到正式目錄下

■ 實現最佳化

在實際使用場景中,特別是大併發場景下,HDFS 寫入延遲容易有毛刺,因為個別 Task Snapshot 超時或者失敗,導致整個 Checkpoint 失敗的問題會比較明顯。因此針對 Checkpoint 失敗,提高系統的容錯性以及穩定性就比較重要。這裡充分利用 Checkpoint ID 嚴格單調遞增的特性,每一次做 Checkpoint 時,當前 Checkpoint ID 一定比以前大,因此在 Checkpoint Complete 階段,可以提交小於等於當前 Checkpoint ID 的臨時資料。具體最佳化策略如下:

Sink 端臨時目錄為{dump_path}/{next_cp_id},這裡 next_cp_id 的定義是當前最新的 cp_id + 1

Checkpoint Snapshot 階段,Sink 端儲存當前最新 cp_id 到 State,同時更新 next_cp_id 為 cp_id + 1

Checkpoint Complete 階段,Sink 端將臨時目錄中所有小於等於當前 cp_id 的資料移動到正式目錄下

Checkpoint Recover 階段,Sink 端恢復最新一次成功的 cp_id,並將臨時目錄中小於等於當前 cp_id 的資料移動到正式目錄下

型別系統

由於不同資料來源支援的資料型別不一樣,為了解決不同資料來源間的資料同步以及不同型別轉換相容的問題,我們支援了 DTS 型別系統,DTS 型別可細化為基礎型別和複合型別,其中複合型別支援型別巢狀,具體轉換流程如下:

在 Source 端,將源資料型別,統一轉成系統內部的 DTS 型別

在 Sink 端,將系統內部的 DTS 型別轉換成目標資料來源型別

其中 DTS 型別系統支援不同型別間的相互轉換,比如 String 型別與 Date 型別的相互轉換

位元組跳動基於Flink的MQ-Hive實時資料整合

DTS Dump架構圖

Rolling Policy

Sink 端是併發寫入,每個 Task 處理的流量不一樣,為了避免生成太多的小檔案或者生成的檔案過大,需要支援自定義檔案切分策略,以控制單個檔案的大小。目前支援三種檔案切分策略:檔案大小、檔案最長未更新時間、Checkpoint。

■ 最佳化策略

Hive 支援 Parquet、Orc、Text 等多種儲存格式,不同的儲存格式資料寫入過程不太一樣,具體可以分為兩大類:

RowFormat:基於單條寫入,支援按照 Offset 進行 HDFS Truncate 操作,例如 Text 格式

BulkFormat:基於 Block 寫入,不支援 HDFS Truncate 操作,例如 Parquet、ORC 格式

為了保障 Exactly Once 語義,並同時支援 Parquet、Orc、Text 等多種格式,在每次 Checkpoint 時,強制做檔案切分,保證所有寫入的檔案都是完整的,Checkpoint 恢復時不用做 Truncate 操作。

容錯處理

理想情況下流式任務會一直執行不需要重啟,但實際不可避免會遇到以下幾個場景:

Flink 計算引擎升級,需要重啟任務

上游資料增加,需要調整任務併發度

Task Failover

■ 併發度調整

目前 Flink 原生支援 State Rescale。具體實現中,在 Task 做 Checkpoint Snapshot 時,將 MQ Offset 儲存到 ListState 中;Job 重啟後,Job Master 會根據 Operator 併發度,將 ListState 平均分配到各個 Task 上。

■ Task Failover

由於網路抖動、寫入超時等外部因素的影響,Task 不可避免會出現寫入失敗,如何快速、準確的做 Task Failover 就顯得比較重要。目前 Flink 原生支援多種 Task Failover 策略,本文使用 Region Failover 策略,將失敗 Task 所在 Region 的所有 Task 都重啟。

異地容災

■ 背景

大資料時代,資料的準確性和實時性顯得尤為重要。本文提供多機房部署及異地容災解決方案,當主機房因為斷網、斷電、地震、火災等原因暫時無法對外提供服務時,能快速將服務切換到備災機房,並同時保障 Exactly Once 語義。

■ 容災元件

整體解決方案需要多個容災元件一起配合實現,容災元件如下圖所示,主要包括 MQ、YARN、HDFS,具體如下:

MQ 需要支援多機房部署,當主機房故障時,能將 Leader 切換到備機房,以供下游消費

Yarn 叢集在主機房、備機房都有部署,以便 Flink Job 遷移

下游 HDFS 需要支援多機房部署,當主機房故障時,能將 Master 切換到備機房

Flink Job 執行在 Yarn 上,同時任務 State Backend 儲存到 HDFS,透過 HDFS 的多機房支援保障 State Backend 的多機房

位元組跳動基於Flink的MQ-Hive實時資料整合

■ 容災過程

整體容災過程如下所示:

正常情況下,MQ Leader 以及 HDFS Master 部署在主機房,並將資料同步到備機房。同時 Flink Job 執行在主機房,並將任務 State 寫入到 HDFS 中,注意 State 也是多機房部署模式

災難情況下,MQ Leader 以及 HDFS Master 從主機房遷移到備災機房,同時 Flink Job 也遷移到備災機房,並透過 State 恢復災難前的 Offset 資訊,以提供 Exactly Once 語義

位元組跳動基於Flink的MQ-Hive實時資料整合

位元組跳動基於Flink的MQ-Hive實時資料整合

事件時間歸檔

■ 背景

在數倉建設中,處理時間(Process Time)和事件時間(Event Time)的處理邏輯不太一樣,對於處理時間會將資料寫到當前系統時間所對應的時間分割槽下;對於事件時間,則是根據資料的生產時間將資料寫到對應時間分割槽下,本文也簡稱為歸檔。在實際場景中,不可避免會遇到各種上下游故障,並在持續一段時間後恢復,如果採用 Process Time 的處理策略,則事故期間的資料會寫入到恢復後的時間分割槽下,最終導致分割槽空洞或者資料漂移的問題;如果採用歸檔的策略,會按照事件時間寫入,則沒有此類問題。由於上游資料事件時間會存在亂序,同時 Hive 分割槽生成後就不應該再繼續寫入,因此實際寫入過程中不可能做到無限歸檔,只能在一定時間範圍內歸檔。歸檔的難點在於如何確定全域性最小歸檔時間以及如何容忍一定的亂序。

全域性最小歸檔時間

Source 端是併發讀取,並且一個 Task 可能同時讀取多個 MQ Partition 的資料,對於 MQ 的每一個 Parititon 會儲存當前分割槽歸檔時間,取分割槽中最小值作為 Task 的最小歸檔時間,最終取 Task 中最小值,作為全域性最小歸檔時間。

位元組跳動基於Flink的MQ-Hive實時資料整合

■ 亂序處理

為了支援亂序的場景,會支援一個歸檔區間的設定,其中 Global Min Watermark 為全域性最小歸檔時間,Partition Watermark 為分割槽當前歸檔時間,Partition Min Watermark 為分割槽最小歸檔時間,只有當事件時間滿足以下條件時,才會進行歸檔:

事件時間大於全域性最小歸檔時間

事件時間大於分割槽最小歸檔時間

位元組跳動基於Flink的MQ-Hive實時資料整合

Hive 分割槽生成

■ 原理

Hive 分割槽生成的難點在於如何確定分割槽的資料是否就緒以及如何新增分割槽。由於 Sink 端是併發寫入,同時會有多個 Task 寫同一個分割槽資料,因此只有當所有 Task 分割槽資料寫入完成,才能認為分割槽資料是就緒,本文解決思路如下:

在 Sink 端,對於每個 Task 儲存當前最小處理時間,需要滿足單調遞增的特性

在 Checkpoint Complete 時,Task 上報最小處理時間到 JM 端

JM 拿到所有 Task 的最小處理時間後,可以得到全域性最小處理時間,並以此作為 Hive 分割槽的最小就緒時間

當最小就緒時間更新時,可判斷是否新增 Hive 分割槽

位元組跳動基於Flink的MQ-Hive實時資料整合

■ 動態分割槽

動態分割槽是根據上游輸入資料的值,確定資料寫到哪個分割槽目錄,而不是寫到固定分割槽目錄,例如 date={date}/hour={hour}/app={app}的場景,根據分割槽時間以及 app 欄位的值確定最終的分割槽目錄,以實現每個小時內,相同的 app 資料在同一個分割槽下。在靜態分割槽場景下,每個 Task 每次只會寫入一個分割槽檔案,但在動態分割槽場景下,每個 Task 可能同時寫入多個分割槽檔案。對於 Parque 格式的寫入,會先將資料寫到做本地快取,然後批次寫入到 Hive,當 Task 同時處理的檔案控制代碼過多時,容易出現 OOM。為了防止單 Task OOM,會週期性對檔案控制代碼做探活檢測,及時釋放長時間沒有寫入的檔案控制代碼。

位元組跳動基於Flink的MQ-Hive實時資料整合

Messenger

Messenger 模組用於採集 Job 執行狀態資訊,以便衡量 Job 健康度以及大盤指標建設。

元資訊採集

元資訊採集的原理如下所示,在 Sink 端透過 Messenger 採集 Task 的核心指標,例如流量、QPS、髒資料、寫入 Latency、事件時間寫入效果等,並透過 Messenger Collector 彙總。其中髒資料需要輸出到外部儲存中,任務執行指標輸出到 Grafana,用於大盤指標展示。

位元組跳動基於Flink的MQ-Hive實時資料整合

■ 髒資料收集

資料整合場景下,不可避免會遇到髒資料,例如型別配置錯誤、欄位溢位、型別轉換不相容等場景。對於流式任務來說,由於任務會一直執行,因此需要能夠實時統計髒資料流量,並且將髒資料儲存到外部儲存中以供排查,同時在執行日誌中取樣輸出。

■ 大盤監控

大盤指標覆蓋全域性指標以及單個 Job 指標,包括寫入成功流量和 QPS、寫入 Latency、寫入失敗流量和 QPS、歸檔效果統計等,具體如下圖所示:

位元組跳動基於Flink的MQ-Hive實時資料整合

位元組跳動基於Flink的MQ-Hive實時資料整合

未來規劃

基於 Flink 實時解決方案目前已在公司上線和推廣,未來主要關注以下幾個方面:

資料整合功能增強,支援更多資料來源的接入,支援使用者自定義資料轉換邏輯等

Data Lake 打通,支援 CDC 資料實時匯入

流批架構統一,支援全量、增量場景資料整合

架構升級,支援更多部署環境,比如 K8S

服務化完善,降低使用者接入成本

總結

隨著位元組跳動業務產品逐漸多元化快速發展,位元組跳動內部一站式大資料開發平臺功能也越來越豐富,並提供離線、實時、全量、增量場景下全域資料整合解決方案,從最初的幾百個任務規模增長到數萬級規模,日處理資料達到 PB 級,其中基於 Flink 實時解決方案目前已在公司內部大力推廣和使用,並逐步替換老的 MQ-Hive 鏈路。

參考文獻:

Real-time Exactly-once ETL with Apache Flink

http://

shzhangji。com/blog/2018

/12/23/real-time-exactly-once-etl-with-apache-flink/

Implementing the Two-Phase Commit Operator in Flink

https://

flink。apache。org/featur

es/2018/03/01/end-to-end-exactly-once-apache-flink。html

A Deep Dive into Rescalable State in Apache Flink

https://

flink。apache。org/featur

es/2017/07/04/flink-rescalable-state。html

Data Streaming Fault Tolerance

https://

ci。apache。org/projects/

flink/flink-docs-release-1。9/internals/stream_checkpointing。html

標簽: 寫入  task  資料  分割槽  MQ