Cloud Computing MapReduce进阶.

Slides:



Advertisements
Similar presentations
大数据基础技术和应用. 大纲 大数据概述 大数据基础技术 工程技术 策略技术 典型应用 我们处于数据爆炸的时代 数据库 文字记录 照片 线下数据信息化 网页数据 用户行为记录 数字图像 互联网 - 移动互联网 设备监控 智能家居 摄像头 传感器 地球上至今总共的数据量: 在 2006 年,个人用户才刚刚迈.
Advertisements

软件编程基础 一、程序的编辑 Java 源程序是以 Java 为后缀的简单的文本文件,可以用各种 Java 集成开发环境中的源代码编辑器来编写,也可以用其他文 本编辑工具,如 Windows 中的记事本或 DOS 中的 EDIT 软件等。 利用文字编辑器编写下列程序 public class Hello.
第四章 类、对象和接口.
3.2 Java的类 Java 类库的概念 语言规则——程序的书写规范 Java语言 类库——已有的有特定功能的Java程序模块
JAVA 编 程 技 术 主编 贾振华 2010年1月.
项目7 面向对象高级.
项目6 通用堆栈.
第16章 代理模式 Website:
四資二甲 第三週作業 物件導向程式設計.
基于Hadoop的Map/Reduce框架研究报告
Ch.8. 基于MapReduce的图算法 MapReduce海量数据并行处理
面向对象的程序设计(一).
MVC Servlet与MVC设计模式.
Hadoop与数据分析 淘宝数据平台及产品部基础研发组 周敏 日期:
第二章 JAVA语言基础.
類別與物件 Class & Object.
Ch07 介面與多重繼承 物件導向程式設計(II).
第5章 进一步讨论对象和类.
Introduction to MapReduce
Ch08 巢狀類別 物件導向程式設計(II).
2.1 基本資料型別 2.2 變數 2.3 運算式與運算子 2.4 輸出與輸入資料 2.5 資料型別轉換 2.6 實例
Hadoop I/O By ShiChaojie.
《大数据技术原理与应用》 第七章 MapReduce (2016春季学期) 林子雨 厦门大学计算机科学系 主页:
Java基础 JavaSE异常.
JSP自定义标签技术的分析与应用 ----Custom Tag 的分析与应用
Classes Lecturer: 曾學文.
程式敘述執行順序的轉移 控制與重複、方法 Lecturer:曾學文.
CHAPTER 6 認識MapReduce.
Java 程式設計 講師:FrankLin.
西南科技大学网络教育系列课程 高级语程序设计(Java) 第五章 继承、接口与范型.
厦门大学数据库实验室 MapReduce 连接
程式設計實作.
Java语言程序设计 第五部分 Java异常处理.
Java程序设计 第9章 继承和多态.
《大数据技术原理与应用》 第十二讲 图计算 (2016春季学期)
Skew Join相关论文 报告人:蔡珉星 厦大数据库实验室
王豐緒 銘傳大學資訊工程學系 問題:JAVA 物件檔輸出入.
基于大数据的物流资源整合 福建师范大学协和学院 沈庆琼.
Homework 1(上交时间:10月14号) 倒排索引.
C#面向对象程序设计 $7 继承和多态性.
SPOTO TM JAVA课程 JAVA中的OO语法
中国矿大计算机学院杨东平 第5章 接口和包 中国矿大计算机学院杨东平
并发机制 结果应该为: 线程 1: 1 线程 1: 2 线程 1: 3 线程 1: 4 线程 1: 5 线程 2: 6 线程 2: 7
9.1 程式偵錯 9.2 捕捉例外 9.3 自行拋出例外 9.4 自定例外類別 9.5 多執行緒
第9讲 Java的继承与多态(一) 类的继承 子类的创建 方法覆盖.
2019/1/16 Java语言程序设计-类与对象 教师:段鹏飞.
Java程序设计 第2章 基本数据类型及操作.
Ch02-基礎語法.
C/C++/Java 哪些值不是头等程序对象
C++语言程序设计 C++语言程序设计 第七章 类与对象 第十一组 C++语言程序设计.
* 單元:電腦與問題解決 主題:Java物件導向程式設計-類別與物件 台南縣國立善化高中 蕭嘉民 老師
Hadoop入门
辅导课程八.
Hadoop与数据分析 淘宝数据平台及产品部基础研发组 周敏 日期:
JAVA 编 程 技 术 主编 贾振华 2010年1月.
C#程序设计基础 $3 成员、变量和常量.
Java程式初體驗大綱 大綱 在學程式之前及本書常用名詞解釋 Hello Java!程式 在Dos下編譯、執行程式
Interfaces and Packages
第三章 数据抽象.
基于MapReduce的Join算法优化
第二章 Java语法基础.
第二章 Java基本语法 讲师:复凡.
基于位置感知和负载均衡 MapReduce的Join算法优化 汇报人:黄梓铭 厦大数据库实验室
第6單元 6-1 類別的繼承 (Class Inheritance) 6-2 抽象類別 (Abstract Class)
辅导课程十二.
JAVA 程式設計與資料結構 第三章 物件的設計.
第2章 Java语言基础.
第二章 Java基础语法 北京传智播客教育
輸出執行結果到螢幕上 如果要將執行結果的文字和數值都「輸出」到電腦螢幕時,程式要怎麼寫? class 類別名稱 {
Summary
Presentation transcript:

Cloud Computing MapReduce进阶

用户定制Partitioner和Combiner 主要内容(6学时) 简介 复合键值对的使用 用户定制数据类型 用户定制输入/输出格式 用户定制Partitioner和Combiner 组合式MapReduce计算作业 多数据源的连接 全局参数/数据文件的传递与使用 关系数据库的连接与访问

简介 MPI(Message Passing Interface) 等并行编程方法 缺少对高层并行编程模型和统一计算框架的支持,需要程序员处理许 多底层细节, 为此MapReduce在三个层面上做了系统而巧妙的设计 构思。 在大数据处理的基本方法上,对相互计算依赖不大的数据采取“分 而治之”的处理策略。 借鉴了Lisp语言中的思想,用Map和Reduce两个函数提供了高层 的并行编程抽象模型和接口。 对于诸多的底层实现和处理细节MapReduce提供了一个统一的计 算框架,大大减轻了程序员在编程是的负担。

复合键值对的使用 把小的键值对合并成大的键值对 Map计算过程中所产生的中间结果键值对需要通过网络传输给 Reduce节点,大规模的键值对可能会大幅增大网络通信开销,并 且降低程序执行速度,为此开采用一个基本的优化方法,即把大量 小的键值对合并为较大的键值对。 例如在单词同现矩阵计算中,单词a可能会与多个其他单词共同出 现,因而一个Map可能会产生很多个单词a与其他单词的键值对, 如下:

复合键值对的使用 <a, b> 1 <a, c> 3 a {b:1,c:3, d:5, e:8, f:4} <a, d> 4 <a, e> 8 <a, f> 4

巧用复合键让系统完成排序 Map计算过程中,系统自动按照Map的输出键进行排序, 因此进入Reduce的键值对都是按照key值排序的,但有时 希望value也按一定规则排序。 方法1:在Reduce过程中对{value}列表中的值进行 本地排序,但当{value}列表数据量巨大时 必须使用复杂的外排算法,会很耗时。 方法2:将value中需要排序的部分加入到key中, 形成复合键,这样能利用MapReduce系统 的排序功能自动完成排序。

用户定制数据类型 Hadoop内置的数据类型 BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable:双字节数 FloatWritable:浮点数 IntWritable:整型数 LongWritable:长整型数 Text:使用UTF8格式存储的文本 NullWritable:当<key, value>中的key或value为空时使用

用户定制数据类型 自定义数据类型的实现 首先实现Writable接口,以便该数据能被序列化后完成 网络传输或文件输入/输出; 其次,如果该数据需要作为key使用,或者要比较数值 大小时,则需要实现 WritableComparable接口。 例如将一个三维坐标P(x,y,z)定制为一个数据类型 pubic class Point3D implements Writable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException }

