书接上回,Spark可以说就是RDDs的化身,我们已经看到RDDs的设计方案对于大数据的计算具有诸多优势,但是随着Spark项目的推进,大家也逐渐地发现,在实际的生产领域,Spark RDDs API的编程方式并不是唯一的选择,很多的计算,都很像目前关系型数据库中的关联查询,而所谓的“信息”,其实很大部分都是产生自数据与数据之间的关联,而对于这样的数据模型,RDDs模型的表达能力还是相对有限的。

Spark 1.3.0的发布带来了全新的DataFrame API以及Spark SQL,我们继续追根溯源,找到关于DataFrame与Spark SQL的核心设计稿 – Spark sql: Relational data processing in spark[1]。

从这篇文章的角度来讲,DataFrame API和Spark SQL是配套的,从下图我们就可以看出来新增的这个模块与原有Spark框架的关系:

1

它们只是以不同的角度来诠释Spark在关系型数据的处理能力,可以看到,整个Spark SQL其实可以拆分为两个重量级的部分来描述(1)DataFrame API 以及(2)Catalyst Optimizer,接下来我们也就顺着这个思路来介绍他们。

DataFrame API

文中给出了DataFrame的核心目标:

  1. 关系式数据处理的API
  2. 可以充分利用现有数据库管理系统的优化技术
  3. 具备接纳新数据源的能力
  4. 提供更好的分析类算法的支持

我们从存储和计算两个角度来看一看这个DataFrame。

1 数据存储结构

DataFrame这个名字是取自R等语言中的概念,在这些专用于数据统计、分析的语言中,DataFrame承担了举足轻重的作用,Spark希望能够借助DataFrame实现在大数据计算平台上的同类功能,不过DataFrame究竟是什么呢?它的发明是否有意义呢?为了探究这个问题,我们将DataFrame和RDDs进行对比,首先最直观的就是他们各自的数据存储方式的差异:

2

