Presentation is loading. Please wait.

Presentation is loading. Please wait.

从TDW-Hive到TDW-SparkSQL

Similar presentations


Presentation on theme: "从TDW-Hive到TDW-SparkSQL"— Presentation transcript:

1 从TDW-Hive到TDW-SparkSQL
沈洪    大家下午好, 我是来自腾讯数据平台海量计算团队的沈洪,今天下午我的分享主题是从TDW-hive到TDW-SparkSQL,腾讯TDW数据引擎演进之路。

2 Agenda 腾讯TDW平台的介绍 SparkSQL的优势与挑战 TDW-SparkSQL的平台建设 上线与效果 未来的计划
今天的分享主要包括一下这几个方面的内容,     首先会介绍腾讯数据平台的主要架构和集群现状,     其次是为什么我们要把SQL引擎从Hive升级到SparkSQL,以及升级到SparkSQL面临的挑战。     然后重点介绍TDW-SparkSQL平台的建设,包括我们在建设平台过程中新增的特色功能,对sparksql的性能优化以及增强稳定性。 然后在介绍平台上线情况和运营效果。     最后会简单介绍我们未来的计划。

3 腾讯TDW大数据平台 数据接入 Angel (PS) Spark Gaia(CPU/ GPU / MEM) HDFS Ceph Tensorflow Mariana(Caffe) MR SQL Lhotse (任务调度) Tesla (机器学习) HBase GraphX IDE ( 即席查询) MLlib Storm 数据应用 黄金眼 META TDBANK FACE+ IDEA ….     腾讯数据平台部门早在2010年就开始基于社区的Hadoop,hive生态构建了分布式数据仓库,简称TDW。     TDW是腾讯内部最大的分布式系统,集中了腾讯内部各个产品的数据,为这些产品提供海量数据的存储和分析服务,包括数据挖掘,经营分析,机器学习等服务。     到目前为止,TDW的架构已经远远超越Hadoop生态,除了包含传统的Hadoop和hive之外,还包含新的Spark和基于Spark的SparkSQL、graphx,MLLib等组件,以及一些机器学习系统TF,Mariana和Angel,面向用户有统一调度系统洛子,查询IDE和机器学习平台特斯拉。

4 集群容量 2w+ 机器总数 8800 最大集群 250P+ 存储总量 1250T+ 24w+ 总内存 总CPU物理核 总硬盘数
   这是集群的规模,截止七月底,腾讯数据平台的机器总数已经超过2W台,这些机器分布在多个机房和集群,其中最大的集群达到了8800的规模。总的存储容量250P,包含24w个CPU物理核,单机内存是64G,总的内存达到1250TB的内存,以及24w块磁盘。

5 最大集群的每天负载 18P+ 200w+ 25w+ 扫描量 作业数 SQL数 75%+ 80% 85% CPU利用率 内存利用率 磁盘利用率
   其中八千八的最大集群一直处于较高的负载,在这个集群上,每天运行的作业数超过200W,扫描数据量达到18P,这其中绝大部分都是SQL query,每日查询分析的SQL还有25W条,另外大集群的CPU的利用率也非常高,平均利用率超过75%,高峰时段已经超过90%,磁盘利用率也超过了85%。

6 WHY 随着业务和集群的扩大,性能和成本的优化压力也就越来越大

7 旧Hive引擎的瓶颈 缺乏DAG,中间结果落地 基于MR的执行计划,不支持资源重用,Cache 社区活跃度不足
HDFS 基于MR的执行计划,不支持资源重用,Cache Query Hive query complier Syntax tree Logical plan Physical MR stages 社区活跃度不足 虽然在Hive上做了大量的优化,但还是存在一些问题,首先是hive不支持DAG,这导致大量SQL的中间数据需要读写HDFS,或者落地到磁盘,这会大大增加网络和磁盘IO的压力,降低SQL的执行效率;    其次hive会被翻译成基于MR的执行计划,Mapreduce不支持资源重用和Cache机制,在大集群中调度可能会耗费更长的时间;    另外Hive现在的社区活跃度比较低。

