Cloud Computing MapReduce进阶
用户定制Partitioner和Combiner 主要内容(6学时) 简介 复合键值对的使用 用户定制数据类型 用户定制输入/输出格式 用户定制Partitioner和Combiner 组合式MapReduce计算作业 多数据源的连接 全局参数/数据文件的传递与使用 关系数据库的连接与访问
简介 MPI(Message Passing Interface) 等并行编程方法 缺少对高层并行编程模型和统一计算框架的支持,需要程序员处理许 多底层细节, 为此MapReduce在三个层面上做了系统而巧妙的设计 构思。 在大数据处理的基本方法上,对相互计算依赖不大的数据采取“分 而治之”的处理策略。 借鉴了Lisp语言中的思想,用Map和Reduce两个函数提供了高层 的并行编程抽象模型和接口。 对于诸多的底层实现和处理细节MapReduce提供了一个统一的计 算框架,大大减轻了程序员在编程是的负担。
复合键值对的使用 把小的键值对合并成大的键值对 Map计算过程中所产生的中间结果键值对需要通过网络传输给 Reduce节点,大规模的键值对可能会大幅增大网络通信开销,并 且降低程序执行速度,为此开采用一个基本的优化方法,即把大量 小的键值对合并为较大的键值对。 例如在单词同现矩阵计算中,单词a可能会与多个其他单词共同出 现,因而一个Map可能会产生很多个单词a与其他单词的键值对, 如下:
复合键值对的使用 <a, b> 1 <a, c> 3 a {b:1,c:3, d:5, e:8, f:4} <a, d> 4 <a, e> 8 <a, f> 4
巧用复合键让系统完成排序 Map计算过程中,系统自动按照Map的输出键进行排序, 因此进入Reduce的键值对都是按照key值排序的,但有时 希望value也按一定规则排序。 方法1:在Reduce过程中对{value}列表中的值进行 本地排序,但当{value}列表数据量巨大时 必须使用复杂的外排算法,会很耗时。 方法2:将value中需要排序的部分加入到key中, 形成复合键,这样能利用MapReduce系统 的排序功能自动完成排序。
用户定制数据类型 Hadoop内置的数据类型 BooleanWritable:标准布尔型数值 ByteWritable:单字节数值 DoubleWritable:双字节数 FloatWritable:浮点数 IntWritable:整型数 LongWritable:长整型数 Text:使用UTF8格式存储的文本 NullWritable:当<key, value>中的key或value为空时使用
用户定制数据类型 自定义数据类型的实现 首先实现Writable接口,以便该数据能被序列化后完成 网络传输或文件输入/输出; 其次,如果该数据需要作为key使用,或者要比较数值 大小时,则需要实现 WritableComparable接口。 例如将一个三维坐标P(x,y,z)定制为一个数据类型 pubic class Point3D implements Writable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException }
用户定制数据类型 如果Point3D还需要作为主键值使用,或者需要比 较大小时,还应该实现WritableComparable接口 pubic class Point3D implements WritableComparable<Point3D> { private float x,y,z; public void readFields(DataInput in) throws IOException {……} public void write(DataOutput out) throws IOException public int compareTo(Point3D p) //具体实现比较当前的this(x,y,z)与p(x,y,z)的位置 //并输出-1,0,1 }
用户定制输入/输出格式 Hadoop内置数据输入格式和RecordReader TextInputFormat:是系统默认的数据输入格式,可以文 本文件分块逐行读入,读入一行时,所产生的key为当前 行在整个文件中的字节偏移位置,而value就是行内容。 KeyValueInputFormat:是另一个常用的数据输入格式, 可将一个安照<key, value>格式逐行存放的文件逐行读 出,并自动解析成相应的key和value。
用户定制输入/输出格式 RecordReader:对于一个数据输入格式,都需要有一个 对应的RecordReader。 RecordReader主要用于将一个 文件中的数据记录分拆成具体的键值对,传给Map函数。 例如: TextInputFormat的默认RecordReader为Line RecordReader, KeyValueInputFormat的默认 RecordReader为KeyValueLine RecordReader。 除此之外,系统还提供很多输入格式,例如: AutoInputFormat, CombineFileInputFormat等
用户定制数据输入格式与RecordReader 假设需要定制 FileNameLocInputFormat 与 FileNameLocRecordReader, 以便产生 FileName@LineOffset主键值,则可定制如下代码 pubic class FileNameLocInputFormat extends FileInputFormat<Text, Text> { @Override public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) FileNameLocRecordReader fnrr = new FileNameLocRecordReader(); fnrr.initialize(split, context); ………… }
用户定制数据输入格式与RecordReader pubic class FileNameLocRecordReader extends RecordReader<Text, Text> { String FileName; LineRecordReader lrr = new LineRecordReader(); @Override public Text getCurrnetKey() return new Text(“+FileName+”@”+lrr.getCurrentKey()+”); } public void initialize(InputSplit arg0, TaskAttemptContext arg1) ……………………….
用户定制输入/输出格式 Hadoop内置的数据输出格式与RecordWriter Hadoop提供了丰富的内置数据输出格式。最常用的是 TextOutputFormat,也是系统默认的输出格式,可以 将计算结果以key + \t +value的形式逐行输入到文件 中。 数据输出格式也提供一个对应的RecordWriter,以便系 统明确输出结果写到文件的具体格式。 TextOutputFormat的默认RecordWriter是 LineRecordWriter。
通过定制数据输出格式实现多集合文件输出 默认情况下,MapReduce将产生包含一至多个文件的单个 输出数据文件集合。但有时候需要输出多个文件集合。比如 ,在处理专利数据是,希望根据不同国家,将每个国家的专 利数据记录输出到不同国家的文件目录中。 Hadoop提供了MultipleOutputFormat类帮助完成这一处 理功能。在Reduce进行数据输出前,需要定制 MultioleOutputFormat的一个子类,实现其中的一个重要 方法 protected String generateFileNameForKeyValue(K key, V value, String name) 通过该放过,程序可以根据输入的主键产生并返回一个所期望的输出数据文 件名和路径
用户定制Partitioner Hadoop MapReduce 提供了默认的Partition来完 成Map节点数据的中间结果向Reduce节点的分区 处理。大多数情况下,应用程序仅使用默认的 HashPartitiner即可满足计算要求,但有时候,数 据可能不会被均匀的分配到每个Reduce上,或者 不符合应用程序的分区要求,这时,就需要定制自 己的Partitioner。
用户定制Partitioner 定制一个Partitioner大致方法如下,继承 HashPartitioner,并重载getPartition()方法 class NewPartitioner extends HashPartitioner<K, V> { //override the method getPartition(K key, V value, int numReduceTasks) {………………….} }
用户定制Combiner 用户也可以根据需要定制自己的Combiner,以减 少Map阶段输出中间结果的数量,降低数据的网络 传输开销。 class NewCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throw IOException, InterruptedException {………………………} }
组合式MapReduce计算作业 一些复杂任务难以用一趟MapReduce处理完成, 需要将其拆分为多趟简单的MapReduce子任务进 行处理。 本节将介绍多种不同形式的组合MapReduce任务 ,包括迭代方法, 顺序组合方法, 具有依赖关系 的组合方法,以及链式方法
迭代MapReduce计算任务 当使用MapReduce进行这样的问题求解时,运行 一趟MapReduce过程将无法完成整个求解结果, 因此,需要采用迭代法方循环运行该MapReduce 过程,直到达到一个逼近结果。 例如页面排序算法PageRank就是这一类需要用循 环迭代MapReduce计算进行求解的问题。
顺序组合式MapReduce计算任务 多个MapReduce子任务可以注意手工执行,但更方便的 做法是将这些子任务串起来,前面的输出作为后面的输入 。例如 mapreduce1 mapreduce2 mapreduce3
具有复杂依赖关系的组合式MapReduce作业 例如,一个MapReduce作业有x, y, z三个子任务 ,它们的依赖关系如下: Job Jobx Joby Jobz
具有复杂依赖关系的组合式MapReduce作业 则他们的配置程序如下 //配置jobx Configuration jobxconf = …. //设置JobControl Job jobx = … JobCOntrol jc = new JobControl(); //配置joby jc.addJob(jobx); Configuration jobyconf = …. jc.addJob(joby); Job joby = … jc.addJob(jobz); //配置jobz jc.run(); Configuration jobzconf = …. Job jobz = … //设置依赖关系 jobz.addDependingJob(jobx); jobz.addDependingJob(joby);
MapReduce前处理和后处理步骤的链式执行 一个MapReduce作业可能会有一些前处理和后处理步骤 ,比如文档倒排索引处理前需要去除“停用词”,若将其 设置为一个单独的MapReduce作业,则可能会影响核心 作业的效率。 为此,一个较好的方法是在核心的Map和Reduce过程之 外,把这些前后处理步骤实现为一些辅助的Map过程,将 这些辅助过程与核心Map和Reduce过程合并为一个链式 MapReduce任务。
MapReduce前处理和后处理步骤的链式执行 Hadoop为此专门提供了链式Mapper(ChainMapper) 和 链式Reducer(ChainReducer)来完成这种处理。 ChainMapper允许在单一Map任务中添加和使用多个 Map子任务;而ChainReducer允许在一个单一Reduce 任务执行了Reduce处理后,继续使用多个Map子任务完 成后续处理。
多数据源的连接 一个MapReduce任务可能需要访问多个数据集, 在关系数据库中,这将是两个或多个表的连接 (join)。Hadoop 系统没有关系数据库那样强大的 连接处理功能,大多数时候需要程序员自己实现, 本节介绍基于DataJoin类库实现Reduce端连接的 方法,用全局文件复制实现Map端连接的方法,带 Map端过滤的Reduce端连接方法,以及 MapRedude数据连接方法的限制
用DataJoin类库实现Reduce端连接 首先需要为不同数据源下的每个数据记录定义一个数据源 标签(Tag)。 进一步,为了能准确的标识一个数据源下的每个数据记录 并完成连接处理,需要为每个带连接的数据记录确定一个 连接主键(GroupKey) 最后, DataJoin类库分别在Map和Reduce阶段提供一个 处理框架,并尽可能的帮助程序员完成一些处理工作,剩 余部分必须自己完成。
Map处理过程 根据GroupKey进行分区 Map Map 1 3 2 1 3 2 4 3 数据源Customers 1,王二,025-1111-1111 2,张三,021-2222-2222 3,李四,025-3333-3333 4,孙五,010-4444-4444 数据源Orders 3,订单1,90, 2011.8.1 1,订单2,130,2011.8.6 2,订单3,220,2011.8.10 3,订单4,160,2011.8.18 Map Map 1 Customers 1,王二,025-1111-1111 Orders 3,订单1,90, 2011.8.1 3 Customers 2,张三,021-2222-2222 Orders 1,订单2,130,2011.8.6 2 1 Customers 3,李四,025-3333-3333 Orders 2,订单3,220,2011.8.10 3 2 Customers 4,孙五,010-4444-4444 Orders 3,订单4,160,2011.8.18 4 3
Reduce处理过程 Reduce节点接收到这些带标签的数据记录后, Reduce过程将对不同数据源标签下具有相同 GroupKey的记录进行笛卡尔叉积,自动生成所有 不同的叉积组合。然后对每一个叉积组合,由程序 员实现一个combine()方法,将这些具有相同 GroupKey的记录进行适当的处理,以完成连接。 过程如图:
Reduce过程 3 Reduce Customers 3,李四,025-3333-3333 Orders 3,订单1,90, 2011.8.1 Orders 3,订单4,160,2011.8.18 Reduce Customers 3,李四,025-3333-3333 Customers 3,李四,025-3333-3333 Orders 3,订单1,90, 2011.8.1 Orders 3,订单4,160,2011.8.18 Combine() Combine() 3,李四,025-3333-3333,订单1,90,2011.8.1 3,李四,025-3333-3333,订单4,160,2011.8.18
用全局文件复制方法实现Map端连接 前述用DataJoin类实现的Reduce端连接方法中, 连接操作直到Reduce端才能进行,因为很多无效 的连接组合数据在Reduce阶段才能去除,所以大 量的网络带宽被用来传输不用的数据,因此,这种 方法效率不高。 当数据源的数据量较小时,能够放在单节点的内存 时,可以使用称为“复制连接”的全局文件复制方 法,把较小的数据源复制到每个Map节点上,然后 在Map阶段完成连接操作
用全局文件复制方法实现Map端连接 Hadoop提供了一个Distributed Cache机制用于将一个 或多个文件分布复制到所有节点上。要利用此机制,需要 涉及到以下两部分的设置 (1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中的文件路径
用全局文件复制方法实现Map端连接 当较小的数据源文件也无法放入内存时,可采用以 下办法: (1) 可以首先过滤较小数据源文件中的记录,只保 留需要进行连接的记录 (2) 可以将较小数据源文件分割为能n个小文件,其 中每个小文件都能放入内存处理,然后分别对 这n个小文件用全局文件复制方法进行与较大源 文件的连接,最后把结果合并起来
带Map端过滤的Reduce端连接 如果过滤后数据仍然无法放在内存中处理,可采用 带Map端过滤的Reduce端连接处理。 具体过程为在Map端先生成一个仅包含连接主键的 过滤文件,由于这个文件的数据量大大降低,则可 将这个文件存放在Distributed Cache文件中,然 后在Map端过滤掉主键不在这个列表中的所有记录 ,然后再实现正常的Reduce端连接
全局参数的传递 public void set(String name, String value);//设置字符串属性 为了能让用户灵活设置某些作业参数,一个 MapReduce计算任务可能需要在执行时从命令行 输入这些作业参数,并将这个参数传递给各个节点 Configuration类为此专门提供了用于保存和获取 属性的方法例如: public void set(String name, String value);//设置字符串属性 public void get(String name, String defaultValue) //读取字符串属性 //将一个参数设置为name属性 jobconf.set(“name”, args[0]); //然后可在Mapper或Reducer类的初始化方法setup中从Configuration对象读出该 属性值 jobconf.get(“name”, “”);
全局数据文件的传递 这里同样也用到了Distributed Cache机制。要 利用此机制,需要涉及到以下两部分的设置 (1) Job类中 public void addCacheFile(URI uri): 将一个文件存放到Distributed Cache文件中 (2) Mapper 或Reducer的context类中 public Path[] getLocalCacheFiles(): 获取设置在Distributed Cache中 的文件路径
关系数据库的连接与访问 Hadoop提供了相应的从关系数据库查询和读取数 据的接口。 DBInputFormat:提供从数据库读取数据的格式 DBRecordReader:提供读取数据记录的接口 Hadoop查询和读取关系数据库的处理效率比较低 因此DBInputFormat仅适合读取小量的数据,对于 读取大量的数据,可以用数据库中的工具将数据输 出为文本文件,并上载到HDFS中处理。
关系数据库的连接与访问 Hadoop提供了相应的向关系数据库直接输出计算 结果的编程接口。 DBOutputFormat:提供向数据库输出数据的格式 DBRecordWriter:提供向数据库写入数据记录的接口 DBConfiguration:提供数据库配置和创建连接的接口 //连接DB的静态方法 Public static void configureDB(Job job, String driverClass, String dbUrl, String userName, String password) Job 为当前准备执行的作业, driveClass为数据库厂商提供的访问数据库的驱动程序 dbUrl 为运行数据库主机的地址,userName和password为访问数据库的用户名与密 码
关系数据库的连接与访问 数据库连接完成后,即可完成从MapReduce程序 向关系数据库写入数据的操作,DBOutputFormat 提供了一个静态方法来指定需要写入的数据表和字 段: public static void setOutput(Job job, String tableName, String… fieldNames) 其中tableName指定即将写入数据的表名,后续参数指定哪些字段数据将写入该表
关系数据库的连接与访问 为了能完成向数据库中的写入操作,程序员还需 要实现DBWritable: public class NewDBWritable implements Writable, DBWritable { public void write(DataOutput out) {……} ………. }
完