第5章 Spark SQL (PPT版本号:2019年春季学期) 《Spark编程基础(Python版)》 教材官网: http://dblab.xmu.edu.cn/post/spark-python/ 第5章 Spark SQL (PPT版本号:2019年春季学期) 温馨提示:编辑幻灯片母版,可以修改每页PPT的厦大校徽和底部文字 林子雨 厦门大学计算机科学系 E-mail: ziyulin@xmu.edu.cn 主页: http://dblab.xmu.edu.cn/post/linziyu 扫一扫访问教材官网 厦门大学计算机科学系 2019版
课程教材 Spark入门教程(Python版) http://dblab.xmu.edu.cn/blog/1709-2/ 纸质教材预期在2019年夏天上市销售 扫一扫访问在线教程 本书以Python作为开发Spark应用程序的编程语言,系统介绍了Spark编程的基础知识。全书共8章,内容包括大数据技术概述、Spark的设计与运行原理、Spark环境搭建和使用方法、RDD编程、Spark SQL、Spark Streaming、Structured Streaming、Spark MLlib等。本书每个章节都安排了入门级的编程实践操作,以便读者更好地学习和掌握Spark编程方法。本书官网免费提供了全套的在线教学资源,包括讲义PPT、习题、源代码、软件、数据集、授课视频、上机实验指南等。
提纲 5.1 Spark SQL简介 5.2 DataFrame概述 5.3 DataFrame的创建 5.4 DataFrame的保存 5.6 从RDD转换得到DataFrame 5.7 使用Spark SQL读写数据库 百度搜索厦门大学数据库实验室网站访问平台
5.1 Spark SQL简介 5.1.1 从Shark说起 5.1.2 Spark SQL设计 5.1.3 为什么推出Spark SQL
Hive中SQL查询的MapReduce作业转化过程 5.1.1 从Shark说起 Hive: SQL-on-Hadoop Hive中SQL查询的MapReduce作业转化过程
5.1.1 从Shark说起 Shark即Hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MapReduce作业替换成了Spark作业,通过Hive的HiveQL解析,把HiveQL翻译成Spark上的RDD操作
5.1.1 从Shark说起 Shark的出现,使得SQL-on-Hadoop的性能比Hive有了10-100倍的提高
5.1.1 从Shark说起 Shark的设计导致了两个问题: 一是执行计划优化完全依赖于Hive,不方便添加新的优化策略 二是因为Spark是线程级并行,而MapReduce是进程级并行,因此,Spark在兼容Hive的实现上存在线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支
5.1.1 从Shark说起 2014年6月1日Shark项目和Spark SQL项目的主持人Reynold Xin宣布:停止对Shark的开发,团队将所有资源放在Spark SQL项目上,至此,Shark的发展画上了句号,但也因此发展出两个分支:Spark SQL和Hive on Spark Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎
5.1.2 Spark SQL设计 Spark SQL在Hive兼容层面仅依赖HiveQL解析、Hive元数据,也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。Spark SQL执行计划生成和优化都由Catalyst(函数式关系查询优化框架)负责 随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于Hive的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的One Stack Rule Them All的既定方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天空”。 l数据兼容方面 不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据; l性能优化方面 除了采取In-Memory Columnar Storage、byte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等; l组件扩展方面 无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。 图 Spark SQL架构
图 Spark SQL支持的数据格式和编程语言 Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、HDFS、Cassandra等外部数据源,还可以是JSON格式的数据 Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范 图 Spark SQL支持的数据格式和编程语言
5.1.3 为什么推出Spark SQL
5.1.3 为什么推出Spark SQL
5.1.3 为什么推出Spark SQL 关系数据库已经很流行 关系数据库在大数据时代已经不能满足要求 首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据 其次,用户需要执行高级分析,比如机器学习和图像处理 在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统 Spark SQL填补了这个鸿沟: 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各种关系型操作 其次,可以支持大数据中的大量数据源和数据分析算法 Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学习算法的数据处理能力
5.2 DataFrame概述 DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能 Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询 只不过RDD就像一个空旷的屋子,你要找东西要把这个屋子翻遍才能找到。那我们的这个DataFrame相当于在你的屋子里面打上了货架。那你只要告诉他你是在第几个货架的第几个位置,那不就是二维表吗。那就是我们DataFrame就是在RDD基础上加入了列。实际上我们处理数据就像处理二维表一样。 图 DataFrame与RDD的区别 RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的 DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息
5.3 DataFrame的创建 从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能 SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持 可以通过如下语句创建一个SparkSession对象: from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() 实际上,在启动进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)
5.3 DataFrame的创建 在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如: spark.read.text("people.txt"):读取文本文件people.txt创建DataFrame spark.read.json("people.json"):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径 spark.read.parquet(“people.parquet”):读取people.parquet文件创建DataFrame
5.3 DataFrame的创建 或者也可以使用如下格式的语句: spark.read.format("text").load("people.txt"):读取文本文件people.json创建DataFrame; spark.read.format("json").load("people.json"):读取JSON文件people.json创建DataFrame; spark.read.format("parquet").load("people.parquet"):读取Parquet文件people.parquet创建DataFrame。
5.3 DataFrame的创建 一个实例 在“/usr/local/spark/examples/src/main/resources/”这个目录下,这个目录下有两个样例数据people.json和people.txt。 people.json文件的内容如下: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} people.txt文件的内容如下: Michael, 29 Andy, 30 Justin, 19
5.3 DataFrame的创建 >>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json") >>> df.show() +----+-------+ | age| name| |null|Michael| | 30| Andy| | 19| Justin| 一、隐式转换介绍 (1) 包括隐式参数、隐式对象、隐式类 (2) scala独有的。 (3) 当调用对象中不存在的方法,系统会扫描上下文和伴对象看是否有implicit方法,如果有隐式方法则调用隐式方法,隐式方法传入原生对象返回包含扩展方法的对象。 (4)原类型和伴生对象都找不到的隐式值,会找手动导入的implicit Import Spark.implicit._
5.4 DataFrame的保存 可以使用spark.write操作,把一个DataFrame保存成不同格式的文件,例如,把一个名称为df的DataFrame保存到不同格式文件中,方法如下: df.write.text("people.txt") df.write.json("people.json“) df.write.parquet("people.parquet“) 或者也可以使用如下格式的语句: df.write.format("text").save("people.txt") df.write.format("json").save("people.json") df.write.format ("parquet").save("people.parquet")
5.4 DataFrame的保存 下面从示例文件people.json中创建一个DataFrame,名称为peopleDF,把peopleDF保存到另外一个JSON文件中,然后,再从peopleDF中选取一个列(即name列),把该列数据保存到一个文本文件中 >>> peopleDF = spark.read.format("json").\ ... load("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopleDF.select("name", "age").write.format("json").\ ... save("file:///usr/local/spark/mycode/sparksql/newpeople.json") >>> peopleDF.select("name").write.format("text").\ ... save("file:///usr/local/spark/mycode/sparksql/newpeople.txt") 会新生成一个名称为newpeople.json的目录(不是文件)和一个名称为newpeople.txt的目录(不是文件) part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json _SUCCESS
5.5 DataFrame的常用操作 可以执行一些常用的DataFrame操作 >>> df=spark.read.json(“people.json”) printSchema() select()
5.5 DataFrame的常用操作 filter() groupBy()
5.5 DataFrame的常用操作 sort()
5.6 从RDD转换得到DataFrame 5.6.1 利用反射机制推断RDD模式 5.6.2 使用编程方式定义RDD模式 Spark官网提供了两种方法来实现从RDD转换得到DataFrame 第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换 第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上
5.6.1 利用反射机制推断RDD模式 在“/usr/local/spark/examples/src/main/resources/”目录下,有个Spark安装时自带的样例数据people.txt,其内容如下: Michael, 29 Andy, 30 Justin, 19 现在要把people.txt加载到内存中生成一个DataFrame,并查询其中的数据
5.6.1 利用反射机制推断RDD模式 >>> from pyspark.sql import Row >>> people = spark.sparkContext.\ ... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").\ ... map(lambda line: line.split(",")).\ ... map(lambda p: Row(name=p[0], age=int(p[1]))) >>> schemaPeople = spark.createDataFrame(people) #必须注册为临时表才能供下面的查询使用 >>> schemaPeople.createOrReplaceTempView("people") >>> personsDF = spark.sql("select name,age from people where age > 20") #DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值 >>> personsRDD=personsDF.rdd.map(lambda p:"Name: "+p.name+ ","+"Age: "+str(p.age)) >>> personsRDD.foreach(print) Name: Michael,Age: 29 Name: Andy,Age: 30
5.6.2 使用编程方式定义RDD模式 当无法提前获知数据结构时,就需要采用编程方式定义RDD模式。 比如,现在需要通过编程方式把people.txt加载进来生成DataFrame,并完成SQL查询。 图 通过编程方式定义RDD模式的实现过程
5.6.2 使用编程方式定义RDD模式 >>> from pyspark.sql.types import * >>> from pyspark.sql import Row #下面生成“表头” >>> schemaString = "name age" >>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")] >>> schema = StructType(fields) #下面生成“表中的记录” >>> lines = spark.sparkContext.\ ... textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") >>> parts = lines.map(lambda x: x.split(",")) >>> people = parts.map(lambda p: Row(p[0], p[1].strip())) #下面把“表头”和“表中的记录”拼装在一起 >>> schemaPeople = spark.createDataFrame(people, schema) 剩余代码见下一页
5.6.2 使用编程方式定义RDD模式 #注册一个临时表供下面查询使用 >>> schemaPeople.createOrReplaceTempView("people") >>> results = spark.sql("SELECT name,age FROM people") >>> results.show() +-------+---+ | name|age| |Michael| 29| | Andy| 30| | Justin| 19|
5.7 使用Spark SQL读写数据库 5.7.1 准备工作 5.7.2 读取MySQL数据库中的数据 Spark SQL可以支持Parquet、JSON、Hive等数据源,并且可以通过JDBC连接外部数据源 5.7.1 准备工作 5.7.2 读取MySQL数据库中的数据 5.7.3 向MySQL数据库写入数据
5.7.1 准备工作 请参考厦门大学数据库实验室博客教程《 Ubuntu安装MySQL 》,在Linux系统中安装好MySQL数据库 教程地址: http://dblab.xmu.edu.cn/blog/install-mysql/ 平台每年访问量超过100万次
5.7.1 准备工作 在Linux中启动MySQL数据库 $ service mysql start $ mysql -u root -p #屏幕会提示你输入密码 输入下面SQL语句完成数据库和表的创建: mysql> create database spark; mysql> use spark; mysql> create table student (id int(4), name char(20), gender char(4), age int(4)); mysql> insert into student values(1,'Xueqian','F',23); mysql> insert into student values(2,'Weiliang','M',24); mysql> select * from student;
5.7.1 准备工作 下载MySQL的JDBC驱动程序,比如mysql-connector-java-5.1.40.tar.gz 把该驱动程序拷贝到spark的安装目录” /usr/local/spark/jars”下 启动pyspark $ cd /usr/local/spark $ ./bin/pyspark
5.7.2 读取MySQL数据库中的数据 执行以下命令连接数据库,读取数据,并显示: >>> jdbcDF = spark.read \ .format("jdbc") \ .option("driver","com.mysql.jdbc.Driver") \ .option("url", "jdbc:mysql://localhost:3306/spark") \ .option("dbtable", "student") \ .option("user", "root") \ .option("password", "123456") \ .load() >>> jdbcDF.show() +---+--------+------+---+ | id| name|gender|age| | 1| Xueqian| F| 23| | 2|Weiliang| M| 24|
5.7.3 向MySQL数据库写入数据 在MySQL数据库中创建了一个名称为spark的数据库,并创建了一个名称为student的表 创建后,查看一下数据库内容:
5.7.3 向MySQL数据库写入数据 现在开始编写程序,往spark.student表中插入两条记录 #!/usr/bin/env python3 from pyspark.sql import Row from pyspark.sql.types import * from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate() #下面设置模式信息 schema = StructType([StructField("id", IntegerType(), True), \ StructField("name", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("age", IntegerType(), True)])
5.7.3 向MySQL数据库写入数据 #下面设置两条数据,表示两个学生的信息 studentRDD = spark \ .sparkContext \ .parallelize(["3 Rongcheng M 26","4 Guanhua M 27"]) \ .map(lambda x:x.split(" ")) #下面创建Row对象,每个Row对象都是rowRDD中的一行 rowRDD = studentRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip()))) #建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来 studentDF = spark.createDataFrame(rowRDD, schema) #写入数据库 prop = {} prop['user'] = 'root' prop['password'] = '123456' prop['driver'] = "com.mysql.jdbc.Driver" studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
5.7.3 向MySQL数据库写入数据 可以看一下效果,看看MySQL数据库中的spark.student表发生了什么变化 mysql> select * from student; +------+-----------+--------+------+ | id | name | gender | age | | 1 | Xueqian | F | 23 | | 2 | Weiliang | M | 24 | | 3 | Rongcheng | M | 26 | | 4 | Guanhua | M | 27 | 4 rows in set (0.00 sec)
附录A:主讲教师林子雨简介 主讲教师:林子雨 单位:厦门大学计算机科学系 E-mail: ziyulin@xmu.edu.cn 个人网页:http://dblab.xmu.edu.cn/post/linziyu 数据库实验室网站:http://dblab.xmu.edu.cn 扫一扫访问个人主页 林子雨,男,1978年出生,博士(毕业于北京大学),现为厦门大学计算机科学系助理教授(讲师),曾任厦门大学信息科学与技术学院院长助理、晋江市发展和改革局副局长。中国计算机学会数据库专业委员会委员,中国计算机学会信息系统专业委员会委员。国内高校首个“数字教师”提出者和建设者,厦门大学数据库实验室负责人,厦门大学云计算与大数据研究中心主要建设者和骨干成员,2013年度和2017年度厦门大学教学类奖教金获得者,荣获2017年福建省精品在线开放课程、2018年厦门大学高等教育成果特等奖、2018年福建省高等教育教学成果二等奖、2018年国家精品在线开放课程。主要研究方向为数据库、数据仓库、数据挖掘、大数据、云计算和物联网,并以第一作者身份在《软件学报》《计算机学报》和《计算机研究与发展》等国家重点期刊以及国际学术会议上发表多篇学术论文。作为项目负责人主持的科研项目包括1项国家自然科学青年基金项目(No.61303004)、1项福建省自然科学青年基金项目(No.2013J05099)和1项中央高校基本科研业务费项目(No.2011121049),主持的教改课题包括1项2016年福建省教改课题和1项2016年教育部产学协作育人项目,同时,作为课题负责人完成了国家发改委城市信息化重大课题、国家物联网重大应用示范工程区域试点泉州市工作方案、2015泉州市互联网经济调研等课题。中国高校首个“数字教师”提出者和建设者,2009年至今,“数字教师”大平台累计向网络免费发布超过500万字高价值的研究和教学资料,累计网络访问量超过500万次。打造了中国高校大数据教学知名品牌,编著出版了中国高校第一本系统介绍大数据知识的专业教材《大数据技术原理与应用》,并成为京东、当当网等网店畅销书籍;建设了国内高校首个大数据课程公共服务平台,为教师教学和学生学习大数据课程提供全方位、一站式服务,年访问量超过100万次。
附录B:大数据学习路线图 大数据学习路线图访问地址:http://dblab.xmu.edu.cn/post/10164/
附录C:《大数据技术原理与应用》教材 扫一扫访问教材官网 《大数据技术原理与应用——概念、存储、处理、分析与应用(第2版)》,由厦门大学计算机科学系林子雨博士编著,是国内高校第一本系统介绍大数据知识的专业教材。人民邮电出版社 ISBN:978-7-115-44330-4 定价:49.80元 全书共有15章,系统地论述了大数据的基本概念、大数据处理架构Hadoop、分布式文件系统HDFS、分布式数据 库HBase、NoSQL数据库、云数据库、分布式并行编程模型MapReduce、Spark、流计算、图计算、数据可视化以及大数据在互联网、生物医学和物流等各个领域的应用。在Hadoop、HDFS、HBase和MapReduce等重要章节,安排了入门级的实践操作,让读者更好地学习和掌握大数据关键技术。 本书可以作为高等院校计算机专业、信息管理等相关专业的大数据课程教材,也可供相关技术人员参考、学习、培训之用。 扫一扫访问教材官网 欢迎访问《大数据技术原理与应用——概念、存储、处理、分析与应用》教材官方网站:http://dblab.xmu.edu.cn/post/bigdata
附录D:《大数据基础编程、实验和案例教程》 本书是与《大数据技术原理与应用(第2版)》教材配套的唯一指定实验指导书 步步引导,循序渐进,详尽的安装指南为顺利搭建大数据实验环境铺平道路 深入浅出,去粗取精,丰富的代码实例帮助快速掌握大数据基础编程方法 精心设计,巧妙融合,五套大数据实验题目促进理论与编程知识的消化和吸收 结合理论,联系实际,大数据课程综合实验案例精彩呈现大数据分析全流程 清华大学出版社 ISBN:978-7-302-47209-4 定价:59元
附录E:《Spark编程基础(Scala版)》 厦门大学 林子雨,赖永炫,陶继平 编著 披荆斩棘,在大数据丛林中开辟学习捷径 填沟削坎,为快速学习Spark技术铺平道路 深入浅出,有效降低Spark技术学习门槛 资源全面,构建全方位一站式在线服务体系 人民邮电出版社出版发行,ISBN:978-7-115-48816-9 教材官网:http://dblab.xmu.edu.cn/post/spark/ 本书以Scala作为开发Spark应用程序的编程语言,系统介绍了Spark编程的基础知识。全书共8章,内容包括大数据技术概述、Scala语言基础、Spark的设计与运行原理、Spark环境搭建和使用方法、RDD编程、Spark SQL、Spark Streaming、Spark MLlib等。本书每个章节都安排了入门级的编程实践操作,以便读者更好地学习和掌握Spark编程方法。本书官网免费提供了全套的在线教学资源,包括讲义PPT、习题、源代码、软件、数据集、授课视频、上机实验指南等。
附录F:高校大数据课程公共服务平台 http://dblab.xmu.edu.cn/post/bigdata-teaching-platform/ 扫一扫访问平台主页 扫一扫观看3分钟FLASH动画宣传片
Department of Computer Science, Xiamen University, 2019