Spark Structured Streaming 流式大数据处理

Slides:



Advertisements
Similar presentations
Copyright©2013 Huawei Technologies Co., Ltd. All Rights Reserved. The information in this document may contain predictive statements including, without.
Advertisements

智能楼宇酒店 GPON 与 LSW 技术选型场景分析. 1 楼宇型酒店接入网络面临挑战 多业务、高带宽  无线上网、电话、电视、视频监控、视频会议、信息发布 等业务独立部署,布线复杂,维护困难  多业务相互融合、高清业务的普及,对网络带宽提出了更 高的要求 楼层高,垂直布线难  双胶线有效传输距离短,高层楼宇需要部署中继设备.
酒店客房 PAD 融合业务方案技术分析. 1 项目背景 客户 ( 龙城花园酒店 / 喜达屋瑞吉酒店 ) 希望 华为提供创新的酒店方案,能够帮助酒店提 升行业内的知名度。如黄龙饭店效果。 酒店 ISV( 日顺等 ) 无法提供整体酒店解决方 案,需要与华为合作,提供有竞争力方案。 我司酒店解决方案缺少创新应用,行业内.
配色参考方案: 建议同一页面内 不超过四种颜色, 以下是 13 组配色 方案,同一页面 内只选择一组使 用。(仅供参考) 客户或者合作 伙伴的标志放 在右上角. 英文标题 :32-35pt 颜色 : R153 G0 B0 内部使用字体 : FrutigerNext LT.
Web Role 的每台虚机运行有 IIS ,用于处理 Web 请求 Worker Role 用于运行后台进程 Cloud Service 是什么? 支持多层架构的应用容器 由多个 Windows 虚拟机集群构成 集群有两种类型: Web 和 Worker Cloud Service 做什么 进行应用的自动化部署.
物联网(IoT)的无线MCU方案 Alex Huang Shenzhen * © 2015 Atmel Corporation.
泛舆情管理平台 ——助力媒体业务创新 新模式 新格局 创新盈利增长点 2/26/2017 1:59 AM 屈伟: 创始人,总裁
中国银行业前置端操作系统移植研究.
3/3/ :01 PM © 2007 Microsoft Corporation. All rights reserved. Microsoft, Windows, Windows Vista and other product names are or may be registered.
Big Data Ecosystem – Hadoop Distribution
银行分析报表展示 SAP Best Practices for Banking(中国)
借助公有云实现游戏的弹性运营 Shaun Fang (方兴) Azure开发技术顾问
Windows Hyper-V与集群共享卷
SAP 最佳业务实践 – 同一集团内子公司整合 集中采购
Database Architecture, not only DBA
请点击以下链接下载WinHEC的演讲材料
1. 设定愿景,确定业务场景 Microsoft Corporation
金融信息安全人才培养的思考与实践 中央财经大学 朱建明 2012年11月24日.
广东省广州市花都区教育局教研室 汤少冰 优化评估方式, 促进中学英语的教与学 广东省广州市花都区教育局教研室 汤少冰
Office 2013 全新功能介紹 台灣微軟 Office 大使 楊承恩 Marcus Microsoft Office
物料帐下的实际成本核算 SAP 半导体及光伏行业最佳业务实践 (中国)
四川省集体林权流转平台 中国西部林权交易网
認識電腦程式著作及合法使用電腦軟體之說明
云实践引导产业升级 沈寓实 博士 教授 MBA 中国云体系产业创新战略联盟秘书长 微软云计算中国区总监 WinHEC 2015
Windows 10 混合现实 Mingfei Yan 高级项目经理
Benjamin Armstrong 高级项目经理 微软
开源软件与Moblin带来的技术创新 Copyright © 2009, Intel Corporation.
關聯式資料庫.
W371 如何使网络设备更好的和Windows Vista工作
基于Hadoop的数据仓库Hive.
(TIA Portal) 学习/培训文档 Siemens Automation Cooperates with Education
最新 Windows Server 徽标 要求和计划
Microsoft Office SharePoint Server 2007 事件追蹤與專案管理
SOLUTIONACCELERATORS Windows Vista Hardware Assessment 1
MSG 321 统一消息架构和PBX集成.
朝雲端專業DBA邁進: 深入剖析 Windows Azure SQL Database 完整資料庫管理、雲端報表建立、建置分散式雲端資料庫
利用最新Hyper-V Replica 功能達成Hyper-V 災難備援機制
Windows Server 2008 NAP整合802.1x網路安全控管
Spark在智慧图书馆建设中的应用探索 2017年12月22日.
互聯網安全資訊 助您達至更安全的網上體驗.
第5章 資料倉儲的資料建置.
服務啟用、導入流程、 郵件移轉步驟簡介 Microsoft Office 12/2/2018
MBL 327 Windows Mobile开发中的异构系统集成
《Spark编程基础》 《 Spark编程基础》课程介绍 (PPT版本号:2018年2月)
Cameron Brodeur Program Manager US-Device & Storage PM
David Edfeldt Senior Program Manager Windows Logo Program
构建 Windows TV Tuner 产业 生态环境的重要观点
微软新一代云计算 面向企业的 Office 365 客户培训大纲
使徒行傳 21:17-23章「保羅的見證(一)」 引言 預言保羅為主的名受許多的苦難的實現
2/24/2019 5:40 AM © 2009 Microsoft Corporation. All rights reserved. Microsoft, Windows, Windows Vista and other product names are or may be registered.
Spark SQL 介绍 付士涛. Spark SQL 介绍 付士涛 大纲 Architecture(架构) 像Hive一样的User Interface(用户操作界面) DataFrame的使用(1.3以前叫做SchemaRDD)
Microsoft SQL Server 2008 報表服務_設計


