Gearpump 基于Akka的新一代流处理引擎

Slides:



Advertisements
Similar presentations
云计算辅助教学风云录 黎加厚 上海师范大学教育技术系 2010年8月9日.
Advertisements

DATE: 14/10/2009 陳威宇 格網技術組 雲端運算相關應用 (Based on Hadoop)
Big Data Ecosystem – Hadoop Distribution
南京楚然电子科技有限公司 Nanjing Truerun Electronics Technology Co.,Ltd
實驗 9: 無線安全網路之建設.
“Internet+” Business Innovation
加快数据中心运转速度 — 加速业务发展 约翰•福勒 甲骨文公司系统事业部执行副总裁. 加快数据中心运转速度 — 加速业务发展 约翰•福勒 甲骨文公司系统事业部执行副总裁.
DCE Market Data Business
第8章 系統架構.
環境感測網路系統介紹 Air Box – 空氣盒子 訊舟科技股份有限公司 品牌事業處產品企劃 吳靜美經理.
分布式系统 Distributed Systems 第 2 讲 系统模型
寻找适合您的工业4.0 Dell/曾峰.
Azure Event Hub Survey 周琦.
云实践引导产业升级 沈寓实 博士 教授 MBA 中国云体系产业创新战略联盟秘书长 微软云计算中国区总监 WinHEC 2015
Introduction to MapReduce
AaaS: ACL as a Service TEAM 2
Leftmost Longest Regular Expression Matching in Reconfigurable Logic
CHT Project Progress Report
Operating System CPU Scheduing - 3 Monday, August 11, 2008.
Module 5 Shopping 第2课时.
YARN & MapReduce 2.0 Boyu Diao
Excellence in Manufacturing 卓 越 制 造
Operating System Concepts 作業系統原理 Chapter 3 行程觀念 (Process Concept)
佐登妮斯大樓監控系統簡介 圓 泰 科 技 1.
王耀聰 陳威宇 國家高速網路與計算中心(NCHC)
基于Hadoop的数据仓库Hive.
第五讲 数据的分组、合并与转换.
从UNIX到Windows的 电信软件移植实践
軟體工程 -物件導向程式設計與UML系統分析實作
線上英檢測驗系統 Copyright © 2012 Cengage Learning Asia Pte. Ltd.,
CHAPTER 6 認識MapReduce.
Spark在智慧图书馆建设中的应用探索 2017年12月22日.
Flash数据管理 Zhou da
Decision Support System (靜宜資管楊子青)
第4章 网络互联与广域网 4.1 网络互联概述 4.2 网络互联设备 4.3 广域网 4.4 ISDN 4.5 DDN
The expression and applications of topology on spatial data
从TDW-Hive到TDW-SparkSQL
SAP 架構及基本操作 SAP前端軟體安裝與登入 Logical View of the SAP System SAP登入 IDES
重點 資料結構之選定會影響演算法 選擇對的資料結構讓您上天堂 程式.
JTAG INTERFACE SRAM TESTER WITH C-LCM
NS2 – TCP/IP Simulation How-Wei Wu.
An Introduction to Cloud RDBMS
建设 21 世纪 具有国际先进水平的 教育与科研计算机网
第4章(1) 空间数据库 —数据库理论基础 北京建筑工程学院 王文宇.
軟體工程:如何開發軟體? 把它看成是一件工程。 那麼就會有一些工具、技術、方法,也有管理的議題。
Decision Support System (靜宜資管楊子青)
IBM SWG Overall Introduction
TinyOS 石万兵 2019/4/6 mice.
資料結構 Data Structures Fall 2006, 95學年第一學期 Instructor : 陳宗正.
SAP R/3架構及前端軟體安裝 Logical View of the R/3 System SAP Frontend 6.2安裝
Sensor Networks: Applications and Services
Have you read Treasure Island yet?
高性能计算与天文技术联合实验室 智能与计算学部 天津大学
Real-Time System Software Group Lab 408 Wireless Networking and Embedded Systems Laboratory Virtualization, Parallelization, Service 實驗室主要是以系統軟體設計為主,
Guide to a successful PowerPoint design – simple is best
Unit 05 雲端分散式Hadoop實驗 -I M. S. Jian
中国科学技术大学计算机系 陈香兰 2013Fall 第七讲 存储器管理 中国科学技术大学计算机系 陈香兰 2013Fall.
虚 拟 仪 器 virtual instrument
中国科学技术大学计算机系 陈香兰 Fall 2013 第三讲 线程 中国科学技术大学计算机系 陈香兰 Fall 2013.
中央社新聞— <LTTC:台灣學生英語聽說提升 讀寫相對下降>
Apache Flink 刘 驰.
The viewpoint (culture) [观点(文化)]
SoC 與微控制器的發展 朱亞民.
NASA雜談+電腦網路簡介 Prof. Michael Tsai 2015/03/02.
SAP 架構及基本操作 SAP前端軟體安裝與登入 Logical View of the SAP System SAP登入 IDES
More About Auto-encoder
11 Overview Cloud Computing 2012 NTHU. CS Che-Rung Lee
SAP 架構及前端軟體安裝 Logical View of the SAP System SAP Frontend 7.1安裝 SAP登入
Experimental Analysis of Distributed Graph Systems
OPTIMA Optical Technology(Shenzhen) Co., Ltd 奥蒂玛光学科技(深圳)有限公司
Presentation transcript:

