close

碰這東西也好一段時間了,也跟碩論有關

單純想用自己的話來描述MapReduce執行過程中的流程細節,並做個記錄

如果我的理解或敘述有誤還煩請告知^^

 


 

資料來源(1):

http://www.cs.rutgers.edu/~pxk/417/notes/content/mapreduce.html

資料來源(2):

hadoop作業調優參數整理及原理

 


1. MapReduce概述

一種主要用來處理大量資料的程式模型,並使用於電腦叢集的分散式運算

程式設計者只需學習使用單純的API(包含map函數和reduce函數),就可輕易的將job分割成許多tasks平行執行在許多node上。(Hadoop中,一台server上可能可以部署許多個nodes,以VM的方式切割)

而Hadoop就是MapReduce這個程式模型其中一種實現出來的平台,另外還有GridGain、Phoenix、Mars,都是使用MapReduce的概念做資料的平行化,但使用的目的與特色都與Hadoop有些許不同。

程式設計者使用此類型的平台,撰寫平行化程式,就不用處理細部如何平行處理、遠端執行、資料分配、負載平衡、容錯機制等等的問題。

 


 

2. MapReduce Running Systemmapreduce running system

(1) fork: 由User寫好的Program開始,將要執行的MapReduce程式複製到Master與每一台Worker node上(此處的Master與Worker在Hadoop中即為JobTracker與TaskTracker)

(2) assign map: 由Master node決定Map和Reduce兩個function內的程式,分別要用哪些Worker node執行

(3) read: split指的是原輸入資料分割後的資料區塊(在Hadoop中input files要存放在HDFS分散式檔案系統上),要執行Map程式的Worker node,就會照著Master分配的資料link,到HDFS上去找到他要執行的資料區塊。

(4) local write: 將資料區塊依照Map程式處理完後,會產生中間資料(intermediate files),暫存在Worker node上,但在Hadoop中其實不單純只是將中間資料存在本機硬碟,還會經過記憶體buffer緩存和預排序等階段,目的是優化map的性能。

(5) remote read: 要執行Reduce程式的Worker node,要從這些在不同位置的Worker node上,下載屬於它要執行的中間資料,作為Reduce程式的input。而遠端讀取中間資料的過程分成三階段:copy(shuffle)->sort->reduce

(6) write: 最後將使用者需要的運算結果輸出。

 


 

3. MapReduce: More Detail

 

另一個較詳細的解釋,分為七個步驟,說明圖可參考資料來源(1)

 

Step 1. Split input

平行化程式要執行的輸入檔案通常資料量相當大,因此必須將資料切割成許多資料區塊,每個資料區塊稱為split或shard。

如果有M個map workers,就會將原本的資料至少切成M塊,這樣每個map worker都有事情做,也可以切成更多塊,但就會變成要分到下一批執行,Master node會負責分配每個map worker要執行的split數目。

(在Hadoop中,map task的個數主要是由 總輸入檔案大小 除以 每個split檔案大小 來決定的,舉例來說,input為一個500MB的檔案,而每個split檔案大小預設為64MB,Hadoop會自動產生8個map task,但map worker的個數是有限制的,這部分的設定是與硬體資源(CPU core個數)有關 Ex: 2個Hadoop node=2個tasktracker,預設最多只能同時執行4個map task,因為每個node的最大map task數量預設為2,但map task的總個數仍有個參考參數可調整)

切割的方式由資料的位置與格式有所不同,而user program也可自行設定,如:換行符號。

 

Step 2. Fork processes

Master node負責發送job給Workers,並追蹤執行的進度及回傳結果。

Master node會選擇目前空閒的Worker node,指定他要執行map task或reduce task。

一個map task只會處理原本輸入資料的其中一個shard部份,reduce task則處理map task執行完後產生的中間資料。

M map tasks ; R reduce tasks(reduce task的個數可由user決定)

worker會接收master的訊息,決定執行map或reduce task,還有要去哪裡讀哪些資料做為input。

 

Step 3. Map

每個map task會從input shard中讀取指派給他執行的資料,並將有興趣需要處理的資料parse成(key,value) pairs,map function會在此丟棄很多沒有興趣處理的資料,也就是map function抓到它要的input後,執行完function內的程式,intermediate files就是程式內撰寫的輸出結果,當然其他的資料就會被丟棄。

藉由許多map worker同時執行,提升performance。

 

Step 4. Map Worker: Partition

map worker執行完map function內的程式後,最後的output仍是(key,value)這個資料串,這些中間資料會先緩存在記憶體中,並階段性的存到map worker的本機硬碟上。

(當map的輸出超過一定的臨界值 ex.80MB,就必須將記憶體buffer內的資料寫成一個檔案暫存在本機硬碟,不然buffer會爆炸呀,新的map輸出也進不來,而這個過程就叫做spill,當然,每個spill內的map輸出們,都會先照key做排序)

spill的過程,是由background thread來執行,因此同時間map function的輸出仍會持續產生

若map執行完後有多個spill檔,最後還會進行merge,將所有spill檔merge成一個,如何將spill檔做merge可由Combiner function決定。

merge完的這個中間資料檔案,會由Partition function分割成R個區域,partition function負責決定R個reduce workers中,哪些人負責處理哪些key的資料,預設的partition function是單純將key值除以R取餘數做hash,使用者如果要指定哪些key的資料給哪台reduce worker執行,可以在此修改partition function。

這個階段蠻複雜的,有個更詳細的網頁可參考:

http://grepalex.com/2012/09/24/map-partition-sort-spill/

 

Step 5. Reduce: Sort(Shuffle)

當所有map worker的工作完成時,Master node就會通知Reduce workers開始工作。

Reduce workers第一件要做的事,就是拿到reduce function要執行的input資料,reduce worker會透過remote procedure calls,向map worker拿屬於這個reduce worker要處理的partition中的(key,value)資料,而這些資料一樣會透過key進行排序。

排序是必須的動作,因為資料可能會有相同的key值,若無排序會發生很多不同key值的資料卻散亂的分給同一個reduce worker(即是在同一個partition)。

排序完成後,目的是把所有有相同key值的資料聚集在一起,也容易抓取有相同key值的所有資料。

而這個階段也稱做Shuffle。

 

Step 6. Reduce function

當資料照key值排序好後,user寫的reduce function就可以開始執行。

每當讀入一個新的key值時,reduce worker就會呼叫一次reduce function,這個function會傳入兩個參數,1. key值 2. 所有有相同key的value組成的list。

執行完reduce function內要做的運算後,將output寫入記憶體,output仍為(key,value)型態。

 

Step 7. Done!

當所有reduce worker完成工作,Master會將控制權轉回user program,MapReduce的輸出會存成R個輸出檔(在Hadoop中可看到輸出為part_XX..),分別就是由R個reduce worker創建的。

 


 

4. 相關說明

detail1  

 

 

 

arrow
arrow
    全站熱搜

    Celia 發表在 痞客邦 留言(3) 人氣()