橫跨電腦、手機與軟體的全方位端點管控解決方案
请点击以下链接下载WinHEC的演讲材料
Apache Flink 刘 驰.
使用WPF创建Windows应用和Web应用
4/30/2019 7:40 AM 約翰福音 15:9;17:20-23 加拉太書 6:1-2 © 2007 Microsoft Corporation. All rights reserved. Microsoft, Windows, Windows Vista and other product.
DEV 343 VS2005超快速开发方案/EEP2006控件包.
5/4/2019 4:42 PM © 2009 Microsoft Corporation. All rights reserved. Microsoft, Windows, Windows Vista and other product names are or may be registered.
使徒行傳 24-26章 [ 保羅的見證(二)] 徒9:15 “  主 對 亞 拿 尼 亞 說 、 你 只 管 去 . 他 是 我 所 揀 選 的 器 皿 、 要 在 外 邦 人 和 君 王 並 以 色 列 人 面 前 、 宣 揚 我 的 名 。 ”]
Windows 徽标计划工具:综述与发展趋势
5/5/2019 7:06 PM 两跨框架梁截面配筋图的绘制 © 2007 Microsoft Corporation. All rights reserved. Microsoft, Windows, Windows Vista and other product names are or may.
百万亿次超级计算机诞生记 姓名 Xiangyu Ye 职务 微软中国技术中心资深HPC顾问 公司 微软中国
DEV 343 VS2005超快速开发方案/EEP2006控件包.
MGT 213 System Management Server的昨天,今天和明天
Experimental Analysis of Distributed Graph Systems
Windows Workflow Foundation CON 230
(TIA Portal) 学习/培训文档 Siemens Automation Cooperates with Education
Presentation transcript:

Spark Structured Streaming 流式大数据处理 Zhang, Lubo (lubo.zhang@intel.com) Yucai, Yu (yucai.yu@intel.com) BDT/DPD/SSG Oct, 2017

目录 流式数据 Structured Streaming *的核心概念 Structured Streaming *的高级话题 *Other names and brands may be claimed as the property of others.

什么是流式数据?

流式数据 数据随着时间的推移而动态变化,不断有新的数据产生。 金融:股票价格变动,仓位,外汇价格 互联网: 网站点击量,页面访问量,注册用户数 零售:订单流,物流,商品价格调整 交通:高架路段车流,当前位置 每天我们都在和格种各样的数据打交道,而数据并不是一成不变的,它会随着时间的推移而动态的发生变化,不断有新的数据产生,我们称之为流式数据。大部分的业务逻辑都可以理解为流式数据,比方说零售行业的订单流,物流,商品价格调整等;比方说金融行业股票的价格变动,仓位的变化,外汇的价格等;比方说交通领域上,高架路段车流量变化,某辆车地理位置的变化等;在比方说互联网行业,网站的点击量,页面的访问量,注册用户数等等。

