CHAPTER 9 使用Hadoop實作MapReduce.

Slides:



Advertisements
Similar presentations
软件编程基础 一、程序的编辑 Java 源程序是以 Java 为后缀的简单的文本文件,可以用各种 Java 集成开发环境中的源代码编辑器来编写,也可以用其他文 本编辑工具,如 Windows 中的记事本或 DOS 中的 EDIT 软件等。 利用文字编辑器编写下列程序 public class Hello.
Advertisements

程序设计语言基础 软件工程系 秦晓燕. 课程目标 了解面向对象程序设计的思想,以及软件 开发流程。 学习 Java 语言的基本概念和编程方法,基 本掌握 Java 类库的使用。 能够利用所学的方法开发简单的小型应用 程序.
第四章 类、对象和接口.
3.2 Java的类 Java 类库的概念 语言规则——程序的书写规范 Java语言 类库——已有的有特定功能的Java程序模块
第一單元 建立java 程式.
第八讲 基于Hadoop的数据仓库Hive (PPT版本号:2016年4月6日版本)
四資二甲 第三週作業 物件導向程式設計.
基于Hadoop的Map/Reduce框架研究报告
Ch.8. 基于MapReduce的图算法 MapReduce海量数据并行处理
Map-Reduce Programming
第二章 JAVA语言基础.
Introduction to MapReduce
王耀聰 陳威宇 國家高速網路與計算中心(NCHC)
設置Hadoop環境 王耀聰 陳威宇 楊順發 國家高速網路與計算中心(NCHC)
Hadoop 單機設定與啟動 step 1. 設定登入免密碼 step 2. 安裝java step 3. 下載安裝Hadoop
程式設計實作.
第3章 分布式文件系统HDFS (PPT版本号:2017年2月版本)
2.1 基本資料型別 2.2 變數 2.3 運算式與運算子 2.4 輸出與輸入資料 2.5 資料型別轉換 2.6 實例
臺北市立大學 資訊科學系(含碩士班) 賴阿福
程式設計概論 1.1 程式設計概論 程式語言的演進 物件導向程式 程式開發流程 1.2 C++開發工具
Java簡介.
JAVA vs. SQL Server 建國科技大學 資管系 饒瑞佶 2013/4 V1.
基于Hadoop的数据仓库Hive.
JDK 安裝教學 (for Win7) Soochow University
第1章 認識Arduino.
《大数据技术原理与应用》 第七章 MapReduce (2016春季学期) 林子雨 厦门大学计算机科学系 主页:
Java基础 JavaSE异常.
Unit 06 雲端分散式Hadoop實驗 -II
CHAPTER 6 認識MapReduce.
西南科技大学网络教育系列课程 高级语程序设计(Java) 第五章 继承、接口与范型.
厦门大学数据库实验室 MapReduce 连接
Google Data API Spreadsheet
Hadoop平台與應用規劃實作 報告者:劉育維.
Cloud Computing MapReduce进阶.
類別(class) 類別class與物件object.
Java语言程序设计 第五部分 Java异常处理.
Map Reduce Programming
R教學 安裝RStudio 羅琪老師.
王豐緒 銘傳大學資訊工程學系 問題:JAVA 物件檔輸出入.
安裝JDK 安裝Eclipse Eclipse 中文化
Homework 1(上交时间:10月14号) 倒排索引.
Windoop操作步驟 於作業系統Windows 10 專業版.
9.1 程式偵錯 9.2 捕捉例外 9.3 自行拋出例外 9.4 自定例外類別 9.5 多執行緒
Java 程式設計 講師:FrankLin.
异常及处理.
C/C++/Java 哪些值不是头等程序对象
Java程式設計 Eclipse.
第一單元 建立java 程式.
4.2通讯服务模块线程之间传递信息 信息工程系 向模军 Tel: QQ:
Hadoop入门
《JAVA程序设计》 语音答疑 辅导老师:高旻.
Unit 05 雲端分散式Hadoop實驗 -I M. S. Jian
Install OpenCV C++ with Visual Studio 2017 on Windows PC
Java程式初體驗大綱 大綱 在學程式之前及本書常用名詞解釋 Hello Java!程式 在Dos下編譯、執行程式
Interfaces and Packages
基于MapReduce的Join算法优化
第二章 Java语法基础.
Unix 安裝過程 使用2個磁片 到 rawwrite bootnet.img drvnet.img 利用rawwrite 將image檔寫入磁片.
基本指令.
第二章 Java基本语法 讲师:复凡.
班級:博碩子一甲 授課老師:鐘國家 助教:陳國政
Quiz1 繳交期限: 9/28(四).
安裝JDK 配置windows win7 環境變數
JAVA 程式設計與資料結構 第三章 物件的設計.
第2章 Java语言基础.
SQLite資料庫 靜宜大學資管系 楊子青.
第二章 Java基础语法 北京传智播客教育
Unix指令4-文字編輯與程式撰寫.
Develop and Build Drives by Visual C++ IDE
InputStreamReader Console Scanner
Presentation transcript:

CHAPTER 9 使用Hadoop實作MapReduce

Outline 開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop MapReduce專題

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

開發環境設定 Hadoop程式開發環境的架設可分為兩種; 不透過Integrated Development Environment (IDE) 透過IDE

不使用IDE環境(1/2) 首先設定環境變數如下: 在/etc/profile中加入Java及Hadoop的CLASSPATH: ~# vi /etc/profile CLASSPATH=/opt/hadoop/hadoop-0.20.2-core.jar ←加入這兩行 export CLASSPATH 透過source讓設定的profile環境變數生效,或者重新登入也可以: ~# source /etc/profile 將撰寫好的Java程式編譯成class檔: ~# javac [程式名稱].java

不使用IDE環境(2/2) 在Hadoop上執行程式 (只有一個class檔時): 若有多個class檔,則需要將class檔包裝成jar檔: ~# jar cvf [jar檔名稱].jar [程式名稱].class 在Hadoop上執行包裝好的jar檔: /hadoop# bin/hadoop jar [jar檔名稱].jar [主函式名稱] [參數0] [參數1] …

使用IDE (Eclipse) (1/2) 先到Eclipse官網 (http://www.eclipse.org) 下載Eclipse安裝檔 Linux系統上需先安裝圖形化套件 目前下載版本是Eclipse Classic 3.6.2 下載完後解壓縮,並將解壓縮後的目錄移到/opt/eclipse

使用IDE (Eclipse) (2/2) 也可以開啟終端機輸入下列指令: ~# wget http://ftp.cs.pu.edu.tw/pub/eclipse/eclipse/downloads/drops/R-3.6.2-201102101200/eclipse-SDK-3.6.2-linux-gtk.tar.gz ~# tar zxvf eclipse-SDK-3.6.2-linux-gtk.tar.gz ~# mv eclipse /opt/ 在/usr/local/bin/裡建立eclipse執行檔的連結: ~# ln -sf /opt/eclipse/eclipse /usr/local/bin/ 將/opt/hadoop裡的eclipse plugin搬到eclipse/plugin裡: ~# cp /opt/hadoop/contrib/eclipse-plugin/hadoop-0.20.2-eclipse-plugin.jar /opt/eclipse/plugins/ 接下來便可以啟動Eclipse了: ~# eclipse &

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

新增專案(1/15)

新增專案(2/15)

新增專案(3/15)

新增專案(4/15)

新增專案(5/15)

新增專案(6/15)

新增專案(7/15)

新增專案(8/15)

新增專案(9/15)

新增專案(10/15)

新增專案(11/15)

新增專案(12/15)

新增專案(13/15)

新增專案(14/15)

新增專案(15/15)

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

MapReduce程式架構 MapReduce程式主要可分為三個部份 MapReduce Driver Mapper Reducer 利用一個輸入key/value pair集合來產生一個輸出的key/value pair集合 Reducer 接受一個中間key的值和相關的一個value值的集合

MapReduce Driver 01. Class MapReduceDriver類別名稱 { 02. main(){ 03. Configuration conf = new Configuration(); 04. Job job = new Job(conf, Job名稱); 05. job.setJarByClass( MapReduceDriver類別(即此類別) ); 06. job.setMapperClass( Mapper類別 ); 07. job.setReducerClass( Reducer類別 ); 08. FileInputFormat.addInputPath( job, new Path(args[0])); 09. FileOutputFormat.setOutputPath( job, new Path(args[1])); 10. 其它參數設定 11. job.waitForCompletion(true); 12. } 13. }

Mapper程式架構 01. class Mapper類別名稱 extends 02. 全域變數 03. public void map( 輸入鍵類型 key, 輸入值類型 value, Context context) throws IOException, InterruptedException { 04. Map程式碼區 05. context.write(IntermediateKey, IntermediateValue); 06. } 07. }