8 SparkSQL的优势 DAG 全新的查询优化器,基于RDD的执行计划,内存计算 DataFrame & 钨丝计划
HDFS 全新的查询优化器,基于RDD的执行计划,内存计算 DataFrame & 钨丝计划   从2015年开始,我们开始引入SparkSQL,它吸引我们的地方在于,首先是性能,采用DAG模型的SparkSQL能减少中间结果落地,,对于一些复杂的SQL,性能提升很明显; 第二是sparksql基于全新的查询引擎,最终会生成基于RDD的执行计划,而RDD可以通过内存加速计算。 第三是2015年引入的dataframe能大大扩展sparksql的应用,能和机器学习和sparkstreaming结合。而钨丝计划显著的加速了sparksql的执行性能。 第四是基于函数式编程语言scala,以及高活跃读的活跃,大量开发者在为社区贡献代码,意味着bug和功能的更新都会比较快。 函数式编程语言Scala和高度活跃的社区

9 TB级别的Shuffle,Aggregate,Join
升级引擎面临的挑战 语法,元数据,安全,权限,功能 兼容性 大集群,跨机房,高负载,SQL复杂 TB级别的Shuffle,Aggregate,Join 稳定性 性能   引入SparkSQL引擎面临许多的挑战, 1 虽然社区SparkSQL兼容Hive和Hadoop,但数平的团队为Hive做了大量定制和优化功能,已经和社区Hive版本差距较大,引入Spark可能会导致语法功能不兼容,但是呢我们不可能让用户去修改SQL,因此我们首先遇到的挑战是如何才能让SparkSQL运行在我们的环境中,即能识别我们的语法功能、又能识别底层的元数据结构和数据安全性。 2 我们集群上运行很多很重要的生产任务,举个例子,我们线上有不少对计费和对账任务,加入算错或者失败,就有可能造成直接的经济损失;另外我们还运行了很多报表任务,这些都是给老板们看的,如果失败延迟,老板会跳起来的。因此我们必须要保证生产的稳定性,而SparkSQL还没有在大规模生产集群的应用案例,怎样才保证SparkSQL上线后不会产生任务的失败是必须要解决的问题。 3 虽然SparkSQL测试性能很好,但在大规模的数据量下,其性能提升效果会如何,是否能满足我们的要求也是要考虑的。

10 TDW-SparkSQL的平台建设 功能增强 稳定性提升 性能优化
  对于这些挑战,TDW团队做了许多的开发优化工作,下面我将介绍TDWSpark平台建设和优化工作。我将从以下这三个方面介绍。 第一 第二 第三

11 TDW的SQL框架 TDW-SQLServer Hive Engine Spark-SQL EasyCount TDW-SQL
共享元数据 存储透明 引擎层 SQL Engine Hive Engine Spark-SQL SQL Engine SQL Engine EasyCount 权限控制 分布式执行层 MapReduce Spark Storm ORC File Protobuf File Text File Sequence File 存储层   在此之前,先介绍一下我们的sql框架。在一开始引入SparkSQL时,受限于开发资源,我们不可能快速把SparkSQL全部替换TDWHive,我们的方案是先把SparkSQL跑起来,用户可以根据自己的需要配置SQL执行引擎。为此,我们首先集中精力解决基本语法与功能兼容性问题,通过适配底层的元数据和存储格式,并引入TDW的用户身份验证和表权限控制,保证数据安全。另外,为了集成Hive,改造HiveServer,让同一个HiveServer进程即能提交Hive SQL,又能提交SparkSQL。

12 功能增强 特色语法功能 支持Python的UDAF和UDTF
  虽然是基于社区SparkSQL版本,但TDW-sparkSQL还是有许多自己的特色,主要包括增强版的语法功能和支持Python的UDAF和UDTF。

13 特色语法功能 兼容Oracle语法 窗口函数:ratio_to_report、 first_value、last_value…
Insert into、Update、delete… 支持 hash/range/list分区 窗口函数:ratio_to_report、 first_value、last_value… UDF函数: IpInfo、wm_concat、est_distinct、url_decode… 支持查询PG外表,Aggregate、Join…   语法功能方面,TDW-SparkSQL是兼容Oracle语法,支持插入,修改,删除类型的sql;其次在分区方面,TDW-sparkSQL还支持hash,range,list的分区;还包含丰富的UDF和窗口函数,支持PG外表的读写,比如hdfs上的表和pg中的表做关联查询。