用户定制数据类型 如果Point3D还需要作为主键值使用,或者需要比 较大小时,还应该实现WritableComparable接口 pubic class Point3D implements WritableComparable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException public int compareTo(Point3D p) //具体实现比较当前的this(x,y,z)与p(x,y,z)的位置 //并输出-1,0,1 }

用户定制输入/输出格式 Hadoop内置数据输入格式和RecordReader TextInputFormat:是系统默认的数据输入格式,可以文 本文件分块逐行读入,读入一行时,所产生的key为当前 行在整个文件中的字节偏移位置,而value就是行内容。 KeyValueInputFormat:是另一个常用的数据输入格式, 可将一个安照<key, value>格式逐行存放的文件逐行读 出,并自动解析成相应的key和value。

用户定制输入/输出格式 RecordReader:对于一个数据输入格式,都需要有一个 对应的RecordReader。 RecordReader主要用于将一个 文件中的数据记录分拆成具体的键值对,传给Map函数。 例如: TextInputFormat的默认RecordReader为Line RecordReader, KeyValueInputFormat的默认 RecordReader为KeyValueLine RecordReader。 除此之外,系统还提供很多输入格式,例如: AutoInputFormat, CombineFileInputFormat等

用户定制数据输入格式与RecordReader 假设需要定制 FileNameLocInputFormat 与 FileNameLocRecordReader, 以便产生 FileName@LineOffset主键值,则可定制如下代码 pubic class FileNameLocInputFormat extends FileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) FileNameLocRecordReader fnrr = new FileNameLocRecordReader(); fnrr.initialize(split, context); ………… }

