MasterofProject

The way of uniting Spark (Advanced) - Spark entry to the master: Ninth Spark SQL operation process analysis

Label SparkSpark-SQL
4225 people read comment(1) Collection Report
Classification:

1 the overall operation process

Use the following code to analyze the SparkSQL process, so that we understand the state of several LogicalPlan, SparkSQL overall understanding of the implementation process

/ / SCIsExisting SparkContext. an
SqlContext Val =NewOrg.apache.spark.sql.SQLContext (SC)
/ / thisIsUsedToConvert an RDD implicitlyToDataFrame. a
SqlContext.implicits._ import

The schema using a / / DefineCaseClass.
/ / Note:CaseClassesInScalaTwo point one zeroSupport only up canTo Twenty-twoFields.ToAround this limit work,
You / / canUseClasses that implement the Product custom interface.
CasePerson class (name:StringInt age:)

An RDD / / CreateOfObjects PersonAnd RegisterAs a table. it
People Val = sc.textFile ("/examples/src/main/resources/people.txt").Map(_.split (",")).Map(P = > Person (P (Zero(P).One(.Trim.toInt) (.ToDF))
People.registerTempTable ("People")

SQL statements can be run by / using the SQL methods provided by sqlContext.
Teenagers Val = sqlContext.sql ("SELECT name, age FROM people WHERE age AND age < 19 > = 13")

(1) view Schema teenagers information

Teenagers.printSchema scala>
Root
|-- name:String(nullable =True)
|-- age:Integer(nullable =False)

(2) view the running process

Teenagers.queryExecution scala>
Org.apache.spark.sql.SQLContext#QueryExecution res3: =
Logical Plan = = = = Parsed
[unresolvedalias'Project (''. 'Unresolvedalias (name) ()'age)]
'Filter (('age > = 13 & & (')Age < = 19))
[people] None,'UnresolvedRelation

Logical Plan = = = = Analyzed
String age:, int name:
[name#0 age#1], Project
Filter ((age#1 = 13) & & (age#1 < = 19))
People Subquery
[name#0 LogicalRDD, age#1], at rddToDataFrameHolder at <console> MapPartitionsRDD[4]: 22

Logical Plan = = = = Optimized
Filter ((age#1 = 13) & & (age#1 < = 19))
[name#0 LogicalRDD, age#1], at rddToDataFrameHolder at <console> MapPartitionsRDD[4]: 22

Plan = = = = Physical
Filter ((age#1 = 13) & & (age#1 < = 19))
PhysicalRDD[name#0 age#1], Scan

Generation: true Code

QueryExecution is expressed in the overall SQL Spark running process, the output from the above results can be seen, a SQL statement to be executed after the following steps:

Parsed Logical Plan (1) = = = =
[unresolvedalias'Project (''. 'Unresolvedalias (name) ()'age)]
'Filter (('age > = 13 & & (')Age < = 19))
[people] None,'UnresolvedRelation

Analyzed Logical Plan (2) = = = =
String age:, int name:
[name#0 age#1], Project
Filter ((age#1 = 13) & & (age#1 < = 19))
People Subquery
[name#0 LogicalRDD, age#1], at rddToDataFrameHolder at <console> MapPartitionsRDD[4]: 22

Optimized Logical Plan (3) = = = =
Filter ((age#1 = 13) & & (age#1 < = 19))
[name#0 LogicalRDD, age#1], at rddToDataFrameHolder at <console> MapPartitionsRDD[4]: 22

(4) Physical Plan = = = =
Filter ((age#1 = 13) & & (age#1 < = 19))
PhysicalRDD[name#0 age#1], Scan

Start / / dynamic byte code generation technology (bytecode generation CG), to enhance the query efficiency
Generation: true Code

2 full table query operation process

Execute statement:

ValSqlContext.sql all= ("SELECT * people FROM")

Operation flow:

All.queryExecution scala>
Org.apache.spark.sql.SQLContext#QueryExecution res9: =
* / / / / is parsed into unresolvedalias (*)
Logical Plan = = = = Parsed
[unresolvedalias'Project (*)]
'[people] None, UnresolvedRelation

Logical Plan = = = = Analyzed
//unresolvedalias (*) is analyzed for all fields in Schema
[people] analyzed is Subquery as people //UnresolvedRelation
名称:字符串,年龄:int
项目名称# [ 0,1 ]年龄#
子查询的人
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:22

=优化逻辑计划
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:22

=物理计划= =
扫描physicalrdd [名字# 0,年龄# 1 ]

代码生成:真实

3。过滤查询运行流程

执行语句:

Scala >瓦尔filterquery = sqlcontext。SQL“选择*从人在年龄>= 13和年龄<= 19”filterquery:orgApache。火花。SQL数据帧。= [名:字符串,年龄:int ]

执行流程:

Scala > filterquery.queryexecution
RES0:org. Apache的火花。SQL。sqlcontext #当=
=逻辑计划分析
项目[ unresolvedalias(*)]
'过滤器((年龄>13岁)年龄:19岁)
“unresolvedrelation [人],没有

=分析逻辑计划
名称:字符串,年龄:int
项目名称# [ 0,1 ]年龄#
/ /多出了滤波器,后同
滤波器((年龄# 1 > = 13)和(1岁# <= 19))
子查询的人
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:20

=优化逻辑计划
滤波器((年龄# 1 > = 13)和(1岁# <= 19))
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:20

=物理计划= =
滤波器((年龄# 1 > = 13)和(1岁# <= 19))
扫描physicalrdd [名字# 0,年龄# 1 ]

代码生成:真实

4。加入查询运行流程

执行语句:

瓦尔连接查询= sqlcontext SQL(”。选择*人民,人民在哪儿a.年龄= b.age

查看整体执行流程

Scala>连接查询当
RES0:orgApache火花SQLsqlcontext当# =
/ /注意滤波器
加入内
= =解析逻辑计划= =
项目[ unresolvedalias(*)]
'过滤器(a.年龄= ' 'B年龄)
  加入内心,没有一个
'unresolvedrelation],一些(一)
“unresolvedrelation],有些(乙)

=分析逻辑计划
名称:字符串,年龄:int,名称:字符串,年龄:int
项目名称# 0,年龄# 1,名字# 2,年龄# 3]
滤波器(年龄1岁# # = 3)
加入内在,无
中的一个
子查询的人
logicalrdd名称# 0,年龄# 1],mappartitionsrdd]在<控制台rddtodataframeholder >:22
子查询B
子查询的人
logicalrdd名称# 2,年龄# 3],mappartitionsrdd]在<控制台rddtodataframeholder >:22

=优化逻辑计划
项目名称# 0,年龄# 1,名字# 2,年龄# 3]
加入内,一些((年龄1岁# # = 3))
logicalrdd名称# 0,年龄# 1],mappartitionsrdd]…

/ /查看其物理计划
Scala > joinquery.queryexecution.sparkplan
res16:org.apache.spark.sql.execution.sparkplan =
tungstenproject名称# 0,年龄# 1,名字# 2,年龄# 3]
sortmergejoin年龄# 1]年龄# 3]
扫描physicalrdd名称# 0,年龄# 1]
扫描physicalrdd名称# 2,年龄# 3]

前面的例子与下面的例子等同,只不过其运行方式略有不同,执行语句:

Scala >瓦尔innerquery = sqlcontext。SQL“选择*从人的内部联接人B a.年龄= b.age”innerquery:orgApache。火花。SQL数据帧。= [姓名:年龄:整型,字符串,字符串名称:,年龄:int ]

查看整体执行流程:

Scala > innerquery.queryexecution
RES2:org. Apache的火花。SQL。sqlcontext #当=
/ /注意加入内
/ /另外这里面没有滤波器
=逻辑计划分析
项目[ unresolvedalias(*)]
'加入内心,有些((a.年龄= ' 'b.age))
  “unresolvedrelation [人],一些(一)
'unresolvedrelation [人],一些(B)

=分析逻辑计划
名称:字符串,年龄:int,名称:字符串,年龄:int
项目名称# 0,年龄# 1,名字# 4,年龄# 5]
 加入内心,有些((年龄# 1=年龄# 5)
中的一个
子查询的人
logicalrdd [名字# 0,年龄# 1mappartitionsrdd ],[在< > rddtodataframeholder控制台]:二十二
子查询B
子查询的人
logicalrdd [名字# 4,年龄# 5mappartitionsrdd ],[在< > rddtodataframeholder控制台]:二十二

/ /注意优化逻辑计划与分析逻辑图
/ /并没有进行特别的优化,突出这一点是为了比较后面的子查询
/ /其间的区别和优化分析
=优化逻辑计划
项目名称# 0,年龄# 1,名字# 4,年龄# 5]
 加入内心,有些((年龄# 1=年龄# 5)
logicalrdd [名字# 0,年龄# 1mappartitionsrdd ],[在rddtodataframeholder ]…

/ /查看其物理计划
Scala > innerquery.queryexecution.sparkplan
res14:org.apache.spark.sql.execution.sparkplan =
tungstenproject [名字# 0,年龄# 1,名字# 6,年龄# 7]
sortmergejoin [年龄# 1[年龄# 7]
扫描physicalrdd [名字# 0,年龄# 1]
扫描physicalrdd [名字# 6,年龄# 7]

5。子查询运行流程

执行语句:

Scala >瓦尔查询= sqlcontext。SQL“选择*从(SELECT * FROM人年龄> = 13),a.年龄<= 19”子查询:orgApache。火花。SQL数据帧。= [名:字符串,年龄:int ]

查看整体执行流程:


Scala > subquery.queryexecution
RES4:org. Apache的火花。SQL。sqlcontext #当=
=逻辑计划分析
项目[ unresolvedalias(*)]
'过滤器(“a.年龄<= 19)
'中的一个
项目[ unresolvedalias(*)]
过滤(年龄> = 13)
“unresolvedrelation [人],没有

=分析逻辑计划
名称:字符串,年龄:int
项目名称# [ 0,1 ]年龄#
滤波器(年龄# 1 < = 19)
中的一个
项目名称# [ 0,1 ]年龄#
滤波器(年龄# 1 > = 13)
子查询的人
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:22

/ /这里需要注意与间的区别优化分析
被进行了优化/滤波器
=优化逻辑计划
滤波器((年龄# 1 > = 13)和(1岁# <= 19))
logicalrdd [名字# 0,年龄1 mappartitionsrdd # ],[ 4 ]在<控制台> rddtodataframeholder:22

=物理计划= =
滤波器((年龄# 1 > = 13)和(1岁# <= 19))
扫描physicalrdd [名字# 0,年龄# 1 ]

代码生成:真实

6。聚合SQL运行流程

执行语句:

Scala >瓦尔aggregatequery = sqlcontext。SQL“选择a.name,总和(a.年龄)从(SELECT * FROM人年龄> = 13)a.年龄≤19组,由a.name”aggregatequery:orgApache。火花。SQL数据帧。= [名:字符串,_c1:bigint ]

运行流程查看:


Scala > aggregatequery.queryexecution
了RES6:org. Apache的火花。SQL。sqlcontext #当=
/ /注意'aggregate [ 'a.name ]、[ unresolvedalias('a.name),unresolvedalias('sum('a.age))]
/ /即组a.name被解析为unresolvedalias('a.name)
=逻辑计划分析
'骨料'a.name ]、[ unresolvedalias(“a.name),unresolvedalias('和(“a.年龄))]
'过滤器(“a.年龄<= 19)
'中的一个
   项目[ unresolvedalias(*)]
'过滤器(年龄>13岁
'unresolvedrelation [人],没有

=分析逻辑计划
名称:字符串,_c1:bigint
聚合[名称# 0[名字# 0,和(投(龄# 1 作为BigInt))作为_c1# 9我]
过滤器(年龄# 1< =十九)
中的一个
项目名称# 0,年龄# 1]
过滤器(年龄# 1=十三)
子查询的人
logicalrdd [名字# 0,年龄# 1mappartitionsrdd ],[在< > rddtodataframeholder控制台]:二十二

=优化逻辑计划
聚合[名称# 0[名字# 0,和(投(龄# 1 作为BigInt))作为_c1# 9我]
过滤((年龄# 1=十三)&(年龄# 1< =十九)
logicalrdd [名字# 0,年龄# 1],mappartitions…

/ /查看其物理计划
Scala > aggregatequery.queryexecution.sparkplan
res10:org.apache.spark.sql.execution.sparkplan =
tungstenaggregate(关键= [名字# 0,函数为[(总和(投# 1 作为BigInt)),模式=最后,区别=)],输出# 0,_c1# 14)]
tungstenaggregate(关键= [名字# 0,函数为[(总和(投# 1 作为BigInt)),模式=部分,区别=)],输出# 0,currentsum# 17)]
过滤((年龄# 1=十三)&(年龄# 1< =十九)
扫描physicalrdd [名字# 0,年龄# 1]

其它SQL语句,大家可以使用同样的方法查看其执行流程,以掌握SQL背后实现的基本思想火花。

猜你在找
查看评论
*以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
    个人资料
    • 访问:447799次
    • 积分:五千七百八十二
    • 等级:
    • 排名:2384名第
    • 原创:91篇
    • 转载:0篇
    • 译文:1篇
    • 评论:159条
    博客专栏
    文章分类
    文章存档
    Latest comments