Presentation is loading. Please wait.

Presentation is loading. Please wait.

CHAPTER 9 使用Hadoop實作MapReduce.

Similar presentations


Presentation on theme: "CHAPTER 9 使用Hadoop實作MapReduce."— Presentation transcript:

1 CHAPTER 9 使用Hadoop實作MapReduce

2 Outline 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例
Hadoop MapReduce專題

3 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

4 開發環境設定 Hadoop程式開發環境的架設可分為兩種;
不透過Integrated Development Environment (IDE) 透過IDE

5 不使用IDE環境(1/2) 首先設定環境變數如下: 在/etc/profile中加入Java及Hadoop的CLASSPATH:
~# vi /etc/profile CLASSPATH=/opt/hadoop/hadoop core.jar ←加入這兩行 export CLASSPATH 透過source讓設定的profile環境變數生效,或者重新登入也可以: ~# source /etc/profile 將撰寫好的Java程式編譯成class檔: ~# javac [程式名稱].java

6 不使用IDE環境(2/2) 在Hadoop上執行程式 (只有一個class檔時):
若有多個class檔,則需要將class檔包裝成jar檔: ~# jar cvf [jar檔名稱].jar [程式名稱].class 在Hadoop上執行包裝好的jar檔: /hadoop# bin/hadoop jar [jar檔名稱].jar [主函式名稱] [參數0] [參數1] …

7 使用IDE (Eclipse) (1/2) 先到Eclipse官網 ( 下載Eclipse安裝檔 Linux系統上需先安裝圖形化套件 目前下載版本是Eclipse Classic 3.6.2 下載完後解壓縮,並將解壓縮後的目錄移到/opt/eclipse

8 使用IDE (Eclipse) (2/2) 也可以開啟終端機輸入下列指令:
~# wget ~# tar zxvf eclipse-SDK linux-gtk.tar.gz ~# mv eclipse /opt/ 在/usr/local/bin/裡建立eclipse執行檔的連結: ~# ln -sf /opt/eclipse/eclipse /usr/local/bin/ 將/opt/hadoop裡的eclipse plugin搬到eclipse/plugin裡: ~# cp /opt/hadoop/contrib/eclipse-plugin/hadoop eclipse-plugin.jar /opt/eclipse/plugins/ 接下來便可以啟動Eclipse了: ~# eclipse &

9 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

10 新增專案(1/15)

11 新增專案(2/15)

12 新增專案(3/15)

13 新增專案(4/15)

14 新增專案(5/15)

15 新增專案(6/15)

16 新增專案(7/15)

17 新增專案(8/15)

18 新增專案(9/15)

19 新增專案(10/15)

20 新增專案(11/15)

21 新增專案(12/15)

22 新增專案(13/15)

23 新增專案(14/15)

24 新增專案(15/15)

25 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

26 MapReduce程式架構 MapReduce程式主要可分為三個部份 MapReduce Driver Mapper Reducer
利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合 Reducer 接受一個中間key的值和相關的一個value值的集合

27 MapReduce Driver 01. Class MapReduceDriver類別名稱 { 02. main(){
Configuration conf = new Configuration(); Job job = new Job(conf, Job名稱); job.setJarByClass( MapReduceDriver類別(即此類別) ); job.setMapperClass( Mapper類別 ); job.setReducerClass( Reducer類別 ); FileInputFormat.addInputPath( job, new Path(args[0])); FileOutputFormat.setOutputPath( job, new Path(args[1])); 10. 其它參數設定 job.waitForCompletion(true); 12. } 13. }

28 Mapper程式架構 01. class Mapper類別名稱 extends
全域變數 03. public void map( 輸入鍵類型 key, 輸入值類型 value, Context context) throws IOException, InterruptedException { Map程式碼區 context.write(IntermediateKey, IntermediateValue); 06. } 07. }

29 Reducer程式架構 01. class Reducer類別名稱 extends
Redcuer < 輸入鍵類型, 輸入值類型, 輸出鍵類型, 輸出鍵值型 > { 全域變數 03. public void reduce( 輸入鍵類型 key, Iterable< 輸入值類型 > value, Context context) throws IOException, InterruptedException { Reduce程式碼 context.write(ResultKey, ResultValue); 06. } 07. }

30 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

31 MapReduce基礎實作範例(1/2) 本範例以一個簡單的maxCPU程式說明如何使用Eclipse開發MapReduce程式
此範例中,系統每小時記錄一次CPU使用率到日誌檔中,而maxCPU程式會分析日誌檔,並透過MapReduce的方式,找出每天最高的CPU使用率。 本範例中將以這個日誌檔做為輸入檔,先在HDFS上新創一個log目錄,再根據上述格式建立一個日誌檔並上傳到log目錄中。

