您當前的位置:首頁 > 繪畫

Flink的業務場景到底是什麼?

作者:由 MyEclipse 發表于 繪畫時間:2021-05-17

Flink的業務場景到底是什麼?Snail2021-06-02 09:48:32

Flink的業務場景到底是什麼?王知無2021-07-04 22:46:01

大資料領域自 2010 年開始,以 Hadoop、Hive 為代表的離線計算開始進入各大公司的視野。大資料領域開始瞭如火如荼的發展。我個人在學校期間就開始關注大資料領域的技術迭代和更新,並且有幸在畢業後成為大資料領域的開發者。

在過去的這幾年時間裡,以 Storm、Spark、Flink 為代表的實時計算技術接踵而至。2019 年阿里巴巴內部 Flink 正式開源。整個實時計算領域風起雲湧,一些普通的開發者因為業務需要或者個人興趣開始接觸Flink。Apache Flink(以下簡稱 Flink)一改過去實時計算領域為人詬病的缺陷,以其強大的計算能力和先進的設計理念,迅速成為實時計算領域先進生產力的代表。各大小公司紛紛開始在 Flink 的應用上進行探索。

其中最引人矚目的兩個方向便是:

實時計算平臺和實時資料倉庫。

我放幾張圖給你看:

Flink的業務場景到底是什麼?

Flink的業務場景到底是什麼?

Flink的業務場景到底是什麼?

如果你還不明白,可以參考這篇文章:

Flink在實時在實時計算平臺和實時數倉中的企業級應用小結

Flink的業務場景到底是什麼?知乎使用者2021-09-25 21:11:40

盤點Flink實戰踩過的坑:

資料傾斜導致子任務積壓

業務背景

一個流程中,有兩個重要子任務:一是資料遷移,將kafka實時資料落Es,二是將kafka資料做視窗聚合落hbase,兩個子任務接的是同一個Topic GroupId。上游 Topic 的 tps 高峰達到5-6w。

問題描述

給 24個 TaskManager(CPU) 都會出現來不及消費的情況

問題原因

做視窗聚合的任務的分組欄位,分組粒度太小,hash不能打散,資料傾斜嚴重,導致少數 TaskManager 上壓力過大,從而影響落Es的效率,導致背壓。

解決方式

將兩個任務獨立開來,作為不同的流程。

結果

修改之前 24個 TaskManager(CPU) 來不及消費,改完之後 20 個 CPU 可完成任務。Kafka實時資料落Es的16個TaskManager,將kafka資料做視窗聚合落hbase的4個TaskManager。

另:

同樣的資料、同樣的Tps作為資料輸入,Hbase的輸出能力遠超過Es,考慮實時任務落資料進Es要慎重。

Flink任務落Es時要考慮設定微批落資料,設定 bulk。flush。max。actions 和 bulk。flush。interval。ms至合適值,否則影響吞吐量。

Kafka 訊息大小預設配置太小,導致資料未處理

業務背景

正常的Flink任務消費 Topic 資料,但是Topic中的資料為 XML 以及 JSON,單條資料較大

問題描述

Flink各項metrics指標正常,但是沒處理到資料

問題原因

Topic中單條資料 > 1M,超過 Kafka Consumer 處理單條資料的預設最大值。

解決方式

有三種可選方式:擴大kafka consumer 單條資料的資料大小:fetch。message。max。bytes。對訊息進行壓縮:上游 kafka producer 設定 compression。codec 和 commpressed。topics。業務上對資料切片,在上游 kafka producer 端將資料切片為 10K,使用分割槽主鍵確保同一條資料傳送到同一Partition,consumer對訊息重組。

結果

方式一:按業務要求擴大 Kafka Consumer 可處理的單條資料位元組數即可正常處理業務 方式二:Kafka Consumer 需先解碼,再進行業務處理。方式三:Kafka Consumer 需先重組資料,再進行業務處理。

Tps 很大,Kafka Ack 預設配置 拖慢訊息處理速度