Gearpump 基于Akka的新一代流处理引擎 QCON Beijing 2015 Gearpump 基于Akka的新一代流处理引擎 钟翔 Mail: xiang.zhong@intel.com Weibo:http://weibo.com/clockfly Intel亚太研发中心 | Intel 软件与服务部 Big Data Technology Department 2015/4/24

What is Gearpump Akka based lightweight Real time data processing platform. Apache License http://gearpump.io What is Akka? Simple and Powerful Akka: 通信, 并发,隔离,容错 Message level streaming Long running daemons Gerpump是一个轻量级的实时计算引擎。 为什么用这个名字?我们想强调Gearpump非常简单又强大,看这张图,齿轮帮是工业设计的杰作,只有两个齿轮,就能有效泵水。 Gearppump基于Akka,那什么是Akka呢? simplifies communication, concurrency, isolation, and fault tolerance,

What is Akka? Micro-service(Actor) oriented. Message Driven Lock-free Location-transparent Better performance Fail in-dependently Scales linearly Akka是一个开源库,Akka应用是由许许多多的微型服务即Actor编织而成,每个微型服务粒度很小,只做一件简单的事情;微型服务间通过异步的消息发送驱动对方工作,一起组合实现应用所需的功能。   Reactive架构极像我们人类社会的架构,每个人独立承担自己的工作,通过邮件等消息驱动对方工作,高效并发,而且每个人可以独立容错,人类社会也许是已知的最大的分布式系统。 It is like our human society, driven by message

Gearpump in Big Data Stack store visualization batch stream SQL Catalyst StreamSQL Impala Cluster manager monitor/alert/notify Machine learning Graphx Cloudera Manager Storage Engine Analytics Visualization & management storm Here! Data explore Gearpump 在计算引擎一层, /

Why another streaming platform? The requirements are not fully met. We need a higher abstraction to build software! 近年来,流处理已经有很多年了,每年都有些新东西,为什么又做一个? 需求未满足 软件发展到一定规模会出现瓶颈,需要更高的抽象,才能简单干净的解决问题,软件发展的历史就是抽象层次不断跳跃的过程,我认为对大数据栈,现在到了这个节点。Hive,100万行代码。

Streaming requirements Michael Stonebraker, The 8 Requirements of Real-Time Stream Processing (2006) My summary 活 Flexible 大 Volume 快 Speed 准 Accuracy 见 Visual Easy programing Any time Any where Any size Any source Any use case Dynamic DAG ②StreamSQL High throughput ⑦Scale linearly ①In-Stream Zero latency ⑥HA ⑧Responsive Exactly-once ③Message loss/delay/out of order ④Predictable WYSWYG 图灵奖获得者Michael,在2006年提出了流处理的六个需求,我把它重新总结了一下,放在下面,5个字,活,大,快,准,见。 大部分平台只部分满足了这个要求,Gearpump和他们相比最大的不同在于灵活性,灵活性是指用起来简单,适应性强。

Gearpump Highlights 100% Akka Throughput 11 million/s (*) 2ms Latency performance Exactly-once Dynamic DAG Out of Order Message function Flexible DSL DAG Visualization Internet of Thing usability 性能上。功能上:可用性上,数据源上:端到端延时, 2020年,将有260亿物联网设备 [*] on 4 nodes

Using Gearpump 用户接口