流式数据的应用 实时性 策略模型 股票价格 投资策略 +1.3 $ ······················································································································································· 买入100手 +2.4 $ ······················································································································································· 买入200手 -0.9 $ ······················································································································································· 买入100手 -0.3 $ ······················································································································································· 卖出200手 +2.1 $ ······················································································································································· 卖出300手 -3.1 $ ······················································································································································· 平仓 . 时间 那我们得到这样的流式数据有什么用,这里我们就举两个实际的例子。现在很多金融产品都支持自动程序化交易,策略分析师会根据自身的经验,确定策略模型。股票的实时价格变动会作为一种流式数据,输入到策略模型,再由策略模型去计算投资策略

流式数据的应用 吞吐量 页面访问信息 推荐、清洗… 购买行为 广告策略 机器学习引擎 新的广告策略 其他应用还有: Cleanup, aggregate, and push data to database Anomaly detection on live sensor data Continuously update models for ads, users in online games, etc. Apply machine learning on streaming data 总结来说可以分成三大类: Streaming ETL Use historical data to process live data Advanced analytics with machine learning stuff 机器学习引擎 新的广告策略

(Kafka, S3, Kinesis, RDBMS, …) 流式数据处理与生俱来的复杂性 复杂的数据 复杂多样的数据格式 (json, parquet, avro, …) 脏数据,延迟,乱序 复杂的处理 与批数据互操作 机器学习 数据流上的交互式查询 复杂的系统 复杂多样的存储系统 (Kafka, S3, Kinesis, RDBMS, …) 系统崩溃

Spark如何处理流式数据?

Structured Streaming * 基于Spark SQL*引擎构建的流处理系统 fast, scalable, fault-tolerant 丰富而统一的高级API deal with complex data and complex workloads 丰富的数据源支持 integrate with many storage system *Other names and brands may be claimed as the property of others.

The simplest way to perform streaming analytics is not having to reason about streaming at all

概念模型

概念模型 将输入流看做是一张输入表 在每一个触发间隙(trigger interval), 输入表逐渐增长 当用户在输入表上应用查询的时候, 结果表随之发生变化

概念模型 在每一个触发间隙(trigger interval), 我们可以输出特定的结果。 Output mode 定义了 每次触发需要输出的内容 Append mode:仅输出新行 Complete mode: 输出全部的结果 Update output [2.1.1]:输出自上次触发以来改变的行

概念模型 Spark并不是在每次触发的时候都去处理完整的输入数据。 相反,它会将查询编译成增量式的查询,每次仅仅处理更新的数据。

Streaming word count

数据源 一个简单的流查询 Spark.readStream .format(“kafka”) .option(“subscribe”, “input”) .load() 数据源 指定数据来源 内建多种格式的支持File/Kafka/Socket/Pluggable 使用union操作符将多种输入源合并起来

转换操作 一个简单的流查询 Spark.readStream .format(“kafka”) .option(“subscribe”, “input”) .load() .groupBy(“value”) .agg (count(“*”)) 转换操作 可以使用DataFrames, Datasets or SQL 等编程接口 Catalyst自动推算如何将这些转换增量地执行 内部的处理满足exactly-once语义

输出端 一个简单的流查询 Spark.readStream .format(“kafka”) .option(“subscribe”, “input”) .load() .groupBy(“value”) .agg (count(“*”)) .writeStream .option(“topic”, “output”) 输出端 接受每个batch的输出 当输出是满足事务性的时候,可以保证exactly once语义(比如Files) 使用foreach执行任意用户自定义的操作

触发器 输出模式 一个简单的流查询 Spark.readStream .format(“kafka”) .option(“subscribe”, “input”) .load() .groupBy(“value”) .agg (count(“*”)) .writeStream .option(“topic”, “output”) .trigger(“5 second”) .outputMode(“update”) 触发器 设定触发频率 不指定意味着系统将尽快的处理 输出模式 Complete – 输出全部结果 Update – 输出改变行 Append (默认) – 仅输出新行

Checkpoint 一个简单的流查询 Spark.readStream .format(“kafka”) .option(“subscribe”, “input”) .load() .groupBy(“value”) .agg (count(“*”)) .writeStream .option(“topic”, “output”) .trigger(“5 second”) .outputMode(“update”) .option(“checkpointLocation”, “path”) .start() Checkpoint 跟踪查询执行的进度 失败的时候重启查询

统一的 API – Dataset / Stream Static = bounded data Streaming = unbounded data Spark 2.0版本后,DataFrames和Datasets可以表示静态,有界数据,以及流式传输,无界数据。 与静Datasets/ DataFrames类似,您可以使用SparkSession从源创建流DataFrames / Datasets,并使用类似静态DataFrames / Datasets的操作。 统一的API ! Intel Confidential 12/5/2018

