Hadoop入门 卢学裕@优酷网 2012.07
Outlines Hadoop Overview HDFS Map-reduce Programming Paradigm Hadoop Map-reduce Job Scheduler Resources
Hadoop, Why? 数据太多了,需要能存储、快速分析Pb级数据集的系统 单机的存储、IO、内存、CPU有限,需要可扩展的集群 使用门槛低,数据分析是个庞杂的问题,MPI太复杂 单点故障问题 –机器多了单点故障成为正常的异常 –节点有增有减 Hadoop就是一个满足易用性、可靠性、可扩展性的存储计算平台,还是开源的!
Hadoop History Dec 2004 – Google GFS paper published July 2005 – Nutch uses MapReduce Feb 2006 – Becomes Lucene subproject Apr 2007 – Yahoo! on 1000-node cluster Apr 2008 – Fastest QuickSort on 1TB Jan 2008 – An Apache Top Level Project Jul 2008 – A 4000 node test cluster
Hadoop-related projects Hive:数据仓库,facebook贡献 PIG:并行计算的一种高级语言,yahoo贡献 Nutch:网页搜索软件,不只是爬虫 Avro:数据序列化系统 Chukwa:用于管理大规模分布式集群的数据收集系统 ZooKeeper:用于分布式应用的高性能协同服务 Hbase:类似于BigTable的,key-value数据库系统 Mahout:分布式机器学习和数据挖掘的Lib Hama:基于BSP的超大规模科学计算框架 ……
Who Uses Hadoop Amazon 著名的AWS Adobe Adknowledge: behavioral targeting, clickstream analytics Alibaba Baidu: 搜索日志分析;每周处理3000TB 数据 Bixo Labs:web mining Datagraph:处理RDF数据,存储、索引 EBay : 532 nodes,搜索优化和研究 ETH Zurich Systems Group:教学《 Massively Parallel Data Analysis with MapReduce 》 Facebook : 1100 nodes, 12PB; 300 nodes, 3PB FOX:3个Cluster 用于日志分析、数据挖掘、机器学习 Freestylers:构建基于图片的推荐系统 Google Gruter. Corp:索引、Link分析、数据挖掘 Hulu: Hbase hosting IBM Krugle :源代码搜索 Last.fm:图表计算、A/B测试,user profile分析,cookie级的报表处理 Lineberger Comprehensive Cancer Center:癌症相关的研究,使用SeqWare LinkedIn :这人你可能认识 The New York Times PARC:分析Wikipedia里的冲突 Pressflip:个性化搜索,训练SVM模型 Yahoo!: 4000 nodes (2*4cpu boxes w 4*1TB disk & 16GB RAM) 优酷土豆 More on http://wiki.apache.org/hadoop/PoweredBy
Goals of HDFS 大数据集存储 – 10K nodes, 100 million files, 10 PB 应付硬件故障 – 用文件多副本应付故障问题 – 故障自动检测和恢复 更适合批量处理 – 搬移计算比搬移数据更廉价 – 数据一次写入,多次读取 – 更注重数据读取的高吞吐量,而不是低延时 适应复杂的硬件及软件平台
The File System 一个集群只有一个Namespace 跟Unix的文件系统Namespace很相似,不过不支持Hard link、Soft link 文件分块存储 一般一块大小为64M,可配置 每块会被复制在多个DataNode上 支持回收站 当一个文件被删除时会先放入用户下的回收站 回收站会被定期清除 恢复的方式是将文件从回收站移出
NameNode Metadata Meta-data 存在内存中 – 整个Meta-data放入主内存 – No demand paging of meta-data Meta-data记录了 – 文件列表信息 – 每个文件的块列表 – 每个块对应的DataNode – 文件属性,如创建时间、创建者、几份副本等 Transaction Log (EditLog ) –记录了文件系统的每个变化,如创建文件、删除文件、修改文件的副本数等 – EditLog会被合并为FsImage并存入磁盘 Meta-data 磁盘故障 – NameNode可以维护多份数据
DataNode A Block Server – 将数据存储本机文件系统(e.g. ext3) – 存储数据块的Meta-data(e.g. CRC, ID) 汇报 – 启动时向NameNode注册本地存储的文件块 –定期向NameNode报告本机存活(心跳) 数据输送 – 接收来自客户端的写数据 – 向客户端发送数据 –将数据传输到指定的 DataNodes
Block Replica Placement 机架感知 NameNode能感知机架,选择较优的方式 假设有3份或以上,目前的策略是 一份放在本地节点上 第二份放在另外一个机架的节点上 第三份放在跟第二份同机架的不同节点上 其他的随机放置 客户端从最近的块读取
Data Correctness – Use CRC32 – 由客户端负责计算CRC –客户端从DataNode读取数据和checksum 文件写入 – 由客户端负责计算CRC – checksum存放在DataNode 文件读取 –客户端从DataNode读取数据和checksum –由客户端校验,如果不通过,则客户端尝试从其他的副本读取数据
FS Shell FS DFSAdmin fsck Balancer hadoop fs -mkdir /foodir hadoop fs -rmr /foodir hadoop fs -cat /foodir/myfile.txt hadoop fs -tail /foodir/myfile.txt Chmod,chown,put,mv,cp,du,dus 更多命令请运行 hadoop fs –help获取 DFSAdmin Safemode,upgradeProgress,refreshNodes,… fsck 文件系统检查 Balancer 集群均衡
Web UI
Map-reduce Programming Paradigm input | map | shuffle | reduce | output 最简单的实现方式 cat * | grep ‘java’| sort | uniq -c | cat > file 实现这种编程范式的有 Google Hadoop Oracle Teradata ……
Hadoop Map/Reduce (input) <k1, v1> -> map -> <k2, v2> -> combine* -> <k2, v2> -> reduce -> <k3, v3> (output) combine 过程可能没有,也可能有多次
WordCount (©周敏@Taobao) the 1 weather 1 is 1 good 1 a 1 The weather is good good 1 a 1 good 4 today 1 is 1 good 1 Today is good guy 1 is 1 guy 1 is 4 man 2 the 1 this 1 guy 1 is 1 a 1 man 1 This guy is a man man 1 the 1 good 1 man 1 is 1 Good man is good this 1 this 1 today 1 weather 1 today 1 weather 1
WordCount Mapper public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable ONE= new IntWritable(1); private Text word = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, ONE); }
WordCount Reducer public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result);
WordCount Job Setup public static void main(String[] args) throws Exception { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(new Configuration(), "word count"); job.setJarByClass(WordCount.class); //设置输入 job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //Map job.setMapperClass(WordCountMapper.class); //Combine job.setCombinerClass(IntSumReducer.class); //Reduce job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置输出 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //提交Job并执行 System.exit(job.waitForCompletion(true) ? 0 : 1);
Inject Points Input Map Combine* Shuffling Sort Grouping Reduce Output job.setInputFormatClass() Map job.setMapperClass() Combine* job.setCombinerClass() Shuffling job.setPartitionerClass() Sort job.setSortComparatorClass() Grouping job.setGroupingComparatorClass() Reduce job.setReducerClass() Output job.setOutputFormatClass()
Job Tracker & Task Tracker Job & Task 一个Job会被分成多个Task执行 一个Task对应一个Map或者Reduce Job Tracker 运行在Master上 ,管理和跟踪每个Job 收集Task的信息状态,并汇总 重新调度失败的任务 Task Tracker 向Job Tracker汇报状态(心跳) 运行在每个计算节点上,管理和跟踪每个Task 收集task的信息,并提供给Job Tracker
Job Sheduler FIFO 先到先得,排队执行 Fair Scheduler(公平调度器) 它的目的是让所有的作业随着时间的推移,都能平均的获取等同的共享资源。 按资源池(pool)来组织作业,并把资源按配置分到这些资源池里 http://hadoop.apache.org/common/docs/r0.20.2/fair_scheduler.html Capacity Scheduler(容量调度器) 支持多个queue,每个Job提交到一个queue里 支持内存调度,对于需要高内存的任务,调度到有足够内存的节点 http://hadoop.apache.org/common/docs/r0.20.2/capacity_scheduler.html
Job Scheduler Web UI
Job Shell hadoop job –kill <job-id> hadoop job –list hadoop job -set-priority <job-id> <priority> hadoop job -status <job-id> hadoop job -kill-task <task-id> hadoop job -fail-task <task-id>
Job Web UI
Select COUNT(DISTINCT guid) as uv from youku_pv; Hive Select COUNT(DISTINCT guid) as uv from youku_pv;
Hadoop Next Generation NameNode单点故障 YARN Resource Manager Job Scheduling/monitoring Application Submission Client ApplicationMaster New Programming Paradigm MPI Master-Worker Iterative models “Customize” by yourself
Hadoop Next Generation Architecture
优酷数据平台V2 分布式实时计算系统 网站 反作弊系统 分布式日志收集系统 广告 分布式实时数据流处理系统 无线 HADOOP HDFS …… 接口系统监控 作弊监测 广告计数 PlayLog记录 播放计数 个性化推荐 计数/监控 准实时计算 网站 反作弊系统 分布式日志收集系统 广告 分布式实时数据流处理系统 无线 HADOOP HDFS HBASE HIVE 搜索 MAP-REDUCE 统计分析 数据挖掘 运营分析 相关推荐 用户分析 个性化推荐 数据开放平台 精准广告 …… 播放器 调度系统 大数据批量计算系统
优酷推荐系统
指数&VideoProfile 演示指数
Resources Hadoop Hadoop World 推荐资料 http://hadoop.apache.org/ http://hive.apache.org/ http://pig.apache.org/ http://hbase.apache.org/ http://zookeeper.apache.org/ Hadoop World http://www.hadoopworld.com/ http://www.cloudera.com/resources/hadoop-world/ 推荐资料 《Hadoop:The.Definitive.Guid 3rd 》 《Hadoop实战》 《Hadoop权威指南》 《hadoop开发者》杂志
Thanks