業務背景

實時任務,上游接流量頁面點選事件的資料,下游輸出Kafka,輸出tps很大。流量資料不重要,可接受丟失的情況

問題描述

CPU資源耗費較多的情況下,才能正常消費,考慮如果縮減資源。

問題原因

Kafka Producer 預設 acks=1,即Partition Leader接收到訊息而且寫入本地磁碟了,就認為成功了

解決方式

Kafka Producer 設定 :props。put(“acks”, “0”); 將 acks=0,即KafkaProducer在客戶端,只要把訊息傳送出去,不管那條資料有沒有在哪怕Partition Leader上落到磁碟,直接就認為這個訊息傳送成功了。

結果

資源降低三分之一。

The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed.

org。apache。flink。util。FlinkException: The assigned slot container_e08_1539148828017_15937_01_003564_0 was removed。 at org。apache。flink。runtime。resourcemanager。slotmanager。SlotManager。removeSlot(SlotManager。java:786) at org。apache。flink。runtime。resourcemanager。slotmanager。SlotManager。removeSlots(SlotManager。java:756) at org。apache。flink。runtime。resourcemanager。slotmanager。SlotManager。internalUnregisterTaskManager(SlotManager。java:948) at org。apache。flink。runtime。resourcemanager。slotmanager。SlotManager。unregisterTaskManager(SlotManager。java:372) at org。apache。flink。runtime。resourcemanager。ResourceManager。closeTaskManagerConnection(ResourceManager。java:803) at org。apache。flink。yarn。YarnResourceManager。lambda$onContainersCompleted$0(YarnResourceManager。java:340) at org。apache。flink。runtime。rpc。akka。AkkaRpcActor。handleRunAsync(AkkaRpcActor。java:332) at org。apache。flink。runtime。rpc。akka。AkkaRpcActor。handleRpcMessage(AkkaRpcActor。java:158) at org。apache。flink。runtime。rpc。akka。FencedAkkaRpcActor。handleRpcMessage(FencedAkkaRpcActor。java:70) at org。apache。flink。runtime。rpc。akka。AkkaRpcActor。onReceive(AkkaRpcActor。java:142)

程式記憶體佔用過大,導致TaskManager在yarn上kill了,分析原因應該是資源不夠,可以將程式放在資源更大的叢集上,再不行就設定減少Slot中共享的task的個數,也可能是記憶體洩露或記憶體資源配置不合理造成,需要進行合理分配。

The heartbeat of TaskManager with id container ....... timed out

此錯誤是container心跳超時,出現此種錯誤一般有兩種可能:

1、分散式物理機網路失聯,這種原因一般情況下failover後作業能正常恢復,如果出現的不頻繁可以不用關注;2、failover的節點對應TM的記憶體設定太小,GC嚴重導致心跳超時,建議調大對應節點的記憶體值。

Caused by: java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@flink88:15265/user/taskmanager_0#6643546564]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation".

在flink-conf。yaml中新增或修改:akka。ask。timeout: 100s web。timeout: 100000

Checkpoint:Checkpoint expired before completing

checkpointConf。setCheckpointTimeout(5000L)這個值設定過小,預設是10min,需要進行調大測試。

Kafka partition leader切換導致Flink重啟

Flink重啟,檢視日誌,顯示:

java。lang。Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition。 at org。apache。flink。streaming。connectors。kafka。FlinkKafkaProducerBase。checkErroneous(FlinkKafkaProducerBase。java:373) at org。apache。flink。streaming。connectors。kafka。FlinkKafkaProducerBase。invoke(FlinkKafkaProducerBase。java:280) at org。apache。flink。streaming。api。operators。StreamSink。processElement(StreamSink。java:41) at org。apache。flink。streaming。runtime。io。StreamInputProcessor。processInput(StreamInputProcessor。java:206) at org。apache。flink。streaming。runtime。tasks。OneInputStreamTask。run(OneInputStreamTask。java:69) at org。apache。flink。streaming。runtime。tasks。StreamTask。invoke(StreamTask。java:263) at org。apache。flink。runtime。taskmanager。Task。run(Task。java:702) at java。lang。Thread。run(Thread。java:748) Caused by: org。apache。kafka。common。errors。NotLeaderForPartitionException: This server is not the leader for that topic-partition。