可以看到,DataFrame直观上很像是RDDs的加强版,它和RDDs在数据存储上最大的区别就在于,DataFrame是有Schema的,通俗的讲,就是上图中蓝色框住的那个表头。不要小看这一点,对于复杂的数据类型,DataFrame的这种结构可以使编程大大简化,比如下面的这段示意性代码(虽然我用了C#的语法,但希望你可以把object看成万能且不会发生slicing的值类型):

在RDDs中,数据集中的每项数据都是一个整体,因为你无法得知其内部的结构,这也就使得你对数据项的操作能力很弱,当你想获得数据项内部的部分信息的时候,你需要手动将object按照你预先设定的数据格式进行分割,麻烦,且容易出错。而使用DataFrame,意味着你可以直接获得数据项的内部数据结构,并且由于DataFrame的Schema的存在(在上面这段代码中,Schema就相当于是struct),数据项的转换也都将是类型安全的,这对于较为复杂的数据计算程序的调试是十分有利的,很多数据类型不匹配的问题都可以在编译阶段就被检查出来,而对于不合法的数据文件,DataFrame也具备一定分辨能力。

而另一方面,我们注意到,当RDD被切割出“列”并加上“表头”变成DataFrame之后,就意味着DataFrame要支持比RDD更加细粒度的查询,而这种Table式的结构,很容易就可以让我们联想到数据库中数据表,同时DataFrame API也支持使用者对DataFrame进行数据库那样的关联、聚合、筛选等查询操作。有些人可能会因此把DataFrame和ORM放在一起对比,诚然,他们之间确实具备诸多相似之处,这些相似之处很容易让人产生混淆,所以我在此简单的对他们的区别加以解释,ORM大致如下图所示:

3

Database中的数据需要被提取出来(通过ORM框架执行SQL语句来进行),并转换为程序可操作的Objects,然后才能被开发者使用,用户对Objects做得任何修改,都不会直接影响数据库中的数据,而且像一些关联表,在ORM中甚至于根本不会被映射为Objects,他们会被隐含在Objects内部的关联关系中。总而言之,ORM会把原有的数据以不同的形式进行转换后才呈现出来。而DataFrame不一样,它自己就扮演了数据库的角色,对DataFrame的查询,都是在RDDs数据集上做得直接映射,原汁原味,就地取材。你可以把DataFrame中的数据,理解为“逻辑数据”,而“物理数据”实际上在RDDs中。

4

因此,为了能够更快更好的定位到数据,甚至于更好的利用内存与磁盘中的存储空间,DataFrame中的数据在内存和磁盘中的排列也必须更为考究,才能够在不损失性能的前提下提供这些操作。Spark SQL团队给出的方案是:按列压缩存储。

列式存储是近年来出现频率越来越多的一个概念,其实它本身很早就已经被提出了,我们首先来看下列式存储和行式存储,在存储介质上的区别,还以之前的图为例:

5

传统的关系型数据库通常都采用行式存储,可以看到,数据一条一条的紧密排列着,通常情况下,行式存储中的每行数据都是紧密排列的。而列式存储则是将行拆开,将一列的数据放在一起,同时不同列可以存放在不同的位置(由于天然利于纵向分表,所以在超大数据集的存储上,列式存储也具有一定优势)。通常情况下,我们查询一个数据并不需要检查一行数据中的每个列条目,但是在行式存储中,必须要扫描全部数据集才能够筛选出我们想要的那条数据,既然我们检索的项目很可能只是“Id”一项而已,那为什么要去管其他列呢?特别是在磁盘上,磁头访问数据的方式是线性的,如果只想根据“Id”进行筛选,即便只是上面那个只有两列的数据表,磁头移动的距离也要超过列式存储的好几倍。不过相应的,列式存储中“更新”“插入“”查询“等操作会比较麻烦,但是由于DataFrame和RDDs一样都是Immutable的,所以恰好规避了这一问题。

虽然DataFrame是可以根据情况存储到磁盘上的,但是讲道理,我们用Spark最看中的其实就是在内存中进行计算以及中间结果的转换的速度优势,而上面所说的列式存储的访存优势,对于支持随机访问的内存介质而言似乎并没有明显的优势,那么为什么DataFrame仍然会采用这种方式呢?那就是列式存储的另外一个优势:利于数据压缩。我们都知道,越“随机”的数据,越难以压缩,越“相似”的数据,越容易压缩,数据表既然分了列,那么通常来讲他们就是独立的数据项,毫无关联的数据摆在一起,你说要怎么压缩嘛。而压缩本身,对于渴望大量内存的Spark而言,能够带来巨大的实惠。

而最终,DataFrame API中这种支持对列进行访问的形式,要比RDDs API的数据访问粒度更为细腻,这也就意味着数据工程师可以根据“列”的性质,来为列建立索引,从而避免遍历所有的数据项,这项数据访问优化技术在传统的关系型数据库中也是广泛使用的,但凡需要查询的关系型数据表,都可以通过建立索引来大大增加查询效率,DataFrame将这项技术引入进来,也算是对传统数据库管理系统的技术的一种借鉴。

另一方面,Spark SQL还支持通过UDT(User-Defined Type)来扩展DataFrame中“列”的数据类型。UDT是对通过基础数据类型(String, Int, Double…)的组合来扩展数据库的类型系统,这个概念来自于面向对象数据库(其实像SQL Server\DB2等数据库都是支持的)。有些关系型数据库(例如MySQL)的数据表的字段就只能从内置的N种数据类型中进行挑选(varchar/char/int/bigint…),而支持UDT其实对于处理和存储复杂数据类型是有益的,特别在大数据场景,有些带嵌套的JSON数据格式,存成数据表的形式就比较麻烦,有了UDF,只需要定义一套符合这种数据结构的新数据类型,就可以直接应用在DataFrame的列上面,不仅可以增加DataFrame数据项的表达能力,同时也仍然享有运行时的数据类型安全监测。

UDT虽然看上去好用,能让列的数据类型更直观更具表达力,但其实也会造成一些麻烦,前文也说了,DataFrame采用列式压缩存储,而这种存储形式是针对“内置类型”的一种优化方案。为了让UDT也能享受这种优化带来的好处,Spark SQL通过把UDT映射到内置类型上(内置类型除了那些耳熟能详的基本类型,还包含Array等集合性质的数据类型),来使Spark SQL可以对其进行正确的列式压缩存储。

2 运作方式

说了“数据结构”,那么接下来自然就是“算法”了。DataFrame API在使用的时候和RDDs API具有诸多相似之处,先看论文中的介绍:

6

可以看到,在上图中出现频率最高的词莫过于“Plan”了,其实Plan和RDDs中的“Lineage”的概念是有一定相似性的。如前文所说,DataFrame也是Immutable的,它通过接收“转换”操作来产生新的DataFrame,而且它的执行同样是Lazy的,只有当遇到“Output OP”的时候才会开始执行上面那一套动作。而DataFrame和RDDs在执行过程中最大的不同就是,DataFrame API的使用实质上是一种DSL(Domain Specific Language),这个词如果你比较陌生的话,举个例子来说,其实你用过的SQL其实也是一种DSL。使用DSL就意味着Spark SQL可以对查询进行深入的观察和优化,DataFrame的一系列转换以Plan的形式,经过验证、分析、优化才形成最终的执行代码,而RDDs则是开发者写成什么样子,最后就以什么样子去执行。

Spark SQL为其DSL增加了定制能力。熟悉数据库的读者可能用过,查询语句中可以使用一些针对聚合数据的操作(例如MAX(), COUNT())或是日期函数(DATA()),并且有部分数据库支持用户自己定义这样函数并在查询语句中使用,这样的自定义函数就被称为UDF(User-Defined Function),Spark SQL支持UDF,也就意味着你可以对数据进行更为自由、方便的处理,使用起来就像这样:

除了UDF和以及前文提到的UDT之外,Spark SQL还支持扩展数据源,你几乎把任何数据源的数据放到Spark SQL里进行查询、连接……这对于数据格式多样的大数据意义非凡,Spark SQL对于其他数据源的数据获取有特殊的优化,这部分内容我将放到下一篇文章中,蘸着Catalyst来介绍。

现在,你可以把DataFrame类比为数据库视图(被Cache的DataFrame则很像传统数据库中的“物化视图”),然后把RDDs理解为存储介质上的生肉。DataFrame API则作为一套通用的数据处理工具,把一切的一切变成RDDs,然后交给作业执行引擎去跑。从上图也可以看到,优化到脚趾头的Plan最终会生成为RDDs然后在Spark上执行(DataFrame API支持开发者在使用它的过程中随时与RDD进行互换)。

其实在RDDs和DataFrame之间,有很多相似的概念,为作区分 ,我把他们稍微总结一下,避免发生不必要的困惑:

RDDs DataFrame
延迟执行 Action Operation -> Run Output Operation -> Catalyst -> Run
执行计划 Lineage – DAG Task Plan -> RDDs -> DAG Task
数据缓存 Cache Columnar Cache
执行方式 Runtime DSL
数据源 手动转换 手动转换/反射/JDBC/…

没想到这篇文章会写这么长,本来打算一并介绍Catalyst的,但是由于现在篇幅规模已经过于巨大……所以还是另开新篇单独来说吧。

[1] Armbrust M, Xin R S, Lian C, et al. Spark sql: Relational data processing in spark[C]//Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data. ACM, 2015: 1383-1394.

说点什么

2 评论 在 "进化的Spark, 从DataFrame说起"

提醒
排序:   最新 | 最旧 | 得票最多

原汁原味,读起来是享受…

[…] 在这篇文章中,我根据Spark SQL的论文,介绍了Spark SQL的一个关键模块:DataFrame API。我们现在已经知道,DataFrame和RDDs之间的关系应当是:DataFrame可以转化为RDDs,而RDDs也可以被映射为DataFrame。同时我们也知道,DataFrame API实质上是一套DSL,而最终在Spark计算框架中被执行的,应当是DataFrame最终转化后的RDDs。显而易见,人肉编写的DF所对应的DSL,存在着巨大的优化空间。这也就是本次文章所有讲述的Spark SQL的后半部分内容——Catalyst. […]

wpDiscuz