Reducer程式架構 01. class Reducer類別名稱 extends Redcuer < 輸入鍵類型, 輸入值類型, 輸出鍵類型, 輸出鍵值型 > { 02. 全域變數 03. public void reduce( 輸入鍵類型 key, Iterable< 輸入值類型 > value, Context context) throws IOException, InterruptedException { 04. Reduce程式碼 05. context.write(ResultKey, ResultValue); 06. } 07. }

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

MapReduce基礎實作範例(1/2) 本範例以一個簡單的maxCPU程式說明如何使用Eclipse開發MapReduce程式 此範例中,系統每小時記錄一次CPU使用率到日誌檔中,而maxCPU程式會分析日誌檔,並透過MapReduce的方式,找出每天最高的CPU使用率。 本範例中將以這個日誌檔做為輸入檔,先在HDFS上新創一個log目錄,再根據上述格式建立一個日誌檔並上傳到log目錄中。

MapReduce基礎實作範例(2/2) 日誌檔中記錄的欄位分別為日期、時段及CPU使用率,目誌檔部份內容如下 2011/01/01 00:00 40 2011/01/01 01:00 30 … 2011/01/02 22:00 40 2011/01/02 23:00 30

新增Mapper類別(1/3)

新增Mapper類別(2/3) 之後在HadoopLab專案中便會新增一個新的package MR_Lab及mymapper.java ,並修改mymapper.java的內容: 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 05. String line = value.toString(); 06. String day = line.substring(0, 10); 07. String data = line.substring(17); 08. tday.set(day); 09. idata.set(Integer.valueOf(data)); 10. context.write(tday, idata); 11. } 12. }

新增Mapper類別(3/3)

新增Reducer類別(1/3)

新增Reducer類別(2/3) 在MR_Lab的package中出現myreducer.java ,並修改myreducer.java的內容為: 01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 04. int maxValue = Integer.MIN_VALUE; 05. for (IntWritable val : values) { 06. maxValue = Math.max(maxValue, val.get()); 07. } 08. cpuUtil.set(maxValue); 09. context.write(key, cpuUtil); 10. } 11. }

新增Reducer類別(3/3)

新增MapReduce Driver類別(1/4)

新增MapReduce Driver類別(2/4) 在MR_Lab的package中出現maxCPU.java ,並修改maxCPU.java的內容為: 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { 03. Configuration conf = new Configuration(); 04. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 05. if (otherArgs.length != 2) { 06. System.err.println("Usage: maxCPU <in> <out>"); 07. System.exit(2); 08. } 09. Job job = new Job(conf, "max CPU"); 10. job.setJarByClass(maxCPU.class); 11. job.setMapperClass(mymapper.class); 12. job.setCombinerClass(myreducer.class); 13. job.setReducerClass(myreducer.class); 14. job.setOutputKeyClass(Text.class); 15. job.setOutputValueClass(IntWritable.class); 16. FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 17. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

新增MapReduce Driver類別(3/4) 18. boolean status = job.waitForCompletion(true); 19. if (status) { 20. System.exit(0); 21. } else { 22. System.err.print("Not Complete!"); 23. System.exit(1); 24. } 25. } 26. }

新增MapReduce Driver類別(4/4)

在Hadoop上執行MapReduce程式(1/3)

在Hadoop上執行MapReduce程式(2/3)

在Hadoop上執行MapReduce程式(3/3) 執行結束後,在HDFS上的output目錄中即可看到最後結果: 2011/01/01 100 2011/01/02 90 2011/01/03 80 2011/01/04 30

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

MapReduce進階實作範例 本範例將所介紹的MapReduce程式 (maxCPU),加入HDFS及HBase的相關操作。 7. 4. 3. Mapper Reducer HBase 6. 2. 5. HDFS 1. Local host