32 MapReduce基礎實作範例(2/2) 日誌檔中記錄的欄位分別為日期、時段及CPU使用率,目誌檔部份內容如下
2011/01/01 00:00 40 2011/01/01 01:00 30 2011/01/02 22:00 40 2011/01/02 23:00 30

33 新增Mapper類別(1/3)

34 新增Mapper類別(2/3) 之後在HadoopLab專案中便會新增一個新的package MR_Lab及mymapper.java ,並修改mymapper.java的內容: 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String day = line.substring(0, 10); String data = line.substring(17); tday.set(day); idata.set(Integer.valueOf(data)); context.write(tday, idata); 11. } 12. }

35 新增Mapper類別(3/3)

36 新增Reducer類別(1/3)

37 新增Reducer類別(2/3) 在MR_Lab的package中出現myreducer.java ,並修改myreducer.java的內容為: 01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable val : values) { maxValue = Math.max(maxValue, val.get()); } cpuUtil.set(maxValue); context.write(key, cpuUtil); 10. } 11. }

38 新增Reducer類別(3/3)

39 新增MapReduce Driver類別(1/4)

40 新增MapReduce Driver類別(2/4)
在MR_Lab的package中出現maxCPU.java ,並修改maxCPU.java的內容為: 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: maxCPU <in> <out>"); System.exit(2); } Job job = new Job(conf, "max CPU"); job.setJarByClass(maxCPU.class); job.setMapperClass(mymapper.class); job.setCombinerClass(myreducer.class); job.setReducerClass(myreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

41 新增MapReduce Driver類別(3/4)
boolean status = job.waitForCompletion(true); if (status) { System.exit(0); } else { System.err.print("Not Complete!"); System.exit(1); } 25. } 26. }

42 新增MapReduce Driver類別(4/4)

43 在Hadoop上執行MapReduce程式(1/3)

44 在Hadoop上執行MapReduce程式(2/3)

45 在Hadoop上執行MapReduce程式(3/3)
執行結束後,在HDFS上的output目錄中即可看到最後結果: 2011/01/ 2011/01/ 2011/01/ 2011/01/

46 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

47 MapReduce進階實作範例 本範例將所介紹的MapReduce程式 (maxCPU),加入HDFS及HBase的相關操作。 7.
4. 3. Mapper Reducer HBase 6. 2. 5. HDFS 1. Local host

48 新增maxCPU類別(1/2) 新增一個MapReduce Driver類別並命名為maxCPU,maxCPU類別負責MapReduce相關設定及運作流程 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: maxCPU <in> <out>"); System.exit(2); } Job job = new Job(conf, "max CPU"); job.setJarByClass(maxCPU.class); job.setMapperClass(mymapper.class); job.setCombinerClass(myreducer.class); job.setReducerClass(myreducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

49 新增maxCPU類別(2/2) FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); CheckDir.check(otherArgs[0].toString(), conf); LocalToHdfs.localToHdfs(otherArgs[0].toString(), otherArgs[0].toString(), conf); CheckDir.check(otherArgs[1].toString(), conf); CheckTable.check("CPU"); CheckTable.addFamily("CPU", "CPUUtil"); boolean status = job.waitForCompletion(true); if (status) { OutputResult.output(otherArgs[1].toString(), conf); System.exit(0); } else { System.err.print("Not Complete!"); System.exit(1); } 31. } 32. }

50 新增mymapper類別 新增一個Mapper類別並命名為mymapper,其功能為整理輸入的鍵/值,並在第10行呼叫AddData類別將資料存入HBase 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String day = line.substring(0, 10); String time = line.substring(11, 16); String data = line.substring(17); try { AddData.add("CPU", "CPUUtil", day + " " + time, data); } catch (Exception e) { System.err.print("ERROR! (add data to HBase)"); 13. } 14. tday.set(day); 15. idata.set(Integer.valueOf(data)); 16. context.write(tday, idata); 17. } 18. }

51 新增AddData類別 此類別負責將資料插入HBase中 01. public class AddData {
02. public static Configuration configuration = null; 03. static { configuration = HBaseConfiguration.create(); configuration.set("hbase.master", "Host01:60000"); configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. static void add(String table, String family, String dtime, String data) throws Exception { HTable htable = new HTable(configuration, table); Put row = new Put(dtime.getBytes()); row.add(family.getBytes(), new String("data").getBytes(), data.getBytes()); htable.put(row); htable.flushCommits(); 15. } 16. }

52 新增myreducer類別 新增一個Reducer類別並命名為myreducer,其程式碼如下
01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable val : values) { maxValue = Math.max(maxValue, val.get()); } cpuUtil.set(maxValue); context.write(key, cpuUtil); 10. } 11. }

