Download presentation
Presentation is loading. Please wait.
1
Gearpump 基于Akka的新一代流处理引擎
QCON Beijing 2015 Gearpump 基于Akka的新一代流处理引擎 钟翔 Mail: Weibo: Intel亚太研发中心 | Intel 软件与服务部 Big Data Technology Department 2015/4/24
2
What is Gearpump Akka based lightweight Real time data processing platform. Apache License What is Akka? Simple and Powerful Akka: 通信, 并发,隔离,容错 Message level streaming Long running daemons Gerpump是一个轻量级的实时计算引擎。 为什么用这个名字?我们想强调Gearpump非常简单又强大,看这张图,齿轮帮是工业设计的杰作,只有两个齿轮,就能有效泵水。 Gearppump基于Akka,那什么是Akka呢? simplifies communication, concurrency, isolation, and fault tolerance,
3
What is Akka? Micro-service(Actor) oriented.
Message Driven Lock-free Location-transparent Better performance Fail in-dependently Scales linearly Akka是一个开源库,Akka应用是由许许多多的微型服务即Actor编织而成,每个微型服务粒度很小,只做一件简单的事情;微型服务间通过异步的消息发送驱动对方工作,一起组合实现应用所需的功能。 Reactive架构极像我们人类社会的架构,每个人独立承担自己的工作,通过邮件等消息驱动对方工作,高效并发,而且每个人可以独立容错,人类社会也许是已知的最大的分布式系统。 It is like our human society, driven by message
4
Gearpump in Big Data Stack
store visualization batch stream SQL Catalyst StreamSQL Impala Cluster manager monitor/alert/notify Machine learning Graphx Cloudera Manager Storage Engine Analytics Visualization & management storm Here! Data explore Gearpump 在计算引擎一层, /
5
Why another streaming platform?
The requirements are not fully met. We need a higher abstraction to build software! 近年来,流处理已经有很多年了,每年都有些新东西,为什么又做一个? 需求未满足 软件发展到一定规模会出现瓶颈,需要更高的抽象,才能简单干净的解决问题,软件发展的历史就是抽象层次不断跳跃的过程,我认为对大数据栈,现在到了这个节点。Hive,100万行代码。
6
Streaming requirements
Michael Stonebraker, The 8 Requirements of Real-Time Stream Processing (2006) My summary 活 Flexible 大 Volume 快 Speed 准 Accuracy 见 Visual Easy programing Any time Any where Any size Any source Any use case Dynamic DAG ②StreamSQL High throughput ⑦Scale linearly ①In-Stream Zero latency ⑥HA ⑧Responsive Exactly-once ③Message loss/delay/out of order ④Predictable WYSWYG 图灵奖获得者Michael,在2006年提出了流处理的六个需求,我把它重新总结了一下,放在下面,5个字,活,大,快,准,见。 大部分平台只部分满足了这个要求,Gearpump和他们相比最大的不同在于灵活性,灵活性是指用起来简单,适应性强。
7
Gearpump Highlights 100% Akka Throughput 11 million/s (*) 2ms Latency
performance Exactly-once Dynamic DAG Out of Order Message function Flexible DSL DAG Visualization Internet of Thing usability 性能上。功能上:可用性上,数据源上:端到端延时, 2020年,将有260亿物联网设备 [*] on 4 nodes
8
Using Gearpump 用户接口
9
How to Submit an application
Master Workers Master Workers 1. Submit a Jar AppMaster 2. Create AppMaster 1 2 Ask YARN for Resource 3 4 Master Workers YARN 3. Request Resource Master Workers AppMaster Executor Executor Executor 1.用户提交Jar给集群,创建AppMaster,AppMaster申请申请资源,继而向YARN申请咨询,创建container, 在每个机器上运行副本,分布式程序开始运行。 AppMaster 4. Report Executor to AppMaster
10
DAG representation and API
DAG API Syntax: Graph(A~>B~>C~>D, B~>E~>D) A B C D E Processor DAG Shuffle 这是我们的DAG,对每个节点我们叫Processor,每个Procesor可以并发出多个Task,上下游之间可以shuffle。上图是我们的API,箭头,超级简单。
11
DAG API Example - WordCount
val context = new ClientContext() val split = Processor[Split](splitParallism) val sum = Processor[Sum](sumParallism) val app = StreamApplication("wordCount", Graph(split ~> sum), UserConfig.empty) val appId = context.submit(app) context.close() class Split(taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { override def onNext(msg : Message) : Unit = { /* split the line */ } } class Sum (taskContext : TaskContext, conf: UserConfig) extends Task(taskContext, conf) { override def onNext(msg : Message) : Unit = {/* do aggregation on word*/} Wordcount,两步 第一步:把一行拆解成单词,第二部,单词计数。 这种
12
WordCount with DSL API val context = ClientContext() val app = new StreamApp("dsl", context) val data = "This is a good start, bingo!! bingo!!" app.fromCollection(data.lines) // word => (word, count = 1) .flatMap(line => line.split("[\\s]+")).map((_, 1)) // (word, count1), (word, count2) => (word, count1 + count2) .groupByKey().sum.log val appId = context.submit(app) context.close() 这种DSL API怎么实现的呢?
13
High level DSL API Details
OP Graph Processor Graph Concepts Description StreamApp OP(Operator) Graph Stream path of OP Graph OP Transformer Transformation on Stream source source merge Optimize reduce flatmap flatmap sink Transformer Operators flatMap merge map groupBy filter process reduce map Co-locate X join sink StreamApp,图。Stream,路径,OP,运算节点。在运行时,我们会自下往上做优化,合并不需要网络shuffle的计算到一个Task里面,并不同的task做colocation的优化,减少跨机器的网络传输 Bottom-up optimization
14
DAG Visualization DAG Page Track global min-Clock of all message DAG:
Node size reflect throughput Edge width represents flow rate Red node means something goes wrong DAG,节点大小代表吞吐量,边的粗细代表流量,如果节点编程红色,说明出了故障。整个集群的运行状态一目了然。如果每个节点,点进去,我们可以看到每个并发的task的状态。
15
DAG Visualization Processor Page Skew analysis
Task throughput and latency Executor JVM deployment 最上面这个框是skew 分析,比如wordcount,不同单词的词频不同,如果skew 分析,我们就知道性能瓶颈在哪,右边是这个task的运行时性能,吞吐量,延时等指标。
16
Performance TEST 性能测试:吞吐量、扩展性、容错性
讲了这么多,Gearpump能不能用? 看看性能测试结果。
17
Throughput and Latency
Throughput: 11 million message/second Latency: 17ms on full load SOL Shuffle test 32 tasks->32 tasks 4 nodes 10GbE 32 core E52680 延时 vs. 吞吐量关系 latency随着吞吐量的变化。 17
18
Scalability 100 nodes Test run on 100 nodes and 3000 tasks
Gearpump performance scales: 100 nodes TODO: UI显示多少个executor,多少个task
19
Fault-Tolerance: Recovery time
91 worker nodes, 1000 tasks Failure scenarios Recovery time [*] comment Cluster Master node Down 0 s Master HA take effect Cluster Worker node down ~ 10 seconds timeout detection take a long time Message loss ~ 300 ms Still optimizing Target will be less than 10ms Application AppMaster down timeout detection take a log time Test environment: 91 worker nodes, 1000 tasks (We use 7 machines to simulate 91 worker nodes) [*]: Recovery time is the time interval between: a) failure happen b) all tasks in topology resume processing data.
20
Gearpump Internal 设计原理
TODO: Add a overview
21
Overview and general ideas
MinClock service track the min timestamp of all pending messages in the system now and future Minclock service Replayable Source DAG runtime Message(timestamp) State Every message in the system Has a application timestamp(message birth time) ① High Performance Streaming Normal Flow 正常的数据流, 应用时钟,消息带时间戳, DAG里面有很多long running的daemon进程,做高性能的计算。 上面有一个全局时钟,记录 现在和未来所有pending消息的最小时钟。 21
22
Overview and general ideas
Minclock service ④clock pause at Tp ② Detect Message loss at Tp Replayable Source ⑤ replay from Tp ③ recover DAG executor process DAG runtime Message(timestamp) State Checkpoint Store ⑥Exactly-once State can be reconstructed by message replay: Recovery Flow 如果消息丢失Tp,这个消息的状态在pending中,时钟会停住,错误的原因可能是机器故障,我们需要把计算节点重新启动,完成恢复,然后从Tp开始重放消息。 因为时间是在Tp停住的,Tp之前的消息都处理完了,只Tp之后消息就够了。 这只解决了at-least once, 未实现exactly once,还需要做状态的恢复。恢复的前提是:State可以由前一个时间点的State,加重放间隔内的消息恢复出来。如果这个假定成立,我们只需要阶段性的checkpoint好状态,然后就能实现Exactly-once。 讲到这边,大家可能有很多疑惑,比如怎么检测消息丢失,怎么追踪最小时钟,怎么确保一个state只处理某时间点前的数据? 下面会按照前面的1和2,3,4,5依次展开! 22
23
High performance streaming Detect Message loss and other failures
DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 23
24
Actor Hierarchy 100% Actor: communication, concurrency, isolation, error handling YARN Master Cluster HA Design Client Hook in and query state As general service 这是我们的overall架构图,我们由100%的Actor,也就是前面的微型服务,架构而成。 分成两部分,Master箭头往下是资源管理,AppMaster箭头往下是应用。Worker管理每个机器的资源和应用,master负责调度应用和资源。AppMaster往下,Executor是每个机器上的JVM,执行副本,下面有最小的执行单元Task。 我们怎么解决单点问题的呢?下一图。
25
Master HA Quorum (多数) Conflict free data types(CRDT) for consistency
Worker Akka Cluster Master standby Master Standby Master State Gossip CRDT Data type example: leader 多个Master节点,通过无中心的Gossip协议,(CCTV是中心模式,Gossip,谣言的传播模式,就是无中心的)。Gossip,闲言碎语, 状态,在每个Mater上有一个副本,又同时解决了数据的一致性问题,用CRDT类型。 Decentralized: No central meta server
26
High performance Messaging Layer
Akka remote message has a big overhead, (sender + receiver address) Reduce 95% overhead (400 bytes to ~20 bytes) Effective batching Sync with other executors convert to short address convert from short address Partition & Shuffle, 数据通道多,数据碎! 流内每个节点能力不同,处理速度不匹配,容易内存OOM,IO阻塞,CPU Hang等
27
Effective batching Network Bandwidth Doubled
Network Idle: Flush as fast as we can Network Busy: Smart batching until the network is open again. Network Bandwidth Doubled For 100 byte per message 看这张图,是batch的大小和网络带宽利用率的关系。 对小消息尤其有用。 This feature is ported from Storm-297
28
Flow Control Pass back-pressure level-by-level Back-pressure Task Task
Sliding window Another option(not used): big-loop-feedback flow control
29
High performance streaming Detect Message loss and other failures
DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 29
30
Failure Detection For Message loss: For JVM Crash, Network Down:
AckRequest and Ack For JVM Crash, Network Down: Actor Supervision Master AppMaster Executor Task Failure Failure Failure 30
31
High performance streaming Detect Message loss and other failures
DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 31
32
DAG Recovery: Quarantine and Recover
1. Message loss detected Executor AppMaster Task Store Source Send message Global clock service ① error detected
33
DAG Recovery: Quarantine and Recover
2. Use dynamic session ID to fence zombies Executor AppMaster Task Store Source Global clock service ① error detected ②Fence zombie 33
34
DAG Recovery: Quarantine and Recover
3. Recover the executor JVM, replay message Executor AppMaster Task Store Source Replay Global clock service ① error detected ②isolate zombie ③ Recover 34 Send message
35
High performance streaming Detect Message loss and other failures
DAG Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 35
36
Global Clock Service – track application min-Clock (1)
Report task min-clock B C E D Definition: Task min-clock is Minimum of ( min timestamp of pending-messages in current task Task min-Clock of all upstream tasks ) Min-Clock of D is min-clock of global 有了Clock Service,我们就知道如果消息丢失了,我们知道是什么时候丢失的。 36
37
Global Clock Service – track application min-Clock (2)
One Task can have thousands upstream tasks, how to effectively track all of them? Report task min-clock Task task Source Definition: Task min-clock is Minimum of ( min timestamp of pending-messages in current task Task min-Clock of all upstream tasks ) 37
38
Global Clock Service – track application min-Clock (3)
Level Clock One Task can have thousands upstream tasks, how to effectively track all of them? Ever incremental A 1000 Later Implementation Task task Source B 800 C E 600 D 400 Earlier 38
39
High performance streaming Detect Message loss and other failures
DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 39
40
Source-based Message Replay
Replay from the very-beginning source Source Normal Flow 40
41
Source-based Message Replay
Replay from the very-beginning source Source ②Replay from offset ①Resolve offset Recovery Flow Global Clock Service 41
42
High performance streaming Detect Message loss and other failures
DAG Executor Recovery Clock Service, know when message is lost Message replay from clock Exactly-once, de-duplication 42
43
Exactly-once message processing
How? Key: Ensure State(t) only contains message(timestamp <= t) DAG runtime Checkpoint Store 43
44
Exactly-once message processing
Checkpoint Flow 44
45
Exactly-once message processing
Recovery Flow
46
Dynamic DAG Use Pub-Sub model
Maintain the correct min clock during transition F ② Remove processor E A B C D E Subscribe C to F Un-subscribe E from B Un-subscribe D from E Subscribe F to A 每个Processor(图中的节点)有很多task,每个task都实现了Pub-sub模型,可以动态修改拓扑。 难点在于怎么保证时钟的语义。 ① Add processor F 46
47
Unique Use cases 独特用例
48
IOT Transparent Cloud Data Center
Target Problem Large gap between edge device with data center Location transparent. Same programming model on IOT device log Data Center dag on device side Edge device是越来越强了,,现在手机都8核了,五年后(2020年),有260亿物联网设备 把 edge 设备当成数据中心的服务器用,位置透明的部署计算子图。
49
Unified log ingestion and processing
Target Problem Distributed application is broken into many isolated pieces Single Gearpump Application For All Deploy agents Manage query service Manage processing Kafka queue Agent DAG Query Visualization Collect logs from different places 之前是分离的割裂的应用,生命期分别管理的。Gearpump可以统一管理,当成一个应用程序,有HA的实现。
50
Exactly-once: Financial use cases
Target Problem transactional exactly-once real-time streaming system Admin Portal Service Billing System User Model Bill Streaming System 收钱的应用,数据不能丢,不能重。
51
Transformers: Dynamical DAG
Target Problem No existing way to manipulate the DAG on the fly Manipulate the DAG on the fly Dynamic Attach Dynamic Replace Dynamic Remove Add Sub Graph B Delete 新奇的功能,彩蛋:变形金刚。 51
52
Eve: Online Machine Learning
Target Problem ML train and predict online in real-time for decision support. Decide immediately based online learning result Learn Input sensor Predict Output Decide 彩蛋,扫描,绿色植物,威胁,激光炮。 52
53
Other Applications IOT Mobile security
Google used streaming update for its search index in New pages are indexed in min. search immediate recommendation Ad pricing in search engine ad Tweet analysis, relation analysis social Carrot mobile app for power analysis Crowd sourcing Other Applications Smart city, ITS Power network abnormal detection, terrorism detection。 Public service security Online study capability robot IOT Location based recommendation LBS Telco fraud prevention security Mobile Finance log audit, Online fraud prevention log Load, throughput monitoring QoS IT System
54
Application and DEMO
55
Example1: Stock Drawdown Tracking
Publish Alerts Query Group by Stock id Crawlers More than 3000 index Drawdown Definition 55
56
Example2: Intelligent Traffic System
KV Query 专线 Gateway 3000 cross roads Realtime analysis DAG Travel time Fake Plate Over speed Traffic planning Real time statistics 56
57
Other demos complex Graph Stock data analysis (Drawdown tracking) ITS
Scalability, 100 nodes, 1,000,000 tasks DSL
58
Status & Plan Our goal: Make this an Apache project Plan:
Welcome code contribution! Plan: Connect: IOT Platform: Dynamic DAG, Exactly-once, etc. (release soon) Data: Real-time analysis algorithm 58
59
References 钟翔 大数据时代的软件架构范式:Reactive架构及Akka实践, 程序员期刊2015年2A期
Gearpump whitepaper 吴甘沙 低延迟流处理系统的逆袭, 程序员期刊2013年10期 Stonebraker Gearpump: Sqlstream Gartner report on IOT
60
60
Similar presentations