新增maxCPU類別(1/2) 新增一個MapReduce Driver類別並命名為maxCPU,maxCPU類別負責MapReduce相關設定及運作流程 01. public class maxCPU { 02. public static void main(String[] args) throws Exception { 03. Configuration conf = new Configuration(); 04. String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); 05. if (otherArgs.length != 2) { 06. System.err.println("Usage: maxCPU <in> <out>"); 07. System.exit(2); 08. } 09. Job job = new Job(conf, "max CPU"); 10. job.setJarByClass(maxCPU.class); 11. job.setMapperClass(mymapper.class); 12. job.setCombinerClass(myreducer.class); 13. job.setReducerClass(myreducer.class); 14. job.setOutputKeyClass(Text.class); 15. job.setOutputValueClass(IntWritable.class); 16. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

新增maxCPU類別(2/2) 17. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 18. CheckDir.check(otherArgs[0].toString(), conf); 19. LocalToHdfs.localToHdfs(otherArgs[0].toString(), otherArgs[0].toString(), conf); 20. CheckDir.check(otherArgs[1].toString(), conf); 21. CheckTable.check("CPU"); 22. CheckTable.addFamily("CPU", "CPUUtil"); 23. boolean status = job.waitForCompletion(true); 24. if (status) { 25. OutputResult.output(otherArgs[1].toString(), conf); 26. System.exit(0); 27. } else { 28. System.err.print("Not Complete!"); 29. System.exit(1); 30. } 31. } 32. }

新增mymapper類別 新增一個Mapper類別並命名為mymapper,其功能為整理輸入的鍵/值,並在第10行呼叫AddData類別將資料存入HBase 01. public class mymapper extends Mapper<Object, Text, Text, IntWritable> { 02. private Text tday = new Text(); 03. private IntWritable idata = new IntWritable(); 04. public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 05. String line = value.toString(); 06. String day = line.substring(0, 10); 07. String time = line.substring(11, 16); 08. String data = line.substring(17); 09. try { 10. AddData.add("CPU", "CPUUtil", day + " " + time, data); 11. } catch (Exception e) { 12. System.err.print("ERROR! (add data to HBase)"); 13. } 14. tday.set(day); 15. idata.set(Integer.valueOf(data)); 16. context.write(tday, idata); 17. } 18. }

新增AddData類別 此類別負責將資料插入HBase中 01. public class AddData { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. static void add(String table, String family, String dtime, String data) throws Exception { 10. HTable htable = new HTable(configuration, table); 11. Put row = new Put(dtime.getBytes()); 12. row.add(family.getBytes(), new String("data").getBytes(), data.getBytes()); 13. htable.put(row); 14. htable.flushCommits(); 15. } 16. }

新增myreducer類別 新增一個Reducer類別並命名為myreducer,其程式碼如下 01. public class myreducer extends Reducer<Text, IntWritable, Text, IntWritable> { 02. IntWritable cpuUtil = new IntWritable(); 03. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { 04. int maxValue = Integer.MIN_VALUE; 05. for (IntWritable val : values) { 06. maxValue = Math.max(maxValue, val.get()); 07. } 08. cpuUtil.set(maxValue); 09. context.write(key, cpuUtil); 10. } 11. }

新增CheckDir類別 此類別的功能為檢查HDFS上是否有已存在某個檔案或目錄,若有則刪除該檔案或目錄,因此在每次執行程式時,都能自動刪除HDFS上的輸出檔案或目錄 01. public class CheckDir { 02. static void check(final String path, Configuration conf) { 03. Path dstPath = new Path(path); 04. try { 05. FileSystem hdfs = dstPath.getFileSystem(conf); 06. if (hdfs.exists(dstPath)) { 07. hdfs.delete(dstPath, true); 08. } 09. } catch (IOException e) { 10. e.printStackTrace(); 11. } 12. } 13. }

新增LocalToHdfs類別 此類別將本地端檔案系統中路徑為src的檔案或目錄上傳至HDFS中路徑為dst的目錄中 01. public class LocalToHdfs { 02. static void localToHdfs(String src, String dst, Configuration conf) { 03. Path dstPath = new Path(dst); 04. try { 05. FileSystem hdfs = dstPath.getFileSystem(conf); 06. hdfs.copyFromLocalFile(false, new Path(src), new Path(dst)); 07. } catch (IOException e) { 08. e.printStackTrace(); 09. } 10. } 11. }