53 新增CheckDir類別 此類別的功能為檢查HDFS上是否有已存在某個檔案或目錄,若有則刪除該檔案或目錄,因此在每次執行程式時,都能自動刪除HDFS上的輸出檔案或目錄 01. public class CheckDir { 02. static void check(final String path, Configuration conf) { Path dstPath = new Path(path); try { FileSystem hdfs = dstPath.getFileSystem(conf); if (hdfs.exists(dstPath)) { hdfs.delete(dstPath, true); } } catch (IOException e) { e.printStackTrace(); } 12. } 13. }

54 新增LocalToHdfs類別 此類別將本地端檔案系統中路徑為src的檔案或目錄上傳至HDFS中路徑為dst的目錄中
01. public class LocalToHdfs { 02. static void localToHdfs(String src, String dst, Configuration conf) { Path dstPath = new Path(dst); try { FileSystem hdfs = dstPath.getFileSystem(conf); hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); } catch (IOException e) { e.printStackTrace(); } 10. } 11. }

55 新增CheckTable類別(1/2) 此類別中的.check() method與CheckDir類別的功能類似;先檢查HBase中是否有相同的表格,若有則刪除該表格 而.addFamily() method的功能為新增表格並設定其column family。 01. public class CheckTable { 02. public static Configuration configuration = null; 03. static { configuration = HBaseConfiguration.create(); configuration.set("hbase.master", "Host01:60000"); configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void check(String table) throws Exception { HBaseAdmin admin = new HBaseAdmin(configuration); if (admin.tableExists(table)) { System.out.println("delete the table "); admin.disableTable(table);

56 新增CheckTable類別(2/2) 14. admin.deleteTable(table); 15. } 16. }
} 16. } 17. public static void addFamily(String table, String family) throws Exception { HBaseAdmin admin = new HBaseAdmin(configuration); HTableDescriptor tableDescripter = new HTableDescriptor(table .getBytes()); tableDescripter.addFamily(new HColumnDescriptor(family)); admin.createTable(tableDescripter); 22. } 23. }

57 新增OutputResult類別(1/2) 此類別由HDFS讀取myreducer的執行結果,得知每日最高CPU使用率,再呼叫ScanTable類別並傳送相關資訊,以找出每日CPU使用率最高的所有時段。 01. public class OutputResult { 02. static void output(final String path, Configuration conf) { Path dst_path = new Path(path); String day = null; String value = null; try { FileSystem hdfs = dst_path.getFileSystem(conf); FSDataInputStream in = null; if (hdfs.exists(dst_path)) { in = hdfs.open(new Path(dst_path.toString() + "/part-r-00000")); String messagein = null; while ((messagein = in.readLine()) != null) { StringTokenizer itr = new StringTokenizer(messagein); day = itr.nextToken(); value = itr.nextToken(); ScanTable.setFilter("CPU", day, value); }

58 新增OutputResult類別(2/2) 18. in.close(); 19. }
} } catch (IOException e) { e.printStackTrace(); } 23. } 24. }

59 新增ScanTable類別(1/2) 類別根據myreducer類別所計算出的每日最高CPU使用率,再到HBase表格中過濾出每日最高使用率的時段並顯示出來 01. public class ScanTable { 02. public static Configuration configuration = null; 03. static { configuration = HBaseConfiguration.create(); configuration.set("hbase.master", "Host01:60000"); configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void setFilter(String tablename, String day, String value) throws IOException { HTable table = new HTable(configuration, tablename); Scan scan = new Scan((day + " 00:00").getBytes(), (day + " 23:00").getBytes()); FilterList filterList = new FilterList(); filterList.addFilter(new SingleColumnValueFilter("CPUUtil".getBytes(), "data".getBytes(), CompareOp.EQUAL, value.getBytes())); scan.setFilter(filterList);

60 新增ScanTable類別(2/2) ResultScanner ResultScannerFilterList = table.getScanner(scan); for (Result rs = ResultScannerFilterList.next(); rs != null; rs = ResultScannerFilterList.next()) { for (KeyValue kv : rs.list()) { System.out.println(new String(kv.getRow()) + " " + new String(kv.getValue())); } } 21. } 22. }

61 執行結果 最終的輸出結果如下: 2011/01/01 16:00 100 2011/01/01 17:00 100
2011/01/02 15:00 90 2011/01/03 16:00 80 2011/01/03 17:00 80 2011/01/03 18:00 80 2011/01/04 00:00 40

62 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

63 Hadoop專題 專題內容必須包含下列技術: 可考慮加入Hadoop其他子專案技術 MapReduce HDFS Hbase
考慮網路上大量的資料,分別存放在不同的Node,並建立一組HDFS Hbase 利用Hbase過濾出重要資訊,並顯示出來 可考慮加入Hadoop其他子專案技術 Avro、Pig、Hive、Chukwa


Download ppt "CHAPTER 9 使用Hadoop實作MapReduce."

Similar presentations


Ads by Google