Map Reduce Programming 王耀聰 陳威宇 Jazz@nchc.org.tw waue@nchc.org.tw 2008. 04 . 27-28 國家高速網路與計算中心(NCHC)
Outline 概念 程式基本框架及執行步驟方法 範例一: 範例二: Hadoop 的 Hello World => Word Count 說明 動手做 範例二: 進階版=> Word Count 2
概念 MapReduce 圖解
概念 MapReduce in Parallel
Program Prototype 程式基本 框架 Map 程式碼 Reduce 程式碼 其他的設定參數程式碼 Class MR{ Class Map …{ } Class Reduce …{ main(){ JobConf conf = new JobConf(“MR.class”); conf.setInputPath(“the_path_of_HDFS ”); conf.setOutputPath(“the_path_of_HDFS ”); conf.setMapperClass(Map.class); conf.setReduceClass(Reduce.class); JobClient.runJob(conf); }} Map 區 Map 程式碼 Reduce 區 Reduce 程式碼 設定區 其他的設定參數程式碼
Process Prototype 編譯 jar -cvf MyJar.jar -C MyJava . 執行流程 基本步驟 javac -classpath hadoop-*-core.jar -d MyJava MyCode.java 封裝 jar -cvf MyJar.jar -C MyJava . 執行 bin/hadoop jar MyJar.jar MyCode HDFS_Input/ HDFS_Output/ 所在的執行目錄為Hadoop_Home ./MyJava = 編譯後程式碼目錄 Myjar.jar = 封裝後的編譯檔 先放些文件檔到HDFS上的input目錄 ./input; ./ouput = hdfs的輸入、輸出目錄
Word Count Sample (1) 範例 程式一 class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map( LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = ((Text) value).toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); }}} 1 2 3 4 5 6 7 8 9
Word Count Sample (2) 範例 程式一 class ReduceClass extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable> { IntWritable SumValue = new IntWritable(); public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) sum += values.next().get(); SumValue.set(sum); output.collect(key, SumValue); }} 1 2 3 4 5 6 7 8
Word Count Sample (3) 範例 程式一 conf.setCombinerClass(Reduce.class); Class WordCount{ main() JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); // set path conf.setInputPath(new Path(args[0])); conf.setOutputPath(new Path(args[1])); // set map reduce conf.setOutputKeyClass(Text.class); // set every word as key conf.setOutputValueClass(IntWritable.class); // set 1 as value conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(ReduceClass.class); onf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); // run JobClient.runJob(conf); }}
核心 Mapper <key/value > 的映射集合 設定 每次map的輸入 map完後的輸出 範例一 說明 核心 Mapper <key/value > 的映射集合 設定 conf.setMapperClass(MapClass.class); 每次map的輸入 map ( WritableComparable, Writable, OutputCollector, Reporter) map完後的輸出 OutputCollector.collect ( WritableComparable,Writable )
範例一 說明 核心Combiner 指定一個combiner,它負責對中間過程的輸出進行本地的聚集,這會有助於降低從Mapper 到 Reducer數據傳輸量。 設定 JobConf.setCombinerClass(Class)
核心Reducer 將Map送來的<key/value > ,對每個key作value的整合 範例一 說明 核心Reducer 將Map送來的<key/value > ,對每個key作value的整合 輸入: <key, (list of values)> Reduce (WritableComparable, Iterator, OutputCollector, Reporter) 輸出 OutputCollector.collect(WritableComparable, Writable) 若沒有Reduce要執行,可以不編寫
配置JobConf 範例一 說明 Hadoop程式架構內主要的執行設定類別 指定Mapper、Combiner、 Partitioner、Reducer、InputFormat和OutputFormat 的類別為何 指定輸入文件 setInputPaths(JobConf, Path...) / addInputPath(JobConf, Path)) 指定輸出文件 setOutputPath(Path) debug script setMapDebugScript(String) / setReduceDebugScript(String) 最多的嘗試次數 setMaxMapAttempts(int) / setMaxReduceAttempts(int)) 容許任務失敗的百分比 setMaxMapTaskFailuresPercent(int) / setMaxReduceTaskFailuresPercent(int) ……
任務執行 runJob(JobConf ): submitJob(JobConf ): 範例一 說明 任務執行 runJob(JobConf ): 提交作業,僅當作業完成時返回。 submitJob(JobConf ): 只提交作業,之後需要你輪詢它返回的 RunningJob 句柄的狀態,並根據情況調度。 JobConf.setJobEndNotificationURI(String ): 設置一個作業完成通知,可避免輪詢。
WordCount練習 (前置) 範例一 動手做 cd $HADOOP_HOME bin/hadoop dfs -mkdir input echo "I like NCHC Cloud Course." > input1 echo "I like nchc Cloud Course, and we enjoy this crouse." > input2 bin/hadoop dfs -put input1 input bin/hadoop dfs -put input2 input bin/hadoop dfs -ls input 編輯WordCount.java http://trac.nchc.org.tw/cloud/attachment/wiki/jazz/Hadoop_Lab6/WordCount.java?format=raw mkdir MyJava
WordCount練習 (執行) 編譯 jar -cvf wordcount.jar -C MyJava . 範例一 動手做 javac -classpath hadoop-*-core.jar -d MyJava WordCount.java 封裝 jar -cvf wordcount.jar -C MyJava . 執行 bin/hadoop jar wordcount.jar WordCount input/ output/ 所在的執行目錄為Hadoop_Home ./MyJava = 編譯後程式碼目錄 wordcount.jar = 封裝後的編譯檔 先放些文件檔到HDFS上的input目錄 ./input; ./ouput = hdfs的輸入、輸出目錄
範例一 動手做 WordCount練習 (執行)
範例一 動手做 WordCount練習 (結果)
WordCount 進階版 WordCount2 功能 步驟 (接續 WordCount 的環境) 範例二 動手做 不計標點符號 不管大小寫 http://trac.nchc.org.tw/cloud/attachment/wiki/jazz/Hadoop_Lab6/WordCount2.java?format=raw 功能 不計標點符號 不管大小寫 步驟 (接續 WordCount 的環境) echo "\." >pattern.txt && echo "\," >>pattern.txt bin/hadoop dfs -put pattern.txt ./ mkdir MyJava2 javac -classpath hadoop-*-core.jar -d MyJava2 WordCount2.java jar -cvf wordcount2.jar -C MyJava2 .
範例二 動手做 不計標點符號 執行 bin/hadoop jar wordcount2.jar WordCount2 input output2 -skip pattern.txt dfs -cat output2/part-00000
範例二 動手做 不管大小寫 執行 bin/hadoop jar wordcount2.jar WordCount2 -Dwordcount.case.sensitive=false input output3 -skip pattern.txt
Tool 處理Hadoop命令執行的選項 透過介面交由程式處理 ToolRunner.run(Tool, String[]) 範例二 補充說明 Tool 處理Hadoop命令執行的選項 -conf <configuration file> -D <property=value> -fs <local|namenode:port> -jt <local|jobtracker:port> 透過介面交由程式處理 ToolRunner.run(Tool, String[])
DistributedCache 設定特定有應用到相關的、超大檔案、或只用來參考卻不加入到分析目錄的檔案 範例二 補充說明 DistributedCache 設定特定有應用到相關的、超大檔案、或只用來參考卻不加入到分析目錄的檔案 如pattern.txt檔 DistributedCache.addCacheFile(URI,conf) URI=hdfs://host:port/FilePath