檢視Kafka的Controller日誌,顯示:

INFO [SessionExpirationListener on 10], ZK expired; shut down all controller components and try to re-elect (kafka。controller。KafkaController$SessionExpirationListener)

關於producer引數設定,設定retries引數,可以在Kafka的Partition發生leader切換時,Flink不重啟,而是做3次嘗試:

kafkaProducerConfig

{

“bootstrap。servers”: “192。169。2。20:9093,192。169。2。21:9093,192。169。2。22:9093”

“retries”:3

}

注意 mapWithState & TTL 的重要性

在處理包含無限多鍵的資料時,要考慮到 keyed 狀態保留策略(透過 TTL 定時器來在給定的時間之後清理未使用的資料)是很重要的。術語『無限』在這裡有點誤導,因為如果你要處理的 key 以 128 位編碼,則 key 的最大數量將會有個限制(等於 2 的 128 次方)。但這是一個巨大的數字!你可能無法在狀態中儲存那麼多值,所以最好考慮你的鍵空間是無界的,同時新鍵會隨著時間不斷出現。

如果你的 keyed 狀態包含在某個 Flink 的預設視窗中,則將是安全的:即使未使用 TTL,在處理視窗的元素時也會註冊一個清除計時器,該計時器將呼叫 clearAllState 函式,並刪除與該視窗關聯的狀態及其元資料。

如果要使用 Keyed State Descriptor 來管理狀態,可以很方便地新增 TTL 配置,以確保在狀態中的鍵數量不會無限制地增加。

但是,你可能會想使用更簡便的 mapWithState 方法,該方法可讓你訪問 valueState 並隱藏操作的複雜性。雖然這對於測試和少量鍵的資料來說是很好的選擇,但如果在生產環境中遇到無限多鍵值時,會引發問題。由於狀態是對你隱藏的,因此你無法設定 TTL,並且預設情況下未配置任何 TTL。這就是為什麼值得考慮做一些額外工作的原因,如宣告諸如 RichMapFunction 之類的東西,這將使你能更好的控制狀態的生命週期。

部署和資源問題

(0) JDK版本過低

這不是個顯式錯誤,但是JDK版本過低很有可能會導致Flink作業出現各種莫名其妙的問題,因此在生產環境中建議採用JDK 8的較高update(我們使用的是181)。

(1) Could not build the program from JAR file

該資訊不甚準確,因為絕大多數情況下都不是JAR包本身有毛病,而是在作業提交過程中出現異常退出了。因此需要檢視本次提交產生的客戶端日誌(預設位於$FLINK_HOME/logs目錄下),再根據其中的資訊定位並解決問題。