新增CheckTable類別(1/2) 此類別中的.check() method與CheckDir類別的功能類似;先檢查HBase中是否有相同的表格,若有則刪除該表格 而.addFamily() method的功能為新增表格並設定其column family。 01. public class CheckTable { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void check(String table) throws Exception { 10. HBaseAdmin admin = new HBaseAdmin(configuration); 11. if (admin.tableExists(table)) { 12. System.out.println("delete the table "); 13. admin.disableTable(table);

新增CheckTable類別(2/2) 14. admin.deleteTable(table); 15. } 16. } 15. } 16. } 17. public static void addFamily(String table, String family) throws Exception { 18. HBaseAdmin admin = new HBaseAdmin(configuration); 19. HTableDescriptor tableDescripter = new HTableDescriptor(table .getBytes()); 20. tableDescripter.addFamily(new HColumnDescriptor(family)); 21. admin.createTable(tableDescripter); 22. } 23. }

新增OutputResult類別(1/2) 此類別由HDFS讀取myreducer的執行結果,得知每日最高CPU使用率,再呼叫ScanTable類別並傳送相關資訊,以找出每日CPU使用率最高的所有時段。 01. public class OutputResult { 02. static void output(final String path, Configuration conf) { 03. Path dst_path = new Path(path); 04. String day = null; 05. String value = null; 06. try { 07. FileSystem hdfs = dst_path.getFileSystem(conf); 08. FSDataInputStream in = null; 09. if (hdfs.exists(dst_path)) { 10. in = hdfs.open(new Path(dst_path.toString() + "/part-r-00000")); 11. String messagein = null; 12. while ((messagein = in.readLine()) != null) { 13. StringTokenizer itr = new StringTokenizer(messagein); 14. day = itr.nextToken(); 15. value = itr.nextToken(); 16. ScanTable.setFilter("CPU", day, value); 17. }

新增OutputResult類別(2/2) 18. in.close(); 19. } 19. } 20. } catch (IOException e) { 21. e.printStackTrace(); 22. } 23. } 24. }

新增ScanTable類別(1/2) 類別根據myreducer類別所計算出的每日最高CPU使用率,再到HBase表格中過濾出每日最高使用率的時段並顯示出來 01. public class ScanTable { 02. public static Configuration configuration = null; 03. static { 04. configuration = HBaseConfiguration.create(); 05. configuration.set("hbase.master", "Host01:60000"); 06. configuration.set("hbase.zookeeper.quorum", "Host01,Host02"); 07. configuration.set("hbase.zookeeper.property.clientPort", "2222"); 08. } 09. public static void setFilter(String tablename, String day, String value) throws IOException { 10. HTable table = new HTable(configuration, tablename); 11. Scan scan = new Scan((day + " 00:00").getBytes(), (day + " 23:00").getBytes()); 12. FilterList filterList = new FilterList(); 13. filterList.addFilter(new SingleColumnValueFilter("CPUUtil".getBytes(), "data".getBytes(), CompareOp.EQUAL, value.getBytes())); 14. scan.setFilter(filterList);

新增ScanTable類別(2/2) 15. ResultScanner ResultScannerFilterList = table.getScanner(scan); 16. for (Result rs = ResultScannerFilterList.next(); rs != null; rs = ResultScannerFilterList.next()) { 17. for (KeyValue kv : rs.list()) { 18. System.out.println(new String(kv.getRow()) + " " + new String(kv.getValue())); 19. } 20. } 21. } 22. }

執行結果 最終的輸出結果如下: 2011/01/01 16:00 100 2011/01/01 17:00 100 2011/01/02 15:00 90 2011/01/03 16:00 80 2011/01/03 17:00 80 2011/01/03 18:00 80 2011/01/04 00:00 40

開發環境設定 新增專案 MapReduce程式架構 MapReduce基礎實作範例 MapReduce進階實作範例 Hadoop專題

Hadoop專題 專題內容必須包含下列技術: 可考慮加入Hadoop其他子專案技術 MapReduce HDFS Hbase 考慮網路上大量的資料,分別存放在不同的Node,並建立一組HDFS Hbase 利用Hbase過濾出重要資訊,並顯示出來 可考慮加入Hadoop其他子專案技術 Avro、Pig、Hive、Chukwa