用户定制数据输入格式与RecordReader pubic class FileNameLocRecordReader extends RecordReader<Text, Text> { String FileName; LineRecordReader lrr = new LineRecordReader(); @Override public Text getCurrnetKey() return new Text(“+FileName+”@”+lrr.getCurrentKey()+”); } public void initialize(InputSplit arg0, TaskAttemptContext arg1) ……………………….

用户定制输入/输出格式 Hadoop内置的数据输出格式与RecordWriter Hadoop提供了丰富的内置数据输出格式。最常用的是 TextOutputFormat,也是系统默认的输出格式,可以 将计算结果以key + \t +value的形式逐行输入到文件 中。 数据输出格式也提供一个对应的RecordWriter,以便系 统明确输出结果写到文件的具体格式。 TextOutputFormat的默认RecordWriter是 LineRecordWriter。

通过定制数据输出格式实现多集合文件输出 默认情况下,MapReduce将产生包含一至多个文件的单个 输出数据文件集合。但有时候需要输出多个文件集合。比如 ,在处理专利数据是,希望根据不同国家,将每个国家的专 利数据记录输出到不同国家的文件目录中。 Hadoop提供了MultipleOutputFormat类帮助完成这一处 理功能。在Reduce进行数据输出前,需要定制 MultioleOutputFormat的一个子类,实现其中的一个重要 方法 protected String generateFileNameForKeyValue(K key, V value, String name) 通过该放过,程序可以根据输入的主键产生并返回一个所期望的输出数据文 件名和路径

用户定制Partitioner Hadoop MapReduce 提供了默认的Partition来完 成Map节点数据的中间结果向Reduce节点的分区 处理。大多数情况下,应用程序仅使用默认的 HashPartitiner即可满足计算要求,但有时候,数 据可能不会被均匀的分配到每个Reduce上,或者 不符合应用程序的分区要求,这时,就需要定制自 己的Partitioner。

用户定制Partitioner 定制一个Partitioner大致方法如下,继承 HashPartitioner,并重载getPartition()方法 class NewPartitioner extends HashPartitioner<K, V> { //override the method getPartition(K key, V value, int numReduceTasks) {………………….} }

用户定制Combiner 用户也可以根据需要定制自己的Combiner,以减 少Map阶段输出中间结果的数量,降低数据的网络 传输开销。 class NewCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throw IOException, InterruptedException {………………………} }

组合式MapReduce计算作业 一些复杂任务难以用一趟MapReduce处理完成, 需要将其拆分为多趟简单的MapReduce子任务进 行处理。 本节将介绍多种不同形式的组合MapReduce任务 ,包括迭代方法, 顺序组合方法, 具有依赖关系 的组合方法,以及链式方法

迭代MapReduce计算任务 当使用MapReduce进行这样的问题求解时,运行 一趟MapReduce过程将无法完成整个求解结果, 因此,需要采用迭代法方循环运行该MapReduce 过程,直到达到一个逼近结果。 例如页面排序算法PageRank就是这一类需要用循 环迭代MapReduce计算进行求解的问题。

