DLF +DDI 一站式資料湖構建與分析最佳實踐
背景
隨著資料時代的不斷髮展,資料量爆發式增長,資料形式也變的更加多樣。傳統資料倉庫模式的成本高、響應慢、格式少等問題日益凸顯。於是擁有成本更低、資料形式更豐富、分析計算更靈活的資料湖應運而生。
資料湖作為一個集中化的資料儲存倉庫,支援的資料型別具有多樣性,包括結構化、半結構化以及非結構化的資料,資料來源上包含資料庫資料、binglog 增量資料、日誌資料以及已有數倉上的存量資料等。資料湖能夠將這些不同來源、不同格式的資料集中儲存管理在高性價比的儲存如 OSS 等物件儲存中,並對外提供統一的資料目錄,支援多種計算分析方式,有效解決了企業中面臨的資料孤島問題,同時大大降低了企業儲存和使用資料的成本。
資料湖架構及關鍵技術
企業級資料湖架構如下:
資料湖儲存與格式
資料湖儲存主要以雲上物件儲存作為主要介質,其具有低成本、高穩定性、高可擴充套件性等優點。
資料湖上我們可以採用支援 ACID 的資料湖儲存格式,如 Delta Lake、Hudi、Iceberg。這些資料湖格式有自己的資料 meta 管理能力,能夠支援 Update、Delete 等操作,以批流一體的方式解決了大資料場景下資料實時更新的問題。在當前方案中,我們主要介紹Delta Lake的核心能力和應用場景。
Delta Lake 的核心能力
Delta Lake 是一個統一的資料管理系統,為雲上資料湖帶來資料可靠性和快速分析。Delta Lake 執行在現有資料湖之上,並且與 Apache Spark 的 API 完全相容。使用Delta Lake,您可以加快高質量資料匯入資料湖的速度,團隊也可以在雲服務上快速使用這些資料,安全且可擴充套件。
ACID 事務性
:Delta Lake 在多個寫操作之間提供 ACID 事務性。每一次寫操作都是一個事務操作,事務日誌(Transaction Log)中記錄的寫操作都有一個順序序列。事務日誌(Transaction Log)跟蹤了檔案級別的寫操作,並使用了樂觀鎖進行併發控制,這非常適用於資料湖,因為嘗試修改相同檔案的多次寫操作的情況並不經常發生。當發生衝突時,Delta Lake 會丟擲一個併發修改異常,拋給供使用者處理並重試其作業。Delta Lake 還提供了最高級別的隔離(可序列化隔離),允許工程師不斷地向目錄或表寫入資料,而使用者不斷地從同一目錄或表讀取資料,讀取資料時會看到資料的最新快照。
Schema 管理(Schema management)
:Delta Lake 會自動驗證正在寫入的DataFrame 的 Schema 是否與表的 Schema 相容。若表中存在但 DataFrame 中不存在的列則會被設定為 null。如果 DataFrame 中有額外的列不在表中,那麼該操作將會丟擲異常。Delta Lake 具有 DDL(資料定義語言)顯式新增新列的功能,並且能夠自動更新 Schema。
可伸縮的元資料(Metadata)處理
:Delta Lake 將表或目錄的元資料資訊儲存在事務日誌(Transaction Log)中,而不是元資料 Metastore 中。這使得 Delta Lake夠在固定時間內列出大目錄中的檔案,並且在讀取資料時效率很高。
資料版本控制和時間旅行(Time Travel)
:Delta Lake 允許使用者讀取表或目錄的歷史版本快照。當檔案在寫入過程中被修改時,Delta Lake 會建立檔案的新的版本並保留舊版本。當用戶想要讀取表或目錄的較舊版本時,他們可以向 Apach Spark的 read API 提供時間戳或版本號,Delta Lake 根據事務日誌(Transaction Log)中的資訊來構建該
時間戳
或版本的完整快照。這非常方便使用者來複現實驗和報告,如果需要,還可以將表還原為舊版本。
統一批流一體
:除了批處理寫入之外,Delta Lake 還可以作為 Apache Spark 的結構化流的高效流接收器(Streaming Sink)。與 ACID 事務和可伸縮元資料處理相結合,高效的流接收器(Streaming Sink)支援大量近實時的分析用例,而無需維護複雜的流和批處理管道。
記錄更新和刪除
:Delta Lake 將支援合併、更新和刪除的 DML(資料管理語言)命令。這使得工程師可以輕鬆地在資料湖中插入和刪除記錄,並簡化他們的變更資料捕獲和 GDPR(一般資料保護條例)用例。由於 Delta Lake 在檔案級粒度上進行跟蹤和修改資料,因此它比讀取和覆蓋整個分割槽或表要高效得多。
資料湖構建與管理
1. 資料入湖
企業的原始資料存在於多種資料庫或儲存系統,如關係資料庫 MySQL、日誌系統SLS、NoSQL 儲存 HBase、訊息資料庫 Kafka 等。其中大部分的線上儲存都面向線上事務型業務,並不適合線上分析的場景,所以需要將資料以無侵入的方式同步至成本更低且更適合計算分析的物件儲存。
常用的資料同步方式有基於 DataX、Sqoop 等資料同步工具做批次同步;同時在對於實時性要求較高的場景下,配合使用 Kafka+spark Streaming / flink 等流式同步鏈路。目前很多雲廠商提供了一站式入湖的解決方案,幫助客戶以更快捷更低成本的方式實現資料入湖,如阿里雲 DLF 資料入湖。
2. 統一元資料服務
物件儲存本身是沒有面向大資料分析的語義的,需要結合 Hive Metastore Service 等元資料服務為上層各種分析引擎提供資料的 Meta 資訊。資料湖元資料服務的設計目標是能夠在大資料引擎、儲存多樣性的環境下,構建不同儲存系統、格式和不同計算引擎統一元資料檢視,並具備統一的許可權、元資料,且需要相容和擴充套件開源大資料生態元資料服務,支援自動獲取元資料,並達到一次管理多次使用的目的,這樣既能夠相容開源生態,也具備極大的易用性。
資料湖計算與分析
相比於
資料倉庫
,資料湖以更開放的方式對接多種不同的計算引擎,如傳統開源大資料計算引擎 Hive、Spark、Presto、Flink 等,同時也支援雲廠商自研的大資料引擎,如阿里雲 MaxCompute、Hologres 等。在資料湖儲存與計算引擎之間,一般還會提供資料湖加速的服務,以提高計算分析的效能,同時減少頻寬的成本和壓力。
Databricks 資料洞察-商業版的 Spark 資料計算與分析引擎
DataBricks 資料洞察(DDI)做為阿里雲上全託管的 Spark 分析引擎,能夠簡單快速幫助使用者對資料湖的資料進行計算與分析。
Saas 全託管 Spark:
免運維,無需關注底層資源情況,降低運維成本,聚焦分析業務
完整 Spark 技術棧整合:
一站式整合 Spark 引擎和 Delta Lake 資料湖,100%相容開源 Spark 社群版;Databricks 做商業支援,最快體驗 Spark 最新版本特性
總成本降低:
商業版本 Spark 及 Delta Lake 效能優勢顯著;同時基於計算儲存分離架構,儲存依託阿里雲 OSS 物件儲存,藉助阿里雲 JindoFS 快取層加速;能夠有效降低叢集總體使用成本
高品質支援以及 SLA 保障:
阿里雲和 Databricks 提供覆蓋 Spark 全棧的技術支援;提供商業化 SLA 保障與7*24小時 Databricks 專家支援服務
Databricks 資料洞察+ DLF 資料湖構建與流批一體分析實踐
企業構建和應用資料湖一般需要經歷資料入湖、資料湖儲存與管理、資料湖探索與分析等幾個過程。本文主要介紹基於阿里雲資料湖構建(DLF)+Databricks 資料洞察(DDI)構建一站式的資料入湖,批流一體資料分析實戰。
流處理場景:
實時場景維護更新兩張 Delta 表:
delta_aggregates_func 表:RDS 資料實時入湖 。
delta_aggregates_metrics
表:工業 metric 資料透過 IoT 平臺採集到雲 Kafka ,經由 Spark Structured Streaming 實時入湖。
批處理場景:
以實時場景生成兩張 Delta 作為資料來源,進行資料分析執行 Spark jobs,透過 Databrick 資料洞察作業排程定時執行。
前置條件
1. 服務開通
確保 DLF、OSS、Kafka、DDI、RDS、DTS 等雲產品服務已開通。注意 DLF、RDS、Kafka、DDI 例項均需在同一 Region 下。
2. RDS 資料準備
RDS 資料準備,在 RDS 中建立資料庫 dlfdb。在賬戶中心建立能夠讀取 engine_funcs資料庫的使用者賬號,如 dlf_admin。
透過 DMS 登入資料庫,執行一下語句建立
engine_funcs
表,及插入少量資料。
CREATE TABLE `engine_funcs` ( `emp_no` int(11) NOT NULL,
`engine_serial_number` varchar(20) NOT NULL,
`engine_serial_name` varchar(20) NOT NULL,
`target_engine_serial_number` varchar(20) NOT NULL,
`target_engine_serial_name` varchar(20) NOT NULL,
`operator` varchar(16) NOT NULL,
`create_time` DATETIME NOT NULL,
`update_time` DATETIME NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
INSERT INTO `engine_funcs` VALUES (10001,‘1107108133’,‘temperature’,‘1107108144’,‘temperature’,‘/’, now(), now());
INSERT INTO `engine_funcs` VALUES (10002,‘1107108155’,‘temperature’,‘1107108133’,‘temperature’,‘/’, now(), now());
INSERT INTO `engine_funcs` VALUES (10003,‘1107108155’,‘runTime’,‘1107108166’,‘speed’,‘/’, now(), now());
INSERT INTO `engine_funcs` VALUES (10004,‘1107108177’,‘pressure’,‘1107108155’,‘electricity’,‘/’, now(), now());
INSERT INTO `engine_funcs` VALUES (10005,‘1107108188’,‘flow’ ,‘1107108111’,‘runTime’,‘/’, now(), now());
RDS資料實時入湖
1. 建立資料來源
進入 DLF 控制檯介面:
https://
dlf。console。aliyun。com/
cn-hangzhou/home
,點選選單 資料入湖 -> 資料來源管理。
點選 新建資料來源。填寫連線名稱,選擇資料準備中的使用的 RDS 例項,填寫賬號密碼,點選“連線測試”驗證網路連通性及賬號可用性。
點選下一步,確定,完成資料來源建立。
2. 建立元資料庫
在 OSS 中新建 Bucket,
databricks-data-source
;
點選左側選單“元資料管理”->“元資料庫”,點選“新建元資料庫”。填寫名稱,新建目錄 dlf/,並選擇。
3. 建立入湖任務
點選選單“資料入湖”->“入湖任務管理”,點選“新建入湖任務”。
選擇“
關係資料庫
實時入湖”,按照下圖的資訊填寫資料來源、目標資料湖、任務配置等資訊。並儲存。
配置資料來源,選擇剛才新建的“dlf”連線,使用表路徑 “dlf/engine_funcs”,選擇新建 dts 訂閱,填寫名稱。
回到任務管理頁面,點選“執行”新建的入湖任務。就會看到任務進入“初始化中”狀態,隨後會進入“執行”狀態。
點選“詳情”進入任務詳情頁,可以看到相應的資料庫表資訊。
該資料入湖任務,屬於全量+增量入湖,大約3至5分鐘後,全量資料會完成匯入,隨後自動進入實時監聽狀態。如果有資料更新,則會自動更新至 Delta Lake 資料中。
資料湖探索與分析
DLF 資料查詢探索
DLF 產品提供了輕量級的資料預覽和探索功能,點選選單“資料探索”->“SQL 查詢”進入資料查詢頁面。
在元資料庫表中,找到“fjl_dlf”,展開後可以看到 engine_funcs_delta 表已經自動建立完成。雙擊該表名稱,右側 sql 編輯框會出現查詢該表的 sql 語句,點選“執行”,即可獲得資料查詢結果。
回到 DMS 控制檯,執行下方 DELETE 和 INSERT SQL 語句。
DELETE FROM `engine_funcs` where `emp_no` = 10001;
UPDATE `engine_funcs` SET `operator` = ‘+’, `update_time` = NOW() WHERE `emp_no` =10002;
INSERT INTO `engine_funcs` VALUES (20001,‘1107108199’,‘speed’,‘1107108122’,‘runTime’,‘*’, now(), now());
大約1至3分鐘後,在 DLF 資料探索再次執行剛才的 select 語句,所有的資料更新已經同步至資料湖中。
建立 Databricks 資料洞察(DDI)叢集
叢集建立完成後,點選“詳情”進入詳情頁,添加當前訪問機器 ip 白名單。
點選 Notebook 進入互動式分析頁查詢同步至 Delta Lake 中 engine_funcs_delta 表資料。
IoT 平臺採集到雲 Kafka 資料實時寫入 Delta Lake
1.引入 spark-sql-kafka 三方依賴
%spark。conf
spark。jars。packages org。apache。spark:spark-sql-kafka-0-10_2。12:3。0。1
2.使用 UDF 函式定義流資料寫入 Delta Lake 的 Merge 規則
發往 Kafka 的測試資料的格式:
{“sn”: “1107108111”,“temperature”: “12” ,“speed”:“1115”, “runTime”:“160”,“pressure”:“210”,“electricity”:“380”,“flow”:“740”,“dia”:“330”}
{“sn”: “1107108122”,“temperature”: “13” ,“speed”:“1015”, “runTime”:“150”,“pressure”:“220”,“electricity”:“390”,“flow”:“787”,“dia”:“340”}
{“sn”: “1107108133”,“temperature”: “14” ,“speed”:“1215”, “runTime”:“140”,“pressure”:“230”,“electricity”:“377”,“flow”:“777”,“dia”:“345”}
{“sn”: “1107108144”,“temperature”: “15” ,“speed”:“1315”, “runTime”:“145”,“pressure”:“240”,“electricity”:“367”,“flow”:“730”,“dia”:“430”}
{“sn”: “1107108155”,“temperature”: “16” ,“speed”:“1415”, “runTime”:“155”,“pressure”:“250”,“electricity”:“383”,“flow”:“750”,“dia”:“345”}
{“sn”: “1107108166”,“temperature”: “10” ,“speed”:“1515”, “runTime”:“145”,“pressure”:“260”,“electricity”:“350”,“flow”:“734”,“dia”:“365”}
{“sn”: “1107108177”,“temperature”: “12” ,“speed”:“1115”, “runTime”:“160”,“pressure”:“210”,“electricity”:“377”,“flow”:“733”,“dia”:“330”}
{“sn”: “1107108188”,“temperature”: “13” ,“speed”:“1015”, “runTime”:“150”,“pressure”:“220”,“electricity”:“381”,“flow”:“737”,“dia”:“340”}
{“sn”: “1107108199”,“temperature”: “14” ,“speed”:“1215”, “runTime”:“140”,“pressure”:“230”,“electricity”:“378”,“flow”:“747”,“dia”:“345”}
%spark
import org。apache。spark。sql。_
import io。delta。tables。_
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF。createOrReplaceTempView(“dataStream”)
// 對流資料DF執行列轉行的操作;
val df=microBatchOutputDF。sparkSession。sql(s“”“
select `sn`,
stack(7, ‘temperature’, `temperature`, ‘speed’, `speed`, ‘runTime’, `runTime`, ‘pressure’, `pressure`, ‘electricity’, `electricity`, ‘flow’, `flow` , ‘dia’, `dia`) as (`name`, `value` )
from dataStream
”“”)
df。createOrReplaceTempView(“updates”)
// 實現實時更新動態的資料,結果merge到表裡面
val mergedf=df。sparkSession。sql(s“”“
MERGE INTO delta_aggregates_metrics t
USING updates s
ON s。sn = t。sn and s。name=t。name
WHEN MATCHED THEN UPDATE SET
t。value = s。value,
t。update_time=current_timestamp()
WHEN NOT MATCHED THEN INSERT
(t。sn,t。name,t。value ,t。create_time,t。update_time)
values (s。sn,s。name,s。value,current_timestamp(),current_timestamp())
”“”)
}
3.使用 Spark Structured Streaming 實時流寫入 Delta Lake
%spark
import org。apache。spark。sql。functions。_
import org。apache。spark。sql。streaming。Trigger
def getquery(checkpoint_dir:String,servers:String,topic:String ){
var streamingInputDF =
spark。readStream
。format(“kafka”)
。option(“kafka。bootstrap。servers”, servers)
。option(“subscribe”, topic)
。option(“startingOffsets”, “latest”)
。option(“minPartitions”, “10”)
。option(“failOnDataLoss”, “true”)
。load()
var streamingSelectDF =
streamingInputDF
。select(
get_json_object
(($“value”)。cast(“string”), “$。sn”)。alias(“sn”),
get_json_object(($“value”)。cast(“string”), “$。temperature”)。alias(“temperature”),
get_json_object(($“value”)。cast(“string”), “$。speed”)。alias(“speed”),
get_json_object(($“value”)。cast(“string”), “$。runTime”)。alias(“runTime”),
get_json_object(($“value”)。cast(“string”), “$。electricity”)。alias(“electricity”),
get_
json_object
(($“value”)。cast(“string”), “$。flow”)。alias(“flow”),
get_json_object(($“value”)。cast(“string”), “$。dia”)。alias(“dia”),
get_json_object(($“value”)。cast(“string”), “$。pressure”)。alias(“pressure”)
)
val query = streamingSelectDF
。writeStream
。format(“delta”)
。option(“checkpointLocation”, checkpoint_dir)
。trigger(Trigger。ProcessingTime(“5 seconds”)) // 執行流處理時間間隔
。foreachBatch(upsertToDelta _) //引用upsertToDelta函式
。outputMode(“update”)
。start()
}
4. 執行程式
%spark
val my_checkpoint_dir=“oss://databricks-data-source/checkpoint/ck”
val servers= “***。***。***。***:9092”
val topic= “your-topic”
getquery(my_checkpoint_dir,servers,topic)
5. 啟動 Kafka 並向生產裡傳送測試資料
查詢資料實時寫入並更新
查詢從 MySQL 實時同步入湖的 engine_funcs_delta 資料
%spark
val rds_dataV=spark。table(“fjl_dlf。engine_funcs_delta”)
rds_dataV。show()
批處理作業
結合業務,需要將對應的 delta_aggregates_metrics 裡的 Value 引數 join 到engine_funcs_delta 表裡
%spark
//讀取實時更新的delta_aggregates_metrics資料表
val aggregateDF=spark。table(“log_data_warehouse_dlf。delta_aggregates_metrics”)
//讀取實時更新的engine_funcs_delta函式表
val rds_dataV=spark。table(“fjl_dlf。engine_funcs_delta”)。drop(“create_time”,“update_time”)
// rds_dataV。show()
val aggregateSDF= aggregateDF。withColumnRenamed(“value”,“esn_value”)。withColumnRenamed(“name”,“engine_serial_name”)。withColumnRenamed(“sn”,“engine_serial_number”)
// aggregateSDF。show()
val aggregateTDF=aggregateDF。withColumnRenamed(“value”,“tesn_value”)。withColumnRenamed(“name”,“target_engine_serial_name”)。withColumnRenamed(“sn”,“target_engine_serial_number”)。drop(“create_time”,“update_time”)
// aggregateTDF。show()
//將對應的delta_aggregates_metrics裡的Value引數 join到engine_funcs_delta表裡;
val resdf=rds_dataV。join(aggregateSDF,Seq(“engine_serial_name”,“engine_serial_number”),“left”)。join(aggregateTDF,Seq(“target_engine_serial_number”,“target_engine_serial_name”),“left”)
。selectExpr(“engine_serial_number”,“engine_serial_name”,“esn_value”,“target_engine_serial_number”,“target_engine_serial_name”,“tesn_value”,“operator”,“create_time”,“update_time”)
//資料展示
resdf。show(false)
// 將結果寫入到Delta表裡面
resdf。write。format(“delta”)
。mode(“append”)
。saveAsTable(“log_data_warehouse_dlf。delta_result”)
效能最佳化:OPTIMIZE & Z-Ordering
在流處理場景下會產生大量的小檔案,大量小檔案的存在會嚴重影響資料系統的讀效能。Delta Lake 提供了 OPTIMIZE 命令,可以將小檔案進行合併壓縮,另外,針對 Ad-Hoc 查詢場景,由於涉及對單表多個維度資料的查詢,我們藉助 Delta Lake 提供的 Z-Ordering 機制,可以有效提升查詢的效能。從而極大提升讀取表的效能。DeltaLake 本身提供了 Auto Optimize 選項,但是會犧牲少量寫效能,增加資料寫入 delta 表的延遲。相反,執行 OPTIMIZE 命令並不會影響寫的效能,因為 Delta Lake 本身支援 MVCC,支援 OPTIMIZE 的同時併發執行寫操作。因此,我們採用定期觸發執行 OPTIMIZE 的方案,每小時透過 OPTIMIZE 做一次合併小檔案操作,同時執行 VACCUM 來清理過期資料檔案:
OPTIMIZE log_data_warehouse_dlf。delta_result ZORDER by engine_serial_number;
VACUUM log_data_warehouse_dlf。delta_result RETAIN 1 HOURS;
作者:
陳鑫偉(
熙康
),阿里雲 計算平臺事業部 技術專家 、馮加亮(
加亮
),阿里雲 計算平臺事業部 技術研發
原文連結
本文為阿里雲原創內容,未經允許不得轉載。