Hadoop与数据分析 淘宝数据平台及产品部基础研发组 周敏 日期:2010-05-26 1
Outline Hadoop基本概念 Hadoop的应用范围 Hadoop底层实现原理 Hive与数据分析 Hadoop集群管理 常见问题及解决方案 2
关于打扑克的哲学
打扑克与MapReduce 分牌 各自齐牌 再次理牌 搞定 交换 Input split shuffle output
统计单词数 a 1 the 1 weather 1 is 1 good 1 The weather good 1 is good a 1 today 1 is 1 good 1 Today is good guy 1 guy 1 is 4 is 1 this 1 guy 1 is 1 a 1 good 1 man 1 man 2 This guy is a good man the 1 man 1 this 1 the 1 today 1 good 1 man 1 is 1 this 1 Good man is good weather 1 today 1 weather 1
流量计算 6 6
趋势分析 http://www.trendingtopics.org/截图 7 7 7
用户推荐 8 8 8
分布式索引 9 9 9
Hadoop生态系统 Hadoop 核心 并行数据分析语言Pig 列存储NoSQL数据库 Hbase 分布式协调器Zookeeper Hadoop Common 分布式文件系统HDFS MapReduce框架 并行数据分析语言Pig 列存储NoSQL数据库 Hbase 分布式协调器Zookeeper 数据仓库Hive(使用SQL) Hadoop日志分析工具Chukwa
Hadoop实现 Hadoop Cluster Data Results MAP Reduce DFS Block 1 Data data data data data Results Data data data data
作业执行流程
Hadoop案例(1) // MapClass1中的map方法 public void map(LongWritable Key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String strLine = value.toString(); String[] strList = strLine.split("\""); String mid = strList[3]; String sid = strList[4]; String timestr = strList[0]; try{ timestr = timestr.substring(0,10); }catch(Exception e){return;} timestr += "0000"; // 省略数十行 output.collect(new Text(mid + “\”” + “sid\”” + timestr , ...); }
Hadoop案例(2) public static class Reducer1 extends MapReduceBase implements Reducer<Text, Text, Text, Text> { private Text word = new Text(); private Text str = new Text(); public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String[] t = key.toString().split("\""); word.set(t[0]);// str.set(t[1]); output.collect(word,str);//uid kind }//reduce }//Reduce0b
Hadoop案例(3) public static class MapClass2 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private Text word = new Text(); private Text str = new Text(); public void map(LongWritable Key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String strLine = value.toString(); String[] strList = strLine.split("\\s+"); word.set(strList[0]); str.set(strList[1]); output.collect(word,str); }
Hadoop案例(4) public static class Reducer2 extends MapReduceBase implements Reducer<Text, Text, Text, Text> { private Text word = new Text(); private Text str = new Text(); public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { while(values.hasNext()) { String t = values.next().toString(); // 省略数十行代码 } output.collect(new Text(mid + “\”” + sid + “\””) + ...., ...)
Thinking in MapReduce(1) B C A Filter Co-group A B D Group Filter B C D Function C Aggregate
Thinking in MapReduce(2)
SELECT COUNT(DISTINCT mid) FROM log_table Hive的魔力 Magics of Hive: SELECT COUNT(DISTINCT mid) FROM log_table
为什么淘宝采用Hadoop? webalizer awstat 般若 Atpanel时代 Hadoop时代 日志最高达250GB/天 最高达约50道作业 每天运行20小时以上 Hadoop时代 当前日志470GB/天 当前366道作业 平均6~7小时完成 21
还有谁在用Hadoop? 雅虎北京全球软件研发中心 中国移动研究院 英特尔研究院 金山软件 百度 腾讯 新浪 搜狐 IBM Facebook Amazon Yahoo!
Web站点的典型Hadoop架构 Web Servers Log Collection Servers Filers Data Warehousing on a Cluster Oracle RAC Federated MySQL 23
淘宝Hadoop与Hive的使用 Scheduler Thrift Server Rich Client Client Program Web Server CLI/GUI MetaStore Server Web Mysql JobClient
调试 标准输出,标准出错 Web显示(50030, 50060, 50070) NameNode,JobTracker, DataNode, TaskTracker日志 本地重现: Local Runner DistributedCache中放入调试代码
Profiling 目的:查性能瓶颈,内存泄漏,线程死锁等 工具: jmap, jstat, hprof,jconsole, jprofiler mat,jstack 对JobTracker的Profile 对各slave节点TaskTracker的Profile 对各slave节点某Child进程的Profile(可能存 在单点执行速度过慢)
监控 目的:监控集群或单个节点I/O, 内存及CPU 工具: Ganglia
如何减少数据搬动? 28 28 28
数据倾斜 29 29 29