使用DataFrames做批处理 从json文件中创建Input DF 通过查询特定的设备创建 Result DF Input = spark.read .format(“json”) .load(“path”) Result = input .select(“device”, ”signal”) .where(“signal > 15”) Result.write .format(“parquet”) .save(“path”) 从json文件中创建Input DF 通过查询特定的设备创建 Result DF 将结果写入到parquet文件

使用DataFrames做流处理 从json文件中创建Input DF 通过查询特定的设备创建 Result DF Query没有任何改变 Input = spark.readStream .format(“json”) .load(“path”) Result = input .select(“device”, ”signal”) .where(“signal > 15”) Result.writeStream .format(“parquet”) .start(“path”) 从json文件中创建Input DF 通过查询特定的设备创建 Result DF Query没有任何改变 将结果写入到parquet文件

Continuous Aggregations input. agg(avg(“signal”)) input. groupBy(“device-type”) .avg(“signal”) 持续不断的计算所有设备信号的均值 持续不断的计算每种设备信号的均值

Continuous Windowed Aggregations input.groupBy( $”device-type”, window($”event-time-col”), “10 min”) .avg(“signal”) 使用事件时间机制持续不断的计算过去10分钟每种设备信号的均值 基于事件时间的处理可以同样地应用在流或者批处理任务中

Query Management query 对象用来监控和管理流式查询 停止查询 获取状态 获取错误信息 val query = df.writeStream .format(“console”) .outputMode(“append”) .start() query.stop() query.awaitTermination() query.exception() query.explain query 对象用来监控和管理流式查询 停止查询 获取状态 获取错误信息 系统中可以有多个处于活动状态的查询过程 每个Query对象有一个唯一的名字用来跟踪对应状态

