作者 | 張軍
策劃 | 蔡芳芳
過去幾年,數(shù)據(jù)倉庫和數(shù)據(jù)湖方案在快速演進(jìn)和彌補(bǔ)自身缺陷的同時(shí),二者之間的邊界也逐漸淡化。云原生的新一代數(shù)據(jù)架構(gòu)不再遵循數(shù)據(jù)湖或數(shù)據(jù)倉庫的單一經(jīng)典架構(gòu),而是在一定程度上結(jié)合二者的優(yōu)勢(shì)重新構(gòu)建。在云廠商和開源技術(shù)方案的共同推動(dòng)之下,2021 年我們將會(huì)看到更多“湖倉一體”的實(shí)際落地案例。InfoQ 希望通過選題的方式對(duì)數(shù)據(jù)湖和數(shù)倉融合架構(gòu)在不同企業(yè)的落地情況、實(shí)踐過程、改進(jìn)優(yōu)化方案等內(nèi)容進(jìn)行呈現(xiàn)。本文將分享同程藝龍將 Flink 與 Iceberg 深度集成的落地經(jīng)驗(yàn)和思考。
背景及痛點(diǎn)
業(yè)務(wù)背景
同程藝龍是一個(gè)提供機(jī)票、住宿、交通等服務(wù)的在線旅游服務(wù)平臺(tái),目前我所在的部門屬于公司的研發(fā)部門,主要職責(zé)是為公司內(nèi)其他業(yè)務(wù)部門提供一些基礎(chǔ)服務(wù),我們的大數(shù)據(jù)系統(tǒng)主要承接的業(yè)務(wù)是部門內(nèi)的一些大數(shù)據(jù)相關(guān)的數(shù)據(jù)統(tǒng)計(jì)、分析工作等。數(shù)據(jù)來源有網(wǎng)關(guān)日志數(shù)據(jù)、服務(wù)器監(jiān)控?cái)?shù)據(jù)、K8s 容器的相關(guān)日志數(shù)據(jù),App 的打點(diǎn)日志, MySQL 的 binlog 日志等。我們主要的大數(shù)據(jù)任務(wù)是基于上述日志構(gòu)建實(shí)時(shí)報(bào)表,提供基于 Presto 的報(bào)表展示和即時(shí)查詢服務(wù),同時(shí)也會(huì)基于 Flink 開發(fā)一些實(shí)時(shí)、批處理任務(wù),為業(yè)務(wù)方提供準(zhǔn)確及時(shí)的數(shù)據(jù)支撐。
原架構(gòu)方案
由于我們所有的原始數(shù)據(jù)都是存儲(chǔ)在 Kafka 的,所以原來的技術(shù)架構(gòu)就是首先是 Flink 任務(wù)消費(fèi) Kafka 的數(shù)據(jù),經(jīng)過 Flink SQL 或者 Flink jar 的各種處理之后實(shí)時(shí)寫入 Hive,其中絕大部分任務(wù)都是 Flink SQL 任務(wù),因?yàn)槲艺J(rèn)為 SQL 開發(fā)相對(duì)代碼要簡單的多,并且維護(hù)方便、好理解,所以能用 SQL 寫的都盡量用 SQL 來寫。
提交 Flink 的平臺(tái)使用的是 Zeppelin,其中提交 Flink SQL 任務(wù)是 Zeppelin 自帶的功能,提交 jar 包任務(wù)是我自己基于 Application 模式開發(fā)的 Zeppelin 插件。
對(duì)于落地到 Hive 的數(shù)據(jù),使用開源的報(bào)表系統(tǒng) metabase (底層使用 Presto) 提供實(shí)時(shí)報(bào)表展示、定時(shí)發(fā)送郵件報(bào)表,以及自定義 SQL 查詢服務(wù)。由于業(yè)務(wù)對(duì)數(shù)據(jù)的實(shí)時(shí)性要求比較高,希望數(shù)據(jù)能盡快的展示出來,所以我們很多的 Flink 流式任務(wù)的 checkpoint 設(shè)置為 1 分鐘,數(shù)據(jù)格式采用的是 orc 格式。
痛點(diǎn)
由于采用的是列式存儲(chǔ)格式 ORC,無法像行式存儲(chǔ)格式那樣進(jìn)行追加操作,所以不可避免的產(chǎn)生了一個(gè)大數(shù)據(jù)領(lǐng)域非常常見且非常棘手的問題,即 HDFS 小文件問題。
開始的時(shí)候我們的小文件解決方案是自己寫的一個(gè)小文件壓縮工具,定期去合并,我們的 Hive 分區(qū)一般都是天級(jí)別的,所以這個(gè)工具的原理就是每天凌晨啟動(dòng)一個(gè)定時(shí)任務(wù)去壓縮昨天的數(shù)據(jù),首先把昨天的數(shù)據(jù)寫入一個(gè)臨時(shí)文件夾,壓縮完,和原來的數(shù)據(jù)進(jìn)行記錄數(shù)的比對(duì)檢驗(yàn),數(shù)據(jù)條數(shù)一致之后,用壓縮后的數(shù)據(jù)覆蓋原來的數(shù)據(jù),但是由于無法保證事務(wù),所以出現(xiàn)了很多問題:
壓縮的同時(shí)由于延遲數(shù)據(jù)的到來導(dǎo)致昨天的 Hive 分區(qū)又有數(shù)據(jù)寫入了,檢驗(yàn)就會(huì)失敗,導(dǎo)致合并小文件失敗。
替換舊數(shù)據(jù)的操作是沒有事務(wù)保證的,如果替換的過程中舊分區(qū)有新的數(shù)據(jù)寫入,就會(huì)覆蓋新寫入的數(shù)據(jù),造成數(shù)據(jù)丟失。
沒有事務(wù)的支持,無法實(shí)時(shí)合并當(dāng)前分區(qū)的數(shù)據(jù),只能合并壓縮前一個(gè)分區(qū)的,最新的分區(qū)數(shù)據(jù)仍然有小文件的問題,導(dǎo)致最新數(shù)據(jù)查詢性能提高不了。
Flink+Iceberg 的落地
Iceberg 技術(shù)調(diào)研
所以基于以上的 HDFS 小文件、查詢慢等問題,結(jié)合我們的現(xiàn)狀,我調(diào)研了目前市面上的數(shù)據(jù)湖技術(shù):Delta、Apache Iceberg 和 Apache Hudi,考慮了目前數(shù)據(jù)湖框架支持的功能和以后的社區(qū)規(guī)劃,最終我們是選擇了 Iceberg,其中考慮的原因有以下幾方面:
Iceberg 深度集成 Flink
前面講到,我們的絕大部分任務(wù)都是 Flink 任務(wù),包括批處理任務(wù)和流處理任務(wù),目前這三個(gè)數(shù)據(jù)湖框架,Iceberg 是集成 Flink 做的最完善的,如果采用 Iceberg 替代 Hive 之后,遷移的成本非常小,對(duì)用戶幾乎是無感知的,
比如我們?cè)瓉淼?SQL 是這樣的:
遷移到 Iceberg 以后,只需要修改 catalog 就行。
Presto 查詢也是和這個(gè)類似,只需要修改 catalog 就行了。
Iceberg 的設(shè)計(jì)架構(gòu)使得查詢更快
在 Iceberg 的設(shè)計(jì)架構(gòu)中,manifest 文件存儲(chǔ)了分區(qū)相關(guān)信息、data files 的相關(guān)統(tǒng)計(jì)信息(max/min)等,去查詢一些大的分區(qū)的數(shù)據(jù),就可以直接定位到所要的數(shù)據(jù),而不是像 Hive 一樣去 list 整個(gè) HDFS 文件夾,時(shí)間復(fù)雜度從 O(n) 降到了 O(1),使得一些大的查詢速度有了明顯的提升,在 Iceberg PMC Chair Ryan Blue 的演講中,我們看到命中 filter 的任務(wù)執(zhí)行時(shí)間從 61.5 小時(shí)降到了 22 分鐘。
使用 Flink SQL 將 CDC 數(shù)據(jù)寫入 Iceberg
Flink CDC 提供了直接讀取 MySQL binlog 的方式,相對(duì)以前需要使用 canal 讀取 binlog 寫入 Iceberg,然后再去消費(fèi) Iceberg 數(shù)據(jù)。少了兩個(gè)組件的維護(hù),鏈路減少了,節(jié)省了維護(hù)的成本和出錯(cuò)的概率。并且可以實(shí)現(xiàn)導(dǎo)入全量數(shù)據(jù)和增量數(shù)據(jù)的完美對(duì)接,所以使用 Flink SQL 將 MySQL binlog 數(shù)據(jù)導(dǎo)入 Iceberg 來做 MySQL->Iceberg 的導(dǎo)入將會(huì)是一件非常有意義的事情。
此外對(duì)于我們最初的壓縮小文件的需求,雖然 Iceberg 目前還無法實(shí)現(xiàn)自動(dòng)壓縮,但是它提供了一個(gè)批處理任務(wù),已經(jīng)能滿足我們的需求。
Hive 表遷移 Iceberg 表
遷移準(zhǔn)備工作
目前我們的所有數(shù)據(jù)都是存儲(chǔ)在 Hive 表的,在驗(yàn)證完 Iceberg 之后,我們決定將 Hive 的數(shù)據(jù)遷移到 Iceberg,所以我寫了一個(gè)工具,可以使用 Hive 的數(shù)據(jù),然后新建一個(gè) Iceberg 表,為其建立相應(yīng)的元數(shù)據(jù),但是測(cè)試的時(shí)候發(fā)現(xiàn),如果采用這種方式,需要把寫入 Hive 的程序停止,因?yàn)槿绻?Iceberg 和 Hive 使用同一個(gè)數(shù)據(jù)文件,而壓縮程序會(huì)不斷地壓縮 Iceberg 表的小文件,壓縮完之后,不會(huì)馬上刪除舊數(shù)據(jù),所以 Hive 表就會(huì)查到雙份的數(shù)據(jù),故我們采用雙寫的策略,原來寫入 Hive 的程序不動(dòng),新啟動(dòng)一套程序?qū)懭?Iceberg,這樣能對(duì) Iceberg 表觀察一段時(shí)間。還能和原來 Hive 中的數(shù)據(jù)進(jìn)行比對(duì),來驗(yàn)證程序的正確性。
經(jīng)過一段時(shí)間觀察,每天將近幾十億條數(shù)據(jù)、壓縮后幾個(gè) T 大小的 Hive 表和 Iceberg 表,一條數(shù)據(jù)也不差。所以在最終對(duì)比數(shù)據(jù)沒有問題之后,把 Hive 表停止寫入,使用新的 Iceberg 表。
遷移工具
我將這個(gè) Hive 表遷移 Iceberg 表的工具做成了一個(gè)基于 Flink batch job 的 Iceberg Action,提交了社區(qū),不過目前還沒合并:https://github.com/apache/iceberg/pull/2217。這個(gè)功能的思路是使用 Hive 原始的數(shù)據(jù)不動(dòng),然后新建一個(gè) Iceberg table,再為這個(gè)新的 Iceberg table 生成對(duì)應(yīng)的元數(shù)據(jù),大家有需要的話可以先看看。
此外,Iceberg 社區(qū),還有一個(gè)把現(xiàn)有的數(shù)據(jù)遷移到已存在的 Iceberg table 的工具,類似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存儲(chǔ)過程做的,大家也可以關(guān)注下:https://github.com/apache/iceberg/pull/2210
Iceberg 優(yōu)化實(shí)踐
壓縮小文件
目前壓縮小文件是采用的一個(gè)額外批任務(wù)來進(jìn)行的,Iceberg 提供了一個(gè) Spark 版本的 action,我在做功能測(cè)試的時(shí)候發(fā)現(xiàn)了一些問題,此外我對(duì) Spark 也不是非常熟悉,擔(dān)心出了問題不好排查,所以參照 Spark 版本的自己實(shí)現(xiàn)了一個(gè) Flink 版本,并修復(fù)了一些 bug,進(jìn)行了一些功能的優(yōu)化。
由于我們的 Iceberg 的元數(shù)據(jù)都是存儲(chǔ)在 Hive 中的,也就是我們使用了 HiveCatalog,所以壓縮程序的邏輯是把 Hive 中所有的 Iceberg 表全部都查出來,依次壓縮。壓縮沒有過濾條件,不管是分區(qū)表還是非分區(qū)表,都進(jìn)行全表的壓縮,這樣做是為了處理某些使用 eventtime 的 Flink 任務(wù)。如果有延遲的數(shù)據(jù)的到來,就會(huì)把數(shù)據(jù)寫入以前的分區(qū),如果不是全表壓縮只壓縮當(dāng)天分區(qū)的話,新寫入的其他天的數(shù)據(jù)就不會(huì)被壓縮。
之所以沒有開啟定時(shí)任務(wù)來壓縮,是因?yàn)楸热缍〞r(shí)五分鐘壓縮一個(gè)表,如果五分鐘之內(nèi)這個(gè)壓縮任務(wù)沒完成,沒有提交新的 snapshot,下一個(gè)定時(shí)任務(wù)又開啟了,就會(huì)把上一個(gè)沒有完成的壓縮任務(wù)中的數(shù)據(jù)重新壓縮一次,所以每個(gè)表依次壓縮的策略可以保證某一時(shí)刻一個(gè)表只有一個(gè)任務(wù)在壓縮。
代碼示例參考:
目前系統(tǒng)運(yùn)行穩(wěn)定,已經(jīng)完成了幾萬次任務(wù)的壓縮。
注意:
不過目前對(duì)于新發(fā)布的 Iceberg 0.11 來說,還有一個(gè)已知的 bug,即當(dāng)壓縮前的文件大小大于要壓縮的大小(targetSizeInBytes)時(shí),會(huì)造成數(shù)據(jù)丟失,其實(shí)這個(gè)問題我在最開始測(cè)試小文件壓縮的時(shí)候就發(fā)現(xiàn)了,并且提了一個(gè) pr,我的策略是大于目標(biāo)文件的數(shù)據(jù)文件不參與壓縮,不過這個(gè) pr 沒有合并到 0.11 版本中,后來社區(qū)另外一個(gè)兄弟也發(fā)現(xiàn)了相同的問題,提交了一個(gè) pr( https://github.com/apache/iceberg/pull/2196 ) ,策略是將這個(gè)大文件拆分到目標(biāo)文件大小,目前已經(jīng)合并到 master,會(huì)在下一個(gè) bug fix 版本 0.11.1 中發(fā)布。
查詢優(yōu)化
批處理定時(shí)任務(wù)
目前對(duì)于定時(shí)調(diào)度中的批處理任務(wù),F(xiàn)link 的 SQL 客戶端還沒 Hive 那樣做的很完善,比如執(zhí)行 hive-f 來執(zhí)行一個(gè)文件。而且不同的任務(wù)需要不同的資源,并行度等。
所以我自己封裝了一個(gè) Flink 程序,通過調(diào)用這個(gè)程序來進(jìn)行處理,讀取一個(gè)指定文件里面的 SQL,來提交批任務(wù)。在命令行控制任務(wù)的資源和并行度等。
優(yōu)化
批任務(wù)的查詢這塊,我做了一些優(yōu)化工作,比如 limit 下推,filter 下推,查詢并行度推斷等,可以大大提高查詢的速度,這些優(yōu)化都已經(jīng)推回給社區(qū),并且在 Iceberg 0.11 版本中發(fā)布。
運(yùn)維管理
清理 orphan 文件
1. 定時(shí)任務(wù)刪除
在使用 Iceberg 的過程中,有時(shí)候會(huì)有這樣的情況,我提交了一個(gè) Flink 任務(wù),由于各種原因,把它停了,這個(gè)時(shí)候 Iceberg 還沒提交相應(yīng)的快照。此外由于一些異常導(dǎo)致程序失敗,會(huì)產(chǎn)生一些不在 Iceberg 元數(shù)據(jù)里面的孤立的數(shù)據(jù)文件,這些文件對(duì) Iceberg 來說是不可達(dá)的,也是沒用的。所以我們需要像 jvm 的垃圾回收一樣來清理這些文件。
目前 Iceberg 提供了一個(gè) Spark 版本的 action 來處理這些沒用的文件,我們采取的策略和壓縮小文件一樣,獲取 Hive 中的所有的 Iceberg 表。每隔一個(gè)小時(shí)執(zhí)行一次定時(shí)任務(wù)來刪除這些沒用的文件。
2. 踩坑
我們?cè)诔绦蜻\(yùn)行過程中出現(xiàn)了正常的數(shù)據(jù)文件被刪除的問題,經(jīng)過調(diào)研,由于快照保留設(shè)置是一小時(shí),這個(gè)清理程序清理時(shí)間也是設(shè)置一個(gè)小時(shí),通過日志發(fā)現(xiàn)是這個(gè)清理程序刪除了正常的數(shù)據(jù)。查了查代碼,應(yīng)該是設(shè)置了一樣的時(shí)間,在清理孤立文件的時(shí)候,有其他程序正在讀取要 expired 的 snapshot,導(dǎo)致刪除了正常的數(shù)據(jù)。最后把這個(gè)清理程序的清理時(shí)間改成默認(rèn)的三天,沒有再出現(xiàn)刪除數(shù)據(jù)文件的問題。
當(dāng)然,為了保險(xiǎn)起見,我們可以覆蓋原來的刪除文件的方法,改成將文件到一個(gè)備份文件夾,檢查沒有問題之后,手工刪除。
快照過期處理
我們的快照過期策略,是和壓縮小文件的批處理任務(wù)寫在一起的,壓縮完小文件之后,進(jìn)行表的快照過期處理,目前保留的時(shí)間是一個(gè)小時(shí)。這是因?yàn)閷?duì)于有一些比較大的表,分區(qū)比較多,而且 checkpoint 比較短,如果保留的快照過長的話,還是會(huì)保留過多小文件,我們暫時(shí)沒有查詢歷史快照的需求,所以我將快照的保留時(shí)間設(shè)置了一個(gè)小時(shí)。
數(shù)據(jù)管理
寫入了數(shù)據(jù)之后,當(dāng)想查看相應(yīng)的快照有多少數(shù)據(jù)文件時(shí),直接查詢 Spark 無法知道哪個(gè)是有用的,哪個(gè)是沒用的。所以需要有對(duì)應(yīng)的管理工具。目前 Flink 這塊還不太成熟,我們可以使用 Spark3 提供的工具來查看。
1. DDL
目前 create table 這些操作我們是通過 Flink SQL Client 來做的。其他相關(guān)的 DDL 的操作可以使用 Spark 來做:
https://iceberg.apache.org/spark/#ddl-commands
2. DML
一些相關(guān)的數(shù)據(jù)的操作,比如刪除數(shù)據(jù)等可以通過 MySQL 來實(shí)現(xiàn),Presto 目前只支持分區(qū)級(jí)別的刪除功能。
3. show partitions & show create table
在我們操作 Hive 的時(shí)候,有一些很常用的操作,比如 show partitions、 show create table 等,這些目前 Flink 還沒有支持,所以在操作 Iceberg 的時(shí)候就很不方便,我們自己基于 Flink 1.12 做 了修改,不過目前還沒有完全提交到社區(qū),后續(xù)有時(shí)間會(huì)提交到 Flink 和 Iceberg 社區(qū)。
后續(xù)工作
Flink SQL 接入 CDC 數(shù)據(jù)到 Iceberg
目前在我們內(nèi)部的版本中,我已經(jīng)測(cè)試通過可以使用 Flink SQL 將 CDC 數(shù)據(jù)(比如 MySQL binlog)寫入 Iceberg,社區(qū)的版本中實(shí)現(xiàn)該功能還需要做一些工作,我也提交了一些相關(guān)的 PR 來推進(jìn)這個(gè)工作。
使用 SQL 進(jìn)行刪除和更新
使用 Flink SQL 進(jìn)行 streaming read
在工作中會(huì)有一些這樣的場(chǎng)景,由于數(shù)據(jù)比較大,Iceberg 的數(shù)據(jù)只存了較短的時(shí)間,如果很不幸因?yàn)槌绦驅(qū)戝e(cuò)了等原因,想從更早的時(shí)間來消費(fèi)就無能為力了。
當(dāng)引入了 Iceberg 的 streaming read 之后,這些問題就可以解決了,因?yàn)?Iceberg 存儲(chǔ)了所有的數(shù)據(jù),當(dāng)然這里有一個(gè)前提就是對(duì)于數(shù)據(jù)沒有要求特別精確,比如達(dá)到秒級(jí)別,因?yàn)槟壳?Flink 寫入 Iceberg 的事務(wù)提交是基于 Flink Checkpoint 間隔的。
收益及總結(jié)
經(jīng)過對(duì) Iceberg 大概一個(gè)季度的調(diào)研,測(cè)試,優(yōu)化和 bug 修復(fù),我們將現(xiàn)有的 Hive 表都遷移到了 Iceberg,完美解決了原來的所有的痛點(diǎn)問題,目前系統(tǒng)穩(wěn)定運(yùn)行,而且相對(duì) Hive 得到了很多的收益:
Flink 寫入的資源減少
舉一個(gè)例子,默認(rèn)配置下,原來一個(gè) flink 讀取 kafka 寫入 hive 的任務(wù),需要60個(gè)并行度才不會(huì)讓 Kafka 產(chǎn)生積壓。改成寫入 iceberg 之后,只需要20個(gè)并行度就夠了.
查詢速度變快
前面我們講到 Iceberg 查詢的時(shí)候不會(huì)像 Hive 一樣去 list 整個(gè)文件夾來獲取分區(qū)數(shù)據(jù),而是先從 manifest 文件中獲取相關(guān)數(shù)據(jù),查詢的性能得到了顯著的提升,一些大的報(bào)表的查詢速度從 50 秒提高到 30 秒。
并發(fā)讀寫
由于 Iceberg 的事務(wù)支持,我們可以實(shí)現(xiàn)對(duì)一個(gè)表進(jìn)行并發(fā)讀寫,F(xiàn)link 流式數(shù)據(jù)實(shí)時(shí)入湖,壓縮程序同時(shí)壓縮小文件,清理過期文件和快照的程序同時(shí)清理無用的文件,這樣就能更及時(shí)的提供數(shù)據(jù),做到分鐘級(jí)的延遲,查詢最新分區(qū)數(shù)據(jù)的速度大大加快了,并且由于 Iceberg 的 ACID 特性可以保證數(shù)據(jù)的準(zhǔn)確性。
time travel
可以回溯查詢以前某一時(shí)刻的數(shù)據(jù)。
總結(jié)一下,我們目前可以實(shí)現(xiàn)使用 Flink SQL 對(duì) Iceberg 進(jìn)行批、流的讀寫,并可以對(duì)小文件進(jìn)行實(shí)時(shí)的壓縮,使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作,后續(xù)可以使用 Flink SQL 將 CDC 的數(shù)據(jù)寫入 Iceberg。目前對(duì) Iceberg 的所有的優(yōu)化和 bug fix,我已經(jīng)貢獻(xiàn)給社區(qū)。由于筆者水平有限,有時(shí)候也難免有錯(cuò)誤,還請(qǐng)大家不吝賜教。
作者介紹:
張軍,同程藝龍大數(shù)據(jù)開發(fā)工程師