14 Python版本的UDAF/UDTF 支持用户使用Python编写UDF、UDAF和UDTF
NodeManager SQLApplicationMaster SparkServer SQL SQLCompiler PLC PySpark RM SQLRunner SparkClient NodeManager UDFx Executor TaskRunner PythonRunner 支持用户使用Python编写UDF、UDAF和UDTF PySpark嵌入SparkServer中,不再单独起JVM进程 PLC执行多个SQL时支持session共享 SQL自动封装为PySpark,对用户透明 Python版本的UDAF和UDTF     在腾讯内部有不少人喜欢使用Python语言编程,但是社区SparkSQL却不支持使用Python语言编写UDAF和UDTF,而然这些人转移到scala语言成本很大,因此我们开发支持使用Python编写UDAF和UDTF; 其基本原理是把用户的SQL翻译成pyspark执行;如图所示,用户编写的各种udf首先会通过我们上传到SparkServer上,用户提交SQL给SparkServer,SparkServer启动pyspark并加载所需的UDAF,然后编译执行SQL。 其他特点在于,PySpark嵌入SparkServer中,不再单独起JVM进程;多个SQL时支持session的共享;用户使用UDF和和UDAF时,与写普通的SQL一样,完全对用户透明。

15 性能改进 调度策略改进 自动设置Shuffle分区数 SortMergeJoin 优化
TDW-SparkSQL除了有自己的特色,在性能上也做了很多优化,由于时间关系,我这里介绍其中三个,第一是调度策略优化; 第二是自动化设置shuffle分区数;第三是SortMergeJoin的优化。

16 调度策略改进——优化前 ... 问题 Job1长时间占用资源池不释放,其他作业长时间等待资源 公平和FIFO调度,都会导致小作业饿死 原因
Jobs queues Resource pool schueduler task JobN ... Executor Job3 Executor Job2 Executor Job1 问题 Job1长时间占用资源池不释放,其他作业长时间等待资源 公平和FIFO调度,都会导致小作业饿死 原因 Spark的调度以Executor为单位——粗粒度分配资源,释放慢 Mapreduce的调度以Task为单位——细粒度分配资源,释放快   首先是资源调度改进,TDW平台提交的SQL,都会运行TDW Gaia上,一个基于Yarn的资源调度系统。用户提交的SQL需要指定资源池,Gaia集群包含多个资源池,每个资源池包含多个用户,并且都有内存和CPU限制。由于Spark任务在启动后,支持资源重用机制,即一个Executor可以运行多个Task,因此当一个用户提交一个大作业之后,这个作业可能长时间占用资源池的资源,这会导致后续提交的任务长时间等待资源,导致整体等待时间变长。

17 资源池紧张时小作业不会饿死,资源池空闲时大作业能充分利用资源
调度策略改进——优化后 Job1 Job2 Job3 JobN ... Jobs queues Resource pool Executor schueduler task × 资源(Executor)自动释放机制。 释放策略: avg = runContainers/runJobs Job资源数R>avg && remainContainers <=0 时,启动释放逻辑, 释放个数=Math.min( (R/runContainers)*waitContainers, R*ratio) 针对这个问题,我们通过优化调度机制,统筹考虑资源池的剩余资源和等待作业数,当一个资源池没有资源且有等待作业时,占用资源多的任务会启动资源释放,释放合适的数量,保证资源池空闲时大作业能充分利用资源,资源池紧张时小作业不会饿死。 资源池紧张时小作业不会饿死,资源池空闲时大作业能充分利用资源

18 Shuffle分区数自动设置——优化前 HDFS 问题:Spark中Shuffle分区数使用同一设置 15G 30G 200G
Scan t1 Scan t2 Scan t4 HDFS Parallel=200 15G 30G 200G 问题:Spark中Shuffle分区数使用同一设置 不同Job需要不同的设置 单个Job不同的阶段也需要不同分区数   原生的Spark任务需要用户自己设置Shuffle并行度,这存在两种问题,首先 是Spark任务会包含多个Stage,不同的Stage可能需要不同的Shuffle并行度,而spark只支持统一的 shuffle并行度设置,另外如果每条sql都需要用户设置shuffle并行度,这对许多SQL用户来说难度比较 大,设置小了SQL可能运行很慢,设置大了可能会生成大量小文件,会给HDFS带来较大的压力。因此TDW 团队开发了自动Shuffle并行度设置, 2018/12/4