Joining streams with static data val streamDS = spark .readStream .json("s3://logs") val staticDS = spark .read .jdbc(“jdbc://”, “history-info”) streamDS.join(staticDS, "customer_id") .groupBy($"customer_name", hour($"time")) .count() Structured Streaming 使用DataFrame 接口, 可以直接连接静态的数据表

Query Execution Logically:将流看成是对表的操作 Physically:Spark自动地将Query按流的方式执行

Structured Streaming 中的一些高级话题

Event Time 许多应用案例需要使用event time来聚合统计信息 E.g. 一小时内的各系统错误数量 多种挑战 DStream APIs 并不能完美地支持event-time

基于Event time 的窗口操作 完全支持UDAFs! 时间窗口是只是group的一种特殊情况 df.groupBy(window(“timestamp”, “1 hour”)) .count() 每小时记录的数量 df.groupBy( $“device”, window(“timestamp”, “10 minutes”)) .avg(“signal”) 每10分钟设备信号的均值 完全支持UDAFs! 31

带状态的聚合 在每次触发操作之间, 聚合必须被保存为分布式的状态 触发器会首先读取之前的状态并保存更新后的状态 状态被保存在内存里并备份到HDFS/S3(ahead log) 具有容错性,exactly-once guarantee

迟到数据的处理 保存状态以允许迟到的数据 去更新旧窗口的count red: 迟到数据的状态更新

迟到数据的处理 保存状态以允许迟到的数据 去更新就窗口的count 但是,如果旧的窗口不被丢弃 状态的大小会无穷地增长 red: 迟到数据的状态更新

Watermarking (水线) Watermark [Spark 2.1] - 定义了最大事件时间后的一个阈值, 规定了允许处理的最晚数据,并丢弃过时的中间状态 根据可见的最大event time计算 时间间隔用户可以配置

Watermarking (水线) 位于watermark之前到来的数据可能有一定延迟性,但允许进行聚合操作

Watermarking (水线)

Watermarking (水线) 仅在有状态的操作中使用 在无状态的流式查询和批查询中将被忽略 (streaming aggs, dropDuplicates, mapGroupsWithState, …) 在无状态的流式查询和批查询中将被忽略 val windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( window($"timestamp", "10 minutes"), $"word") .count()

再谈时间有关的几个概念 ssm.withWatermark(“eventTime”, “10 minutes”) .groupBy(window(“eventTime”, “5 minutes”)) .count() .writeStream .trigger(“10 seconds”) .start()

再谈时间有关的几个概念 ssm.withWatermark(“eventTime”, “10 minutes”) .groupBy(window(“eventTime”, “5 minutes”)) .count() .writeStream .trigger(“10 seconds”) .start() 在时间窗口内统计数据, 流式处理与批处理完全一样

再谈时间有关的几个概念 指定数据的延迟性 ssm.withWatermark(“eventTime”, “10 minutes”) .groupBy(window(“eventTime”, “5 minutes”)) .count() .writeStream .trigger(“10 seconds”) .start()

再谈时间有关的几个概念 更新的频率 ssm.withWatermark(“eventTime”, “10 minutes”) .groupBy(window(“eventTime”, “5 minutes”)) .count() .writeStream .trigger(“10 seconds”) .start() 更新的频率

任意的状态操作 [Spark 2.2] df. groupByKey(_.sessionId) .mapGroupsWithState (timeoutConf) (statefunc) def statefunc( key: K, values: Iterator [V], state: GroupState [S]): U = { // update or remove state // set timeouts // return mapped values } mapGroupsWithState/flatMapGroupsWithState允许用户在自定义的状态上进行用户自定义的操作 支持Processing Time和Event Time的timeout 支持Scala和java 接口

自定义状态函数的例子 case class SessionInfo(numEvents: Int, startTimestampMs: Long, endTimestampMs: Long) def stateFunc(sessionId: String, value: Iterator[Event], state: GroupState[SessionInfo]) = { if (state.hasTimedOut) { // 调用的时候如果超时,则删掉该状态 state.remove() Iterator[SessionUpdate] = ... } else if (state.exists) { // 状态存在的话进行后续处理 val newState: SessionInfo = ... // 用户定义如何更新状态 state.update(newState) // 设置新的状态 state.setTimeoutDuration(“1 hour”) // 配置超时时间 Iterator.empty } else { val initialState: SessionInfo = ... state.update(initialState) // 初始化状态 state.setTimeoutDuration("1 hour") //配置超时时间 }

Structured Streaming *的原理与高可用 *Other names and brands may be claimed as the property of others.

执行原理 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() .as[String] .filter($"value" === "Strata Hadoop") .explain

高可用 Structured Streaming *需要 24x7的情况下稳定运行,并且能够在主机出现问题的 情况下自动恢复。 Worker 出现问题: Spark Core的架构可以原生的处理 Driver 出现问题: 由Cluster Manager负责Driver的重启 (Standalone, YARN, MESOS) 从checkpoint(WAL)读取进度,重新计算

高可用 系统中的数据和元数据都需要是可恢复,可重放的

在执行前,数据offset会被写入具有容错性的WAL 具有容错性的Planner 在执行前,数据offset会被写入具有容错性的WAL fails

具有容错性的Planner 从WAL中读取数据offset, 重新计算

具有容错性的数据源 Structured streaming 要求数据源是可重放的(e.g. Kafka,Kinesis,files),并且能够根据planer提供的offset,生成完全一样的数据

具有容错性的状态 Spark 工作节点会在HDFS中保存数据处理的中间状态,包括版本,KV映射 Planner会在查询失败后,确保应用正确的状态版本以进行恢复

具有容错性的输出端 具有幂等写入特性,在重新执行查询后,能避免结果的重复输出

高可用性 数据offset保存到WAL + 状态管理 容错性的数据源和输出端 = end to end exactly once !

Reference Structured streaming model picture https://spark.apache.org/docs/latest/structured-streaming- programming-guide.html Intel Confidential 12/5/2018

Legal Disclaimer No license (express or implied, by estoppel or otherwise) to any intellectual property rights is granted by this document. Intel disclaims all express and implied warranties, including without limitation, the implied warranties of merchantability, fitness for a particular purpose, and non-infringement, as well as any warranty arising from course of performance, course of dealing, or usage in trade. This document contains information on products, services and/or processes in development.  All information provided here is subject to change without notice. Contact your Intel representative to obtain the latest forecast, schedule, specifications and roadmaps. The products and services described may contain defects or errors known as errata which may cause deviations from published specifications. Current characterized errata are available on request. Copies of documents which have an order number and are referenced in this document may be obtained by calling 1-800-548-4725 or by visiting www.intel.com/design/literature.htm. Intel, the Intel logo, Atom, Core, Iris, VTune, Xeon, and Xeon Phi are trademarks of Intel Corporation in the U.S. and/or other countries. * Other names and brands may be claimed as the property of others © 2017 Intel Corporation.

Thank You ! Intel Confidential 12/5/2018