顺序组合式MapReduce计算任务 多个MapReduce子任务可以注意手工执行,但更方便的 做法是将这些子任务串起来,前面的输出作为后面的输入 。例如 mapreduce1 mapreduce2 mapreduce3

具有复杂依赖关系的组合式MapReduce作业 例如,一个MapReduce作业有x, y, z三个子任务 ,它们的依赖关系如下: Job Jobx Joby Jobz

具有复杂依赖关系的组合式MapReduce作业 则他们的配置程序如下 //配置jobx Configuration jobxconf = …. //设置JobControl Job jobx = … JobCOntrol jc = new JobControl(); //配置joby jc.addJob(jobx); Configuration jobyconf = …. jc.addJob(joby); Job joby = … jc.addJob(jobz); //配置jobz jc.run(); Configuration jobzconf = …. Job jobz = … //设置依赖关系 jobz.addDependingJob(jobx); jobz.addDependingJob(joby);

MapReduce前处理和后处理步骤的链式执行 一个MapReduce作业可能会有一些前处理和后处理步骤 ,比如文档倒排索引处理前需要去除“停用词”,若将其 设置为一个单独的MapReduce作业,则可能会影响核心 作业的效率。 为此,一个较好的方法是在核心的Map和Reduce过程之 外,把这些前后处理步骤实现为一些辅助的Map过程,将 这些辅助过程与核心Map和Reduce过程合并为一个链式 MapReduce任务。

MapReduce前处理和后处理步骤的链式执行 Hadoop为此专门提供了链式Mapper(ChainMapper) 和 链式Reducer(ChainReducer)来完成这种处理。 ChainMapper允许在单一Map任务中添加和使用多个 Map子任务;而ChainReducer允许在一个单一Reduce 任务执行了Reduce处理后,继续使用多个Map子任务完 成后续处理。

多数据源的连接 一个MapReduce任务可能需要访问多个数据集, 在关系数据库中,这将是两个或多个表的连接 (join)。Hadoop 系统没有关系数据库那样强大的 连接处理功能,大多数时候需要程序员自己实现, 本节介绍基于DataJoin类库实现Reduce端连接的 方法,用全局文件复制实现Map端连接的方法,带 Map端过滤的Reduce端连接方法,以及 MapRedude数据连接方法的限制

用DataJoin类库实现Reduce端连接 首先需要为不同数据源下的每个数据记录定义一个数据源 标签(Tag)。 进一步,为了能准确的标识一个数据源下的每个数据记录 并完成连接处理,需要为每个带连接的数据记录确定一个 连接主键(GroupKey) 最后, DataJoin类库分别在Map和Reduce阶段提供一个 处理框架,并尽可能的帮助程序员完成一些处理工作,剩 余部分必须自己完成。

Map处理过程 根据GroupKey进行分区 Map Map 1 3 2 1 3 2 4 3 数据源Customers 1,王二,025-1111-1111 2,张三,021-2222-2222 3,李四,025-3333-3333 4,孙五,010-4444-4444 数据源Orders 3,订单1,90, 2011.8.1 1,订单2,130,2011.8.6 2,订单3,220,2011.8.10 3,订单4,160,2011.8.18 Map Map 1 Customers 1,王二,025-1111-1111 Orders 3,订单1,90, 2011.8.1 3 Customers 2,张三,021-2222-2222 Orders 1,订单2,130,2011.8.6 2 1 Customers 3,李四,025-3333-3333 Orders 2,订单3,220,2011.8.10 3 2 Customers 4,孙五,010-4444-4444 Orders 3,订单4,160,2011.8.18 4 3

Reduce处理过程 Reduce节点接收到这些带标签的数据记录后, Reduce过程将对不同数据源标签下具有相同 GroupKey的记录进行笛卡尔叉积,自动生成所有 不同的叉积组合。然后对每一个叉积组合,由程序 员实现一个combine()方法,将这些具有相同 GroupKey的记录进行适当的处理,以完成连接。 过程如图:

Reduce过程 3 Reduce Customers 3,李四,025-3333-3333 Orders 3,订单1,90, 2011.8.1 Orders 3,订单4,160,2011.8.18 Reduce Customers 3,李四,025-3333-3333 Customers 3,李四,025-3333-3333 Orders 3,订单1,90, 2011.8.1 Orders 3,订单4,160,2011.8.18 Combine() Combine() 3,李四,025-3333-3333,订单1,90,2011.8.1 3,李四,025-3333-3333,订单4,160,2011.8.18

用全局文件复制方法实现Map端连接 前述用DataJoin类实现的Reduce端连接方法中, 连接操作直到Reduce端才能进行,因为很多无效 的连接组合数据在Reduce阶段才能去除,所以大 量的网络带宽被用来传输不用的数据,因此,这种 方法效率不高。 当数据源的数据量较小时,能够放在单节点的内存 时,可以使用称为“复制连接”的全局文件复制方 法,把较小的数据源复制到每个Map节点上,然后 在Map阶段完成连接操作

用全局文件复制方法实现Map端连接 Hadoop提供了一个Distributed Cache机制用于将一个 或多个文件分布复制到所有节点上。要利用此机制,需要 涉及到以下两部分的设置 (1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中的文件路径

用全局文件复制方法实现Map端连接 当较小的数据源文件也无法放入内存时,可采用以 下办法: (1) 可以首先过滤较小数据源文件中的记录,只保 留需要进行连接的记录 (2) 可以将较小数据源文件分割为能n个小文件,其 中每个小文件都能放入内存处理,然后分别对 这n个小文件用全局文件复制方法进行与较大源 文件的连接,最后把结果合并起来

带Map端过滤的Reduce端连接 如果过滤后数据仍然无法放在内存中处理,可采用 带Map端过滤的Reduce端连接处理。 具体过程为在Map端先生成一个仅包含连接主键的 过滤文件,由于这个文件的数据量大大降低,则可 将这个文件存放在Distributed Cache文件中,然 后在Map端过滤掉主键不在这个列表中的所有记录 ,然后再实现正常的Reduce端连接

全局参数的传递 public void set(String name, String value);//设置字符串属性 为了能让用户灵活设置某些作业参数,一个 MapReduce计算任务可能需要在执行时从命令行 输入这些作业参数,并将这个参数传递给各个节点 Configuration类为此专门提供了用于保存和获取 属性的方法例如: public void set(String name, String value);//设置字符串属性 public void get(String name, String defaultValue) //读取字符串属性 //将一个参数设置为name属性 jobconf.set(“name”, args[0]); //然后可在Mapper或Reducer类的初始化方法setup中从Configuration对象读出该 属性值 jobconf.get(“name”, “”);

全局数据文件的传递 这里同样也用到了Distributed Cache机制。要 利用此机制,需要涉及到以下两部分的设置 (1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中 的文件路径

关系数据库的连接与访问 Hadoop提供了相应的从关系数据库查询和读取数 据的接口。 DBInputFormat:提供从数据库读取数据的格式 DBRecordReader:提供读取数据记录的接口 Hadoop查询和读取关系数据库的处理效率比较低 因此DBInputFormat仅适合读取小量的数据,对于 读取大量的数据,可以用数据库中的工具将数据输 出为文本文件,并上载到HDFS中处理。

关系数据库的连接与访问 Hadoop提供了相应的向关系数据库直接输出计算 结果的编程接口。 DBOutputFormat:提供向数据库输出数据的格式 DBRecordWriter:提供向数据库写入数据记录的接口 DBConfiguration:提供数据库配置和创建连接的接口 //连接DB的静态方法 Public static void configureDB(Job job, String driverClass, String dbUrl, String userName, String password) Job 为当前准备执行的作业, driveClass为数据库厂商提供的访问数据库的驱动程序 dbUrl 为运行数据库主机的地址,userName和password为访问数据库的用户名与密 码

关系数据库的连接与访问 数据库连接完成后,即可完成从MapReduce程序 向关系数据库写入数据的操作,DBOutputFormat 提供了一个静态方法来指定需要写入的数据表和字 段: public static void setOutput(Job job, String tableName, String… fieldNames) 其中tableName指定即将写入数据的表名,后续参数指定哪些字段数据将写入该表

关系数据库的连接与访问 为了能完成向数据库中的写入操作,程序员还需 要实现DBWritable: public class NewDBWritable implements Writable, DBWritable { public void write(DataOutput out) {……} ………. }