How to Submit an application Master Workers Master Workers 1. Submit a Jar AppMaster 2. Create AppMaster 1 2 Ask YARN for Resource 3 4 Master Workers YARN 3. Request Resource Master Workers AppMaster Executor Executor Executor 1.用户提交Jar给集群,创建AppMaster,AppMaster申请申请资源,继而向YARN申请咨询,创建container, 在每个机器上运行副本,分布式程序开始运行。 AppMaster 4. Report Executor to AppMaster

DAG representation and API DAG API Syntax: Graph(A~>B~>C~>D, B~>E~>D) A B C D E Processor DAG Shuffle 这是我们的DAG,对每个节点我们叫Processor,每个Procesor可以并发出多个Task,上下游之间可以shuffle。上图是我们的API,箭头,超级简单。

DAG API Example - WordCount val context = new ClientContext() val split = Processor[Split](splitParallism) val sum = Processor[Sum](sumParallism) val app = StreamApplication("wordCount", Graph(split ~> sum), UserConfig.empty) val appId = context.submit(app) context.close() class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { override def onNext(msg : Message) : Unit = { /* split the line */ } } class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { override def onNext(msg : Message) : Unit = {/* do aggregation on word*/} Wordcount,两步 第一步:把一行拆解成单词,第二部,单词计数。 这种

WordCount with DSL API val context = ClientContext() val app = new StreamApp("dsl", context) val data = "This is a good start, bingo!! bingo!!" app.fromCollection(data.lines) // word => (word, count = 1) .flatMap(line => line.split("[\\s]+")).map((_, 1)) // (word, count1), (word, count2) => (word, count1 + count2) .groupByKey().sum.log val appId = context.submit(app) context.close() 这种DSL API怎么实现的呢?

High level DSL API Details OP Graph Processor Graph Concepts Description StreamApp OP(Operator) Graph Stream path of OP Graph OP Transformer Transformation on Stream source source merge Optimize reduce flatmap flatmap sink Transformer Operators flatMap merge map groupBy filter process reduce map Co-locate X join sink StreamApp,图。Stream,路径,OP,运算节点。在运行时,我们会自下往上做优化,合并不需要网络shuffle的计算到一个Task里面,并不同的task做colocation的优化,减少跨机器的网络传输 Bottom-up optimization

DAG Visualization DAG Page Track global min-Clock of all message DAG: Node size reflect throughput Edge width represents flow rate Red node means something goes wrong DAG,节点大小代表吞吐量,边的粗细代表流量,如果节点编程红色,说明出了故障。整个集群的运行状态一目了然。如果每个节点,点进去,我们可以看到每个并发的task的状态。

DAG Visualization Processor Page Skew analysis Task throughput and latency Executor JVM deployment 最上面这个框是skew 分析,比如wordcount,不同单词的词频不同,如果skew 分析,我们就知道性能瓶颈在哪,右边是这个task的运行时性能,吞吐量,延时等指标。

Performance TEST 性能测试:吞吐量、扩展性、容错性 讲了这么多,Gearpump能不能用? 看看性能测试结果。

Throughput and Latency Throughput: 11 million message/second Latency: 17ms on full load SOL Shuffle test 32 tasks->32 tasks 4 nodes 10GbE 32 core E52680 延时 vs. 吞吐量关系 latency随着吞吐量的变化。 17

Scalability 100 nodes Test run on 100 nodes and 3000 tasks Gearpump performance scales: 100 nodes TODO: UI显示多少个executor,多少个task

Fault-Tolerance: Recovery time 91 worker nodes, 1000 tasks Failure scenarios Recovery time [*] comment Cluster Master node Down 0 s Master HA take effect Cluster Worker node down ~ 10 seconds timeout detection take a long time Message loss ~ 300 ms Still optimizing Target will be less than 10ms Application AppMaster down timeout detection take a log time Test environment: 91 worker nodes, 1000 tasks (We use 7 machines to simulate 91 worker nodes) [*]: Recovery time is the time interval between: a) failure happen b) all tasks in topology resume processing data.

Gearpump Internal 设计原理 TODO: Add a overview

Overview and general ideas MinClock service track the min timestamp of all pending messages in the system now and future Minclock service Replayable Source DAG runtime Message(timestamp) State Every message in the system Has a application timestamp(message birth time) ① High Performance Streaming Normal Flow 正常的数据流, 应用时钟,消息带时间戳, DAG里面有很多long running的daemon进程,做高性能的计算。 上面有一个全局时钟,记录 现在和未来所有pending消息的最小时钟。 21

