这篇文章中,我根据Spark SQL的论文,介绍了Spark SQL的一个关键模块:DataFrame API。我们现在已经知道,DataFrame和RDDs之间的关系应当是:DataFrame可以转化为RDDs,而RDDs也可以被映射为DataFrame。同时我们也知道,DataFrame API实质上是一套DSL,而最终在Spark计算框架中被执行的,应当是DataFrame最终转化后的RDDs。显而易见,人肉编写的DF所对应的DSL,存在着巨大的优化空间。这也就是本次文章所有讲述的Spark SQL的后半部分内容——Catalyst.

*这里额外补一句,根据目前最新的代码,Spark已经把DataFrame这个东西去掉了,取而代之的则是Dataset,为了保持兼容,原来的DataFrame被定义为了Dataset[Row]的别名。关于这个Dataset我先观察观察,如果有必要的话,再单独拎出来说说,目前您可以假装认为它和DataFrame没有啥区别。

 

Catalyst Optimizer

 

1 运作方式

Catalyst是Spark SQL配套提供的SQL/DF优化器,我们还是先搬出这张图:

6

不管是DF还是SQL查询语句,经过Parser,都会变成逻辑执行计划Logical Plan,而Catalyst则会针对这个逻辑计划进行优化。其实过程都很容易理解,我们不妨就着上面这张图解释一下:

  1. Catalyst对Unresolved Logical Plan进行分析,与注册DF时声明的Schema进行对比,如果发现不一致(比如你列名打错了),那么就会在这一阶段跳出,并给出相应的异常提示。如果通过了分析,那么就成功晋升为Logical Plan
  2. 之后逻辑优化器Logical Optimization就要开始干活了,它会遍历整个Plan,并对当前的Plan应用第一种优化方法“Rules”,这些Rules是预先设定好的一些优化规则,例如对常数的合并或是对执行顺序进行调整。应用这些规则后的Plan虽然已经不是原来的模样,但是通过它执行并运算出的结果一定是和无优化的版本直接执行后的结果等价的,这也是优化器的原则之一:保证程序正确性的前提下,尽可能地提高程序的性能。
  3. 物理计划阶段,Catalyst会根据优化好的逻辑执行计划生成N种物理执行计划Physical Plans,所谓物理计划,也就是和最终的Spark计算框架的语义能够相互对应(你基本可以理解Spark计算框架的语义就是RDDs API)。当然,条条大路通罗马,这也是为何在这一阶段Catalyst要生成N份物理计划的原因。当然生成了之后也不能不管他们,我们必须要选择一种方案作为最终的执行方案,那么这个时候就会启用第二种优化方法“Cost”,顾名思义,就是通过计算预估各个方案的开销,然后选择开销最小的一个方案最为最终的物理计划。
  4. 最终的物理计划经过代码生成阶段就变成了RDDs(表现形式为可以被执行的代码),通常来讲,可能需要做一套专用的编译器来把这些执行计划编译为可以塞进JVM的Java bytecode,不过Spark团队为了简化这一阶段的开发,启用了一项名为“Quasiquotes”的Scala语言特色技术,让技术人员可以通过简单的拼字符串的方式拼出可执行程序(看上去有点像JavaScript的eval()函数,不过我觉得更像C#语言当中的“表达式树”,只不过功能更全,使用更简便)。通过代码生成,SQL/DF的语义最终被转换为了RDDs,而RDDs将会被Spark的执行引擎接收并计算。

2 执行计划优化

根据上面的流程,我们可以稍微总结一下Catalyst的优化方法:(1)对逻辑执行计划应用“Rule”优化,(2)随后对物理执行计划应用“Cost”优化,最后把物理执行计划生成为RDDs。

我们都知道,编译器在工作的时候,会产生一棵“语法数”,语法数的叶子节点是数据,其余节点是操作,比如下图:

1

就可以用来表示a+b+c这一表达式,在编译器将其翻译为机器代码的时候就可以通过遍历这棵树来完成。实际上DF/SQL在经过Spark SQL的Parser转换为Logical Plan过后,这个“执行计划”就是以一棵树形式被呈现的。而我们的各种优化方案实质上就是在对这棵树进行各种等价变换。

 

2.1 基于Rule的优化

在这里,Spark应用了为数不多的Scala语言特性:模式匹配,这一语言特性使得对Plan的优化变得更简单,论文中举了个简单的例子:

如果你比较习惯传统语言的代码写法,可能会觉得这个写法太黑魔法了(反正我是这么觉得的……),区别于一般的switch case语句,在Scala当中,你不止可以匹配常量,还可以匹配函数,并且可以在接下来的操作中,根据匹配到具体函数类型,你可以返回一个新的函数,在上面的示例代码中,返回的新函数将会用于替换掉匹配项。这个例子有点像编译原理中,代码优化部分的常量表达式优化,对于一个常量表达式,我们可以直接在编译期计算出它的值,从而提升运行期的效率。

接下来我们来看一个具体的例子,下面是一个Spark SQL的应用程序,是我从范例程序修改而来的,我们关注高亮的这一行代码:

代码很容易理解,就是对records表做了两次过滤操作,但是很显然我们可以发现,这段代码并不是最高效的,因为这种写法就意味着我们要对records表扫描两次,第一次先得出 key > 90 的项目,随后再在这些项目中找 value > val_9” 的。而我们都知道,其实对于这样的多重过滤,只需要one pass即可完成,也就是说这里是一个可以优化的地方。我们运行一下这段代码,然后先来看一看Analyzed Logical Plan是什么样子:

作为最忠实反映原始DSL的逻辑计划,我们可以很清晰的看到该计划中,需要对LogicalRDD进行两次Filter操作。而接下来Catalyst就会对其进行优化,产生的Optimized Logical Plan就变成了下面这个样子:

由于value字段是字符串类型,所以额外对该字段进行了判空,而之前的两个Filter操作,已经被正确地合并为了一个。这就是一个典型的基于Rule进行优化的例子,进行这种优化的Rule,我们可以在源码中找到:

这条Rule做了这样几件事:

  1. 在树中匹配两个相邻的Filter节点
  2. 把两个Filter节点所包含的所有Condition表达式的字句都抽取出来(以AND操作作为分割依据),可以记作c1, c2
  3. 然后就是去除重复的条件(比如你写了key > 90 && key > 90),记c3 = c1 – (c1 ∩ c2)
  4. 最后将两个Filter合并为一个新的Filter(如果去重操作后发现c3为空,那么就直接返回c2的那个过滤器即可),也就是把c3和c1用“AND”操作连接起来,并生成一个新的过滤器

是不是还挺好理解的?事实上,Spark SQL已经内置了大量常用的优化规则,我会在未来的文章中,对这些规则以及他们的应用方法进行详细的介绍。

 

2.2 基于Cost的优化

在产生Logical Optimization之后,就进入物理执行计划的生成阶段,Spark SQL会依据逻辑执行计划生成至少一个物理执行计划,随后通过Cost Model对每个物理执行计划进行开销评估,并选择预估开销最小的一个作为最终的物理执行计划送去做代码生成。在论文中提到,目前Cost-based优化只应用于存在select join algorithms的执行计划,由于我还没有深入地看这一部分,所以还不知晓如今最新的Spark有没有将Cost-based优化应用于其他操作,关于这点,也有望在未来的文章再来详谈,容我先去挖挖文档和源码:P

 

3 扩展数据源优化

在前一篇文章中,我提到了Spark SQL支持多种数据源以及对数据源的扩展能力(你可以把你自定义的数据格式作为Spark SQL的输入源),这项能力看上去似乎并没有什么新奇的,毕竟从工程调度来讲,无非是你的数据源提供一个适配Spark SQL输入流的适配器即可,但实际上对于大量的数据处理而言,如果只是简单的给出一个提供数据的接口,那么很可能会造成很严重的性能问题,下面我举个例子:

假设我们需要混入来自于MySQL数据库的数据,于是我们写下了下面这样的Spark SQL语句:

然后有意思的事情就发生了,如果是直接按照这个DSL生成执行的计划的话,多半是这样的:

虽然看上去没什么逻辑问题,但是实际上,我们本可以做得更好。考虑一下,如果MySQL数据表中的数据量非常巨大,在现有的这个执行逻辑下,就会导致MySQL先将该数据表的所有数据导出给Spark SQL,然后再由Spark SQL执行Filter操作,而导出数据表的所有数据的代价是非常高的,那么如何避免呢?相信你也想到了,既然MySQL本身就支持数据过滤,而且作为一个数据库,有索引有优化,那让MySQL直接给出筛选后的结果不就可以了?

也就是说,由MySQL先执行过滤操作,Spark SQL直接接收过滤后的数据即可。Spark SQL就会针对这一情况进行优化,把一部分工作交给数据源适配器去做:

2

Spark SQL已经针对常用的数据源(例如JDBC所支持的RDBMS、CSV之类的)实现了这种具备优化能力的接口,如果你有自己的数据格式,并且你对这个格式的数据已经建立了高性能的索引,那么通过实现这种可优化的数据输入接口,就可以使Spark SQL在读取你的自定义数据源的时候,更智能更高效。


 

到这里,Spark SQL这篇论文所阐述的内容基本上就算是完结了,但是我们对Spark系统的探索才刚刚开始,在本文中,我尝试加入了一点点Spark的源代码来辅助介绍,在未来的文章中,我将会一步一步深入到Spark的源代码中,看清Spark,为优化Spark打下基础。

[1] Armbrust M, Xin R S, Lian C, et al. Spark sql: Relational data processing in spark[C]//Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data. ACM, 2015: 1383-1394.

说点什么

2 评论 在 "初窥Spark SQL Catalyst"

提醒
排序:   最新 | 最旧 | 得票最多

赞一个!!!

受益匪浅,期待后面的文章…

wpDiscuz