Hadoop MapReduce http://hadoop.apache.org/common/docs/r0.17.0/mapred_tutorial.html Hadoop Map-Reduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner. A Map-Reduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks. Typically the compute nodes and the storage nodes are the same, that is, the Map-Reduce framework and the Distributed FileSystem are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster. The Map-Reduce framework consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master. Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client. Although the Hadoop framework is implemented in JavaTM, Map-Reduce applications need not be written in Java.
Hadoop MapReduce Hadoop MapReduce is a software framework for easily writing applications to process vast amounts of data in HDFS. 提供高度的可靠性運算 提供容錯機制 降低網路傳輸的頻寬需求 提供負載平衡 Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
從雲端運算看 MapReduce 應用層面 大量數據資料的分析統計 大量資料的排序彙整 網頁存取紀錄的分析 … 3 3
Hadoop MapReduce 架構 Job Task JobTracker TaskTracker MapReduce的基本工作單位 Map Task及Reduce Task JobTracker 負責透過Task的排程管理,並收集各個Job的進度狀況,以便協調所有在這個系統上Job的執行。 TaskTracker 負責各種Task的執行,並回報執行進度給JobTracker。
工作提交 (Job Submission) 確認Job的輸入及輸出: output 目錄是否指定及存在 處理Job輸入檔案的分割 複製Job的jar檔(java執行檔壓縮)及相關設定到HDFS中MapReduce系統的目錄 提交工作到JobTracker queue並監控其狀態
Job 排程 (Job Scheduling) 預設使用FIFO scheduler依序執行Job Fair scheduler 多使用者排程策略 Give every user a fair share of the cluster capacity over time. As more jobs are submitted, free task slots are given to the jobs in such a way as to give each user a fair share of the cluster. 支援搶占 (preemption) 機制 If a pool has not received its fair share for a certain period of time, then the scheduler will kill tasks in pools running over capacity in order to give the slots to the pool running under capacity
工作輸入 (Job Input) Input Split 將輸入的數據資料分割而成的資料片段 資料片段的大小固定 (可調整) 每一個Split產生一個Map Task 考慮資料區域性最佳化 (Data Locality Optimization)
Map 階段 (Map Phase) Job的輸入檔案被分割為Split後,在Map階段轉換成中間資料 Map後的中間資料經過partition及排序後暫存於TaskTracker的本地端硬碟
Combine 階段 (Combine Phase) Map Task在產生中間資料之前可先進行Combine 省下部分數據資料傳輸的頻寬需求 加快寫入硬碟 節省硬碟空間 Map Task 1 (12, 30) (12, 10) (12, 15) (12, 19) Combine 1 (12, 30) Reduce Task (12, [30, 10, 15, 19, 22, 18, 9]) Reduce Task (12, [30, 22]) Map Task 2 (12, 22) (12, 18) (12, 9) Combine 2 (12, 22)
Reduce 階段 (Reduce Phase) Shuffle 將Map的輸出傳送到Reducer輸入的過程 同時從多個Map Task下載中間資料 根據叢集環境網路速度調整等待時間 使用Memory Buffer來暫存下載回來的中間資料 Reduce 處理中間資料並將最終結果存放於HDFS
MapReduce 運作流程
Divide and Conquer Divide Conquer combine Map Task Reduce Task Job Map Task Reduce Task Final Result Map Task Reduce Task Map Task Reduce Task Map Task Reduce Task Reduce Tasks can more than Map Tasks
Map tasks and Reduce tasks are the HDFS Clients Architecture Map tasks and Reduce tasks are the HDFS Clients JT TT TT NN DN DN TT TT TT DN DN DN Cluster JT: JobTracker DN: TaskTracker NN: NameNode DN: DataNode
Work Flow A MapReduce job usually splits the input data-set into independent chunks The chunks are processed by the map tasks The framework shuffles the outputs of the maps, and then input to the reduce tasks The reduce tasks combine the outputs of maps, and store the final output to file system
HDFS Native File System splitting Map Input Chunks Output of Map Shuffle Reduce Output Input of Reduce
失敗 (Failure)處理機制 Task失敗 TaskTracker失敗 JobTracker失敗 常見於使用者程式在Map或Reduce Task執行時異常 (Runtime Exception) TaskTracker失敗 TaskTracker因當機而失敗或執行緩慢時 JobTracker失敗 目前沒有機制處理Jobtracker失敗 失敗機率不常發生,因為特定機器故障的機率很低
hadoop running java programes in cluster - 跑JAVA簡單的程式在Hadoop叢集安裝 可以參考pro-hadoop.PDF page329 ch10章
WordCount Example
WordCount範例程式 環境: cluster有一台Namenode兩台Datanode ( 一)首先要將計算單字的文字檔上傳到HDFS檔案系統 說明: 進入家目錄 建立資料夾input,並加入兩個文件檔file1.txt、file2.txt 進入hadoop家目錄,將input資料夾複製到HDFS 檢視HDFS上的檔案
WordCount範例程式 步驟: mkdir input cd input echo "hello world bye world" > file1.txt echo "hello hadoop bye hadoop" > file2.txt cd /opt/hadoop bin/hadoop dfs -copyFromLocal ~/input input bin/hadoop dfs -ls input
WordCount範例程式 (二) 編輯WordCount.java的程式 可從網址下載檔案http://trac.nche.org.tw/cloud/attachment/wiki/jazz/Hadoop_Lab6/WordCount.java?format=raw
WordCount範例程式(1/3) import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } }
WordCount範例程式(2/3) public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
WordCount範例程式(3/3) public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
WordCount範例程式 (三)將WordCount.java封裝及編譯成 WordCount.jar 步驟: mkdir MyJava javac -classpath hadoop-*-core.jar -d MyJava WordCount.java jar -cvf wordcount.jar -C MyJava . bin/hadoop jar wordcount.jar WordCount Input/ Output/
WordCount範例程式 (四) 查看執行結果: bin/hadoop dfs -cat output/part-00000 結果如下: bye 2 hadoop 2 hello 2 world 2 參考:http://www.slideshare.net/waue/hadoop-map-reduce-3019713