Download presentation
Presentation is loading. Please wait.
Published byCharles Nelson Modified 6年之前
2
Spark SQL 介绍 付士涛
3
大纲 Architecture(架构) 像Hive一样的User Interface(用户操作界面)
DataFrame的使用(1.3以前叫做SchemaRDD)
4
Architecture 优化 传统的SQL SparkSQL Parsing Analysis Binding & Analyzing
Sql Text DataFrame Hive AST Parsing Unresolved Logical Plan Unresolved Logical Plan 传统的SQL SparkSQL Analysis Binding & Analyzing Metadata Resolved Logical Plan Logical Plan 优化 Optimize Optimizing Optimized Logical Plan Optimized Logical Plan Query Planning Query Planning Physical Plans Physical Plan Execution RDD&Execution
5
SparkSQL和Hive-On-Spark对比
功能上 实现上 覆盖了大部分的HiveQL特征 /HiveServer / CLI 提供了Spark的引擎特征(如缓存表) 提供给开发者的API(DataFrame) 为高级用户提供可插拔的组件(分析器/优化器等) 借用了HIVE上词汇分析的jars 借用了Hive Metastore和数据访问的API 重写了自己的分析器、执行器、优化器,脱离了hive的大部分依赖,提高了执行的效率
6
开启SparkSQL Thrift Service /CLI
Start the ThriftService $SPARK_HOME/sbin/start-thriftserver.sh Start the CLI $SPARK_HOME/bin/spark-sql 注:几乎在不改动HIVE任何配置的情况下重用HIVE的代码
7
SparkSQL的三个核心部分 1. 可以加载各种结构化数据源(e.g., JSON, Hive, and Parquet).
2. 可以让你通过SQL ,Spark 内部程序或者外部工具,通过标准的数据库连接(JDBC/ODBC)连接Spark,比如一个商业智能的工具Tableau 3.SparkSQL 提供丰富又智能的SQL或者 regular Python/Java/Scala code,包括 join RDDS ,SQL tables ,使用SQL自定义用户函数UDF
8
结构化数据的统一接口
9
DataFrame 容易使用的API 支持Pydata Pandas 内置很多常用的功能和数据源
多种开发语言的支持(Scala/Java/Python/R/SQL) 感觉不到是运行在分布式集群之上的 只有在真正去执行的时候才会真实进行操作(lazy) 对逻辑计划和表达式进行了优化 可以和hive共享同一个metastore 和spark进行了无缝的集成 可扩展性
10
DataFrame & Column Column(表达式) DataFrame(操作) 聚合函数 基本操作
printSchema / schema / explain / dtypes / cache/ persist etc. Actions head / first / take / count / collect etc. 像RDD一样的操作 map / flatMap / mapPartitions / foreach etc 集成性语言查询(SQL) filter / join / union / select / orderBy / groupBy / cube / rollup / sample / randomSplit etc. Column(表达式) 聚合函数 sum/avg/max/min … 一般函数操作(UDF) and / or / not / rand / lower / upper ... 数学函数 + / - / * /cos/sin/asin .. 统计函数 pearsonCorrelation / crossTabulate / calculateCov …
11
开始:DataFrames SparkSQL切入点
Spark SQL 中所有相关功能的入口点是 SQLContext 类或者它的子类, 创建一个 SQLContext 需要一个 SparkContext。(在此以scala代码为例) 使用 Scala 创建方式如下: val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ 注:HiveContext继承了SQLContext,使用它即可对hive进行操作,可以直接从hive表读取数据,而且提供了UDF的功能。
12
创建DataFrame的形式 使用 SQLContext,应用可以从一个存在的 RDD、Hive 表或者数据源中创建 DataFrame。
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.json("file:///home/data/people.json") // Displays the content of the DataFrame to stdout df.show()
13
DataFrame操作 DataFrame提供了一种DSL语言,例如scala,java以及python //引用上页的代码
// Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select everybody, but increment the age by 1 df.select(df("name"), df("age") + 1).show() // name (age + 1) // Andy 31 // Justin 20 // Select people older than 21 df.filter(df("age") > 21).show() // age name // 30 Andy
14
RDD转化为DataFrame 利用反射推断模式
Spark SQL的 Scala 接口支持将包含样本类的 RDD 自动转换为 DataFrame。这个样本类定义了表的模式。样本类的参数名字通过反射来读取,然后作为列的名字。样本类可以嵌套或者包含复杂的类型如序列或者数组。这个 RDD 可以隐式转化为一个 DataFrame,然后注册为一个表,表可以在后续的 sql 语句中使用。 编程指定模式 当样本类不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。 从原来的 RDD 创建一个行的 RDD 创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配 在行 RDD 上通过 createDataFrame 方法应用模式
15
反射推断模式 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._ case class Person(name: String, age: Int) val people = sc.textFile("file:///resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
16
编程指定模式 从原来的 RDD 创建一个行的 RDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc) val people = sc.textFile("file:///resources/people.txt") import org.apache.spark.sql.Row; val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) 创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配 val schemaString = "name age" import org.apache.spark.sql.types.{StructType,StructField,StringType}; val schema =StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 在行 RDD 上通过 createDataFrame 方法应用模式 val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) peopleDataFrame.registerTempTable("people") val results = sqlContext.sql("SELECT name FROM people") results.map(t => "Name: " + t(0)).collect().foreach(println)
17
Data Source APIs // Scala
val df1 = Seq(("Justin", 31, 1), ("Hao", 33, 2)). toDF("name","age", "deptId") df1.write.mode( SaveMode.Overwrite).parquet("/data/people.parquet")
18
用Hive集成 // Scala SparkContext sc = new SparkContext(new SparkConf()
. setMaster("local"). setAppName("test")); HiveContext hc = new HiveContext(sc); val hc = new org.apache.spark.sql.HiveContext(sc) val employee = hc.json("/user/data/employee") val dept = hc.json("/user/data/department") val result = employee.filter("age > 30") .join(dept, employee("deptId") === dept("id")) . groupBy(dept("name"), "gender") result.orderBy("age").select("gender").head(4)
19
// Assume we have a Hive Table Name "salary"
employee.registerTempTable("employee") val result: DataFrame = hc.sql("SELECT avg(s.amount), e.deptId FROM employee e JOIN salary s ON e.id=s.id GROUP BY e.deptId") result.write(). saveAsTable("myresult") // Do query against table "myresult" in Hive
20
自定义UDF // Register the function in Scala
sqlContext.udf.register("myUdf", (arg1: Int, arg2: String) => arg2 + arg1) // Query in SQL using the function sqlContext.sql("SELECT myUDF(age, name) from people")
21
内存中的列式缓存表 Spark Caching DataFrame Columnar Caching
22
内存中的列式缓存表(续) 更少的分配对象(更少的内存,更少的GC) 扫描/反序列化所需的列 更好的压缩 开发人员code形式:
df.cache() / df.persist(StorageLevel) / df.unpersist() 对于CLI/ThriftService用户 "CACHE [LAZY]TABLE result AS SELECT * FROM employee WHERE joinDate>’ ’;" "UNCACHE TABLE result;"
23
SQL自动的优化执行计划 SparkSQL优化逻辑计划和表达式树 // Scala
val t1 = hc.table("a") val t2 = hc.table("b") val result1 = t1.join(t2, (t1("key")===t2("key")) && (t1("key") < 10)).select(t1("value"), t2("value")) SELECT a.age, b. FROM aINNER JOIN b ON a.key=b.key AND a.key<10 result1.explain(true) // print logical plan (before / after optimization)
24
谢谢
Similar presentations