Hive SQL遷移Spark SQL在網易傳媒的實踐
在整個遷移過程,除了前期踩坑階段,期間線上基本沒出什么問題,十分平滑的將2000左右的任務遷移到了sparkSql,而且也沒耗費過多人力,這說明整個遷移方案的設計和實施是比較成功的。

引言:把基于mapreduce的離線hiveSQL任務遷移到sparkSQL,不但能大幅縮短任務運行時間,還能節省不少計算資源。最近我們也把組內2000左右的hivesql任務遷移到了sparkSQL,這里做個簡單的記錄和分享,本文偏重于具體條件下的方案選擇。
遷移背景
- SQL任務運行慢Hive SQL處理任務雖然較為穩定,但是其時效性已經達瓶頸,無法再進一步提升,同一個SQL,Hive比Spark執行的時間更長。
- Spark SQL的發展遠超HSQL隨著 Spark 以及其社區的不斷發展,Spark SQL 本身技術也在不斷成熟,Spark在技術架構和性能上都展示出Hive無法比擬的優勢。
遷移方案設計在遷移過程中,首先需要確認如何解決執行引擎sql語法不兼容以及執行結果不一致的問題。常用的做法是在執行引擎層面進行兼容,用戶無感知,這是較為理想的方法。但對于我們來說,這是一個不可選項,組內基本工作在頂層業務層面,沒有對底層spark引擎特別熟的同事,數科同事也不會對我們的祖傳代碼負責,所以我們只能通過修改業務sql代碼解決兼容性問題。
確定了修改SQL代碼的兼容方式后,接下來便需要在此基礎上確定以下問題的解決方法:
- 如何將任務執行引擎切換為spark
- 如何測試任務,并驗證數據一致性
- 如何自動化處理
這些問題明顯是與任務的具體運行形式相關的,所以有必要先介紹下現有的hiveSQL的運行方式?,F有的hiveSQL任務主要以shell腳本的方式提交,腳本代碼結構如下所示:
#!/bin/bash
#結構1
#根據腳本參數、環境變量等計算時間參數
dayStr='xxx'
#結構2
#根據時間參數拼接hiveSql字符串sqlStr="insert overwrite t1 partition(day='${dayStr}') select XXX from t2 where day='${dayStr}';"
#結構3
#運行hiveSQLhive -e "${sqlStr}"
#結構4
#可選,將結果導入MySQL、impala等存儲系統
腳本的好處在于可以通過git管理代碼的變更、通過IDE搜索代碼引用,壞處在于沒有和公司的中臺進行整合,很多功能用不上。
如何將任務執行引擎切換為spark,是分歧最大的地方。
第一種選擇是直接將SQL任務遷移到公司中臺上,這樣可以直接在平臺選擇執行引擎,讓平臺幫我們屏蔽掉任務的運行細節。這個方法也不是不行,但工作量和難度都很大,很快就被排除了。為控制工作量,保持任務仍以腳本的形式運行是必須的,所以SQL任務的提交方式也只能通過shell命令。如何在shell中提交sparkSQL也有2個選擇,第一種是直接在我們的az調度機上部署spark客戶端,然后通過本地的spark客戶端運行任務,另一種則是通過beeline將SQL提交到類似于hiveserver的服務上??紤]之后,我們選擇了第二種,具體來說,是使用了網易有數姚老師、尤老師等大佬的作品kyuubi。
選擇kyuubi的決定是非常正確的。首先,因為我們希望使用抽象的sparkSQL運行服務,把SQL扔給它就完了,業務代碼和spark部署、任務運行環境等細節完全解耦,kyuubi在這方面非常合適。其次kyuubi本身的功能要遠超Spark Thrift Server,主要集中在可以隔離不同的SQL任務、支持高可用、自動優化任務等方面,能讓用戶偷懶的組件就是好組件。最后的原因是在人員合作方面,使用kyuubi等于找到了一個穩定的背鍋俠,出了問題直接讓數科spark團隊處理就完了,溝通非常直接,如果使用的是spark客戶端或中臺的SQL運行功能,中間可能隔著好幾個人。
反映到具體代碼細節上,我們首先編寫了一個名為sparkUtils.sh的腳本,里面定義了所有與sparkSQL相關的實用函數,比如runSql、setResource等,這樣其他腳本只需要source一下這個腳本,然后將hive -e改為runSql就可以將任務執行方式改為spark。
在解決如何運行任務后,接下來便是解決如何測試任務的問題。由于沒有隔離的測試環境,所以在測試任務時,必須將任務代碼無害化,包括替換insert的表為臨時表、注釋任何可能影響線上數據的操作。這樣一來,同一個腳本,我們至少需要有3個版本:線上的hiveSQL版、未上線的sparkSQL版、無害化的使用sparkSQL運行的測試版,而且我們需要在他們之間同步代碼更改,比如測試途中,線上版有更改,那么我們需要將更改合并到sparksql版和測試版,如果測試中發現不兼容問題,需要先在sparksql版上修改,然后合并到測試版,最后上線操作則是使用sparksql版覆蓋線上版。整個過程基本以git版本管理為核心。
為確定任務是否可上線,必須計算出數據的一致程度。這里的采用的方法比較簡單,先計算測試表分區和線上表分區相同的數據條數,然后算出一致度,比如線上分區數據是2條,測試分區數據是8條,相同數據是1條,那么一致度是(1*2)/(2+8)=0.2。為方便確定哪些列的數據不一致,我們計算了列hash。
切換執行引擎、測試、上線的方案都確定后,最后一個問題便是如何實現自動化處理,畢竟有2000個任務,全靠人工處理工作量太大。但由于shell腳本代碼并不是結構化的數據,全自動處理也不現實,線上數據出問題的鍋可背不起。
所以我們制定的策略是半自動化,人工審核代碼修改,其他部分盡可能自動化,特別是測試和計算數據一致性部分??偟墓ぷ髁魅缦聢D所示:

