Map-Reduce Programming 王耀聰 陳威宇 jazz@nchc.org.tw waue@nchc.org.tw 國家高速網路與計算中心(NCHC) 1
Outline 概念 程式基本框架及執行步驟方法 範例一: 範例二: Hadoop 的 Hello World => Word Count 說明 動手做 範例二: 進階版=> Word Count 2 2 2
Program Prototype (v 0.18) Map 程式碼 Reduce 程式碼 其他的設定參數程式碼 程式基本 框架 Class MR{ Class Mapper …{ } Class Reducer …{ main(){ JobConf conf = new JobConf(“MR.class”); conf.setMapperClass(Mapper.class); conf.setReduceClass(Reducer.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }} Map 程式碼 Map區 Reduce 程式碼 Reduce 區 設定區 其他的設定參數程式碼 3 3
程式基本 框架 Class Mapper 1 2 3 4 5 6 7 8 9 class MyMap extends MapReduceBase implements Mapper < , , , > { // 全域變數區 public void map ( key , value , OutputCollector < , > output, Reporter reporter) throws IOException { // 區域變數與程式邏輯區 output.collect( NewKey, NewValue); } INPUT KEY INPUT VALUE OUTPUT KEY OUTPUT VALUE INPUT KEY INPUT VALUE OUTPUT KEY OUTPUT VALUE 4 4
程式基本 框架 Class Reducer 1 2 3 4 5 6 7 8 9 class MyRed extends MapReduceBase implements Reducer < , , , > { // 全域變數區 public void reduce ( key, Iterator< > values, OutputCollector< , > output, Reporter reporter) throws IOException { // 區域變數與程式邏輯區 output.collect( NewKey, NewValue); } INPUT KEY INPUT VALUE OUTPUT KEY OUTPUT VALUE INPUT KEY INPUT VALUE OUTPUT KEY OUTPUT VALUE 5 5
程式基本 框架 Class Combiner 指定一個combiner,它負責對中間過程的輸出進行聚集,這會有助於降低從Mapper 到Reducer數據傳輸量。 可不用設定交由Hadoop預設 也可不實做此程式,引用Reducer 設定 JobConf.setCombinerClass(Class) 6 6
Run Job runJob(JobConf ) 提交作業,僅當作業完成時返回。 submitJob(JobConf ) 程式基本 框架 Run Job runJob(JobConf ) 提交作業,僅當作業完成時返回。 submitJob(JobConf ) 只提交作業,之後需要你輪詢它返回的 RunningJob 句柄的狀態,並根據情況調度。 JobConf.setJobEndNotificationURI(String) 設置一個作業完成通知,可避免輪詢。 7 7
Word Count Sample (1) 範例 程式一 class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map( LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = ((Text) value).toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); }}} 1 2 3 4 5 6 7 8 9 Input key <word,one> …………………. ………………… No news is a good news. /user/hadooper/input/a.txt < no , 1 > no news is good a < news , 1 > itr itr itr itr itr itr itr < is , 1 > Input value line < a, 1 > < good , 1 > 8 < news, 1 > 8
Word Count Sample (2) 範例 程式一 news 1 1 3 4 5 6 7 8 class ReduceClass extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable> { IntWritable SumValue = new IntWritable(); public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) sum += values.next().get(); SumValue.set(sum); output.collect(key, SumValue); }} <word,one> < no , 1 > news <key,SunValue> < news , 1 > 1 1 < is , 1 > < news , 2 > < a, 1 > < good , 1 > 9 < news, 1 > 9
Word Count Sample (3) 範例 程式一 conf.setCombinerClass(Reduce.class); Class WordCount{ main() JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); // set path FileInputFormat.setInputPaths(new Path(args[0])); FileOutputFormat.setOutputPath(new Path(args[1])); // set map reduce conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(ReduceClass.class); // run JobClient.runJob(conf); }} 10 10
編譯與執行 jar Δ -cvf Δ MyJar.jar Δ -C Δ MyJava Δ . 編譯 執行流程 基本步驟 javac Δ -classpath Δ hadoop-*-core.jar Δ -d Δ MyJava Δ MyCode.java 封裝 jar Δ -cvf Δ MyJar.jar Δ -C Δ MyJava Δ . 執行 bin/hadoop Δ jar Δ MyJar.jar Δ MyCode Δ HDFS_Input/ Δ HDFS_Output/ 所在的執行目錄為Hadoop_Home ./MyJava = 編譯後程式碼目錄 Myjar.jar = 封裝後的編譯檔 先放些文件檔到HDFS上的input目錄 ./input = hdfs的輸入目錄 ./ouput = hdfs的輸出目錄 11 11
WordCount1 練習 (I) 範例一 動手做 cd $HADOOP_HOME bin/hadoop dfs -mkdir input echo "I like NCHC Cloud Course." > inputwc/input1 echo "I like nchc Cloud Course, and we enjoy this crouse." > inputwc/input2 bin/hadoop dfs -put inputwc inputwc bin/hadoop dfs -ls input 12 12
WordCount1 練習 (II) 範例一 動手做 編輯WordCount.java http://trac.nchc.org.tw/cloud/attachment/wiki/jazz/Hadoop_Lab6/WordCount.java?format=raw mkdir MyJava javac -classpath hadoop-*-core.jar -d MyJava WordCount.java jar -cvf wordcount.jar -C MyJava . bin/hadoop jar wordcount.jar WordCount input/ output/ 所在的執行目錄為Hadoop_Home(因為hadoop-*-core.jar ) javac編譯時需要classpath, 但hadoop jar時不用 wordcount.jar = 封裝後的編譯檔,但執行時需告知class name Hadoop進行運算時,只有 input 檔要放到hdfs上,以便hadoop分析運算;執行檔(wordcount.jar)不需上傳,也不需每個node都放,程式的載入交由java處理 13 13
範例一 動手做 WordCount1 練習(III) 14 14
範例一 動手做 WordCount1 練習(IV) 15 15
WordCount 進階版 WordCount2 功能 步驟 (接續 WordCount 的環境) 範例二 動手做 不計標點符號 不管大小寫 http://trac.nchc.org.tw/cloud/raw-attachment/wiki/jazz/Hadoop_Lab6/WordCount2.java 功能 不計標點符號 不管大小寫 步驟 (接續 WordCount 的環境) echo "\." >pattern.txt && echo "\," >>pattern.txt bin/hadoop dfs -put pattern.txt ./ mkdir MyJava2 javac -classpath hadoop-*-core.jar -d MyJava2 WordCount2.java jar -cvf wordcount2.jar -C MyJava2 . 16 16
範例二 動手做 不計標點符號 執行 bin/hadoop jar wordcount2.jar WordCount2 input output2 -skip pattern.txt dfs -cat output2/part-00000 17 17
範例二 動手做 不管大小寫 執行 bin/hadoop jar wordcount2.jar WordCount2 -Dwordcount.case.sensitive=false input output3 -skip pattern.txt 18 18
Tool 處理Hadoop命令執行的選項 -conf <configuration file> 範例二 補充說明 Tool 處理Hadoop命令執行的選項 -conf <configuration file> -D <property=value> -fs <local|namenode:port> -jt <local|jobtracker:port> 透過介面交由程式處理 ToolRunner.run(Tool, String[]) 19 19
DistributedCache 設定特定有應用到相關的、超大檔案、或只用來參考卻不加入到分析目錄的檔案 如pattern.txt檔 範例二 補充說明 DistributedCache 設定特定有應用到相關的、超大檔案、或只用來參考卻不加入到分析目錄的檔案 如pattern.txt檔 DistributedCache.addCacheFile(URI,conf) URI=hdfs://host:port/FilePath 20 20