計(jì)算機(jī)的基本工作就是處理數(shù)據(jù),包括磁盤文件中的數(shù)據(jù),通過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)流或數(shù)據(jù)包,數(shù)據(jù)庫(kù)中的結(jié)構(gòu)化數(shù)據(jù)等。隨著互聯(lián)網(wǎng)、物聯(lián)網(wǎng)等技術(shù)得到越來越廣泛的應(yīng)用,數(shù)據(jù)規(guī)模不斷增加,TB、PB量級(jí)成為常態(tài),對(duì)數(shù)據(jù)的處理已無(wú)法由單臺(tái)計(jì)算機(jī)完成,而只能由多臺(tái)機(jī)器共同承擔(dān)計(jì)算任務(wù)。而在分布式環(huán)境中進(jìn)行大數(shù)據(jù)處理,除了與存儲(chǔ)系統(tǒng)打交道外,還涉及計(jì)算任務(wù)的分工,計(jì)算負(fù)荷的分配,計(jì)算機(jī)之間的數(shù)據(jù)遷移等工作,并且要考慮計(jì)算機(jī)或網(wǎng)絡(luò)發(fā)生故障時(shí)的數(shù)據(jù)安全,情況要復(fù)雜得多。
舉一個(gè)簡(jiǎn)單的例子,假設(shè)我們要從銷售記錄中統(tǒng)計(jì)各種商品銷售額。在單機(jī)環(huán)境中,我們只需把銷售記錄掃描一遍,對(duì)各商品的銷售額進(jìn)行累加即可。如果銷售記錄存放在關(guān)系數(shù)據(jù)庫(kù)中,則更省事,執(zhí)行一個(gè)SQL語(yǔ)句就可以了。現(xiàn)在假定銷售記錄實(shí)在太多,需要設(shè)計(jì)出由多臺(tái)計(jì)算機(jī)來統(tǒng)計(jì)銷售額的方案。為保證計(jì)算的正確、可靠、高效及方便,這個(gè)方案需要考慮下列問題:
如何為每臺(tái)機(jī)器分配任務(wù),是先按商品種類對(duì)銷售記錄分組,不同機(jī)器處理不同商品種類的銷售記錄,還是隨機(jī)向各臺(tái)機(jī)器分發(fā)一部分銷售記錄進(jìn)行統(tǒng)計(jì),最后把各臺(tái)機(jī)器的統(tǒng)計(jì)結(jié)果按商品種類合并?
上述兩種方式都涉及數(shù)據(jù)的排序問題,應(yīng)選擇哪種排序算法?應(yīng)該在哪臺(tái)機(jī)器上執(zhí)行排序過程?
如何定義每臺(tái)機(jī)器處理的數(shù)據(jù)從哪里來,處理結(jié)果到哪里去?數(shù)據(jù)是主動(dòng)發(fā)送,還是接收方申請(qǐng)時(shí)才發(fā)送?如果是主動(dòng)發(fā)送,接收方處理不過來怎么辦?如果是申請(qǐng)時(shí)才發(fā)送,那發(fā)送方應(yīng)該保存數(shù)據(jù)多久?
會(huì)不會(huì)任務(wù)分配不均,有的機(jī)器很快就處理完了,有的機(jī)器一直忙著?甚至,閑著的機(jī)器需要等忙著的機(jī)器處理完后才能開始執(zhí)行?
如果增加一臺(tái)機(jī)器,它能不能減輕其他機(jī)器的負(fù)荷,從而縮短任務(wù)執(zhí)行時(shí)間?
如果一臺(tái)機(jī)器掛了,它沒有完成的任務(wù)該交給誰(shuí)?會(huì)不會(huì)遺漏統(tǒng)計(jì)或重復(fù)統(tǒng)計(jì)?
統(tǒng)計(jì)過程中,機(jī)器之間如何協(xié)調(diào),是否需要專門的一臺(tái)機(jī)器指揮調(diào)度其他機(jī)器?如果這臺(tái)機(jī)器掛了呢?
(可選)如果銷售記錄在源源不斷地增加,統(tǒng)計(jì)還沒執(zhí)行完新記錄又來了,如何保證統(tǒng)計(jì)結(jié)果的準(zhǔn)確性?能不能保證結(jié)果是實(shí)時(shí)更新的?再次統(tǒng)計(jì)時(shí)能不能避免大量重復(fù)計(jì)算?
(可選)能不能讓用戶執(zhí)行一句SQL就可以得到結(jié)果?
上述問題中,除了第1個(gè)外,其余的都與具體任務(wù)無(wú)關(guān),在其他分布式計(jì)算的場(chǎng)合也會(huì)遇到,而且解決起來都相當(dāng)棘手。即使第1個(gè)問題中的分組、統(tǒng)計(jì),在很多數(shù)據(jù)處理場(chǎng)合也會(huì)涉及,只是具體方式不同。如果能把這些問題的解決方案封裝到一個(gè)計(jì)算框架中,則可大大簡(jiǎn)化這類應(yīng)用程序的開發(fā)。
2004年前后,Google先后發(fā)表三篇論文分別介紹分布式文件系統(tǒng)GFS、并行計(jì)算模型MapReduce、非關(guān)系數(shù)據(jù)存儲(chǔ)系統(tǒng)BigTable,第一次提出了針對(duì)大數(shù)據(jù)分布式處理的可重用方案。在Google論文的啟發(fā)下,Yahoo的工程師Doug Cutting和Mike Cafarella開發(fā)了Hadoop。在借鑒和改進(jìn)Hadoop的基礎(chǔ)上,又先后誕生了數(shù)十種應(yīng)用于分布式環(huán)境的大數(shù)據(jù)計(jì)算框架。本文在參考業(yè)界慣例的基礎(chǔ)上,對(duì)這些框架按下列標(biāo)準(zhǔn)分類:
如果不涉及上面提出的第8、9兩個(gè)問題,則屬于批處理框架。批處理框架重點(diǎn)關(guān)心數(shù)據(jù)處理的吞吐量,又可分為非迭代式和迭代式兩類,迭代式包括DAG(有向無(wú)環(huán)圖)、圖計(jì)算等模型。
若針對(duì)第8個(gè)問題提出來應(yīng)對(duì)方案,則分兩種情況:如果重點(diǎn)關(guān)心處理的實(shí)時(shí)性,則屬于流計(jì)算框架;如果側(cè)重于避免重復(fù)計(jì)算,則屬于增量計(jì)算框架。
如果重點(diǎn)關(guān)注的是第9個(gè)問題,則屬于交互式分析框架。
本文下面分別討論批處理、流計(jì)算、交互式分析三種類別的框架,然后簡(jiǎn)要介紹大數(shù)據(jù)計(jì)算框架的一些發(fā)展趨勢(shì)。文章最后介紹這一領(lǐng)域的學(xué)習(xí)資料。
批處理框架
Hadoop
Hadoop最初主要包含分布式文件系統(tǒng)HDFS和計(jì)算框架MapReduce兩部分,是從Nutch中獨(dú)立出來的項(xiàng)目。在2.0版本中,又把資源管理和任務(wù)調(diào)度功能從MapReduce中剝離形成YARN,使其他框架也可以像MapReduce那樣運(yùn)行在Hadoop之上。與之前的分布式計(jì)算框架相比,Hadoop隱藏了很多繁瑣的細(xì)節(jié),如容錯(cuò)、負(fù)載均衡等,更便于使用。
Hadoop也具有很強(qiáng)的橫向擴(kuò)展能力,可以很容易地把新計(jì)算機(jī)接入到集群中參與計(jì)算。在開源社區(qū)的支持下,Hadoop不斷發(fā)展完善,并集成了眾多優(yōu)秀的產(chǎn)品如非關(guān)系數(shù)據(jù)庫(kù)HBase、數(shù)據(jù)倉(cāng)庫(kù)Hive、數(shù)據(jù)處理工具Sqoop、機(jī)器學(xué)習(xí)算法庫(kù)Mahout、一致性服務(wù)軟件ZooKeeper、管理工具Ambari等,形成了相對(duì)完整的生態(tài)圈和分布式計(jì)算事實(shí)上的標(biāo)準(zhǔn)。
圖2. Hadoop生態(tài)圈(刪減版)
MapReduce可以理解為把一堆雜亂無(wú)章的數(shù)據(jù)按照某種特征歸并起來,然后處理并得到最后的結(jié)果。基本處理步驟如下:
把輸入文件按照一定的標(biāo)準(zhǔn)分片,每個(gè)分片對(duì)應(yīng)一個(gè)map任務(wù)。一般情況下,MapReduce和HDFS運(yùn)行在同一組計(jì)算機(jī)上,也就是說,每臺(tái)計(jì)算機(jī)同時(shí)承擔(dān)存儲(chǔ)和計(jì)算任務(wù),因此分片通常不涉及計(jì)算機(jī)之間的數(shù)據(jù)復(fù)制。
按照一定的規(guī)則把分片中的內(nèi)容解析成鍵值對(duì)。通常選擇一種預(yù)定義的規(guī)則即可。
執(zhí)行map任務(wù),處理每個(gè)鍵值對(duì),輸出零個(gè)或多個(gè)鍵值對(duì)。
MapReduce獲取應(yīng)用程序定義的分組方式,并按分組對(duì)map任務(wù)輸出的鍵值對(duì)排序。默認(rèn)每個(gè)鍵名一組。
待所有節(jié)點(diǎn)都執(zhí)行完上述步驟后,MapReduce啟動(dòng)Reduce任務(wù)。每個(gè)分組對(duì)應(yīng)一個(gè)Reduce任務(wù)。
執(zhí)行reduce任務(wù)的進(jìn)程通過網(wǎng)絡(luò)獲取指定組的所有鍵值對(duì)。
把鍵名相同的值合并為列表。
執(zhí)行reduce任務(wù),處理每個(gè)鍵對(duì)應(yīng)的列表,輸出結(jié)果。
圖3. MapReduce處理過程
在上面的步驟中,應(yīng)用程序主要負(fù)責(zé)設(shè)計(jì)map和reduce任務(wù),其他工作均由框架負(fù)責(zé)。在定義map任務(wù)輸出數(shù)據(jù)的方式時(shí),鍵的選擇至關(guān)重要,除了影響結(jié)果的正確性外,也決定數(shù)據(jù)如何分組、排序、傳輸,以及執(zhí)行reduce任務(wù)的計(jì)算機(jī)如何分工。前面提到的商品銷售統(tǒng)計(jì)的例子,可選擇商品種類為鍵。MapReduce執(zhí)行商品銷售統(tǒng)計(jì)的過程大致如下:
把銷售記錄分片,分配給多臺(tái)機(jī)器。
每條銷售記錄被解析成鍵值對(duì),其中值為銷售記錄的內(nèi)容,鍵可忽略。
執(zhí)行map任務(wù),每條銷售記錄被轉(zhuǎn)換為新的鍵值對(duì),其中鍵為商品種類,值為該條記錄中商品的銷售額。
MapReduce把map任務(wù)生成的數(shù)據(jù)按商品種類排序。
待所有節(jié)點(diǎn)都完成排序后,MapReduce啟動(dòng)reduce任務(wù)。每個(gè)商品種類對(duì)應(yīng)一個(gè)reduce任務(wù)。
執(zhí)行reduce任務(wù)的進(jìn)程通過網(wǎng)絡(luò)獲取指定商品種類的各次銷售額。
MapReduce把同一種商品下的各次銷售額合并到列表中。
執(zhí)行reduce任務(wù),累加各次銷售額,得到該種商品的總銷售額。
上面的過程還有優(yōu)化的空間。在傳輸各種商品每次的銷售額數(shù)據(jù)前,可先在map端對(duì)各種商品的銷售額進(jìn)行小計(jì),由此可大大減少網(wǎng)絡(luò)傳輸?shù)呢?fù)荷。MapReduce通過一個(gè)可選的combine任務(wù)支持該類型的優(yōu)化。
DAG模型
現(xiàn)在假設(shè)我們的目標(biāo)更進(jìn)一步,希望知道銷售得最好的前10種商品。我們可以分兩個(gè)環(huán)節(jié)來計(jì)算:
統(tǒng)計(jì)各種商品的銷售額。通過MapReduce實(shí)現(xiàn),這在前面已經(jīng)討論過。
對(duì)商品種類按銷售額排名。可以通過一個(gè)排序過程完成。假定商品種類非常多,需要通過多臺(tái)計(jì)算機(jī)來加快計(jì)算速度的話,我們可以用另一個(gè)MapReduce過程來實(shí)現(xiàn),其基本思路是把map和reduce分別當(dāng)作小組賽和決賽,先計(jì)算各分片的前10名,匯總后再計(jì)算總排行榜的前10名。
從上面的例子可以看出,通過多個(gè)MapReduce的組合,可以表達(dá)復(fù)雜的計(jì)算問題。不過,組合過程需要人工設(shè)計(jì),比較麻煩。另外,每個(gè)階段都需要所有的計(jì)算機(jī)同步,影響了執(zhí)行效率。
為克服上述問題,業(yè)界提出了DAG(有向無(wú)環(huán)圖)計(jì)算模型,其核心思想是把任務(wù)在內(nèi)部分解為若干存在先后順序的子任務(wù),由此可更靈活地表達(dá)各種復(fù)雜的依賴關(guān)系。Microsoft Dryad、Google FlumeJava、Apache Tez是最早出現(xiàn)的DAG模型。Dryad定義了串接、全連接、融合等若干簡(jiǎn)單的DAG模型,通過組合這些簡(jiǎn)單結(jié)構(gòu)來描述復(fù)雜的任務(wù),F(xiàn)lumeJava、Tez則通過組合若干MapReduce形成DAG任務(wù)。
圖4. MapReduce(左)與Tez(右)
執(zhí)行復(fù)雜任務(wù)時(shí)對(duì)比
MapReduce的另一個(gè)不足之處是使用磁盤存儲(chǔ)中間結(jié)果,嚴(yán)重影響了系統(tǒng)的性能,這在機(jī)器學(xué)習(xí)等需要迭代計(jì)算的場(chǎng)合更為明顯。加州大學(xué)伯克利分校AMP實(shí)驗(yàn)室開發(fā)的Spark克服了上述問題。Spark對(duì)早期的DAG模型作了改進(jìn),提出了基于內(nèi)存的分布式存儲(chǔ)抽象模型RDD(Resilient Distributed Datasets,可恢復(fù)分布式數(shù)據(jù)集),把中間數(shù)據(jù)有選擇地加載并駐留到內(nèi)存中,減少磁盤IO開銷。與Hadoop相比,Spark基于內(nèi)存的運(yùn)算要快100倍以上,基于磁盤的運(yùn)算也要快10倍以上。
圖5. MapReduce與Spark中間結(jié)果
保存方式對(duì)比
Spark為RDD提供了豐富的操作方法,其中map、 filter、 flatMap、 sample、groupByKey、 reduceByKey、union、join、cogroup、mapValues、sort、partionBy用于執(zhí)行數(shù)據(jù)轉(zhuǎn)換,生成新的RDD,而count、collect、 reduce、lookup、save用于收集或輸出計(jì)算結(jié)果。如前面統(tǒng)計(jì)商品銷售額的例子,在Spark中只需要調(diào)用map和reduceByKey兩個(gè)轉(zhuǎn)換操作就可以實(shí)現(xiàn),整個(gè)程序包括加載銷售記錄和保存統(tǒng)計(jì)結(jié)果在內(nèi)也只需要寥寥幾行代碼,并且支持Java、Scala、Python、R等多種開發(fā)語(yǔ)言,比MapReduce編程要方便得多。下圖說明reduceByKey的內(nèi)部實(shí)現(xiàn)。
圖6. RDD reduceByKey內(nèi)部實(shí)現(xiàn)
RDD由于把數(shù)據(jù)存放在內(nèi)存中而不是磁盤上,因此需要比Hadoop更多地考慮容錯(cuò)問題。分布式數(shù)據(jù)集的容錯(cuò)有兩種方式:數(shù)據(jù)檢查點(diǎn)和記錄數(shù)據(jù)的更新。處理海量數(shù)據(jù)時(shí),數(shù)據(jù)檢查點(diǎn)操作成本很高, 因此Spark默認(rèn)選擇記錄更新的方式。不過如果更新粒度太細(xì)太多,記錄更新成本也不低。因此,RDD只支持粗粒度轉(zhuǎn)換,即只記錄單個(gè)塊上執(zhí)行的單個(gè)操作,然后將創(chuàng)建RDD的一系列變換序列記錄下來,類似于數(shù)據(jù)庫(kù)中的日志。
當(dāng)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),Spark根據(jù)之前記錄的演變過程重新運(yùn)算,恢復(fù)丟失的數(shù)據(jù)分區(qū)。Spark生態(tài)圈的另一項(xiàng)目Alluxio(原名Tachyon)也采用類似的思路,使數(shù)據(jù)寫入速度比HDFS有數(shù)量級(jí)的提升。
下面總結(jié)Spark對(duì)MapReduce的改進(jìn):
MapReduce抽象層次低,需要手工編寫代碼完成;Spark基于RDD抽象,使數(shù)據(jù)處理邏輯的代碼非常簡(jiǎn)短。
MapReduce只提供了map和reduce兩個(gè)操作,表達(dá)力欠缺;Spark提供了很多轉(zhuǎn)換和動(dòng)作,很多關(guān)系數(shù)據(jù)庫(kù)中常見的操作如JOIN、GROUP BY已經(jīng)在RDD中實(shí)現(xiàn)。
MapReduce中,只有map和reduce兩個(gè)階段,復(fù)雜的計(jì)算需要大量的組合,并且由開發(fā)者自己定義組合方式;Spark中,RDD可以連續(xù)執(zhí)行多個(gè)轉(zhuǎn)換操作,如果這些操作對(duì)應(yīng)的RDD分區(qū)不變的話,還可以放在同一個(gè)任務(wù)中執(zhí)行。
MapReduce處理邏輯隱藏在代碼中,不直觀;Spark代碼不包含操作細(xì)節(jié),邏輯更清晰。
MapReduce中間結(jié)果放在HDFS中;Spark中間結(jié)果放在內(nèi)存中,內(nèi)存放不下時(shí)才寫入本地磁盤而不是HDFS,這顯著提高了性能,特別是在迭代式數(shù)據(jù)處理的場(chǎng)合。
MapReduce中,reduce任務(wù)需要等待所有map任務(wù)完成后才可以開始;在Spark中,分區(qū)相同的轉(zhuǎn)換構(gòu)成流水線放到同一個(gè)任務(wù)中運(yùn)行。
流計(jì)算框架
流計(jì)算概述
在大數(shù)據(jù)時(shí)代,數(shù)據(jù)通常都是持續(xù)不斷動(dòng)態(tài)產(chǎn)生的。在很多場(chǎng)合,數(shù)據(jù)需要在非常短的時(shí)間內(nèi)得到處理,并且還要考慮容錯(cuò)、擁塞控制等問題,避免數(shù)據(jù)遺漏或重復(fù)計(jì)算。流計(jì)算框架則是針對(duì)這一類問題的解決方案。流計(jì)算框架一般采用DAG(有向無(wú)環(huán)圖)模型。圖中的節(jié)點(diǎn)分為兩類:一類是數(shù)據(jù)的輸入節(jié)點(diǎn),負(fù)責(zé)與外界交互而向系統(tǒng)提供數(shù)據(jù);另一類是數(shù)據(jù)的計(jì)算節(jié)點(diǎn),負(fù)責(zé)完成某種處理功能如過濾、累加、合并等。從外部系統(tǒng)不斷傳入的實(shí)時(shí)數(shù)據(jù)則流經(jīng)這些節(jié)點(diǎn),把它們串接起來。如果把數(shù)據(jù)流比作水的話,輸入節(jié)點(diǎn)好比是噴頭,源源不斷地出水,計(jì)算節(jié)點(diǎn)則相當(dāng)于水管的轉(zhuǎn)接口。如下圖所示。
圖7. 流計(jì)算DAG模型示意圖
為提高并發(fā)性,每一個(gè)計(jì)算節(jié)點(diǎn)對(duì)應(yīng)的數(shù)據(jù)處理功能被分配到多個(gè)任務(wù)(相同或不同計(jì)算機(jī)上的線程)。在設(shè)計(jì)DAG時(shí),需要考慮如何把待處理的數(shù)據(jù)分發(fā)到下游計(jì)算節(jié)點(diǎn)對(duì)應(yīng)的各個(gè)任務(wù),這在實(shí)時(shí)計(jì)算中稱為分組(Grouping)。最簡(jiǎn)單的方案是為每個(gè)任務(wù)復(fù)制一份,不過這樣效率很低,更好的方式是每個(gè)任務(wù)處理數(shù)據(jù)的不同部分。隨機(jī)分組能達(dá)到負(fù)載均衡的效果,應(yīng)優(yōu)先考慮。不過在執(zhí)行累加、數(shù)據(jù)關(guān)聯(lián)等操作時(shí),需要保證同一屬性的數(shù)據(jù)被固定分發(fā)到對(duì)應(yīng)的任務(wù),這時(shí)應(yīng)采用定向分組。在某些情況下,還需要自定義分組方案。
圖8. 流計(jì)算分組
由于應(yīng)用場(chǎng)合的廣泛性,目前市面上已經(jīng)有不少流計(jì)算平臺(tái),包括Google MillWheel、Twitter Heron和Apache項(xiàng)目Storm、Samza、S4、Flink、Apex、Gearpump。
Storm及Trident
在流計(jì)算框架中,目前人氣最高,應(yīng)用最廣泛的要數(shù)Storm。這是由于Storm具有簡(jiǎn)單的編程模型,且支持Java、Ruby、Python等多種開發(fā)語(yǔ)言。Storm也具有良好的性能,在多節(jié)點(diǎn)集群上每秒可以處理上百萬(wàn)條消息。Storm在容錯(cuò)方面也設(shè)計(jì)得很優(yōu)雅。下面介紹Storm確保消息可靠性的思路。
在DAG模型中,確保消息可靠的難點(diǎn)在于,原始數(shù)據(jù)被當(dāng)前的計(jì)算節(jié)點(diǎn)成功處理后,還不能被丟棄,因?yàn)樗傻臄?shù)據(jù)仍然可能在后續(xù)的計(jì)算節(jié)點(diǎn)上處理失敗,需要由該消息重新生成。而如果要對(duì)消息在各個(gè)計(jì)算節(jié)點(diǎn)的處理情況都作跟蹤記錄的話,則會(huì)消耗大量資源。
Storm的解決思路,是為每條消息分派一個(gè)ID作為唯一性標(biāo)識(shí),并在消息中包含原始輸入消息的ID。同時(shí)用一個(gè)響應(yīng)中心(Acker)維護(hù)每條原始輸入消息的狀態(tài),狀態(tài)的初值為該原始輸入消息的ID。每個(gè)計(jì)算節(jié)點(diǎn)成功執(zhí)行后,則把輸入和輸出消息的ID進(jìn)行異或,再異或?qū)?yīng)的原始輸入消息的狀態(tài)。由于每條消息在生成和處理時(shí)分別被異或一次,則成功執(zhí)行后所有消息均被異或兩次,對(duì)應(yīng)的原始輸入消息的狀態(tài)為0。因此當(dāng)狀態(tài)為0后可安全清除原始輸入消息的內(nèi)容,而如果超過指定時(shí)間間隔后狀態(tài)仍不為0,則認(rèn)為處理該消息的某個(gè)環(huán)節(jié)出了問題,需要重新執(zhí)行。
圖9. Storm保證消息可靠性過程示意圖
Storm還實(shí)現(xiàn)了更高層次的抽象框架Trident。Trident以微批處理的方式處理數(shù)據(jù)流,比如每次處理100條記錄。Trident提供了過濾、分組、連接、窗口操作、聚合、狀態(tài)管理等操作,支持跨批次進(jìn)行聚合處理,并對(duì)執(zhí)行過程進(jìn)行優(yōu)化,包括多個(gè)操作的合并、數(shù)據(jù)傳輸前的本地聚合等。以微批處理方式處理數(shù)據(jù)流的框架還有Spark Streaming。
(1) 實(shí)時(shí)流處理
(2) 微批處理
圖10. 實(shí)時(shí)流處理與微批處理比較
下面是Storm、Trident與另外幾種流計(jì)算框架的對(duì)比:
交互式分析框架
概述
在解決了大數(shù)據(jù)的可靠存儲(chǔ)和高效計(jì)算后,如何為數(shù)據(jù)分析人員提供便利日益受到關(guān)注,而最便利的分析方式莫過于交互式查詢。這幾年交互式分析技術(shù)發(fā)展迅速,目前這一領(lǐng)域知名的平臺(tái)有十余個(gè),包括Google開發(fā)的Dremel和PowerDrill,F(xiàn)acebook開發(fā)的Presto, Hadoop服務(wù)商Cloudera和HortonWorks分別開發(fā)的Impala和Stinger,以及Apache項(xiàng)目Hive、Drill、Tajo、Kylin、MRQL等。
一些批處理和流計(jì)算平臺(tái)如Spark和Flink也分別內(nèi)置了交互式分析框架。由于SQL已被業(yè)界廣泛接受,目前的交互式分析框架都支持用類似SQL的語(yǔ)言進(jìn)行查詢。早期的交互式分析平臺(tái)建立在Hadoop的基礎(chǔ)上,被稱作SQL-on-Hadoop。后來的分析平臺(tái)改用Spark、Storm等引擎,不過SQL-on-Hadoop的稱呼還是沿用了下來。SQL-on-Hadoop也指為分布式數(shù)據(jù)存儲(chǔ)提供SQL查詢功能。
Hive
Apache Hive是最早出現(xiàn)的架構(gòu)在Hadoop基礎(chǔ)之上的大規(guī)模數(shù)據(jù)倉(cāng)庫(kù),由Facebook設(shè)計(jì)并開源。Hive的基本思想是,通過定義模式信息,把HDFS中的文件組織成類似傳統(tǒng)數(shù)據(jù)庫(kù)的存儲(chǔ)系統(tǒng)。Hive 保持著 Hadoop 所提供的可擴(kuò)展性和靈活性。Hive支持熟悉的關(guān)系數(shù)據(jù)庫(kù)概念,比如表、列和分區(qū),包含對(duì)非結(jié)構(gòu)化數(shù)據(jù)一定程度的 SQL 支持。它支持所有主要的原語(yǔ)類型(如整數(shù)、浮點(diǎn)數(shù)、字符串)和復(fù)雜類型(如字典、列表、結(jié)構(gòu))。它還支持使用類似 SQL 的聲明性語(yǔ)言 Hive Query Language (HiveQL) 表達(dá)的查詢,任何熟悉 SQL 的人都很容易理解它。HiveQL被編譯為MapReduce過程執(zhí)行。下圖說明如何通過MapReduce實(shí)現(xiàn)JOIN和GROUP BY。
(1) 實(shí)現(xiàn)JOIN
(2) 實(shí)現(xiàn)GROUP BY
圖11. 部分HiveQL操作的實(shí)現(xiàn)方式
Hive與傳統(tǒng)關(guān)系數(shù)據(jù)庫(kù)對(duì)比如下:
Hive的主要弱點(diǎn)是由于建立在MapReduce的基礎(chǔ)上,性能受到限制。很多交互式分析平臺(tái)基于對(duì)Hive的改進(jìn)和擴(kuò)展,包括Stinger、Presto、Kylin等。其中Kylin是中國(guó)團(tuán)隊(duì)提交到Apache上的項(xiàng)目,其與眾不同的地方是提供多維分析(OLAP)能力。Kylin對(duì)多維分析可能用到的度量進(jìn)行預(yù)計(jì)算,供查詢時(shí)直接訪問,由此提供快速查詢和高并發(fā)能力。Kylin在eBay、百度、京東、網(wǎng)易、美團(tuán)均有應(yīng)用。
SQL引擎Calcite
對(duì)于交互式分析,SQL查詢引擎的優(yōu)劣對(duì)性能的影響舉足輕重。Spark開發(fā)了自己的查詢引擎Catalyst,而包括Hive、Drill、Kylin、Flink在內(nèi)的很多交互式分析平臺(tái)及數(shù)據(jù)倉(cāng)庫(kù)使用Calcite(原名optiq)作為SQL引擎。Calcite是一個(gè)Apache孵化項(xiàng)目,其創(chuàng)建者Julian Hyde曾是Oracle數(shù)據(jù)庫(kù)SQL引擎的主要開發(fā)者。Calcite具有下列幾個(gè)技術(shù)特點(diǎn):
支持標(biāo)準(zhǔn)SQL語(yǔ)言。
支持OLAP。
支持對(duì)流數(shù)據(jù)的查詢。
獨(dú)立于編程語(yǔ)言和數(shù)據(jù)源,可以支持不同的前端和后端。
支持關(guān)系代數(shù)、可定制的邏輯規(guī)劃規(guī)則和基于成本模型優(yōu)化的查詢引擎。
支持物化視圖(materialized view)的管理。
由于分布式場(chǎng)景遠(yuǎn)比傳統(tǒng)的數(shù)據(jù)存儲(chǔ)環(huán)境更復(fù)雜,Calcite和Catalyst都還處于向Oracle、MySQL等經(jīng)典關(guān)系數(shù)據(jù)庫(kù)引擎學(xué)習(xí)的階段,在性能優(yōu)化的道路上還有很長(zhǎng)的路要走。
其他類型的框架
除了上面介紹的幾種類型的框架外,還有一些目前還不太熱門但具有重要潛力的框架類型。圖計(jì)算是DAG之外的另一種迭代式計(jì)算模型,它以圖論為基礎(chǔ)對(duì)現(xiàn)實(shí)世界建模和計(jì)算,擅長(zhǎng)表達(dá)數(shù)據(jù)之間的關(guān)聯(lián)性,適用于PageRank計(jì)算、社交網(wǎng)絡(luò)分析、推薦系統(tǒng)及機(jī)器學(xué)習(xí)。這一類框架有Google Pregel、Apache Giraph、Apache Hama、PowerGraph、,其中PowerGraph是這一領(lǐng)域目前最杰出的代表。很多圖數(shù)據(jù)庫(kù)也內(nèi)置圖計(jì)算框架。
另一類是增量計(jì)算框架,探討如何只對(duì)部分新增數(shù)據(jù)進(jìn)行計(jì)算來極大提升計(jì)算過程的效率,可應(yīng)用到數(shù)據(jù)增量或周期性更新的場(chǎng)合。這一類框架包括Google Percolator、Microsoft Kineograph、阿里Galaxy等。
另外還有像Apache Ignite、Apache Geode(GemFire的開源版本)這樣的高性能事務(wù)處理框架。
總結(jié)與展望
從Hadoop橫空出世到現(xiàn)在10余年的時(shí)間中,大數(shù)據(jù)分布式計(jì)算技術(shù)得到了迅猛發(fā)展。不過由于歷史尚短,這方面的技術(shù)遠(yuǎn)未成熟。各種框架都還在不斷改進(jìn),并相互競(jìng)爭(zhēng)。
性能優(yōu)化毫無(wú)疑問是大數(shù)據(jù)計(jì)算框架改進(jìn)的重點(diǎn)方向之一。而性能的提高很大程度上取決于內(nèi)存的有效利用。這包括前面提到的內(nèi)存計(jì)算,現(xiàn)已在各種類型的框架中廣泛采用。內(nèi)存資源的分配管理對(duì)性能也有重要影響,JVM垃圾回收在給開發(fā)人員帶來便利的同時(shí),也制約了內(nèi)存的有效利用。另外,Java的對(duì)象創(chuàng)建及序列化也比較浪費(fèi)資源。在內(nèi)存優(yōu)化方面做足功夫的代表是Flink。出于性能方面的考慮,F(xiàn)link很多組件自行管理內(nèi)存,無(wú)需依賴JVM垃圾回收機(jī)制。Flink還用到開辟內(nèi)存池、用二進(jìn)制數(shù)據(jù)代替對(duì)象、量身定制序列化、定制緩存友好的算法等優(yōu)化手段。Flink還在任務(wù)的執(zhí)行方面進(jìn)行優(yōu)化,包括多階段并行執(zhí)行和增量迭代。
擁抱機(jī)器學(xué)習(xí)和人工智能也是大數(shù)據(jù)計(jì)算的潮流之一。Spark和Flink分別推出機(jī)器學(xué)習(xí)庫(kù)Spark ML和Flink ML。更多的平臺(tái)在第三方大數(shù)據(jù)計(jì)算框架上提供機(jī)器學(xué)習(xí),如Mahout、Oryx及一干Apache孵化項(xiàng)目SystemML、HiveMall、PredictionIO、SAMOA、MADLib。這些機(jī)器學(xué)習(xí)平臺(tái)一般都同時(shí)支持多個(gè)計(jì)算框架,如Mahout同時(shí)以Spark、Flink、H2O為引擎,SAMOA則使用S4、Storm、Samza。在深度學(xué)習(xí)掀起熱潮后,又有社區(qū)探索把深度學(xué)習(xí)框架與現(xiàn)有分布式計(jì)算框架結(jié)合起來,這樣的項(xiàng)目有SparkNet、Caffe on Spark、TensorFrames等。
在同一平臺(tái)上支持多種框架也是發(fā)展趨勢(shì)之一,尤其對(duì)于那些開發(fā)實(shí)力較為雄厚的社區(qū)。Spark以批處理模型為核心,實(shí)現(xiàn)了交互式分析框架Spark SQL、流計(jì)算框架Spark Streaming(及正在實(shí)現(xiàn)的Structured Streaming)、圖計(jì)算框架GraphX、機(jī)器學(xué)習(xí)庫(kù)Spark ML。而Flink在提供低延遲的流計(jì)算的同時(shí),批處理、關(guān)系計(jì)算、圖計(jì)算、機(jī)器學(xué)習(xí),一個(gè)也沒落下,目標(biāo)直奔大數(shù)據(jù)通用計(jì)算平臺(tái)。Google的BEAM(意為Batch+strEAM)則試圖把Spark、Flink、Apex這樣的計(jì)算框架納入自己制定的標(biāo)準(zhǔn)之下,頗有號(hào)令江湖之意。
Google的那幾篇論文這里就不一一列出了,網(wǎng)上很容易搜到。其他推薦的論文如下: