碰這東西也好一段時間了,也跟碩論有關
單純想用自己的話來描述MapReduce執行過程中的流程細節,並做個記錄
如果我的理解或敘述有誤還煩請告知^^
資料來源(1):
http://www.cs.rutgers.edu/~pxk/417/notes/content/mapreduce.html
資料來源(2):
hadoop作業調優參數整理及原理
1. MapReduce概述
一種主要用來處理大量資料的程式模型,並使用於電腦叢集的分散式運算
程式設計者只需學習使用單純的API(包含map函數和reduce函數),就可輕易的將job分割成許多tasks平行執行在許多node上。(Hadoop中,一台server上可能可以部署許多個nodes,以VM的方式切割)
而Hadoop就是MapReduce這個程式模型其中一種實現出來的平台,另外還有GridGain、Phoenix、Mars,都是使用MapReduce的概念做資料的平行化,但使用的目的與特色都與Hadoop有些許不同。
程式設計者使用此類型的平台,撰寫平行化程式,就不用處理細部如何平行處理、遠端執行、資料分配、負載平衡、容錯機制等等的問題。
2. MapReduce Running System
(1) fork: 由User寫好的Program開始,將要執行的MapReduce程式複製到Master與每一台Worker node上(此處的Master與Worker在Hadoop中即為JobTracker與TaskTracker)
(2) assign map: 由Master node決定Map和Reduce兩個function內的程式,分別要用哪些Worker node執行
(3) read: split指的是原輸入資料分割後的資料區塊(在Hadoop中input files要存放在HDFS分散式檔案系統上),要執行Map程式的Worker node,就會照著Master分配的資料link,到HDFS上去找到他要執行的資料區塊。
(4) local write: 將資料區塊依照Map程式處理完後,會產生中間資料(intermediate files),暫存在Worker node上,但在Hadoop中其實不單純只是將中間資料存在本機硬碟,還會經過記憶體buffer緩存和預排序等階段,目的是優化map的性能。
(5) remote read: 要執行Reduce程式的Worker node,要從這些在不同位置的Worker node上,下載屬於它要執行的中間資料,作為Reduce程式的input。而遠端讀取中間資料的過程分成三階段:copy(shuffle)->sort->reduce
(6) write: 最後將使用者需要的運算結果輸出。
3. MapReduce: More Detail
另一個較詳細的解釋,分為七個步驟,說明圖可參考資料來源(1)
Step 1. Split input
平行化程式要執行的輸入檔案通常資料量相當大,因此必須將資料切割成許多資料區塊,每個資料區塊稱為split或shard。
如果有M個map workers,就會將原本的資料至少切成M塊,這樣每個map worker都有事情做,也可以切成更多塊,但就會變成要分到下一批執行,Master node會負責分配每個map worker要執行的split數目。
(在Hadoop中,map task的個數主要是由 總輸入檔案大小 除以 每個split檔案大小 來決定的,舉例來說,input為一個500MB的檔案,而每個split檔案大小預設為64MB,Hadoop會自動產生8個map task,但map worker的個數是有限制的,這部分的設定是與硬體資源(CPU core個數)有關 Ex: 2個Hadoop node=2個tasktracker,預設最多只能同時執行4個map task,因為每個node的最大map task數量預設為2,但map task的總個數仍有個參考參數可調整)
切割的方式由資料的位置與格式有所不同,而user program也可自行設定,如:換行符號。
Step 2. Fork processes
Master node負責發送job給Workers,並追蹤執行的進度及回傳結果。
Master node會選擇目前空閒的Worker node,指定他要執行map task或reduce task。
一個map task只會處理原本輸入資料的其中一個shard部份,reduce task則處理map task執行完後產生的中間資料。
M map tasks ; R reduce tasks(reduce task的個數可由user決定)
worker會接收master的訊息,決定執行map或reduce task,還有要去哪裡讀哪些資料做為input。
Step 3. Map
每個map task會從input shard中讀取指派給他執行的資料,並將有興趣需要處理的資料parse成(key,value) pairs,map function會在此丟棄很多沒有興趣處理的資料,也就是map function抓到它要的input後,執行完function內的程式,intermediate files就是程式內撰寫的輸出結果,當然其他的資料就會被丟棄。
藉由許多map worker同時執行,提升performance。
Step 4. Map Worker: Partition
map worker執行完map function內的程式後,最後的output仍是(key,value)這個資料串,這些中間資料會先緩存在記憶體中,並階段性的存到map worker的本機硬碟上。
(當map的輸出超過一定的臨界值 ex.80MB,就必須將記憶體buffer內的資料寫成一個檔案暫存在本機硬碟,不然buffer會爆炸呀,新的map輸出也進不來,而這個過程就叫做spill,當然,每個spill內的map輸出們,都會先照key做排序)
spill的過程,是由background thread來執行,因此同時間map function的輸出仍會持續產生
若map執行完後有多個spill檔,最後還會進行merge,將所有spill檔merge成一個,如何將spill檔做merge可由Combiner function決定。
merge完的這個中間資料檔案,會由Partition function分割成R個區域,partition function負責決定R個reduce workers中,哪些人負責處理哪些key的資料,預設的partition function是單純將key值除以R取餘數做hash,使用者如果要指定哪些key的資料給哪台reduce worker執行,可以在此修改partition function。
這個階段蠻複雜的,有個更詳細的網頁可參考:
http://grepalex.com/2012/09/24/map-partition-sort-spill/
Step 5. Reduce: Sort(Shuffle)
當所有map worker的工作完成時,Master node就會通知Reduce workers開始工作。
Reduce workers第一件要做的事,就是拿到reduce function要執行的input資料,reduce worker會透過remote procedure calls,向map worker拿屬於這個reduce worker要處理的partition中的(key,value)資料,而這些資料一樣會透過key進行排序。
排序是必須的動作,因為資料可能會有相同的key值,若無排序會發生很多不同key值的資料卻散亂的分給同一個reduce worker(即是在同一個partition)。
排序完成後,目的是把所有有相同key值的資料聚集在一起,也容易抓取有相同key值的所有資料。
而這個階段也稱做Shuffle。
Step 6. Reduce function
當資料照key值排序好後,user寫的reduce function就可以開始執行。
每當讀入一個新的key值時,reduce worker就會呼叫一次reduce function,這個function會傳入兩個參數,1. key值 2. 所有有相同key的value組成的list。
執行完reduce function內要做的運算後,將output寫入記憶體,output仍為(key,value)型態。
Step 7. Done!
當所有reduce worker完成工作,Master會將控制權轉回user program,MapReduce的輸出會存成R個輸出檔(在Hadoop中可看到輸出為part_XX..),分別就是由R個reduce worker創建的。
4. 相關說明