Overview and general ideas Minclock service ④clock pause at Tp ② Detect Message loss at Tp Replayable Source ⑤ replay from Tp ③ recover DAG executor process DAG runtime Message(timestamp) State Checkpoint Store ⑥Exactly-once State can be reconstructed by message replay: Recovery Flow 如果消息丢失Tp,这个消息的状态在pending中,时钟会停住,错误的原因可能是机器故障,我们需要把计算节点重新启动,完成恢复,然后从Tp开始重放消息。 因为时间是在Tp停住的,Tp之前的消息都处理完了,只Tp之后消息就够了。 这只解决了at-least once, 未实现exactly once,还需要做状态的恢复。恢复的前提是:State可以由前一个时间点的State,加重放间隔内的消息恢复出来。如果这个假定成立,我们只需要阶段性的checkpoint好状态,然后就能实现Exactly-once。 讲到这边,大家可能有很多疑惑,比如怎么检测消息丢失,怎么追踪最小时钟,怎么确保一个state只处理某时间点前的数据? 下面会按照前面的1和2,3,4,5依次展开! 22

High performance streaming Detect Message loss and other failures DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 23

Actor Hierarchy 100% Actor: communication, concurrency, isolation, error handling YARN Master Cluster HA Design Client Hook in and query state As general service 这是我们的overall架构图,我们由100%的Actor,也就是前面的微型服务,架构而成。 分成两部分,Master箭头往下是资源管理,AppMaster箭头往下是应用。Worker管理每个机器的资源和应用,master负责调度应用和资源。AppMaster往下,Executor是每个机器上的JVM,执行副本,下面有最小的执行单元Task。 我们怎么解决单点问题的呢?下一图。

Master HA Quorum (多数) Conflict free data types(CRDT) for consistency Worker Akka Cluster Master standby Master Standby Master State Gossip CRDT Data type example: leader 多个Master节点,通过无中心的Gossip协议,(CCTV是中心模式,Gossip,谣言的传播模式,就是无中心的)。Gossip,闲言碎语, 状态,在每个Mater上有一个副本,又同时解决了数据的一致性问题,用CRDT类型。 Decentralized: No central meta server

High performance Messaging Layer Akka remote message has a big overhead, (sender + receiver address) Reduce 95% overhead (400 bytes to ~20 bytes) Effective batching Sync with other executors convert to short address convert from short address Partition & Shuffle, 数据通道多,数据碎! 流内每个节点能力不同,处理速度不匹配,容易内存OOM,IO阻塞,CPU Hang等

Effective batching Network Bandwidth Doubled Network Idle: Flush as fast as we can Network Busy: Smart batching until the network is open again. Network Bandwidth Doubled For 100 byte per message 看这张图,是batch的大小和网络带宽利用率的关系。 对小消息尤其有用。 This feature is ported from Storm-297

Flow Control Pass back-pressure level-by-level Back-pressure Task Task Sliding window Another option(not used): big-loop-feedback flow control

High performance streaming Detect Message loss and other failures DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 29

Failure Detection For Message loss: For JVM Crash, Network Down: AckRequest and Ack For JVM Crash, Network Down: Actor Supervision Master AppMaster Executor Task Failure Failure Failure 30

High performance streaming Detect Message loss and other failures DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 31

DAG Recovery: Quarantine and Recover 1. Message loss detected Executor AppMaster Task Store Source Send message Global clock service ① error detected

DAG Recovery: Quarantine and Recover 2. Use dynamic session ID to fence zombies Executor AppMaster Task Store Source Global clock service ① error detected ②Fence zombie 33

DAG Recovery: Quarantine and Recover 3. Recover the executor JVM, replay message Executor AppMaster Task Store Source Replay Global clock service ① error detected ②isolate zombie ③ Recover 34 Send message

High performance streaming Detect Message loss and other failures DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 35

Global Clock Service – track application min-Clock (1) Report task min-clock B C E D Definition: Task min-clock is Minimum of ( min timestamp of pending-messages in current task Task min-Clock of all upstream tasks ) Min-Clock of D is min-clock of global 有了Clock Service,我们就知道如果消息丢失了,我们知道是什么时候丢失的。 36

Global Clock Service – track application min-Clock (2) One Task can have thousands upstream tasks, how to effectively track all of them? Report task min-clock Task task Source Definition: Task min-clock is Minimum of ( min timestamp of pending-messages in current task Task min-Clock of all upstream tasks ) 37

Global Clock Service – track application min-Clock (3) Level Clock One Task can have thousands upstream tasks, how to effectively track all of them? Ever incremental A 1000 Later Implementation Task task Source B 800 C E 600 D 400 Earlier 38

