软件工程基础 Hadoop生态系统 刘 驰.

Slides:



Advertisements
Similar presentations
云计算辅助教学风云录 黎加厚 上海师范大学教育技术系 2010年8月9日.
Advertisements

DATE: 14/10/2009 陳威宇 格網技術組 雲端運算相關應用 (Based on Hadoop)
Foundations of Computer Science
穆公(朱金清 微博:淘穆公 阿里HBase业务设计实践 穆公(朱金清 微博:淘穆公
基于Hadoop的Map/Reduce框架研究报告
对存储系统发挥特殊作用的文件系统 2006年5月 - 北京.
加快数据中心运转速度 — 加速业务发展 约翰•福勒 甲骨文公司系统事业部执行副总裁. 加快数据中心运转速度 — 加速业务发展 约翰•福勒 甲骨文公司系统事业部执行副总裁.
人工智能 Artificial Intelligence 第十一章
台灣雲端運算應用實驗中心研發計畫 計 畫 期 間:自98年7月1日至99年6月30日止 執行單位名稱 :財團法人資訊工業策進會 國立中山大學.
操作系统结构.
HADOOP的高能物理分析平台 孙功星 高能物理研究所/计算中心
数据采集与Hadoop框架 报告人:黄文君 导 师:王华忠 BEA Confidential.
基于hadoop的数据仓库技术.
Routing Protocols and Concepts – Chapter 3
分布式系统 Distributed Systems 第 2 讲 系统模型
大数据在医疗行业的应用.
天文望远镜集成建模研究 杨德华 南京天文光学技术研究所 30 NOV, 年中国虚拟天文台年会 广西师范大学 桂林
Made by Feng Nie 开源机器学习库&Hadoop介绍 Made by Feng Nie
云实践引导产业升级 沈寓实 博士 教授 MBA 中国云体系产业创新战略联盟秘书长 微软云计算中国区总监 WinHEC 2015
Introduction to MapReduce
Frontiers of Software Engineering
Leftmost Longest Regular Expression Matching in Reconfigurable Logic
System Administration Practice Homework 2: Shell Programming
YARN & MapReduce 2.0 Boyu Diao
王耀聰 陳威宇 國家高速網路與計算中心(NCHC)
第五讲 数据的分组、合并与转换.
線上英檢測驗系統 Copyright © 2012 Cengage Learning Asia Pte. Ltd.,
中国散裂中子源小角谱仪 的实验数据格式与处理算法 报告人:张晟恺 中国科学院高能物理研究所 SCE 年8月18日
CHAPTER 6 認識MapReduce.
Logistics 物流 昭安國際物流園區 總經理 曾玉勤.
Flash数据管理 Zhou da
Decision Support System (靜宜資管楊子青)
第4章(2) 空间数据库 —关系数据库 北京建筑工程学院 王文宇.
HLA - Time Management 陳昱豪.
Chapter7 全球資訊網與瀏覽器介紹 網路應用入門(一) Chapter7 全球資訊網與瀏覽器介紹
微程序控制器 刘鹏 Dept. ISEE Zhejiang University
创建型设计模式.
重點 資料結構之選定會影響演算法 選擇對的資料結構讓您上天堂 程式.
软件工程 Software Engineering
校園網路架構介紹與資源利用 主講人:趙志宏 圖書資訊館網路通訊組.
第4章(1) 空间数据库 —数据库理论基础 北京建筑工程学院 王文宇.
大数据介绍及应用案例分享 2016年7月 华信咨询设计研究院有限公司.
Decision Support System (靜宜資管楊子青)
TinyOS 石万兵 2019/4/6 mice.
Westmont College 网络应用软件 第一讲 (客户-服务器 概念, 协议端口的使用, 套接字API)
資料庫 靜宜大學資管系 楊子青.
Version Control System Based DSNs
高性能计算与天文技术联合实验室 智能与计算学部 天津大学
Guide to a successful PowerPoint design – simple is best
高正宗 System Consultant Manager
Unit 05 雲端分散式Hadoop實驗 -I M. S. Jian
中国科学技术大学计算机系 陈香兰 2013Fall 第七讲 存储器管理 中国科学技术大学计算机系 陈香兰 2013Fall.
虚 拟 仪 器 virtual instrument
中国科学技术大学计算机系 陈香兰 Fall 2013 第三讲 线程 中国科学技术大学计算机系 陈香兰 Fall 2013.
Common Qs Regarding Earnings
從 ER 到 Logical Schema ──兼談Schema Integration
Google Local Search API Research and Implementation
NASA雜談+電腦網路簡介 Prof. Michael Tsai 2015/03/02.
Distance Vector vs Link State
计算机问题求解 – 论题1-5 - 数据与数据结构 2018年10月16日.
百万亿次超级计算机诞生记 姓名 Xiangyu Ye 职务 微软中国技术中心资深HPC顾问 公司 微软中国
严肃游戏设计—— Lab-Adventure
Cloud Computing Google云计算原理.
Distance Vector vs Link State Routing Protocols
11 Overview Cloud Computing 2012 NTHU. CS Che-Rung Lee
怎樣把同一評估 給與在不同班級的學生 How to administer the Same assessment to students from Different classes and groups.
MGT 213 System Management Server的昨天,今天和明天
Experimental Analysis of Distributed Graph Systems
ppt宝藏提供 中国银行业信息化系统建设研讨会
Section 1 Basic concepts of web page
Presentation transcript:

软件工程基础 Hadoop生态系统 刘 驰

An Ecosystem for Cloud Computing 2

Problem Batch (offline) processing of huge data set using commodity hardware is not enough for real-time applications Strong desire for linear scalability Need infrastructure to handle all mechanics allow developers to focus on the processing logic/algorithms 3

Explosive Data! – Storage New York Stock Exchange: 1 TB data per day Facebook: 100 billion photos, 1 PB (1000 TB) Internet Archive: 2 PB data, growing by 20 TB per month Can’t put data on a SINGLE node Strong needs for distributed file systems

5 Java/Python/C interfaces 需要处理Petabyte级别的数据集 为每个应用程序的建立可靠的平台代价较大 每天都有节点坏掉 – 节点坏掉是可以预料到的 – 集群中节点的数量不是稳定的 只需要普通的基础设施 –高效、可信、开源 Java/Python/C interfaces 5

Commercial Hardware 7 典型的2层构架 – 节点是普通的商业PC机 – 30-40 节点/Rack – 顶层到Rack 带宽3-4Gbps – Rack到节点带宽1Gbps 7

Who is (was) Using Hadoop? 8

Example: Facebook的Hadoop集群 产品集群 4800个内核,600个机器,每个机器16GB—2009年4月 8000个内核,1000个机器,每个机器32GB—2009年7月 每个机器拥有4个1TB大小的SATA硬盘 两层网络结构,每个Rack有40个机器 整个集群大小为2PB,未来还会不断增加 测试集群 800 个内核, 每个16GB 9

A Distributed File System

Single-Node Architecture CPU Machine Learning, Statistics Memory “Classical” Data Mining Disk Even with Lucene, we had to tweak parameters to get proper performance for XML Index and Search because of its usage of disk for B-Trees implementation for DBLP dataset (337 MB) 11 11

Commodity Clusters Web data sets can be very large Tens to hundreds of TB Cannot mine on a single server Standard architecture emerging: Cluster of commodity Linux nodes Gigabit Ethernet interconnect How to organize computations on this architecture? Mask issues such as hardware failure On the software side: what is the programming model? On the hardware side: how to deal with failures – hardware and data corruption? 12 12

Cluster Architecture … … 2-10 Gbps backbone between racks 1 Gbps between any pair of nodes in a rack Switch Switch Switch Mem Disk CPU Mem Disk CPU Mem Disk CPU Mem Disk CPU … … Each rack contains 16-64 nodes 13 13

Stable storage First order problem: if nodes can fail, how can we store data persistently? Answer: Distributed File System Provides global file namespace Google GFS; Hadoop HDFS; Kosmix KFS Typical usage pattern Huge files (100s of GB to TB) Data is rarely updated in place Reads and appends are common DFS: provides a unified view of the file system and hides the details of replication and consistency management 14 14

15

16

Namenode and Datanodes Master/slave architecture 1 Namenode, a master server that manages the file system namespace and regulates access to files by clients. many DataNodes usually one per node in a cluster. manage storage serves read, write requests, performs block creation, deletion, and replication upon instruction from Namenode. HDFS exposes a file system namespace and allows user data to be stored in files. A file is split into one or more blocks and set of blocks are stored in DataNodes. 2018/9/10 17

Namespace Hierarchical file system with directories and files Create, remove, move, rename etc. Namenode maintains the file system Any meta information changes to the file system recorded by the Namenode. An application can specify the number of replicas of the file needed: replication factor of the file. This information is stored in the Namenode. 9/10/2018 18

Data Replication Store very large files across machines in a large cluster. Each file is a sequence of blocks of same size. Blocks are replicated 2-3 times. Block size and replicas are configurable per file. Namenode receives a Heartbeat and a BlockReport from each DataNode in the cluster. BlockReport contains all the blocks on a Datanode. 9/10/2018 19

Replica Placement Rack-aware: Goal: improve reliability, availability and network bandwidth utilization Research topic Namenode determines the rack id for each DataNode. Replicas are placed: 1 in a local rack, 1 on a different node in the local rack and 1 on a node in a different rack. 1/3 of the replica on a node, 2/3 on a rack and 1/3 distributed evenly across remaining racks. 9/10/2018 20

HDFS: Data Node Distance 21

Replication Pipelining When the client receives response from Namenode, it flushes its block in small pieces (4K) to the first replica, that in turn copies it to the next replica and so on. Thus data is pipelined from Datanode to the next. 9/10/2018 22

Replica Selection Replica selection for READ operation: HDFS tries to minimize the bandwidth consumption and latency. If there is a replica on the Reader node then that is preferred. HDFS cluster may span multiple data centers: replica in the local data center is preferred over the remote one. 9/10/2018 23

Datanode A Datanode stores data in files in its local file system. Datanode has no knowledge about HDFS filesystem It stores each block of HDFS data in a separate file. Datanode does not create all files in the same directory. It uses heuristics to determine optimal number of files per directory and creates directories appropriately: Research issue? When the filesystem starts up it generates a list of all HDFS blocks and send this report to Namenode: Blockreport. 9/10/2018 24

HDFS: File Read 25

HDFS: File Write 26

Communication Protocol All protocols are layered on top of the TCP/IP protocol A client establishes a connection to a configurable TCP port on the Namenode machine. It talks ClientProtocol with the Namenode. Datanodes talk to the Namenode using Datanode protocol. RPC abstraction wraps both ClientProtocol and Datanode protocol. Namenode is simply a server and never initiates a request; it only responds to RPC requests issued by DataNodes or clients. 9/10/2018 27

DataNode Failure and Heartbeat Datanodes lose connectivity with Namenode. Namenode detects this condition by the absence of a Heartbeat message. Namenode marks Datanodes without Hearbeat and does not send any IO requests to them. Any data registered to the failed Datanode is not available to the HDFS. 9/10/2018 28

Cluster Rebalancing HDFS architecture is compatible with data rebalancing schemes. A scheme might move data from one Datanode to another if the free space on a Datanode falls below a certain threshold. In the event of a sudden high demand for a particular file, a scheme might dynamically create additional replicas and rebalance other data in the cluster. These types of data rebalancing are not yet implemented: research issue. 9/10/2018 29

APIs HDFS provides Java API for application to use. Python access is also used in many applications. A C language wrapper for Java API is also available. A HTTP browser can be used to browse the files of a HDFS instance. 9/10/2018 30

A Distributed Computation Framework for Batch Processing

What is Map/Reduce? A Programming Model Decompose a processing job into Map and Reduce stages Developer need to provide codes for Map and Reduce functions configure the job let Hadoop handle the rest 33

MapReduce Model 34

Distributed Execution Overview User Program Worker Master fork assign map reduce Split 0 Split 1 Split 2 Input Data local write Output File 0 File 1 write read remote read, sort 1.MapReduce库先把user program的输入文件划分为M份(M为用户定义),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内其它机器上。 2.user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是可以由用户指定的。 3.被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split一一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。 4.缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户定义的,将来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。 5.master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map 作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个 Reduce作业(谁让分区少呢),所以排序是必须的。 6.reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。 7.当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。 所有执行完毕后,MapReduce输出放在了R个分区的输出文件中(分别对应一个Reduce作业)。用户通常并不需要合并这R个文件,而是将其作为输入交给另一个MapReduce程序处理。整个过程中,输入数据是来自底层分布式文件系统(GFS) 的,中间数据是放在本地文件系统的,最终输出数据是写入底层分布式文件系统(GFS)的。而且我们要注意Map/Reduce作业和map/reduce 函数的区别:Map作业处理一个输入数据的分片,可能需要调用多次map函数来处理每个输入键值对;Reduce作业处理一个分区的中间键值对,期间要对 每个不同的键调用一次reduce函数,Reduce作业最终也对应一个输出文件。 35

Example: Word Count We have a large file of words, one word to a line Count the number of appearances for each distinct word Sample application: analyze web server logs to find popular URLs 36

Pseudo-Code: Word Count map(key, value): // key: document name; value: text of document for each word w in value: emit(w, 1) reduce(key, values): // key: a word; values: an iterator over counts result = 0 for each count v in values: result += v emit(key,result) map(String input_key, String input_value): // input_key: document name // input_value: document contents for each word w in input_value: EmitIntermediate(w, "1"); reduce(String output_key, Iterator intermediate_values): // output_key: a word // output_values: a list of counts int result = 0; for each v in intermediate_values: result += ParseInt(v); Emit(AsString(result)); 37

Word Count see 1 bob 1 bob 1 run 1 see bob run run 1 see 2 map(key=url, val=contents): For each word w in contents, emit (w, “1”) reduce(key=word, values=uniq_counts): Sum all “1”s in values list Emit result “(word, sum)” see 1 bob 1 run 1 see 1 spot 1 throw 1 bob 1 run 1 see 2 spot 1 throw 1 see bob run see spot throw 38 38

MapReduce Input: a set of key/value pairs User supplies two functions: map(k,v)  list(k1,v1) reduce(k1, list(v1))  v2 (k1,v1) is an intermediate key/value pair Output is the set of (k1,v2) pairs 39

What is MAP? Map each data entry into a pair Examples <key, value> Examples Map each log file entry into <URL,1> Map day stock trading record into <STOCK, Price> map类似于SQL聚集请求中的group-by子句 40

What is Shuffle/Merge phase? Hadoop merges(shuffles) output of the MAP stage into: <key, valulue1, value2, value3> Examples <URL, 1 ,1 ,1 ,1 ,1 1> <STOCK, Price On day 1, Price On day 2,…...> 41

What is Reduce? Reduce entries produces by Hadoop merging processing into <key, value> pair Examples Map <URL, 1,1,1> into <URL, 3> Map <Stock, 3,2,10> into <Stock, 10> 42

Implementation Overview Master Node user Job tracker Slave node 1 Slave node 2 Slave node N Task tracker Task tracker Task tracker 45 Workers Workers Workers

How It Works? 46

Data Flow Input, final output are stored on HDFS Scheduler tries to schedule map tasks “close” to physical storage location of input data Intermediate results are stored on local FS of map and reduce workers Output is often input to another map reduce task 47

Coordination Master data structures Task status: (idle, in-progress, completed) Idle tasks get scheduled as workers become available When a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducer Master pushes this info to reducers Master pings workers periodically to detect failures 48

Failures Map worker failure Reduce worker failure Master failure Map tasks completed or in-progress at worker are reset to idle Reduce workers are notified when task is rescheduled on another worker Reduce worker failure Only in-progress tasks are reset to idle Master failure MapReduce task is aborted and client is notified Why is completed map task discarded? 49 49

Execution View from task perspective 50 50

Parallel Execution View from scheduled m/c perspective 51 51

How Many Map and Reduce Jobs? M map tasks, R reduce tasks Rule of thumb: M, R >> (# of nodes) in cluster One DFS chunk per map is common Improves dynamic load balancing and speeds recovery from worker failure Usually R is smaller than M, because output is spread across R files 52

Combiners Often a map task will produce many pairs of the form (k,v1), (k,v2), … for the same key k e.g., popular words in Word Count Can save network time by pre-aggregating at mapper combine(k1, list(v1))  v2 same as reduce function 53

Partition Function Inputs to map tasks are created by contiguous splits of input file For reduce, we need to ensure that records with the same intermediate key end up at the same worker System can use a default partition function e.g., hash(key) mod R Sometimes useful to override e.g., hash(hostname(URL)) mod R ensures URLs from a host end up in the same output file 54

Execution Summary How is this distributed? Partition input key/value pairs into chunks, run map() tasks in parallel After all map()s are complete, consolidate all emitted values for each unique emitted key Now partition space of output map keys, and run reduce() in parallel If map() or reduce() fails, re-execute! 55 55

Example: Trading Data Processing Input: Historical Stock Data Records are CSV (comma separated values) text file Each line : stock_symbol, low_price, high_price 1987-2009 data for all stocks one record per stock per day Output: Maximum interday delta for each stock 56

Map Function: Part I 57

Map Function: Part II 58

Reduce Function 59

Running the Job : Part I 60

Running the Job: Part II 61