Download presentation
Presentation is loading. Please wait.
1
Cloud Computing MapReduce进阶
2
用户定制Partitioner和Combiner
主要内容(6学时) 简介 复合键值对的使用 用户定制数据类型 用户定制输入/输出格式 用户定制Partitioner和Combiner 组合式MapReduce计算作业 多数据源的连接 全局参数/数据文件的传递与使用 关系数据库的连接与访问
3
简介 MPI(Message Passing Interface) 等并行编程方法 缺少对高层并行编程模型和统一计算框架的支持,需要程序员处理许 多底层细节, 为此MapReduce在三个层面上做了系统而巧妙的设计 构思。 在大数据处理的基本方法上,对相互计算依赖不大的数据采取“分 而治之”的处理策略。 借鉴了Lisp语言中的思想,用Map和Reduce两个函数提供了高层 的并行编程抽象模型和接口。 对于诸多的底层实现和处理细节MapReduce提供了一个统一的计 算框架,大大减轻了程序员在编程是的负担。
4
复合键值对的使用 把小的键值对合并成大的键值对
Map计算过程中所产生的中间结果键值对需要通过网络传输给 Reduce节点,大规模的键值对可能会大幅增大网络通信开销,并 且降低程序执行速度,为此开采用一个基本的优化方法,即把大量 小的键值对合并为较大的键值对。 例如在单词同现矩阵计算中,单词a可能会与多个其他单词共同出 现,因而一个Map可能会产生很多个单词a与其他单词的键值对, 如下:
5
复合键值对的使用 <a, b> 1 <a, c> 3 a {b:1,c:3, d:5, e:8, f:4}
<a, d> <a, e> <a, f>
6
巧用复合键让系统完成排序 Map计算过程中,系统自动按照Map的输出键进行排序, 因此进入Reduce的键值对都是按照key值排序的,但有时 希望value也按一定规则排序。 方法1:在Reduce过程中对{value}列表中的值进行 本地排序,但当{value}列表数据量巨大时 必须使用复杂的外排算法,会很耗时。 方法2:将value中需要排序的部分加入到key中, 形成复合键,这样能利用MapReduce系统 的排序功能自动完成排序。
7
用户定制数据类型 Hadoop内置的数据类型 BooleanWritable:标准布尔型数值 ByteWritable:单字节数值
DoubleWritable:双字节数 FloatWritable:浮点数 IntWritable:整型数 LongWritable:长整型数 Text:使用UTF8格式存储的文本 NullWritable:当<key, value>中的key或value为空时使用
8
用户定制数据类型 自定义数据类型的实现 首先实现Writable接口,以便该数据能被序列化后完成 网络传输或文件输入/输出;
其次,如果该数据需要作为key使用,或者要比较数值 大小时,则需要实现 WritableComparable接口。 例如将一个三维坐标P(x,y,z)定制为一个数据类型 pubic class Point3D implements Writable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException }
9
用户定制数据类型 如果Point3D还需要作为主键值使用,或者需要比 较大小时,还应该实现WritableComparable接口
pubic class Point3D implements WritableComparable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException public int compareTo(Point3D p) //具体实现比较当前的this(x,y,z)与p(x,y,z)的位置 //并输出-1,0,1 }
10
用户定制输入/输出格式 Hadoop内置数据输入格式和RecordReader
TextInputFormat:是系统默认的数据输入格式,可以文 本文件分块逐行读入,读入一行时,所产生的key为当前 行在整个文件中的字节偏移位置,而value就是行内容。 KeyValueInputFormat:是另一个常用的数据输入格式, 可将一个安照<key, value>格式逐行存放的文件逐行读 出,并自动解析成相应的key和value。
11
用户定制输入/输出格式 RecordReader:对于一个数据输入格式,都需要有一个 对应的RecordReader。 RecordReader主要用于将一个 文件中的数据记录分拆成具体的键值对,传给Map函数。 例如: TextInputFormat的默认RecordReader为Line RecordReader, KeyValueInputFormat的默认 RecordReader为KeyValueLine RecordReader。 除此之外,系统还提供很多输入格式,例如: AutoInputFormat, CombineFileInputFormat等
12
用户定制数据输入格式与RecordReader
假设需要定制 FileNameLocInputFormat 与 FileNameLocRecordReader, 以便产生 pubic class FileNameLocInputFormat extends FileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) FileNameLocRecordReader fnrr = new FileNameLocRecordReader(); fnrr.initialize(split, context); ………… }
13
用户定制数据输入格式与RecordReader
pubic class FileNameLocRecordReader extends RecordReader<Text, Text> { String FileName; LineRecordReader lrr = new public Text getCurrnetKey() return new } public void initialize(InputSplit arg0, TaskAttemptContext arg1) ……………………….
14
用户定制输入/输出格式 Hadoop内置的数据输出格式与RecordWriter
Hadoop提供了丰富的内置数据输出格式。最常用的是 TextOutputFormat,也是系统默认的输出格式,可以 将计算结果以key + \t +value的形式逐行输入到文件 中。 数据输出格式也提供一个对应的RecordWriter,以便系 统明确输出结果写到文件的具体格式。 TextOutputFormat的默认RecordWriter是 LineRecordWriter。
15
通过定制数据输出格式实现多集合文件输出 默认情况下,MapReduce将产生包含一至多个文件的单个 输出数据文件集合。但有时候需要输出多个文件集合。比如 ,在处理专利数据是,希望根据不同国家,将每个国家的专 利数据记录输出到不同国家的文件目录中。 Hadoop提供了MultipleOutputFormat类帮助完成这一处 理功能。在Reduce进行数据输出前,需要定制 MultioleOutputFormat的一个子类,实现其中的一个重要 方法 protected String generateFileNameForKeyValue(K key, V value, String name) 通过该放过,程序可以根据输入的主键产生并返回一个所期望的输出数据文 件名和路径
16
用户定制Partitioner Hadoop MapReduce 提供了默认的Partition来完 成Map节点数据的中间结果向Reduce节点的分区 处理。大多数情况下,应用程序仅使用默认的 HashPartitiner即可满足计算要求,但有时候,数 据可能不会被均匀的分配到每个Reduce上,或者 不符合应用程序的分区要求,这时,就需要定制自 己的Partitioner。
17
用户定制Partitioner 定制一个Partitioner大致方法如下,继承 HashPartitioner,并重载getPartition()方法 class NewPartitioner extends HashPartitioner<K, V> { //override the method getPartition(K key, V value, int numReduceTasks) {………………….} }
18
用户定制Combiner 用户也可以根据需要定制自己的Combiner,以减 少Map阶段输出中间结果的数量,降低数据的网络 传输开销。
class NewCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throw IOException, InterruptedException {………………………} }
19
组合式MapReduce计算作业 一些复杂任务难以用一趟MapReduce处理完成, 需要将其拆分为多趟简单的MapReduce子任务进 行处理。 本节将介绍多种不同形式的组合MapReduce任务 ,包括迭代方法, 顺序组合方法, 具有依赖关系 的组合方法,以及链式方法
20
迭代MapReduce计算任务 当使用MapReduce进行这样的问题求解时,运行 一趟MapReduce过程将无法完成整个求解结果, 因此,需要采用迭代法方循环运行该MapReduce 过程,直到达到一个逼近结果。 例如页面排序算法PageRank就是这一类需要用循 环迭代MapReduce计算进行求解的问题。
21
顺序组合式MapReduce计算任务 多个MapReduce子任务可以注意手工执行,但更方便的 做法是将这些子任务串起来,前面的输出作为后面的输入 。例如 mapreduce mapreduce mapreduce3
22
具有复杂依赖关系的组合式MapReduce作业
例如,一个MapReduce作业有x, y, z三个子任务 ,它们的依赖关系如下: Job Jobx Joby Jobz
23
具有复杂依赖关系的组合式MapReduce作业
则他们的配置程序如下 //配置jobx Configuration jobxconf = … //设置JobControl Job jobx = … JobCOntrol jc = new JobControl(); //配置joby jc.addJob(jobx); Configuration jobyconf = … jc.addJob(joby); Job joby = … jc.addJob(jobz); //配置jobz jc.run(); Configuration jobzconf = …. Job jobz = … //设置依赖关系 jobz.addDependingJob(jobx); jobz.addDependingJob(joby);
24
MapReduce前处理和后处理步骤的链式执行
一个MapReduce作业可能会有一些前处理和后处理步骤 ,比如文档倒排索引处理前需要去除“停用词”,若将其 设置为一个单独的MapReduce作业,则可能会影响核心 作业的效率。 为此,一个较好的方法是在核心的Map和Reduce过程之 外,把这些前后处理步骤实现为一些辅助的Map过程,将 这些辅助过程与核心Map和Reduce过程合并为一个链式 MapReduce任务。
25
MapReduce前处理和后处理步骤的链式执行
Hadoop为此专门提供了链式Mapper(ChainMapper) 和 链式Reducer(ChainReducer)来完成这种处理。 ChainMapper允许在单一Map任务中添加和使用多个 Map子任务;而ChainReducer允许在一个单一Reduce 任务执行了Reduce处理后,继续使用多个Map子任务完 成后续处理。
26
多数据源的连接 一个MapReduce任务可能需要访问多个数据集, 在关系数据库中,这将是两个或多个表的连接 (join)。Hadoop 系统没有关系数据库那样强大的 连接处理功能,大多数时候需要程序员自己实现, 本节介绍基于DataJoin类库实现Reduce端连接的 方法,用全局文件复制实现Map端连接的方法,带 Map端过滤的Reduce端连接方法,以及 MapRedude数据连接方法的限制
27
用DataJoin类库实现Reduce端连接
首先需要为不同数据源下的每个数据记录定义一个数据源 标签(Tag)。 进一步,为了能准确的标识一个数据源下的每个数据记录 并完成连接处理,需要为每个带连接的数据记录确定一个 连接主键(GroupKey) 最后, DataJoin类库分别在Map和Reduce阶段提供一个 处理框架,并尽可能的帮助程序员完成一些处理工作,剩 余部分必须自己完成。
28
Map处理过程 根据GroupKey进行分区 Map Map 1 3 2 1 3 2 4 3 数据源Customers
1,王二, 2,张三, 3,李四, 4,孙五, 数据源Orders 3,订单1,90, 1,订单2,130, 2,订单3,220, 3,订单4,160, Map Map 1 Customers 1,王二, Orders 3,订单1,90, 3 Customers 2,张三, Orders 1,订单2,130, 2 1 Customers 3,李四, Orders 2,订单3,220, 3 2 Customers 4,孙五, Orders 3,订单4,160, 4 3
29
Reduce处理过程 Reduce节点接收到这些带标签的数据记录后, Reduce过程将对不同数据源标签下具有相同 GroupKey的记录进行笛卡尔叉积,自动生成所有 不同的叉积组合。然后对每一个叉积组合,由程序 员实现一个combine()方法,将这些具有相同 GroupKey的记录进行适当的处理,以完成连接。 过程如图:
30
Reduce过程 3 Reduce Customers 3,李四,025-3333-3333 Orders
3,订单1,90, Orders 3,订单4,160, Reduce Customers 3,李四, Customers 3,李四, Orders 3,订单1,90, Orders 3,订单4,160, Combine() Combine() 3,李四, ,订单1,90, 3,李四, ,订单4,160,
31
用全局文件复制方法实现Map端连接 前述用DataJoin类实现的Reduce端连接方法中, 连接操作直到Reduce端才能进行,因为很多无效 的连接组合数据在Reduce阶段才能去除,所以大 量的网络带宽被用来传输不用的数据,因此,这种 方法效率不高。 当数据源的数据量较小时,能够放在单节点的内存 时,可以使用称为“复制连接”的全局文件复制方 法,把较小的数据源复制到每个Map节点上,然后 在Map阶段完成连接操作
32
用全局文件复制方法实现Map端连接 Hadoop提供了一个Distributed Cache机制用于将一个 或多个文件分布复制到所有节点上。要利用此机制,需要 涉及到以下两部分的设置 (1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中的文件路径
33
用全局文件复制方法实现Map端连接 当较小的数据源文件也无法放入内存时,可采用以 下办法: (1) 可以首先过滤较小数据源文件中的记录,只保
留需要进行连接的记录 (2) 可以将较小数据源文件分割为能n个小文件,其 中每个小文件都能放入内存处理,然后分别对 这n个小文件用全局文件复制方法进行与较大源 文件的连接,最后把结果合并起来
34
带Map端过滤的Reduce端连接 如果过滤后数据仍然无法放在内存中处理,可采用 带Map端过滤的Reduce端连接处理。
具体过程为在Map端先生成一个仅包含连接主键的 过滤文件,由于这个文件的数据量大大降低,则可 将这个文件存放在Distributed Cache文件中,然 后在Map端过滤掉主键不在这个列表中的所有记录 ,然后再实现正常的Reduce端连接
35
全局参数的传递 public void set(String name, String value);//设置字符串属性
为了能让用户灵活设置某些作业参数,一个 MapReduce计算任务可能需要在执行时从命令行 输入这些作业参数,并将这个参数传递给各个节点 Configuration类为此专门提供了用于保存和获取 属性的方法例如: public void set(String name, String value);//设置字符串属性 public void get(String name, String defaultValue) //读取字符串属性 //将一个参数设置为name属性 jobconf.set(“name”, args[0]); //然后可在Mapper或Reducer类的初始化方法setup中从Configuration对象读出该 属性值 jobconf.get(“name”, “”);
36
全局数据文件的传递 这里同样也用到了Distributed Cache机制。要 利用此机制,需要涉及到以下两部分的设置 (1) Job类中
public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中 的文件路径
37
关系数据库的连接与访问 Hadoop提供了相应的从关系数据库查询和读取数 据的接口。
DBInputFormat:提供从数据库读取数据的格式 DBRecordReader:提供读取数据记录的接口 Hadoop查询和读取关系数据库的处理效率比较低 因此DBInputFormat仅适合读取小量的数据,对于 读取大量的数据,可以用数据库中的工具将数据输 出为文本文件,并上载到HDFS中处理。
38
关系数据库的连接与访问 Hadoop提供了相应的向关系数据库直接输出计算 结果的编程接口。
DBOutputFormat:提供向数据库输出数据的格式 DBRecordWriter:提供向数据库写入数据记录的接口 DBConfiguration:提供数据库配置和创建连接的接口 //连接DB的静态方法 Public static void configureDB(Job job, String driverClass, String dbUrl, String userName, String password) Job 为当前准备执行的作业, driveClass为数据库厂商提供的访问数据库的驱动程序 dbUrl 为运行数据库主机的地址,userName和password为访问数据库的用户名与密 码
39
关系数据库的连接与访问 数据库连接完成后,即可完成从MapReduce程序 向关系数据库写入数据的操作,DBOutputFormat 提供了一个静态方法来指定需要写入的数据表和字 段: public static void setOutput(Job job, String tableName, String… fieldNames) 其中tableName指定即将写入数据的表名,后续参数指定哪些字段数据将写入该表
40
关系数据库的连接与访问 为了能完成向数据库中的写入操作,程序员还需 要实现DBWritable:
public class NewDBWritable implements Writable, DBWritable { public void write(DataOutput out) {……} ………. }
41
完
Similar presentations