厦门大学数据库实验室 MapReduce 连接 报告人:李雨倩 导师:林子雨 2014.07.12
连接简介 MapReduce连接策略
连接 连接是关系运算,可以用于合并关系。 在数据库中,一般是表连接操作; 在MapReduce中,连接可以用于合并两个或多个数据集。 例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。
连接的类型 内连接 外连接 最常用的两个连接类型是内连接和外连接。 内连接比较两个关系中所有的数组,然后生成一个满足连接条件的结果集。 外连接并不需要两个关系的数组都满足连接条件。在连接条件不满足的时候,外连接可以将一方的数据保留在结果集中。 左外连接 右外连接 全连接
连接关系图
连接实例 select * from table1 left join table2 on table1.id=table2.id -------------结果------------- id name id score ------------------------------ 1 lee 1 90 2 zhang 2 100 4 wang NULL NULL ------------------------------ select * from table1 join table2 on table1.id=table2.id -------------结果------------- id name id score ------------------------------ 1 lee 1 90 2 zhang 2 100 select * from table1 full join table2 on table1.id=table2.id -------------结果------------- id name id score ------------------------------ 1 lee 1 90 2 zhang 2 100 4 wang NULL NULL NULL NULL 3 70 select * from table1 right join table2 on table1.id=table2.id -------------结果------------- id name id score ------------------------------ 1 lee 1 90 2 zhang 2 100 NULL NULL 3 70 ------------------------------ Table1 | table2 | ------------------------------------------------- id name | id score | 1 lee 1 90 | 2 zhang 2 100 | 4 wang 3 70 |
连接简介 MapReduce连接策略
连接 连接是关系运算,可以用于合并关系。 在数据库中,一般是表连接操作; 在MapReduce中,连接可以用于合并两个或多个数据集。 例如,用户基本信息和用户活动详情。用户基本信息来自于OLTP数据库。用户活动详情来自于日志文件。
MapReduce的连接 MapReduce连接的应用场景 welcome to use these PowerPoint templates, New Content design, 10 years experience 用户的人口统计信息的聚合操作(例如:青少年和中年人的习惯差异) 当用户超过一定时间没有使用网站后,发邮件提醒他们。 分析用户的浏览习惯,让系统可以提示用户有哪些网站特性还没有使用到,形成一个反馈循环。
MapReduce中的连接策略 重分区连接 复制连接 半连接 ——reduce端连接。 使用场景:连接两个或多个大型数据集。 使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。 半连接 ——map端连接。 使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。
重分区连接 重分区连接利用MapReduce的排序-合并机制来分组数据。它被实现为使用一个单独的MapReduce任务,并支持多路连接(这里的多路指的是多个数据集)。 Map阶段负责从多个数据集中读取数据,决定每个数据的连接值,将连接值作为输出键。输出值则包含将在reduce阶段被合并的值。 Reduce阶段,一个reducer接收map函数传来的一个输出键的所有输出值,并将数据分为多个分区。在此之后,reducer对所有的分区进行笛卡尔积连接运算,并生成全部的结果集。
$ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt 在如下示例中,用户数据中有用户姓名,年龄和所在州 $ cat test-data/ch4/users.txt anne 22 NY joe 39 CO alison 35 NY mike 69 VA marie 27 OR jim 21 OR bob 71 CA mary 53 NY dave 36 VA dude 50 CA 用户活动日志中有用户姓名,进行的动作, 来源IP。这个文件一般都要比用户数据要大得多。 $ cat test-data/ch4/user-logs.txt jim logout 93.24.237.12 mike new_tweet 87.124.79.252 bob new_tweet 58.133.120.100 mike logout 55.237.104.36 jim new_tweet 93.24.237.12 marie view_user 122.158.130.90 $ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt $ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output $ hadoop fs -cat output/part* bob 71 CA new_tweet 58.133.120.100 jim 21 OR logout 93.24.237.12 jim 21 OR new_tweet 93.24.237.12 jim 21 OR login 198.184.237.49 marie 27 OR login 58.133.120.100 marie 27 OR view_user 122.158.130.90 mike 69 VA new_tweet 87.124.79.252 mike 69 VA logout 55.237.104.36
重分区连接 过滤( filter)指的是将map极端的输入数据中不需要的部分丢弃。投影( project)是关系代数的概念。投影用于减少发送给reducer的字段。
优化重分区连接 传统重分区方法的实现空间效率低下。它需要将连接的所有的输出值都读取到内存中,然后进行多路连接。事实上,如果仅仅将小数据集读取到内存中,然后用小数据集来遍历大数据集,进行连接,这样将更加高效。下图是优化后的重分区连接的流程图。
Map输出的组合键和组合值 上图说明了map输出的组合键和组合值。二次排序将会根据连接键(join key)进行分区,但会用整个组合键来进行排序。组合键包括一个标识源数据集(较大或较小)的整形值,因此可以根据这个整形值来保证较小源数据集的值先于较大源数据的值被reducer接收。
优化重分区连接 上图是实现的类图。类图中包含两个部分,一个通用框架和一些类的实现样例。使用这个连接框架需要实现抽象类 OptimizedDataJoinMapperBase 和 OptimizedDataJoinReducerBase。
OptimizedDataJoinMapperBase 这个类的作用是辨认出较小的数据集,并生成输出键和输出值。Configure方法在mapper创建期调用。Configure方法的作用之一是标识每一个数据集,让reducer可以区分数据的源数据集。另一个作用是辨认当前的输入数据是否是较小的数据集。 protected abstract Text generateInputTag(String inputFile); protected abstract boolean isInputSmaller(String inputFile); public void configure(JobConf job) { this.inputFile = job.get("map.input.file"); this.inputTag = generateInputTag(this.inputFile); if(isInputSmaller(this.inputFile)) { smaller = new BooleanWritable(true); outputKey.setOrder(0); } else { smaller = new BooleanWritable(false); outputKey.setOrder(1); }
OptimizedDataJoinMapperBase(续) Map方法首先调用自定义的方法 (generateTaggedMapOutput) 来生成OutputValue对象。这个对象包含了在连接中需要使用的值,和一个标识较大或较小数据集的布尔值。如果map方法可以调用自定义的方法 (generateGroupKey) 来得到可以在连接中使用的键,那么这个键就作为map的输出键。 protected abstract OptimizedTaggedMapOutput generateTaggedMapOutput(Object value); protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord); public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { OptimizedTaggedMapOutput aRecord = generateTaggedMapOutput(value); if (aRecord == null) { return;} aRecord.setSmaller(smaller); String groupKey = generateGroupKey(aRecord); if (groupKey == null) { outputKey.setKey(groupKey); output.collect(outputKey, aRecord);}
OptimizedDataJoinReducerBase Map端处理后已经可以保证较小源数据集的值将会先于较大源数据集的值被接收。这里就可以将所有的较小源数据集的值放到缓存中。在开始接收较大源数据集的值的时候,就开始和缓存中的值做连接操作。 public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { CompositeKey k = (CompositeKey) key; List smaller = new ArrayList(); while (values.hasNext()) { Object value = values.next(); OptimizedTaggedMapOutput cloned =((OptimizedTaggedMapOutput) value).clone(job); if (cloned.isSmaller().get()) { smaller.add(cloned); } else { joinAndCollect(k, smaller, cloned, output, reporter); }
OptimizedDataJoinRuducerBase(续) protected abstract OptimizedTaggedMapOutput combine(String key, OptimizedTaggedMapOutput value1,OptimizedTaggedMapOutput value2); private void joinAndCollect(CompositeKey key,List smaller, OptimizedTaggedMapOutput value, OutputCollector output, Reporter reporter) throws IOException { if(smaller.size() < 1) { OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value); collect(key, combined, output, reporter); } else { for (OptimizedTaggedMapOutput small : smaller) { OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value); } 方法 joinAndCollect 包含了两个数据集的值,并输出它们。
优化重分区连接实例 例如,需要连接用户详情数据和用户活动日志。 第一步,判断两个数据集中哪一个比较小。对于一般的网站来说,用户详情数据会比较小,用户活动日志会比较大。首先,实现抽象类 OptimizedDataJoinMapperBase。这个将在map端被调用。这个类将创建map的输出键和输出值。同时,它还将提示整个框架,当前处理的文件是不是比较小的那个。
Map端实现代码 public class SampleMap extends OptimizedDataJoinMapperBase { private boolean smaller; @Override protected Text generateInputTag(String inputFile) { // tag the row with input file name (data source) smaller = inputFile.contains("users.txt"); return new Text(inputFile); } protected String genGroupKey(Object key, OutputValue output) { return key.toString(); protected boolean isInputSmaller(String inputFile) { return smaller; protected OutputValue genMapOutputValue(Object o) { return new TextTaggedOutputValue((Text) o);
Reduce端实现代码 第二步,你需要实现抽象类 OptimizedDataJoinReducerBase。它将在reduce端被调用。在这个类中,将从map端传入不同数据集的输出键和输出值,然后返回reduce的输出数组。 public class SampleReduce extends OptimizedDataJoinReducerBase { private TextTaggedOutputValue output = new TextTaggedOutputValue(); private Text textOutput = new Text(); @Override protected OutputValue combine(String key, OutputValue smallValue,OutputValue largeValue) { if(smallValue == null || largeValue == null) { return null; } Object[] values = { smallValue.getData(), largeValue.getData() }; textOutput.set(StringUtils.join(values, "\t")); output.setData(textOutput); return output;
任务的主代码 最后,任务的主代码需要指明InputFormat类,并设置二次排序。 job.setInputFormat(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(CompositeKey.class); job.setMapOutputValueClass(TextTaggedOutputValue.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setPartitionerClass(CompositeKeyPartitioner.class); job.setOutputKeyComparatorClass(CompositeKeyComparator.class); job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class);
MapReduce中的连接策略 重分区连接 复制连接 半连接 ——reduce端连接。 使用场景:连接两个或多个大型数据集。 使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。 半连接 ——map端连接。 使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。
复制连接 复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。 MapReduce的复制连接的工作原理如下: 使用分布式缓存将这个小数据集复制到所有运行map任务的节点。 用各个map任务初始化方法将这个小数据集装载到一个哈希表中。 逐条用大数据集中的记录遍历这个哈希表,逐个判断是否符合连接条件 输出符合连接条件的结果。
复制连接
一个复制连接通用框架 该复制连接框架可以支持任意类型的数据集。这个框架中同样提供了一个优化的小功能:动态监测分布式缓存内容和输入块的大小,并判断哪个更大。如果输入块较小,那么就需要将map的输入块放到内存缓冲中,然后在mapper的cleanup方法中执行连接操作。下图为该框架的类图。并且提供了连接类( GenericReplicatedJoin)的具体实现,假设前提:每个数据文件的第一个标记是连接键。
连接框架的算法 Mapper的setup方法判断在map的输入块和分布式缓存的内容中哪个大。如果分布式缓存的内容比较小,那么它将被装载到内存缓存中。Map函数开始连接操作。如果输入块比较小,map函数将输入块的键\值对装载到内存缓存中。Map的cleanup方法将从分布式缓存中读取记录,逐条记录和在内存缓存中的键\值对进行连接操作。 。
GenericReplicatedJoin 以下代码为GenericReplicatedJoin中的setup方法,它是在map的初始化阶段调用的。这个方法判断分布式缓存中的文件和输入块哪个大。如果文件比较小,则将文件装载到HashMap中。 protected void setup(Context context) throws IOException, InterruptedException { distributedCacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration()); int distCacheSizes = 0; for (Path distFile : distributedCacheFiles) { File distributedCacheFile = new File(distFile.toString()); distCacheSizes += distributedCacheFile.length(); } if(context.getInputSplit() instanceof FileSplit) { FileSplit split = (FileSplit) context.getInputSplit(); long inputSplitSize = split.getLength(); distributedCacheIsSmaller = (distCacheSizes < inputSplitSize); } else { distributedCacheIsSmaller = true; if (distributedCacheIsSmaller) { for (Path distFile : distributedCacheFiles) { File distributedCacheFile = new File(distFile.toString()); DistributedCacheFileReader reader = getDistributedCacheReader(); reader.init(distributedCacheFile); for (Pair p : (Iterable<Pair>) reader) {addToCache(p); } reader.close(); }} }
GenericReplicatedJoin(续) 以下代码为GenericReplicatedJoin中的Map方法。它将会根据setup方法是否将了分布式缓存的内容装载到内存的缓存中来选择行为。如果分布式缓存的内容被装载到内存中,那么map方法就将输入块的记录和内存中的缓存做连接操作。如果分布式缓存的内容没有被装载到内存中,那么map方法就将输入块的记录装载到内存中,然后在cleanup方法中使用。 protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Pair pair = readFromInputFormat(key, value); if(distributedCacheIsSmaller) { joinAndCollect(pair, context); } else { addToCache(pair);}} public void joinAndCollect(Pair p, Context context) throws IOException, InterruptedException { List<Pair> cached = cachedRecords.get(p.getKey()); if (cached != null) { for (Pair cp : cached) {Pair result; if(distributedCacheIsSmaller) { result = join(p, cp); } else {result =join(cp, p); } if (result != null) { context.write(result.getKey(), result.getData()); }}}} public Pair join(Pair inputSplitPair, Pair distCachePair) { StringBuilder sb = new StringBuilder(); if (inputSplitPair.getData() != null) { sb.append(inputSplitPair.getData()); } sb.append("\t"); if (distCachePair.getData() != null) { sb.append(distCachePair.getData()); return new Pair<Text, Text>( new Text(inputSplitPair.getKey().toString()), new Text(sb.toString()));
GenericReplicatedJoin(续) 当所有的记录都被传输给map方法,MapReduce将会调用cleanup方法。如果分布式缓存的内容比输入块大,连接将会在cleanup中进行。连接的对象是map函数的缓存中的输入块的记录和分布式缓存中的记录。 protected void cleanup(Context context) throws IOException, InterruptedException { if (!distributedCacheIsSmaller) { for (Path distFile : distributedCacheFiles) { File distributedCacheFile = new File(distFile.toString()); DistributedCacheFileReader reader = getDistributedCacheReader(); reader.init(distributedCacheFile); for (Pair p : (Iterable<Pair>) reader) { joinAndCollect(p, context); } reader.close();
GenericReplicatedJoin(续) 最后,任务的驱动代码必须指定需要装载到分布式缓存中的文件。以下的代码可以处理一个文件,也可以处理MapReduce输入结果的一个目录。 Configuration conf = new Configuration(); FileSystem fs = smallFilePath.getFileSystem(conf); FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath); if(smallFilePathStatus.isDir()) { for(FileStatus f: fs.listStatus(smallFilePath)) { if(f.getPath().getName().startsWith("part")) { DistributedCache.addCacheFile(f.getPath().toUri(), conf); } } else { DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
MapReduce中的连接策略 重分区连接 复制连接 半连接 ——reduce端连接。 使用场景:连接两个或多个大型数据集。 使用场景:待连接的数据集中有一个数据集小到可以完全放在缓存中。 半连接 ——map端连接。 使用场景:待连接的数据集中有一个数据集非常大,但同时这个数据集可以被过滤成小到可以放在缓存中。
半连接 假设一个场景,需要连接两个很大的数据集,例如用户日记和OLTP的用户数据。任何一个数据集都不是足够小到可以缓存在map job的内存中。如此看来,似乎就不得不应用reduce端的连接了。这时候,可以看一下题目本身:若是一个数据集中有的记录因为无法连接到另一个数据集的记录,将会被移除,还需要将全部数据集放到内存中吗?在这个例子中,在用户日记中的用户仅仅是OLTP用户数据中的用户中的很小的一个项目组。那么就可以从OLTP用户数据中取出存在于用户日记中的那项目组用户的用户数据。如许就可以获得足够小到可以放在内存中的数据集。如许的解决规划就叫做半连接。
例子 ¥ hadoop fs -cat output/result/part* jim logout 93.24.237.12 21 OR ¥ bin/run.sh com.manning.hip.ch4.joins.semijoin .Main users.txt user-logs.txt output ¥ hadoop fs -ls output /user/aholmes/output/filtered /user/aholmes/output/result /user/aholmes/output/unique ¥ hadoop fs -cat output/unique/part* bob jim marie mike ¥ hadoop fs -cat output/filtered/part* mike 69 VA marie 27 OR jim 21 OR bob 71 CA ¥ hadoop fs -cat output/result/part* jim logout 93.24.237.12 21 OR mike new_tweet 87.124.79.252 69 VA bob new_tweet 58.133.120.100 71 CA mike logout 55.237.104.36 69 VA jim new_tweet 93.24.237.12 21 OR marie view_user 122.158.130.90 27 OR jim login 198.184.237.49 21 OR marie login 58.133.120.100 27 OR
半连接的实现——3个MapReduce job
半连接的实现——job1 第一个MapReduce job的功能是从日记文件中提取出用户名,用这些用户名生成一个用户名唯一的凑集(Set)。这经由过程让map函数履行了用户名的投影操作。然后用reducer输出用户名。为了削减在map阶段和reduce简短之间传输的数据量,就在map任务中采取哈希集来保存用户名,在cleanup办法中输出哈希集的值。
Job1的实现代码 public static class Map extends Mapper<Text, Text, Text, NullWritable> { private Set<String> keys = new HashSet<String>(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { keys.add(key.toString()); } protected void cleanup(Context context) Text outputKey = new Text(); for(String key: keys) { outputKey.set(key); context.write(outputKey, NullWritable.get()); public static class Reduce extends Reducer<Text, NullWritable, Text,NullWritable> { protected void reduce(Text key, Iterable<NullWritable> values, Context context) context.write(key, NullWritable.get()); } }
半连接的实现——job2 第二步是一个进行过滤的MapReduce job。目标是从全部用户的用户数据集中移除不存在于日记文件中的用户。这是一个只包含map的功课。它用到了复制连接来缓存存在于日记文件中的用户名,并把它们和全部用户的数据集连接。来自于job1的唯一的数据集要远远小于全部用户的数据集。很自然的就把小的那个数据集放到缓存中了。
复制连接通用框架 GenericReplicatedJoin 类是履行连接的类。在该类列表中前三个类是可扩展的,相对应的复制连接的行动也是可定制的。readFromInputFormat 方法可以用于任意的输入类型。getDistributedCacheReader 办法可以被重载来支持来自于分布式缓存的任意的文件类型。在这一步中的核心是join方法。Join方法将会生成job的输出键和输出值。在默认的实现中,两个数据集的值将会被归并以生成最后的输出值。这里可以把值输出变成来自于用户表的数据。
Job2的实现代码 重载join方法; @Override public Pair join(Pair inputSplitPair, Pair distCachePair) { return inputSplitPair; } 把来自于job1的文件放到分布式缓存中; for(FileStatus f: fs.listStatus(uniqueUserStatus)) { if(f.getPath().getName().startsWith("part")) { DistributedCache.addCacheFile(f.getPath().toUri(), conf); 在驱动代码中,调用 GenericReplicatedJoin 类; Job job = newJob(conf); job.setJarByClass(ReplicatedFilterJob.class); job.setMapperClass(ReplicatedFilterJob.class); job.setNumReduceTasks(0); job.setInputFormatClass(KeyValueTextInputFormat.class); outputPath.getFileSystem(conf).(outputPath, true); FileInputFormat.setInputPaths(job, usersPath); FileOutputFormat.setOutputPath(job, outputPath);
半连接的实现——job3 在最后一步中,须要将job2生成的已过滤的用户集和原始的用户日记合并了。已过滤的用户集是足够小到可以放到内存中,同样也可以放到分布式缓存中。 这里要再次用到复制连接框架来执行连接。但这次不需调节join方法的行为,因为两个数据集中的数据都要呈现在最后的输出中。
最佳连接策略选择 如果数据集中有一个足够小到可以放到mapper的内存中,那么map端的复制连接就足够了。 如果每个数据集都很大,同时其中一个数据集可以在经过一定条件过滤后大幅度地减小,那么半连接将会很有效。 如果你无法预处理你的数据,并且数据集大到不能够被缓存,那么你就需要在reducer中使用重分区连接了。
Thank you!