High performance streaming Detect Message loss and other failures DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 39

Source-based Message Replay Replay from the very-beginning source Source Normal Flow 40

Source-based Message Replay Replay from the very-beginning source Source ②Replay from offset ①Resolve offset Recovery Flow Global Clock Service 41

High performance streaming Detect Message loss and other failures DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 42

Exactly-once message processing How? Key: Ensure State(t) only contains message(timestamp <= t) DAG runtime Checkpoint Store 43

Exactly-once message processing Checkpoint Flow 44

Exactly-once message processing Recovery Flow

Dynamic DAG Use Pub-Sub model Maintain the correct min clock during transition F ② Remove processor E A B C D E Subscribe C to F Un-subscribe E from B Un-subscribe D from E Subscribe F to A 每个Processor(图中的节点)有很多task,每个task都实现了Pub-sub模型,可以动态修改拓扑。 难点在于怎么保证时钟的语义。 ① Add processor F 46

Unique Use cases 独特用例

IOT Transparent Cloud Data Center Target Problem Large gap between edge device with data center Location transparent. Same programming model on IOT device log Data Center dag on device side Edge device是越来越强了,,现在手机都8核了,五年后(2020年),有260亿物联网设备 把 edge 设备当成数据中心的服务器用,位置透明的部署计算子图。

Unified log ingestion and processing Target Problem Distributed application is broken into many isolated pieces Single Gearpump Application For All Deploy agents Manage query service Manage processing Kafka queue Agent DAG Query Visualization Collect logs from different places 之前是分离的割裂的应用,生命期分别管理的。Gearpump可以统一管理,当成一个应用程序,有HA的实现。

Exactly-once: Financial use cases Target Problem transactional exactly-once real-time streaming system Admin Portal Service Billing System User Model Bill Streaming System 收钱的应用,数据不能丢,不能重。

Transformers: Dynamical DAG Target Problem No existing way to manipulate the DAG on the fly Manipulate the DAG on the fly Dynamic Attach Dynamic Replace Dynamic Remove Add Sub Graph B Delete 新奇的功能,彩蛋:变形金刚。 51

Eve: Online Machine Learning Target Problem ML train and predict online in real-time for decision support. Decide immediately based online learning result Learn Input sensor Predict Output Decide 彩蛋,扫描,绿色植物,威胁,激光炮。 52

Other Applications IOT Mobile security Google used streaming update for its search index in 2010. New pages are indexed in 10-15 min. search immediate recommendation Ad pricing in search engine ad Tweet analysis, relation analysis social Carrot mobile app for power analysis Crowd sourcing Other Applications Smart city, ITS Power network abnormal detection, terrorism detection。 Public service security Online study capability robot IOT Location based recommendation LBS Telco fraud prevention security Mobile Finance log audit, Online fraud prevention log Load, throughput monitoring QoS IT System

Application and DEMO

Example1: Stock Drawdown Tracking Publish Alerts Query Group by Stock id Crawlers More than 3000 index Drawdown Definition 55

Example2: Intelligent Traffic System KV Query 专线 Gateway 3000 cross roads Realtime analysis DAG Travel time Fake Plate Over speed Traffic planning Real time statistics 56

Other demos complex Graph Stock data analysis (Drawdown tracking) ITS Scalability, 100 nodes, 1,000,000 tasks DSL

Status & Plan Our goal: Make this an Apache project Plan: Welcome code contribution! http://gearpump.io Plan: Connect: IOT Platform: Dynamic DAG, Exactly-once, etc. (release soon) Data: Real-time analysis algorithm 58

References 钟翔 大数据时代的软件架构范式:Reactive架构及Akka实践, 程序员期刊2015年2A期 Gearpump whitepaper http://typesafe.com/blog/gearpump-real-time-streaming-engine-using-akka 吴甘沙 低延迟流处理系统的逆袭, 程序员期刊2013年10期 Stonebraker http://cs.brown.edu/~ugur/8rulesSigRec.pdf https://github.com/intel-hadoop/gearpump Gearpump: https://github.com/intel-hadoop/gearpump http://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/ https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines Sqlstream http://www.sqlstream.com/customers/ http://www.statsblogs.com/2014/05/19/a-general-introduction-to-stream-processing/ http://www.statalgo.com/2014/05/28/stream-processing-with-messaging-systems/ Gartner report on IOT http://www.zdnet.com/article/internet-of-things-devices-will-dwarf-number-of-pcs-tablets-and-smartphones/

60