CHAPTER 9 使用Hadoop實作MapReduce
Outline 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop MapReduce專題
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
開發環境設定 Hadoop程式開發環境的架設可分為兩種; 不透過Integrated Development Environment (IDE) 透過IDE
不使用IDE環境(1/2) 首先設定環境變數如下: 在/etc/profile中加入Java及Hadoop的CLASSPATH: ~# vi /etc/profile CLASSPATH=/opt/hadoop/hadoop-0.20.2-core.jar ←加入這兩行 export CLASSPATH 透過source讓設定的profile環境變數生效,或者重新登入也可以: ~# source /etc/profile 將撰寫好的Java程式編譯成class檔: ~# javac [程式名稱].java
不使用IDE環境(2/2) 在Hadoop上執行程式 (只有一個class檔時): 若有多個class檔,則需要將class檔包裝成jar檔: ~# jar cvf [jar檔名稱].jar [程式名稱].class 在Hadoop上執行包裝好的jar檔: /hadoop# bin/hadoop jar [jar檔名稱].jar [主函式名稱] [參數0] [參數1] …
使用IDE (Eclipse) (1/2) 先到Eclipse官網 (http://www.eclipse.org) 下載Eclipse安裝檔 Linux系統上需先安裝圖形化套件 目前下載版本是Eclipse Classic 3.6.2 下載完後解壓縮,並將解壓縮後的目錄移到/opt/eclipse
使用IDE (Eclipse) (2/2) 也可以開啟終端機輸入下列指令: ~# wget http://ftp.cs.pu.edu.tw/pub/eclipse/eclipse/downloads/drops/R-3.6.2-201102101200/eclipse-SDK-3.6.2-linux-gtk.tar.gz ~# tar zxvf eclipse-SDK-3.6.2-linux-gtk.tar.gz ~# mv eclipse /opt/ 在/usr/local/bin/裡建立eclipse執行檔的連結: ~# ln -sf /opt/eclipse/eclipse /usr/local/bin/ 將/opt/hadoop裡的eclipse plugin搬到eclipse/plugin裡: ~# cp /opt/hadoop/contrib/eclipse-plugin/hadoop-0.20.2-eclipse-plugin.jar /opt/eclipse/plugins/ 接下來便可以啟動Eclipse了: ~# eclipse &
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
新增專案(1/15)
新增專案(2/15)
新增專案(3/15)
新增專案(4/15)
新增專案(5/15)
新增專案(6/15)
新增專案(7/15)
新增專案(8/15)
新增專案(9/15)
新增專案(10/15)
新增專案(11/15)
新增專案(12/15)
新增專案(13/15)
新增專案(14/15)
新增專案(15/15)
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
MapReduce程式架構 MapReduce程式主要可分為三個部份 MapReduce Driver Mapper Reducer 利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合 Reducer 接受一個中間key的值和相關的一個value值的集合
MapReduce Driver 01. Class MapReduceDriver類別名稱 { 02. main(){ 03. Configuration conf = new Configuration(); 04. Job job = new Job(conf, Job名稱); 05. job.setJarByClass( MapReduceDriver類別(即此類別) ); 06. job.setMapperClass( Mapper類別 ); 07. job.setReducerClass( Reducer類別 ); 08. FileInputFormat.addInputPath( job, new Path(args[0])); 09. FileOutputFormat.setOutputPath( job, new Path(args[1])); 10. 其它參數設定 11. job.waitForCompletion(true); 12. } 13. }
Mapper程式架構 01. class Mapper類別名稱 extends 02. 全域變數 03. public void map( 輸入鍵類型 key, 輸入值類型 value, Context context) throws IOException, InterruptedException { 04. Map程式碼區 05. context.write(IntermediateKey, IntermediateValue); 06. } 07. }
Reducer程式架構 01. class Reducer類別名稱 extends Redcuer < 輸入鍵類型, 輸入值類型, 輸出鍵類型, 輸出鍵值型 > { 02. 全域變數 03. public void reduce( 輸入鍵類型 key, Iterable< 輸入值類型 > value, Context context) throws IOException, InterruptedException { 04. Reduce程式碼 05. context.write(ResultKey, ResultValue); 06. } 07. }
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
MapReduce基礎實作範例(1/2) 本範例以一個簡單的maxCPU程式說明如何使用Eclipse開發MapReduce程式 此範例中,系統每小時記錄一次CPU使用率到日誌檔中,而maxCPU程式會分析日誌檔,並透過MapReduce的方式,找出每天最高的CPU使用率。 本範例中將以這個日誌檔做為輸入檔,先在HDFS上新創一個log目錄,再根據上述格式建立一個日誌檔並上傳到log目錄中。
MapReduce基礎實作範例(2/2) 日誌檔中記錄的欄位分別為日期、時段及CPU使用率,目誌檔部份內容如下 2011/01/01 00:00 40 2011/01/01 01:00 30 … 2011/01/02 22:00 40 2011/01/02 23:00 30
新增Mapper類別(1/3)
新增Mapper類別(2/3) 之後在HadoopLab專案中便會新增一個新的package MR_Lab及mymapper.java ,並修改mymapper.java的內容: 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 05. String line = value.toString(); 06. String day = line.substring(0, 10); 07. String data = line.substring(17); 08. tday.set(day); 09. idata.set(Integer.valueOf(data)); 10. context.write(tday, idata); 11. } 12. }
新增Mapper類別(3/3)
新增Reducer類別(1/3)
新增Reducer類別(2/3) 在MR_Lab的package中出現myreducer.java ,並修改myreducer.java的內容為: 01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 04. int maxValue = Integer.MIN_VALUE; 05. for (IntWritable val : values) { 06. maxValue = Math.max(maxValue, val.get()); 07. } 08. cpuUtil.set(maxValue); 09. context.write(key, cpuUtil); 10. } 11. }
新增Reducer類別(3/3)
新增MapReduce Driver類別(1/4)
新增MapReduce Driver類別(2/4) 在MR_Lab的package中出現maxCPU.java ,並修改maxCPU.java的內容為: 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { 03. Configuration conf = new Configuration(); 04. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 05. if (otherArgs.length != 2) { 06. System.err.println("Usage: maxCPU <in> <out>"); 07. System.exit(2); 08. } 09. Job job = new Job(conf, "max CPU"); 10. job.setJarByClass(maxCPU.class); 11. job.setMapperClass(mymapper.class); 12. job.setCombinerClass(myreducer.class); 13. job.setReducerClass(myreducer.class); 14. job.setOutputKeyClass(Text.class); 15. job.setOutputValueClass(IntWritable.class); 16. FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 17. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
新增MapReduce Driver類別(3/4) 18. boolean status = job.waitForCompletion(true); 19. if (status) { 20. System.exit(0); 21. } else { 22. System.err.print("Not Complete!"); 23. System.exit(1); 24. } 25. } 26. }
新增MapReduce Driver類別(4/4)
在Hadoop上執行MapReduce程式(1/3)
在Hadoop上執行MapReduce程式(2/3)
在Hadoop上執行MapReduce程式(3/3) 執行結束後,在HDFS上的output目錄中即可看到最後結果: 2011/01/01 100 2011/01/02 90 2011/01/03 80 2011/01/04 30
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
MapReduce進階實作範例 本範例將所介紹的MapReduce程式 (maxCPU),加入HDFS及HBase的相關操作。 7. 4. 3. Mapper Reducer HBase 6. 2. 5. HDFS 1. Local host
新增maxCPU類別(1/2) 新增一個MapReduce Driver類別並命名為maxCPU,maxCPU類別負責MapReduce相關設定及運作流程 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { 03. Configuration conf = new Configuration(); 04. String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); 05. if (otherArgs.length != 2) { 06. System.err.println("Usage: maxCPU <in> <out>"); 07. System.exit(2); 08. } 09. Job job = new Job(conf, "max CPU"); 10. job.setJarByClass(maxCPU.class); 11. job.setMapperClass(mymapper.class); 12. job.setCombinerClass(myreducer.class); 13. job.setReducerClass(myreducer.class); 14. job.setOutputKeyClass(Text.class); 15. job.setOutputValueClass(IntWritable.class); 16. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
新增maxCPU類別(2/2) 17. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 18. CheckDir.check(otherArgs[0].toString(), conf); 19. LocalToHdfs.localToHdfs(otherArgs[0].toString(), otherArgs[0].toString(), conf); 20. CheckDir.check(otherArgs[1].toString(), conf); 21. CheckTable.check("CPU"); 22. CheckTable.addFamily("CPU", "CPUUtil"); 23. boolean status = job.waitForCompletion(true); 24. if (status) { 25. OutputResult.output(otherArgs[1].toString(), conf); 26. System.exit(0); 27. } else { 28. System.err.print("Not Complete!"); 29. System.exit(1); 30. } 31. } 32. }
新增mymapper類別 新增一個Mapper類別並命名為mymapper,其功能為整理輸入的鍵/值,並在第10行呼叫AddData類別將資料存入HBase 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 05. String line = value.toString(); 06. String day = line.substring(0, 10); 07. String time = line.substring(11, 16); 08. String data = line.substring(17); 09. try { 10. AddData.add("CPU", "CPUUtil", day + " " + time, data); 11. } catch (Exception e) { 12. System.err.print("ERROR! (add data to HBase)"); 13. } 14. tday.set(day); 15. idata.set(Integer.valueOf(data)); 16. context.write(tday, idata); 17. } 18. }
新增AddData類別 此類別負責將資料插入HBase中 01. public class AddData { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. static void add(String table, String family, String dtime, String data) throws Exception { 10. HTable htable = new HTable(configuration, table); 11. Put row = new Put(dtime.getBytes()); 12. row.add(family.getBytes(), new String("data").getBytes(), data.getBytes()); 13. htable.put(row); 14. htable.flushCommits(); 15. } 16. }
新增myreducer類別 新增一個Reducer類別並命名為myreducer,其程式碼如下 01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 04. int maxValue = Integer.MIN_VALUE; 05. for (IntWritable val : values) { 06. maxValue = Math.max(maxValue, val.get()); 07. } 08. cpuUtil.set(maxValue); 09. context.write(key, cpuUtil); 10. } 11. }
新增CheckDir類別 此類別的功能為檢查HDFS上是否有已存在某個檔案或目錄,若有則刪除該檔案或目錄,因此在每次執行程式時,都能自動刪除HDFS上的輸出檔案或目錄 01. public class CheckDir { 02. static void check(final String path, Configuration conf) { 03. Path dstPath = new Path(path); 04. try { 05. FileSystem hdfs = dstPath.getFileSystem(conf); 06. if (hdfs.exists(dstPath)) { 07. hdfs.delete(dstPath, true); 08. } 09. } catch (IOException e) { 10. e.printStackTrace(); 11. } 12. } 13. }
新增LocalToHdfs類別 此類別將本地端檔案系統中路徑為src的檔案或目錄上傳至HDFS中路徑為dst的目錄中 01. public class LocalToHdfs { 02. static void localToHdfs(String src, String dst, Configuration conf) { 03. Path dstPath = new Path(dst); 04. try { 05. FileSystem hdfs = dstPath.getFileSystem(conf); 06. hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); 07. } catch (IOException e) { 08. e.printStackTrace(); 09. } 10. } 11. }
新增CheckTable類別(1/2) 此類別中的.check() method與CheckDir類別的功能類似;先檢查HBase中是否有相同的表格,若有則刪除該表格 而.addFamily() method的功能為新增表格並設定其column family。 01. public class CheckTable { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void check(String table) throws Exception { 10. HBaseAdmin admin = new HBaseAdmin(configuration); 11. if (admin.tableExists(table)) { 12. System.out.println("delete the table "); 13. admin.disableTable(table);
新增CheckTable類別(2/2) 14. admin.deleteTable(table); 15. } 16. } 15. } 16. } 17. public static void addFamily(String table, String family) throws Exception { 18. HBaseAdmin admin = new HBaseAdmin(configuration); 19. HTableDescriptor tableDescripter = new HTableDescriptor(table .getBytes()); 20. tableDescripter.addFamily(new HColumnDescriptor(family)); 21. admin.createTable(tableDescripter); 22. } 23. }
新增OutputResult類別(1/2) 此類別由HDFS讀取myreducer的執行結果,得知每日最高CPU使用率,再呼叫ScanTable類別並傳送相關資訊,以找出每日CPU使用率最高的所有時段。 01. public class OutputResult { 02. static void output(final String path, Configuration conf) { 03. Path dst_path = new Path(path); 04. String day = null; 05. String value = null; 06. try { 07. FileSystem hdfs = dst_path.getFileSystem(conf); 08. FSDataInputStream in = null; 09. if (hdfs.exists(dst_path)) { 10. in = hdfs.open(new Path(dst_path.toString() + "/part-r-00000")); 11. String messagein = null; 12. while ((messagein = in.readLine()) != null) { 13. StringTokenizer itr = new StringTokenizer(messagein); 14. day = itr.nextToken(); 15. value = itr.nextToken(); 16. ScanTable.setFilter("CPU", day, value); 17. }
新增OutputResult類別(2/2) 18. in.close(); 19. } 19. } 20. } catch (IOException e) { 21. e.printStackTrace(); 22. } 23. } 24. }
新增ScanTable類別(1/2) 類別根據myreducer類別所計算出的每日最高CPU使用率,再到HBase表格中過濾出每日最高使用率的時段並顯示出來 01. public class ScanTable { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void setFilter(String tablename, String day, String value) throws IOException { 10. HTable table = new HTable(configuration, tablename); 11. Scan scan = new Scan((day + " 00:00").getBytes(), (day + " 23:00").getBytes()); 12. FilterList filterList = new FilterList(); 13. filterList.addFilter(new SingleColumnValueFilter("CPUUtil".getBytes(), "data".getBytes(), CompareOp.EQUAL, value.getBytes())); 14. scan.setFilter(filterList);
新增ScanTable類別(2/2) 15. ResultScanner ResultScannerFilterList = table.getScanner(scan); 16. for (Result rs = ResultScannerFilterList.next(); rs != null; rs = ResultScannerFilterList.next()) { 17. for (KeyValue kv : rs.list()) { 18. System.out.println(new String(kv.getRow()) + " " + new String(kv.getValue())); 19. } 20. } 21. } 22. }
執行結果 最終的輸出結果如下: 2011/01/01 16:00 100 2011/01/01 17:00 100 2011/01/02 15:00 90 2011/01/03 16:00 80 2011/01/03 17:00 80 2011/01/03 18:00 80 2011/01/04 00:00 40
開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題
Hadoop專題 專題內容必須包含下列技術: 可考慮加入Hadoop其他子專案技術 MapReduce HDFS Hbase 考慮網路上大量的資料,分別存放在不同的Node,並建立一組HDFS Hbase 利用Hbase過濾出重要資訊,並顯示出來 可考慮加入Hadoop其他子專案技術 Avro、Pig、Hive、Chukwa