(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/。。。

一般都是因為使用者依賴第三方包的版本與Flink框架依賴的版本有衝突導致。

(3) Deployment took more than 60 seconds。 Please check if the requested resources are available in the YARN cluster

就是字面意思,YARN叢集內沒有足夠的資源啟動Flink作業。檢查一下當前YARN叢集的狀態、正在執行的YARN App以及Flink作業所處的佇列,釋放一些資源或者加入新的資源。

(4) java。util。concurrent。TimeoutException: Slot allocation request timed out

slot分配請求超時,是因為TaskManager申請資源時無法正常獲得,按照上一條的思路檢查即可。

(5) org。apache。flink。util。FlinkException: The assigned slot < container_id> was removed

TaskManager的Container因為使用資源超限被kill掉了。首先需要保證每個slot分配到的記憶體量足夠,特殊情況下可以手動配置SlotSharingGroup來減少單個slot中共享Task的數量。如果資源沒問題,那麼多半就是程式內部發生了記憶體洩露。建議仔細檢視TaskManager日誌,並按處理JVM OOM問題的常規操作來排查。

(6) java。util。concurrent。TimeoutException: Heartbeat of TaskManager with id < tm_id>timed out

TaskManager心跳超時。有可能是TaskManager已經失敗,如果沒有失敗,那麼有可能是因為網路不好導致JobManager沒能收到心跳訊號,或者TaskManager忙於GC,無法傳送心跳訊號。JobManager會重啟心跳超時的TaskManager,如果頻繁出現此異常,應該透過日誌進一步定位問題所在。

在Flink中,資源的隔離是透過Slot進行的,也就是說多個Slot會執行在同一個JVM中,這種隔離很弱,尤其對於生產環境。Flink App上線之前要在一個單獨的Flink叢集上進行測試,否則一個不穩定、存在問題的Flink App上線,很可能影響整個Flink叢集上的App。

(7)資源不足導致 container 被 kill

The assigned slot container_container編號 was removed。

Flink App 丟擲此類異常,透過檢視日誌,一般就是某一個 Flink App 記憶體佔用大,導致 TaskManager(在 Yarn 上就是 Container )被Kill 掉。

但是並不是所有的情況都是這個原因,還需要進一步看 yarn 的日誌( 檢視 yarn 任務日誌:yarn logs -applicationId -appOwner),如果程式碼寫的沒問題,就確實是資源不夠了,其實 1G Slot 跑多個Task( Slot Group Share )其實挺容易出現的。

因此有兩種選擇,可以根據具體情況,權衡選擇一個。

將該 Flink App 排程在 Per Slot 記憶體更大的叢集上。透過 slotSharingGroup(“xxx”) ,減少 Slot 中共享 Task 的個數

(8)啟動報錯,提示找不到 jersey 的類

java。lang。NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties 解決辦法進入 yarn中 把 lib 目中的一下兩個問價複製到flink的lib中 hadoop/share/hadoop/yarn/lib/jersey-client-1。9。jar /hadoop/share/hadoop/yarn/lib/jersey-core-1。9。jar

(9)Scala版本衝突

java。lang。NoSuchMethodError:scala。collection。immutable。HashSet$。empty()Lscala/collection/ 解決辦法,新增: import org。apache。flink。api。scala。_

(10)沒有使用回撤流報錯

Table is not an append一only table。 Use the toRetractStream() in order to handle add and retract messages。 這個是因為動態表不是 append-only 模式的,需要用 toRetractStream ( 回撤流) 處理就好了。

tableEnv。toRetractStreamPerson。print()

(11)OOM 問題解決思路 java。lang。OutOfMemoryError: GC overhead limit exceeded java。lang。OutOfMemoryError: GC overhead limit exceeded at java。util。Arrays。copyOfRange(Arrays。java:3664) at java。lang。String。(String。java:207) at com。esotericsoftware。kryo。io。Input。readString(Input。java:466) at com。esotericsoftware。kryo。serializers。DefaultSerializers$StringSerializer。read(DefaultSerializers。java:177) …… at org。apache。flink。streaming。runtime。tasks。OperatorChain$CopyingChainingOutput。collect(OperatorChain。java:524)

解決方案:

檢查 slot 槽位夠不夠或者 slot 分配的數量有沒有生效。程式起的並行是否都正常分配了(會有這樣的情況出現,假如 5 個並行,但是隻有 2 個在幾點上生效了,另外 3 個沒有資料流動)。檢查flink程式有沒有資料傾斜,可以透過 flink 的 ui 介面檢視每個分割槽子節點處理的資料量。

(12)解析返回值型別失敗報錯

The return type of function could not be determined automatically Exception in thread “main” org。apache。flink。api。common。functions。InvalidTypesException: The return type of function ‘main(RemoteEnvironmentTest。java:27)’ could not be determined automatically, due to type erasure。 You can give type information hints by using the returns(。。。) method on the result of the transformation call, or by letting your function implement the ‘ResultTypeQueryable’ interface。 at org。apache。flink。api。java。DataSet。getType(DataSet。java:178) at org。apache。flink。api。java。DataSet。collect(DataSet。java:410) at org。apache。flink。api。java。DataSet。print(DataSet。java:1652)

解決方案:產生這種現象的原因一般是使用 lambda 表示式沒有明確返回值型別,或者使用特使的資料結構 flink 無法解析其型別,這時候我們需要在方法的後面新增返回值型別,比如字串。

input。flatMap((Integer number, Collector< String> out) -> { …… }) // 提供返回值型別 。returns(Types。STRING)

(13)Hadoop jar 包衝突

Caused by: java。io。IOException: The given file system URI (hdfs:///data/checkpoint-data/abtest) did not describe the authority (like for example HDFS NameNode address/port or S3 host)。 The attempt to use a configured default authority failed: Hadoop configuration did not contain an entry for the default file system (‘fs。defaultFS’)。 at org。apache。flink。runtime。fs。hdfs。HadoopFsFactory。create(HadoopFsFactory。java:135) at org。apache。flink。core。fs。FileSystem。getUnguardedFileSystem(FileSystem。java:399) at org。apache。flink。core。fs。FileSystem。get(FileSystem。java:318) at org。apache。flink。core。fs。Path。getFileSystem(Path。java:298)

解決:pom 檔案中去掉和 hadoop 相關的依賴就好了

作業問題

(1)org。apache。flink。streaming。runtime。tasks。ExceptionInChainedOperatorException: Could not forward element to next operator

該異常幾乎都是由於程式業務邏輯有誤,或者資料流裡存在未處理好的髒資料導致的,繼續向下追溯異常棧一般就可以看到具體的出錯原因,比較常見的如POJO內有空欄位,或者抽取事件時間的時間戳為null等。

(2) java。lang。IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down

很多童鞋拿著這兩條異常資訊來求助,但實際上它們只是表示BufferPool、MemoryManager這些Flink執行時元件被銷燬,亦即作業已經失敗。具體的原因多種多樣,根據經驗,一般是上一條描述的情況居多(即Could not forward element to next operator錯誤會伴隨出現),其次是JDK版本問題。具體情況還是要根據TaskManager日誌具體分析。

(3) akka。pattern。AskTimeoutException: Ask timed out on [Actor[akka://。。。]] after [10000 ms]

Akka超時導致,一般有兩種原因:一是叢集負載比較大或者網路比較擁塞,二是業務邏輯同步呼叫耗時的外部服務。如果負載或網路問題無法徹底緩解,需考慮調大akka。ask。timeout引數的值(預設只有10秒);另外,呼叫外部服務時儘量非同步操作(Async I/O)。

(4) java。io。IOException: Too many open files

這個異常我們應該都不陌生,首先檢查系統ulimit -n的檔案描述符限制,再注意檢查程式內是否有資源(如各種連線池的連線)未及時釋放。值得注意的是,Flink使用RocksDB狀態後端也有可能會丟擲這個異常,此時需修改flink-conf。yaml中的state。backend。rocksdb。files。open引數,如果不限制,可以改為-1。

(5) org。apache。flink。api。common。function。InvalidTypesException: The generic type parameters of ‘< class>’ are missing

在Flink內使用Java Lambda表示式時,由於型別擦除造成的副作用,注意呼叫returns()方法指定被擦除的型別。

(6)Checkpoint失敗:Checkpoint expired before completing

原因是因為checkpointConf。setCheckpointTimeout(8000L)。設定的太小了,預設是10min,這裡只設置了8sec。當一個Flink App背壓的時候(例如由外部元件異常引起),Barrier會流動的非常緩慢,導致Checkpoint時長飆升。

檢查點和狀態問題

(1) Received checkpoint barrier for checkpoint < cp_id> before completing current checkpoint < cp_id>。 Skipping current checkpoint

在當前檢查點還未做完時,收到了更新的檢查點的barrier,表示當前檢查點不再需要而被取消掉,一般不需要特殊處理。

(2) Checkpoint < cp_id> expired before completing

首先應檢查CheckpointConfig。setCheckpointTimeout()方法設定的檢查點超時,如果設的太短,適當改長一點。另外就是考慮發生了反壓或資料傾斜,或者barrier對齊太慢。

(3) org。apache。flink。util。StateMigrationException: The new state serializer cannot be incompatible

我們知道Flink的狀態是按key組織並儲存的,如果程式邏輯內改了keyBy()邏輯或者key的序列化邏輯,就會導致檢查點/儲存點的資料無法正確恢復。所以如果必須要改key相關的東西,就棄用之前的狀態資料吧。

(4) org。apache。flink。util。StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed。 However, migration for MapState currently isn‘t supported

在1。9之前的Flink版本中,如果我們使用RocksDB狀態後端,並且更改了自用MapState的schema,恢復作業時會丟擲此異常,表示不支援更改schema。這個問題已經在FLINK-11947解決,升級版本即可。

(5)時鐘不同步導致無法啟動

啟動Flink任務的時候報錯 Caused by: java。lang。RuntimeException: Couldn’t deploy Yarn cluster。

然後仔細看發現:system times on machines may be out of sync。

意思說是機器上的系統時間可能不同步。同步叢集機器時間即可。

這些都是我閱讀過的優秀書籍,裡面涵蓋了大資料所有的知識,我也做了詳細的歸類,已經整理到網盤了,大家可以自行下載。

計算機必備書籍(持續更新,附下載地址)​mp。weixin。qq。com/s?__biz=Mzg3NjI4OTg1Mw==&mid=100016617&idx=1&sn=1113864c265dcfc9ab496682c0d4cdd5&chksm=4f360bbb784182ada8c0099d89165166548ae8754af5c47ab3006b19f497a8b9a1d3c480e566#rd

Flink的業務場景到底是什麼?

Flink的業務場景到底是什麼?王簡2021-10-14 23:34:54

Flink的業務場景到底是什麼?大資料技術派2022-01-24 23:32:36

你好,歡迎來到第 01 課時,本課時我們主要介紹 Flink 的應用場景和架構模型。

實時計算最好的時代

在過去的十年裡,面向資料時代的

實時計算技術

接踵而至。從我們最初認識的 Storm,再到 Spark 的異軍突起,迅速佔領了整個實時計算領域。直到 2019 年 1 月底,阿里巴巴內部版本 Flink 正式開源!一石激起千層浪,Flink 開源的訊息立刻刷爆朋友圈,整個大資料計算領域一直以來由 Spark 獨領風騷,瞬間成為兩強爭霸的時代。

Apache Flink(以下簡稱 Flink)以其先進的設計理念、強大的計算能力備受關注,如何將 Flink 快速應用在生產環境中,更好的與現有的大資料生態技術完美結合,充分挖掘資料的潛力,成為了眾多開發者面臨的難題。

Flink 實際應用場景

Flink 自從 2019 年初開源以來,迅速成為大資料實時計算領域炙手可熱的技術框架。

作為 Flink 的主要貢獻者阿里巴巴率先將其在全集團進行推廣使用

,另外由於 Flink 天然的

流式特性

,更為領先的架構設計,使得 Flink 一出現便在各大公司掀起了應用的熱潮。

阿里巴巴、騰訊、百度、位元組跳動、滴滴、華為等眾多網際網路公司已經將 Flink 作為未來技術重要的發力點,迫切地在各自公司內部進行技術升級和推廣使用。同時,Flink 已經成為 Apache 基金會和 GitHub 社群

最為活躍的專案之一

我們來看看 Flink 支援的眾多應用場景。

實時資料計算

如果你對大資料技術有所接觸,那麼下面的這些需求場景你應該並不陌生:

阿里巴巴每年雙十一都會直播,實時監控大屏是如何做到的?

公司想看一下大促中銷量最好的商品 TOP5?

我是公司的運維,希望能實時接收到伺服器的負載情況?

……

Flink的業務場景到底是什麼?

我們可以看到,資料計算場景需要從原始資料中提取有價值的資訊和指標,比如上面提到的實時銷售額、銷量的 TOP5,以及伺服器的負載情況等。

傳統的分析方式通常是利用

批查詢

,或將事件(生產上一般是訊息)記錄下來並基於此形成有限資料集(表)構建應用來完成。為了得到最新資料的計算結果,必須先將它們寫入表中並重新執行 SQL 查詢,然後將結果寫入儲存系統比如 MySQL 中,再生成報告。

Apache Flink 同時支援流式及批次分析應用,這就是我們所說的

批流一體

。Flink 在上述的需求場景中承擔了

資料的實時採集

實時計算

下游傳送

實時資料倉庫和 ETL

ETL

(Extract-Transform-Load)的目的是將業務系統的資料經過抽取、清洗轉換之後載入到資料倉庫的過程。

Flink的業務場景到底是什麼?

傳統的離線資料倉庫將業務資料集中進行儲存後,以固定的計算邏輯定時進行 ETL 和其他建模後產出報表等應用。離線資料倉庫主要是構建 T+1 的離線資料,透過定時任務每天拉取增量資料,然後建立各個業務相關的主題維度資料,對外提供 T+1 的資料查詢介面。

上圖展示了離線資料倉庫 ETL 和實時資料倉庫的差異,可以看到離線資料倉庫的計算和資料的實時性均較差。資料本身的價值隨著時間的流逝會逐步減弱,因此資料發生後必須儘快的達到使用者的手中,

實時數倉的構建

需求也應運而生。

實時資料倉庫的建設是“資料智慧 BI”必不可少的一環,也是大規模資料應用中必然面臨的挑戰。

Flink 在實時數倉和實時 ETL 中有天然的優勢:

狀態管理,

實時數倉裡面會進行很多的聚合計算,這些都需要對於狀態進行訪問和管理,Flink 支援強大的狀態管理;

豐富的 API,

Flink 提供極為豐富的多層次 API,包括 Stream API、Table API 及 Flink SQL;

生態完善

,實時數倉的用途廣泛,Flink 支援多種儲存(HDFS、ES 等);

批流一體,

Flink 已經在將流計算和批計算的 API 進行統一。

Flink的業務場景到底是什麼?

事件驅動型應用

你是否有這樣的需求:

我們公司有幾萬臺伺服器,希望能從伺服器上報的訊息中將 CPU、MEM、LOAD 資訊分離出來做分析,然後觸發自定義的規則進行報警?

我是公司的安全運維人員,希望能從每天的訪問日誌中識別爬蟲程式,並且進行 IP 限制?

……

事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取資料,並根據到來的事件觸發計算、狀態更新或其他外部動作。

在傳統架構中,我們需要讀寫遠端事務型資料庫,比如 MySQL。在事件驅動應用中資料和計算不會分離,應用只需訪問本地(記憶體或磁碟)即可獲取資料,所以具有更高的吞吐和更低的延遲。

Flink的業務場景到底是什麼?

Flink 的以下特性完美的支援了事件驅動型應用:

高效的狀態管理

,Flink 自帶的 State Backend 可以很好的儲存中間狀態資訊;

豐富的視窗支援

,Flink 支援包含滾動視窗、滑動視窗及其他視窗;

多種時間語義

,Flink 支援 Event Time、Processing Time 和 Ingestion Time;

不同級別的容錯

,Flink 支援 At Least Once 或 Exactly Once 容錯級別。

小結

Apache Flink 從底層支援了針對多種不同場景的應用開發。

Flink 的主要特性包括:批流一體、Exactly-Once、強大的狀態管理等。同時,Flink 還支援執行在包括 YARN、 Mesos、Kubernetes 在內的多種資源管理框架上。阿里巴巴已經率先將 Flink 在全集團進行推廣使用,事實證明,Flink 已經可以擴充套件到數千核心,其狀態可以達到 TB 級別,且仍能保持

高吞吐

低延遲

的特性。

因此,Flink 已經成為我們在

實時計算的領域的第一選擇

Flink 的架構模型

Flink 的分層模型

Flink的業務場景到底是什麼?

Flink 自身提供了不同級別的抽象來支援我們開發流式或者批次處理程式,上圖描述了 Flink 支援的 4 種不同級別的抽象。

對於我們開發者來說,大多數應用程式不需要上圖中的最低級別的 Low-level 抽象,而是針對 Core API 程式設計, 比如 DataStream API(有界/無界流)和 DataSet API (有界資料集)。這些流暢的 API 提供了用於資料處理的通用構建塊,比如各種形式使用者指定的轉換、連線、聚合、視窗、狀態等。

Table API 和 SQL

是 Flink 提供的更為高階的 API 操作,Flink SQL 是 Flink 實時計算為簡化計算模型,降低使用者使用實時計算門檻而設計的一套符合標準 SQL 語義的開發語言。

Flink 的資料流模型

Flink 程式的基礎構建模組是

(Streams)與

轉換

(Transformations),每一個數據流起始於一個或多個

Source

,並終止於一個或多個

Sink

。資料流類似於

有向無環圖

(DAG)。

我們以一個最經典的 WordCount 計數程式舉例:

Flink的業務場景到底是什麼?

在上圖中,程式消費 Kafka 資料,這便是我們的

Source

部分。

然後經過 Map、Keyby、TimeWindow 等方法進行邏輯計算,該部分就是我們的

Transformation

轉換部分,而其中的 Map、Keyby、TimeWindow 等方法被稱為

運算元

。通常,程式中的

轉換

與資料流中的

運算元

之間存在對應關係,有時一個轉換可能包含多個轉換運算元。

最後,經過計算的資料會被寫入到我們執行的檔案中,這便是我們的

Sink

部分。

實際上面對複雜的生產環境,Flink 任務大都是並行進行和分佈在各個計算節點上。在 Flink 任務執行期間,每一個數據流都會有多個

分割槽

,並且每個運算元都有多個

運算元任務

並行進行。運算元子任務的數量是該特定運算元的

並行度(Parallelism)

,對並行度的設定是 Flink 任務進行調優的重要手段,我們會在後面的課程中詳細講解。

Flink的業務場景到底是什麼?

從上圖中可以看到,在上面的 map 和 keyBy/window 之間,以及 keyBy/window 和 Sink 之間,因為並行度的差異,資料流都進行了重新分配。

Flink 中的視窗和時間

視窗

時間

是 Flink 中的核心概念之一。在實際成產環境中,對資料流上的聚合需要由

視窗

來劃定範圍,比如“計算過去的 5 分鐘”或者“最後 100 個元素的和”。

Flink 支援了多種視窗模型比如

滾動視窗(Tumbling Window)、滑動視窗(Sliding Window)

會話視窗(Session Window)

等。

下圖展示了 Flink 支援的多種視窗模型:

Flink的業務場景到底是什麼?

同時,Flink 支援了

事件時間(Event Time)

攝取時間(Ingestion Time)

處理時間(Processing Time)

三種時間語義用來滿足實際生產中對於時間的特殊需求。

Flink的業務場景到底是什麼?

其他

此外,Flink 自身還支援了有狀態的運算元操作、容錯機制、Checkpoint、Exactly-once 語義等更多高階特性,來支援使用者在不同的業務場景中的需求。

點選這裡下載本課程原始碼。

總結

本課時從實時計算的背景入手介紹了當前實時計算的發展歷程,Flink 作為實時計算領域的一匹黑馬,先進的設計思想、強大的效能和豐富的業務場景支援,已經是我們開發者必須要學習的技能之一,Flink 已經成為實時計算領域最鋒利的武器!

標簽: Flink  Java  org  apache