19 Shuffle分区数自动设置——优化后 HDFS 优化措施 15G 30G 200G 追溯输入,自动计算每个Stage的并行度
Scan t1 Scan t2 15G 30G Scan t4 Parallel=45G/512M=90 200G Parallel=245G/512M=490 HDFS 优化措施 追溯输入,自动计算每个Stage的并行度 自动调整cube,distinct等数据膨胀操作的并行度。 防止输出小文件过多——输出小文件自动合并机制 1.其基本原理是根据输入数据自动计算每个Shuffle的并行度。 2.对部分SQL语法做微调,比如数据膨胀类型的count distinct,cube,rollup等,自动调大shuffle并 行度。 3. 虽然如此,还是有可能出现输出大量小文件的情况,为了彻底解决这个问题,我们在最终写文件前会 判断是否需要增加一个stage,根据规则判断是否需要对最终输出进行合并。虽然增加了部分的开销,但 相对于TDWhive和原生的sparksql,优势还是很明显。

20 SortMergeJoin ——优化前 问题 Shuffle Fetch效率低,网络利用率低 Sort 操作耗时
Map Reduce map Spill Merge Fetch External Sorter Join partition, spill file Memory Buffer Memory Buffer Spill file OutPut Split Sort Map Join .... .... .... other reduces other maps 问题 Shuffle Fetch效率低,网络利用率低 Sort 操作耗时 频繁Spill,序列化/反序列化   在我们上线过程中,发现在数据量大的情况下,社区SparkSQL和SortMergeJoin效果比较差,。其原因是SortMergeJoin在做Join前需要从Map端拉取数据,这些数据需要先排序,然后才能开始Join。数据量大的Join可能会消耗大量时间。其主要的时间花费在拉取数据和排序上。

21 SortMergeJoin ——优化后 优化措施 Fetch 大 Block 直接写磁盘 增加 Fetch线程
Map Reduce map Sort Spill Merge sort Fetch Merge Sort Join Partition, sort, spill file Memory Buffer Memory Buffer OutPut Split Merge sort Map Join .... .... Disk file other reduces other maps 优化措施 Fetch 大 Block 直接写磁盘 增加 Fetch线程 Join Sort排序移到Map端,Reduce端只需要合并排序   对于这个问题,最简单的解决方法是增加并行度,减少每个Reduce任务的处理的数据,但这种方法无法彻底解决Join性能问题,并且在TDW中,包含Join的SQL大约占了50%。因此TDW团队对SortMergeJoin做了深入的优化。主要包含三个部分,首先是Shuffle大的Block直接写磁盘,减少不必要的spill和序列化操作;其次是增加Shuffle的线程数,提高网络利用率;第三是把SortMergeJoin的排序操作提前到Map输出端,Reduce端只需要做合并排序。经过优化后的SortMergeJoin运行时间比社区版本减少70%。 平均运行时间减少70%

22 稳定性提升 SparkServer分散化 运行稳定性

23 SparkServer分散化 Stage1 Stage2 StageN ... SparkServer MapTask ReduceTask
SparkDriver SparkDriver SparkDriver SparkDriver Ask for MapOutStatus Stage1 Stage2 StageN MapTask ReduceTask MapTask ReduceTask ... ReduceTask ReduceTask MapTask ReduceTask ReduceTask 问题 SparkDriver的内存占用大 网络(千兆)卡超过一分钟 SparkServer最大同时运行1000条SQL,影响其他SQL的运行 引入SparkSQL时,Driver需要运行在SparkServer上,task个数多的SQL,其Driver需要占用较大的内存和网络,可能会导致SparkServer的网络卡一分钟,而SparkServer所在的机器最多可同时运行1000条SQL,这会影响其他运行的SQL。 2018/12/4

24 SparkDriver分散化 MetaStore IDE SparkServer Resource Manager Resource
洛子 SparkClient SparkClient Node Manager SparkDriver Node Manager SparkServer Executor Executor Metastore Server Metastore Server Node Manager SparkDriver MetaStore Executor Executor 改造 把SparkDriver的功能迁移到AppMaster上 增加MetastoreServer,确保Metastore Cluster的安全 增加通信协议,SparkServer获取Driver的作业进度 针对这个问题,我们做了SparkDriver的分散化,其基本原理是把SparkDirver的工作移到ApplicationMaster上,另外为了元数据库的安全,增加元数据库代理,SparkDriver通过元数据代理获取元数据信息。 2018/12/4

25 运行稳定性 推测执行改进 调度问题修复 AM经常OOM Tasks 3w+, AM内存OOM
减少Accumulators,降低AM 70%内存 AM内存配置由4G减少到1.5G 推测执行改进 推测成功后kill掉正在跑的Task 修复Commit失败导致无限重试的BUG 调度问题修复 Task不调度:动态资源申请的BUG Task调度到已释放的Executor失败 其他还优划了AM的内存,减少Accumulators,降低70%的内存,4G到1.5G 修复推测执行的BUG 发现许多隐藏的bug