除了3個代碼分支,這里多了一個測試機,用于自動化測試并計算數據一致性。各步驟具體介紹如下:
- 檢測未測試代碼:通過公司中臺api獲取有調度的腳本,拷貝到測試庫,完全自動化;
- 執行方式切換為spark:自動化處理,人工審核后提交;
- 生成測試代碼:通過正則匹配insert語句,替換為臨時表,并在之前插入創建臨時表語句,注釋其他影響線上數據的操作。自動化處理,人工審核提交;
- 拉取代碼、測試、計算一致程度:完全自動化,測試后本地生成一個csv的測試報告;
- 判斷是否可上線:人工審核;
- 修改不兼容語法:基本靠人工修改,一些簡單的參數替換可自動化;
- 合并修改:人工處理,不過基本可以自動合并;
- 刪除測試代碼,代碼上線:自動處理,人工審核提交。
遷移成果
如下圖所示,在2個月的時間內,我們spark程序所使用的資源從10%左右上升到了80%左右。

CPU累計使用(CPU核數*分鐘)的變化曲線如下所示。由于遷移過程中也下線了不少任務,所以CPU總量的下降不光是遷移的功勞。

任務遷移前后的運行情況如下圖所示,橫軸代表任務運行時間,縱軸代表任務累計CPU使用(CPU核數*分鐘)。這里共選取了600多個任務,藍色的點代表遷移前MapReduce的運行情況,紅色點代表遷移后sparkSQL的運行情況。根據統計,遷移后平均運行時間節省70%,CPU累計使用平均節省35%。

總的來看,收益超過預期,特別是在運行時間方面。
總結
回顧整個方案的設計過程,實際上沒有太多選擇的余地,在沒法在spark引擎層做兼容的前提,和以腳本提交任務的現狀下,只能選擇基于git版本管理的自動化遷移流程。
方案能這么順利實施,主要因為任務代碼是以腳本的形式存在,這樣我們可以很方便的用各種程序處理腳本源代碼,避免了大量重復性的工作,特別是用git進行版本管理,如果我們的任務都寫在了公司中臺上,那么遷移工作量會大很多。
在整個遷移過程,除了前期踩坑階段,期間線上基本沒出什么問題,十分平滑的將2000左右的任務遷移到了sparkSql,而且也沒耗費過多人力,這說明整個遷移方案的設計和實施是比較成功的。