26 上线与效果 ——飞行中换引擎 上线与运营    飞行中换引擎

27 升级过程 380+ 30+ 内部Issue 社区PR 升级覆盖率 1.4.1 1.5.1 1.6.1 2015.07.15
首先简单介绍一下我们对spark解决的问题,我们内部提了超过380个issus,向社区提了超过30个PR,SQL切换后sparksql的覆盖率已经达到80%。        我们最早调研的社区版本是spark1.2版本,后来基于社区1.4开始开发,由于上线SparkSQL的时间跨度很长,因此在我们升级引擎的过程中,社区版本也经历了1.5,1.6到现在最新的2.0版本,每次出现新的版本,我们都会对其性能进测试对比,并决定是否引入。因此在我们在边换引擎的同时,还会边升级SparkSQL的版本。

28 边换引擎边升级 典型的10条线上SQL性能提升
1.4(Seconds) 1.5(Seconds) 1.5 vs 1.4 1.6((Seconds) 1.6 vs 1.5 q3.sql 295 218 -26.10% 150 -23.05% q34.sql 805 313 -61.12% 243 -8.70% q42.sql 253 149 -41.11% 159 3.95% q43.sql 804 259 -67.79% 249 -1.24% q52.sql 390 168 -56.92% 153 -3.85% q55.sql 480 177 -63.13% 124 -11.04% q59.sql 494 204 -58.70% 189 -3.04% q63.sql 131 138 5.34% 8.40% q65.sql 274 229 -16.42% 163 -24.09% q82.sql 315 327 3.81% 358 9.84%        我们最早调研的社区版本是spark1.2版本,后来基于社区1.4开始开发,由于上线SparkSQL的时间跨度很长,因此在我们升级引擎的过程中,社区版本也经历了1.5,1.6到现在最新的2.0版本,每次出现新的版本,我们都会对其性能进测试对比,并决定是否引入。因此在我们在边换引擎的同时,还会边升级SparkSQL的版本。 典型的10条线上SQL性能提升

29 分组灰度上线 升级不停服,用户无影响 灰度前 数据一致性 性能评估 灰度时 双引擎自动切换 灰度后 运行时间 资源对比 异常任务监控
   TDW每天运行25万条sql,从TDWHive切换到TDWSparkSQL也是一件很艰巨的任务,需要保证数据的准确性,稳定性和性能,而我们的测试集群比较小,难以大规模的测试验证。 为此,我们采用分批测试,即把线上任务,分成多批,每次跑一批任务测试,并解决测试过程中出现的问题,当这批任务在数据、稳定性和性能都达到我们的要求后,我们就会灰度上线这批任务; 经过多次的迭代,虽然上线速度比较慢,但上线却很平稳,对用户来是无感知的切换。     测试没问题,上线还是可能出现异常,因此,对于上线以后的任务,我们还会监控其运行效果,包括资源消耗和运行时间的统计;另外还自动监控失败任务的原因,并分类汇总,方便我们对线上问题的修复和优化。

30 引擎升级的效果 每日运行SQL数 20w+ SQL运行速度平均提升 40%+ 成本节约 20%+   截止七月底,TDW平台上每日运行的SparkSQL数量已经超过20w,上图是效率提升的分布图,提升效果还是很明显的,大约一半的SQL任务运行效率提升50%以上,提升运行效率的同时,也能为集群节约成本20%+的成本。 总结一下今天分享的内容,今天首先介绍腾讯数据平台的主要架构和集群现状,其次是为什么我们要把SQL引擎从Hive升级到SparkSQL,主要介绍腾讯TDW-SparkSQL平台建设,包括功能增强,性能优化和稳定性提升,上线SparkSQL的SQL数量超过20w,整体性能提升40%,成本节约20%。

31 未来的计划 多表Join优化 支持处理数据倾斜 机器增加内存 结合Spark Streaming提供流式查询
  最后再简要介绍一下我们未来的计划,后续我们还将继续优化多表Join,智能数据倾斜等性能问题,并使用更大内存的机器,充分利用Spark内存计算的优势,另外还会扩展SparkSQL的应用,和SparkStreaming结合提供流式数据的查询。

32 Q & A


Download ppt "从TDW-Hive到TDW-SparkSQL"

Similar presentations


Ads by Google