RDD
RDD的全称是:Resilient Distributed Dataset (弹性分布式数据集)
五个关键特性:
所有分区的列表
计算每个split的函数
对其他RDD上依赖的列表
可选的,对kv的RDD的Partitioner
可选的,对每个split计算偏好的位置
为解决资源不能在内存中,并且跨集群重复使用的问题,我们抽象出了RDD的概念,可以支持在大量的应用之间高效的重复利用数据。RDD是一种具备容错性、并行的数据结构,可以在内存中持久化,以便在大量的算子之间操作。
Spark RDD编程接口
在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)调用转换。
定义RDD之后,程序员就可以在动作中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。
程序员还可以从两个方面控制RDD,即缓存和分区。用户可以请求将RDD缓存,这样运行时将已经计算好的RDD分区存储起来,以加速后期的重用?;捍娴腞DD一般存储在内存中,但如果内存不够,可以写到磁盘上。
另一方面,RDD还允许用户根据关键字(key)指定分区顺序,这是一个可选的功能。目前支持哈希分区和范围分区。例如,应用程序请求将两个RDD按照同样的哈希分区方式进行分区(将同一机器上具有相同关键字的记录放在一个分区),以加速它们之间的join操作。
Spark SQL
Join
当前SparkSQL支持三种Join算法-shuffle hash join、broadcast hash join以及sort merge join
1、 确定Build Table以及Probe Table:这个概念比较重要,Build Table使用join key构建Hash Table,而Probe Table使用join key进行探测,探测成功就可以join在一起。通常情况下,小表会作为Build Table,大表作为Probe Table。
2、 构建Hash Table:依次读取Build Table(item)的数据,对于每一行数据根据join key(item.id)进行hash,hash到对应的Bucket,生成hash table中的一条记录。
3、探测:再依次扫描Probe Table(order)的数据,使用相同的hash函数映射Hash Table中的记录,映射成功之后再检查join条件(item.id = order.i_id),如果匹配成功就可以将两者join在一起。
基本流程可以参考上图,这里有两个小问题需要关注:
1、 hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为o(a+b),较之最极端的笛卡尔集运算a*b,不知甩了多少条街。
2、为什么Build Table选择小表?道理很简单,因为构建的Hash Table最好能全部加载在内存,效率最高;这也决定了hash join算法只适合至少一个小表的join场景,对于两个大表的join场景并不适用;
上文说过,hash join是传统数据库中的单机join算法,在分布式环境下需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。hash join分布式改造一般有两种经典方案:
1、broadcast hash join:
? ? 将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行hash join。broadcast适用于小表很小,可以直接广播的场景。
2、shuffle hash join:
? ? 一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据join key相同必然分区相同的原理,将两张表分别按照join key进行重新组织分区,这样就可以将join分而治之,划分为很多小join,充分利用集群资源并行化。(相当于在map端将大小按照key进行拆分重新组织分区,然后根据key分发到reduce端进行分别大小表的处理,最终再将结果进行汇总。)
shuffle hash join
在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时可以按照join key进行分区,根据key相同必然分区相同的原理,就可以将大表join分而治之,划分为很多小表的join,充分利用集群资源并行化。如下图所示,shuffle hash join也可以分为两步:
1、shuffle阶段:分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为shuffle
2、hash join阶段:每个分区节点上的数据单独执行单机hash join算法。(最后应该还要做一个union all的操作将之前处理的内容进行合并)
broadcast hash join
1、broadcast阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给driver,driver再统一分发给所有executor;要不就是基于bittorrete的p2p思路;
基于bittorrete的p2p思路可参考:
https://zhidao.baidu.com/question/9782615.html
https://baike.baidu.com/item/BitTorrent/142795?fr=aladdin
2、hash join阶段:在每个executor上执行单机版hash join,小表映射,大表试探;
sort merge join
1、shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
2、sort阶段:对单个分区节点的两表数据,分别进行排序
3、merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边(两张分区表进行join的过程中,会不断的比较索引的大小,一直以索较小的索引值遍历分区表数据。),见下图示意:
仔细分析的话会发现,sort-merge join的代价并不比shuffle hash join小,反而是多了很多。那为什么SparkSQL还会在两张大表的场景下选择使用sort-merge join算法呢?这和Spark的shuffle实现有关,目前spark的shuffle实现都适用sort-based shuffle算法,因此在经过shuffle之后partition数据都是按照key排序的。因此理论上可以认为数据经过shuffle之后是不需要sort的,可以直接merge(也就是说sort-merge-join实际只需要执行shuffle和merge阶段,而shuffle-hash-join需要执行shuffle和hash-join阶段。而对于大表join大表来说,merge阶段比hash-join阶段更优!
为什么更优:hash-join的复杂度O(a+b);而merge小于O(a+b)。a,b代表数组的长度)。
Catalyst优化器
Spark引擎的Catalyst优化器编译并优化了逻辑计划,而且还有一个能够确保生成最有效的物理计划的成本优化器。
SQL语法树和Rule
相信无论对SQL优化器有无了解,都肯定知道SQL语法树这个概念,不错,SQL语法树就是SQL语句通过编译器之后会被解析成一棵树状结构。这棵树会包含很多节点对象,每个节点都拥有特定的数据类型,同时会有0个或多个孩子节点(节点对象在代码中定义为TreeNode对象),下图是个简单的示例:
如上图所示,箭头左边表达式有3种数据类型(Literal表示常量、Attribute表示变量、Add表示动作),表示x+(1+2)。映射到右边树状结构后,每一种数据类型就会变成一个节点。另外,Tree还有一个非常重要的特性,可以通过一定的规则进行等价变换,如下图:
上图定义了一个等价变换规则(Rule):两个Integer类型的常量相加可以等价转换为一个Integer常量,这个规则其实很简单,对于上文中提到的表达式x+(1+2)来说就可以转变为x+3。对于程序来讲,如何找到两个Integer常量呢?其实就是简单的二叉树遍历算法,每遍历到一个节点,就模式匹配当前节点为Add、左右子节点是Integer常量的结构,定位到之后将此三个节点替换为一个Literal类型的节点。
上面用一个最简单的示例来说明等价变换规则以及如何将规则应用于语法树。在任何一个SQL优化器中,通?;岫ㄒ宕罅康腞ule(后面会讲到),SQL优化器会遍历语法树中每个节点,针对遍历到的节点模式匹配所有给定规则(Rule),如果有匹配成功的,就进行相应转换,如果所有规则都匹配失败,就继续遍历下一个节点。
优化器工作原理
任何一个优化器工作原理都大同小异:
SQL语句首先通过Parser??楸唤馕鑫锓ㄊ?,此棵树称为Unresolved Logical Plan;
Unresolved Logical Plan通过Analyzer??榻柚谠萁馕鑫狶ogical Plan;
此时再通过各种基于规则的优化策略进行深入优化,得到Optimized Logical Plan;
优化后的逻辑执行计划依然是逻辑的,并不能被Spark系统理解,此时需要将此逻辑执行计划转换为Physical Plan;
为了更好的对整个过程进行理解,下文通过一个简单示例进行解释。
Parser
Parser简单来说是将SQL字符串切分成一个一个Token,再根据一定语义规则解析为一棵语法树。Parser模块目前基本都使用第三方类库ANTLR进行实现,比如Hive、 Presto、SparkSQL等。下图是一个示例性的SQL语句(有两张表,其中people表主要存储用户基本信息,score表存储用户的各种成绩),通过Parser解析后的AST语法树如下图所示:
Analyzer
通过解析后的逻辑执行计划基本有了骨架,但是系统并不知道score、sum这些都是些什么鬼,此时需要基本的元数据信息来表达这些词素,最重要的元数据信息主要包括两部分:表的Scheme和基本函数信息,表的scheme主要包括表的基本定义(列名、数据类型)、表的数据格式(Json、Text)、表的物理位置等,基本函数信息主要指类信息。
Analyzer会再次遍历整个语法树,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变量,sum会被解析为特定的聚合函数,如下图所示:
SparkSQL中Analyzer定义了各种解析规则,有兴趣深入了解的童鞋可以查看Analyzer类,其中定义了基本的解析规则,如下:
Optimizer(优化器)
优化器是整个Catalyst的核心,上文提到优化器分为基于规则优化和基于代价优化两种,当前SparkSQL 2.1依然没有很好的支持基于代价优化(下文细讲),此处只介绍基于规则的优化策略,基于规则的优化策略实际上就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换。因此,基于规则优化说到底就是一棵树等价地转换为另一棵树。SQL中经典的优化规则有很多,下文结合示例介绍三种比较常见的规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)。
上图左边是经过Analyzer解析前的语法树,语法树中两个表先做join,之后再使用age>10对结果进行过滤。大家知道join算子通常是一个非常耗时的算子,耗时多少一般取决于参与join的两个表的大小,如果能够减少参与join两表的大小,就可以大大降低join算子所需时间。谓词下推就是这样一种功能,它会将过滤操作下推到join之前进行,上图中过滤条件age>0以及id!=null两个条件就分别下推到了join之前。这样,系统在扫描数据的时候就对数据进行了过滤,参与join的数据量将会得到显著的减少,join耗时必然也会降低。
常量累加其实很简单,就是上文中提到的规则 x+(1+2) -> x+3,虽然是一个很小的改动,但是意义巨大。示例如果没有进行优化的话,每一条结果都需要执行一次100+80的操作,然后再与变量math_score以及english_score相加,而优化后就不需要再执行100+80操作。
列值裁剪是另一个经典的规则,示例中对于people表来说,并不需要扫描它的所有列值,而只需要列值id,所以在扫描people之后需要将其他列进行裁剪,只留下列id。这个优化一方面大幅度减少了网络、内存数据量消耗,另一方面对于列存数据库(Parquet)来说大大提高了扫描效率。
至此,逻辑执行计划已经得到了比较完善的优化,然而,逻辑执行计划依然没办法真正执行,他们只是逻辑上可行,实际上Spark并不知道如何去执行这个东西。比如Join只是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现这个合并,逻辑执行计划并没有说明。
此时就需要将逻辑执行计划转换为物理执行计划,将逻辑上可行的执行计划变为Spark可以真正执行的计划。比如Join算子,Spark根据不同场景为该算子制定了不同的算法策略,有BroadcastHashJoin、ShuffleHashJoin以及SortMergeJoin等(可以将Join理解为一个接口,BroadcastHashJoin是其中一个具体实现),物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,这个过程涉及到基于代价优化策略,后续文章细讲。
PySpark 的原理
Spark运行时架构
首先我们先回顾下Spark的基本运行时架构,如下图所示,其中橙色部分表示为JVM,Spark应用程序运行时主要分为Driver和Executor,Driver负载总体调度及UI展示,Executor负责Task运行,Spark可以部署在多种资源管理系统中,例如Yarn、Mesos等,同时Spark自身也实现了一种简单的Standalone(独立部署)资源管理系统,可以不用借助其他资源管理系统即可运行。
用户的Spark应用程序运行在Driver上(某种程度上说,用户的程序就是Spark Driver程序),经过Spark调度封装成一个个Task,再将这些Task信息发给Executor执行,Task信息包括代码逻辑以及数据信息,Executor不直接运行用户的代码。
PySpark运行时架构
为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。
其中白色部分是新增的Python进程,在Driver端,通过Py4j实现在Python中调用Java的方法,即将用户写的PySpark程序”映射”到JVM中,例如,用户在PySpark中实例化一个Python的SparkContext对象,最终会在JVM中实例化Scala的SparkContext对象;在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码,虽然里面可能包含有用户定义的Python函数或Lambda表达式,Py4j并不能实现在Java里调用Python的方法,为了能在Executor端运行用户定义的Python函数或Lambda表达式,则需要为每个Task单独启一个Python进程,通过socket通信方式将Python函数或Lambda表达式发给Python进程执行。语言层面的交互总体流程如下图所示,实线表示方法调用,虚线表示结果返回。
下面分别详细剖析PySpark的Driver是如何运行起来的以及Executor是如何运行Task的。
Driver端运行原理
当我们通过spark-submmit提交pyspark程序,首先会上传python脚本及依赖,并申请Driver资源,当申请到Driver资源后,会通过PythonRunner(其中有main方法)拉起JVM,如下图所示。
PythonRunner入口main函数里主要做两件事:
开启Py4j GatewayServer
通过Java Process方式运行用户上传的Python脚本
用户Python脚本起来后,首先会实例化Python版的SparkContext对象,在实例化过程中会做两件事:
实例化Py4j GatewayClient,连接JVM中的Py4j GatewayServer,后续在Python中调用Java的方法都是借助这个Py4j Gateway
通过Py4j Gateway在JVM中实例化SparkContext对象
经过上面两步后,SparkContext对象初始化完毕,Driver已经起来了,开始申请Executor资源,同时开始调度任务。用户Python脚本中定义的一系列处理逻辑最终遇到action方法后会触发Job的提交,提交Job时是直接通过Py4j调用Java的PythonRDD.runJob方法完成,映射到JVM中,会转给sparkContext.runJob方法,Job运行完成后,JVM中会开启一个本地Socket等待Python进程拉取,对应地,Python进程在调用PythonRDD.runJob后就会通过Socket去拉取结果。
把前面运行时架构图中Driver部分单独拉出来,如下图所示,通过PythonRunner入口main函数拉起JVM和Python进程,JVM进程对应下图橙色部分,Python进程对应下图白色部分。Python进程通过Py4j调用Java方法提交Job,Job运行结果通过本地Socket被拉取到Python进程?;褂幸坏闶牵杂诖笫萘?,例如广播变量等,Python进程和JVM进程是通过本地文件系统来交互,以减少进程间的数据传输。
Executor端运行原理
为了方便阐述,以Spark On Yarn为例,当Driver申请到Executor资源时,会通过CoarseGrainedExecutorBackend(其中有main方法)拉起JVM,启动一些必要的服务后等待Driver的Task下发,在还没有Task下发过来时,Executor端是没有Python进程的。当收到Driver下发过来的Task后,Executor的内部运行过程如下图所示。
Executor端收到Task后,会通过launchTask运行Task,最后会调用到PythonRDD的compute方法,来处理一个分区的数据,PythonRDD的compute方法的计算流程大致分三步走:
如果不存在pyspark.deamon后台Python进程,那么通过Java Process的方式启动pyspark.deamon后台进程,注意每个Executor上只会有一个pyspark.deamon后台进程,否则,直接通过Socket连接pyspark.deamon,请求开启一个pyspark.worker进程运行用户定义的Python函数或Lambda表达式。pyspark.deamon是一个典型的多进程服务器,来一个Socket请求,fork一个pyspark.worker进程处理,一个Executor上同时运行多少个Task,就会有多少个对应的pyspark.worker进程。
紧接着会单独开一个线程,给pyspark.worker进程喂数据,pyspark.worker则会调用用户定义的Python函数或Lambda表达式处理计算。
在一边喂数据的过程中,另一边则通过Socket去拉取pyspark.worker的计算结果。
把前面运行时架构图中Executor部分单独拉出来,如下图所示,橙色部分为JVM进程,白色部分为Python进程,每个Executor上有一个公共的pyspark.deamon进程,负责接收Task请求,并fork pyspark.worker进程单独处理每个Task,实际数据处理过程中,pyspark.worker进程和JVM Task会较频繁地进行本地Socket数据通信。
总结
总体上来说,PySpark是借助Py4j实现Python调用Java,来驱动Spark应用程序,本质上主要还是JVM runtime,Java到Python的结果返回是通过本地Socket完成。虽然这种架构保证了Spark核心代码的独立性,但是在大数据场景下,JVM和Python进程间频繁的数据通信导致其性能损耗较多,恶劣时还可能会直接卡死,所以建议对于大规模机器学习或者Streaming应用场景还是慎用PySpark,尽量使用原生的Scala/Java编写应用程序,对于中小规模数据量下的简单离线任务,可以使用PySpark快速部署提交。
参考:
http://sharkdtu.com/posts/pyspark-internal.html
Spark部署模式
Yarn-cluster
在Yarn-cluster模式下,driver运行在Appliaction Master上,Appliaction Master进程同时负责驱动Application和从Yarn中申请资源,该进程运行在Yarn container内,所以启动Application Master的client可以立即关闭而不必持续到Application的生命周期,下图是yarn-cluster模式
Yarn-client
在Yarn-client中,Application Master仅仅从Yarn中申请资源给Executor,之后client会跟container通信进行作业的调度,下图是Yarn-client模式
Spark内存模型
堆内和堆外内存规划
作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
图 1 . 堆内和堆外内存示意图
堆内内存
堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。
堆外内存
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
内存空间分配
静态内存管理
在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图 2 所示:
图 2 . 静态内存管理图示——堆内
统一内存管理
Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,如图 4 和图 5 所示
图 4 . 统一内存管理图示——堆内
图 5 . 统一内存管理图示——堆外
其中最重要的优化在于动态占用机制,其规则如下:
设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间
存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂[4]
图 6 . 动态占用机制图示
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的[5]。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。
存储内存管理
RDD 的持久化机制
弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换(Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时,Spark 才会创建任务读取 RDD,然后真正触发转换的执行。
Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管 理 (存储内存的其他应用场景,如缓存 broadcast 数据,暂时不在本文的讨论范围之内)。
RDD 的持久化由 Spark 的 Storage ??閇7]负责,实现了 RDD 与物理存储的解耦合。Storage ??楦涸鸸芾?Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage ??楣钩闪酥鞔邮降募芄?,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage ??樵诼呒弦?Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。
图 7 . Storage ??槭疽馔?/p>
RDD 缓存的过程
RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器( Iterator )的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项(Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不连续。
RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。 将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为”展开”(Unroll) 。Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例[6],对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图 8 所示。
图 8. Spark Unroll 示意图
在图 3 和图 5 中可以看到,在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。
淘汰和落盘
由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。
存储内存的淘汰规则为:
被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存
新旧 Block 不能属于同一个 RDD,避免循环淘汰
旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题
遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。
落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其_deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage ??橹懈缕湫畔?。
执行内存管理
多任务间内存分配
Executor 内运行的任务同样共享执行内存,Spark 用一个 HashMap 结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的任务的个数。每个任务在启动之时,要向 MemoryManager 请求申请最少为 1/2N 的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。
Shuffle 的内存占用
执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程,我们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的使用:
Shuffle Write
若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。、
Shuffle Read
在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。
在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。
Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划[9],解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序。Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是这个内存块在系统内存中的 64 位绝对地址。Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。
Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成:
页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。
页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。
有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和 CPU 使用效率带来了明显的提升[10]。
Spark 的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。
Spark Shuffle
Spark Shuffle 的发展
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle
Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort并入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出历史舞台
未优化的 HashShuffle
每一个 ShuffleMapTask 都会为每一个 ReducerTask 创建一个单独的文件,总的文件数是 M * R,其中 M 是 ShuffleMapTask 的数量,R 是 ShuffleReduceTask 的数量。
在处理大数据时,ShuffleMapTask 和 ShuffleReduceTask 的数量很多,创建的磁盘文件数量 M*R 也越多,大量的文件要写磁盘,再从磁盘读出来,不仅会占用大量的时间,而且每个磁盘文件记录的句柄都会保存在内存中(每个人大约 100k),因此也会占用很大的内存空间,频繁的打开和关闭文件,会导致频繁的GC操作,很容易出现 OOM 的情况。
也正是上述原因,该 HashShuffle 如今已退出历史舞台。
优化后 HashShuffle
在 Spark 0.8.1 版本中,引入了 Consolidation 机制,该机制是对 HashShuffle 的一种优化。
可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。
先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。
这样,每个 worker 持有的文件数降为 cores * R。cores 代表核数,R 是 ShuffleReduceTask 数。
Sort-Based Shuffle
由于 HashShuffle 会产生很多的磁盘文件,引入 Consolidation 机制虽然在一定程度上减少了磁盘文件数量,但是不足以有效提高 Shuffle 的性能,适合中小型数据规模的大数据处理。
为了让 Spark 在更大规模的集群上更高性能处理更大规模的数据,因此在 Spark 1.1 版本中,引入了 SortShuffle。
该机制每一个 ShuffleMapTask 都只创建一个文件,将所有的 ShuffleReduceTask 的输入都写入同一个文件,并且对应生成一个索引文件。
以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer 缓存所占用的内存大小,而且同时避免 GC 的风险和频率。
您可以输出按“ reducer” id排序并建立索引的单个文件,这样您就可以轻松地获取块通过仅获取有关文件中相关数据块位置的信息并在读取前进行一次fseek来查找与“ reducer x”相关的数据。但是,当然对于少量的“ reducer”来说,对单独文件进行散列操作显然要比排序更快,因此排序改组有一个“后备”计划:当“ reducers”的数量小于“ spark.shuffle. sort.bypassMergeThreshold”(默认为200)时,我们使用“后备”计划,先对数据进行哈希处理以分离出文件,然后将这些文件合并为一个文件。此逻辑在单独的类BypassMergeSortShuffleWriter中实现。
spark shuffle 过程
两个stage中间结果通过shuffle传递,shuffle的三个步骤如图所示
每一个executor一旦启动就在同一节点的Spark External Shuffle Service(ESS)上面注册,这样的注册让spark ESS知道来自每个注册executor的本地map任务生成的物化shuffle数据的位置,注意ESS是在executor之外的并且由多个spark应用共享。
在 shuffle map stage里面的每个任务处理自己的数据,在每个map任务的末尾,它生成一对文件,一个是shuffle的数据,一个是shuffle block的索引文件。为了这样,map任务需要对所有transform的数据根据partition key的哈希值排序,在这个过程中,map任务可能会因为不能在内存中排序所有数据而将数据溢写到磁盘里。一旦排序完成,shuffle数据文件就会生成,属于同一个shuffle partition的所有记录都会被整合到一个shuffle block里面,与之匹配的shuffle index文件也会生成,它记录了每个block边界的offset。
当下一个stage的reduce的任务要运行时,它们将会向driver查询它们输入的shuffle block的位置。一旦得到这些数据,每个reduce task会建立一个到对应ESS的实例连接以便获取数据。ESS一旦接收到这些请求,会根据shuffle index文件查询跳转到对应的shuffle block文件,读取这个block并将数据发送回reduce task。
将executor和ESS解耦的优点:
ESS在executor GC暂停是仍能提供shuffle block的服务。
即使生成shuffle block的executor销毁仍然能够提供服务。
闲置的executor可以被清空释放资源
对该shuffle的优化可以参考:
https://issues.apache.org/jira/browse/SPARK-30602
任务调度器Scheduler
宽依赖与窄依赖
RDD之间的依赖关系可以分为两类,即宽依赖和窄依赖,宽依赖与窄依赖的区分主要是父partition与子partition的对应关系,区分宽窄依赖主要就是看父RDD的一个partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖(用图的概念即判断父partition的出度)。
stage的划分
Spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成互相依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。
如上图所示,A/B/C/D/E/F代表RDD,当执行算子存在shuffle操作的时候,就划分一个stage,即用宽依赖来划分stage。窄依赖会被划分到同一个stage中,这样他们就可以以管道的方式执行,宽依赖由于依赖的上游不止一个,所以往往需要需要跨节点传输数据。
Scheduler
DagScheduler(是高级的调度)。DAGScheduler负责将Task拆分成不同Stage的具有依赖关系(包含RDD的依赖关系)的多批任务,然后提交给TaskScheduler进行具体处理。DAG全称 Directed Acyclic Graph,有向无环图。简单的来说,就是一个由顶点和有方向性的边构成的图,从任意一个顶点出发,没有任何一条路径会将其带回到出发的顶点。
TaskScheduler(是低级的调度器接口)。TaskScheduler负责实际每个具体Task的物理调度。
Spark的任务调度总体来说分两路进行,一路是Stage级的调度,一路是Task级的调度,总体调度流程如下图所示。
Spark存储系统
三种存储数据的操作
任何一个存储系统要解决的关键问题无非是数据的存与取、收与发,不过,在去探讨 Spark 存储系统如何工作之前,咱们先来搞清楚 Spark 存储系统中“存”的主要是什么内容?总的来说,Spark 存储系统用于存储 3 个方面的数据:
RDD 缓存:RDD 缓存指的是将 DAG 中某些计算成本较高且访问频率较高的数据形态以缓存的形式物化到内存或磁盘的过程。对于血统较长的 DAG 来说,RDD 缓存一来可以通过截断 DAG 从而降低失败重试的开销,二来通过缓存在内存或磁盘中的数据来从整体上提升作业的端到端执行性能。
Shuffle 中间结果:Shuffle writer 按照 Shuffle 分区规则将本节点数据以分片(Splits)的形式写入本地磁盘(或内存)。
广播变量:广播变量的设计初衷是为了解决 Spark 调度系统中任务调度的开销问题。
详解
Spark 存储系统提供了两种存储实现,分别是内存存储 MemoryStore 和磁盘存储 DiskStore。从名字我们就能看出来,MemoryStore 用于存储内存中的数据块,而 DiskStore 则用来存储磁盘中的数据块。
Spark 支持两种数据存储形式,即对象值和字节数组,且两者之间可以相互转化。将对象值压缩为字节数组的过程,称为序列化;相反,将字节数组还原为对象值,称之为反序列化。序列化的字节数组就像是从宜家家具超市购买的待组装板材(外加组装说明书),而对象值则是将板材拆包、并根据说明书组装而成的各种桌椅板凳。对象值的优点是“拿来即用”、所见即所得,缺点是所需的存储空间大、占地儿。相比之下,序列化的字节数组的空间利用率要高得多,不过要是急用桌椅板凳的话,还得根据说明书现装,略麻烦。
由此可见,二者的关系是一种博弈,所谓的“以空间换时间”和“以时间换空间”,具体的取舍还要看使用场景,想省地儿,您就用字节数组,想以最快的速度访问对象,对象值的存储方式还是来的更直接一些。不过像这种选择的烦恼只存在于 MemoryStore 之中,DiskStore 只能存序列化后的字节数组 —— 这里咱们多说两嘴,凡是需要走网络或落盘的数据,都是需要序列化的。
先说 DiskBlockManager,DiskBlockManager 的主要职责是记录逻辑数据块 Block 与磁盘文件系统中物理文件的对应关系。每个 Block 都对应一个磁盘文件,同理,每个磁盘文件都有一个与之对应的 Block ID,这就好比仓库中的每一件货物都有唯一的 ID 标识,显而易见,DiskBlockManager 就是专用仓储基地 DiskStore 的“库管”。
相较于 DiskBlockManager,MemoryStore“库管”BlockInfoManager 就显得没那么“称职”,对于 MemoryStore 中存储的数据,BlockInfoManager 的职责仅包含如下各项:记录数据块大小、维护加持在 Block 上的读锁与写锁、维护任务的读写权限状态。简言之,BlockInfoManager 主要用于保持 MemoryStore 中数据状态的一致性,而不是用于维护逻辑块与物理存储的对应关系。那么问题来了,想要拉取 MemoryStore 中“货物”的卡车司机怎么知道货物存储在哪个货架呢?
要回答这个问题,咱们还要说回 MemoryStore,前文书咱们说到 MemoryStore 可以存储两种形式的数据,即对象值和字节数组,对于这两种数据形式,MemoryStore 统一采用 MemoryEntry 抽象来进行封装。
MemoryEntry 实现为 Scala Trait,主要成员为数据块大小和数据类型,它的两个实现类 DeserializedMemoryEntry 和 SerializedMemoryEntry 分别用于封装对象值和字节数组,其中 DeserializedMemoryEntry 利用 Array[T] 来存储对象值序列,而 SerializedMemoryEntry 利用 ByteBuffer 来存储序列化后的字节序列。
MemoryStore 通过一种高效的数据结构来统一数据块的存储与访问:LinkedHashMap[BlockId, MemoryEntry],即 Key 为 BlockId、Value 是 MemoryEntry 的映射。显然,一个 Block 对应一个 MemoryEntry,MemoryEntry 既可以是 DeserializedMemoryEntry、也可以是 SerializedMemoryEntry。有了这个 LinkedHashMap,通过 BlockId 即可方便地定位 MemoryEntry,从而实现数据块的快速存取。
MemoryStore 中的数据存储流程
先来看数据存储的流程,也即将 RDD 中数据分片(Partitions/Blocks)的 Iterator 物化为数据存储的过程。
逻辑上,RDD 数据分片(也即 Partition)与 Block 是一一对应的,不过需要指出的是,Partition 的编号规则与 Block 的编号规则不见得保持一致。MemoryStore 提供了 putIteratorAsValues 和 putIteratorAsBytes 来将 RDD 数据分片对应的迭代器分别物化为对象值序列和字节序列,具体流程如上图所示。
值得注意的是,在将数据封装为 MemoryEntry 之前,MemoryStore 先利用 ValuesHolder 对 Iterator 进行展开(Unroll),展开的过程实际上就是物化的过程,数据实实在在地存储到 ValuesHolder 封装的数据结构(Vector 或 OutputStream)中,这些物化的数据在之后封装为 MemoryEntry 的过程中,仅仅(通过 toArray、toByteBuffer 等操作)在数据类型上做了转换,并没有带来额外的内存消耗,Spark 源码中将这个过程称之为:从 Unroll memory 到 Storage memory 的“Transfer(转移)”。
理顺了数据存储的流程,数据的读取和访问则一目了然。MemoryStore 提供 getValues 和 getBytes 方法,根据 BlockId 分别访问对象值与字节序列,如下图所示。两种方法首先通过 BlockId 获取到对应的 DeserializedMemoryEntry 或 SerializedMemoryEntry,然后在通过访问各自封装的 Array[T] 和 ByteBuffer 来读取数据内容。
MemoryStore 中的数据访问
说到 MemoryStore 中数据的存与取,有几个重要的角色不得不提,他们分别是:
BlockInfoManager:前文书已有交代,其主要职责是通过锁机制来保证多任务并发情况下数据访问的一致性。
SerializerMananger:顾名思义,自然是负责 MemoryStore 中数据的序列化与反序列化。
MemoryManager:Spark 内存管理器,这可是斯巴克国际建筑集团分公司举足轻重的一位大佬,我们在下一篇《Spark 内存管理》中会有更详细的交代。
如果非要用一句话概括,MemoryManager 的主要职能是维持不同内存区域(Storage memory, Shuffle memory, Runtime memory 等)之间的平衡、以及维持多任务并发下不同线程之间内存消耗的平衡。
BlockEvictionHandler:这个角色比较有意思,他负责把 MemoryStore 中的数据块“驱逐”出内存 —— 通常情况下都会把这些被驱逐的 Block“撵”到 DiskStore 中去,也即把内存中物化的数据转移到磁盘存储中。一个典型的场景是当 RDD 缓存采用 MEMORY_AND_DISK 模式且内存不足以容纳整个 RDD 数据集时,根据 LRU 原则,访问频次较低且访问时间较为久远的 Block 就会被 BlockEvictionHandler“下放”到 DiskStore 中去。
在本篇的开始,咱们说到 SparkContext 在初始化的过程中会创建一系列的对象来分别服务于众多的 Spark 子系统 —— 如调度系统、存储系统、内存管理、Shuffle 管理、RPC 系统等,我们暂且把这些对象称之为“上下文对象”。BlockManager 作为 Spark 存储系统的入口,以组合的设计模式持有多个“上下文对象”的引用,封装了与数据存取有关的所有抽象。BlockManager 的组合对象星罗云布,到目前为止我们接触过的有:
MemoryStore、DiskStore、BlockInfoManager、DiskBlockManager —— 本地数据存取
MemoryManager —— 维护不同内存区域之间的平衡
SerializerManager —— 序列化管理器
除了以上用于访问本地存储的类,Spark 存储系统正是仰仗 BlockTransferService 来提供跨节点的数据存取。
BlockTransferService 抽象主要提供两种方法来支持不同类型的计算任务,即如上图所示的 fetchBlockSync 方法和 uploadBlockSync 方法。
BlockTransferService,顾名思义,既然是 Service,自然就绕不开 Server/Client 的概念。fetchBlockSync 方法和 uploadBlockSync 方法都属于客户端方法,用于向服务端提交“下载数据块”和“上传数据块”的请求。
Spark存储体系设计
BlockManagerMaster:代理BlockManager与Driver上的BlockManagerMasterEndpoint通信。记号①表示Executor节点上的BlockManager通过BockManagerMaster与BlockManagerMasterEndpoint进行通信,记号②表示Driver节点上的BlockManager通过BlockManagerMaster与BlockManagerMasterEndpoint进行通信。这些通信的内容有很多,例如,注册BlockManager、更新Block信息、获取Block的位置(即Block所在的BlockManager)、删除Executor等。BlockManagerMaster之所以能够和BlockManagerMasterEndpoint通信,是因为它持有了BlockManagerMasterEndpoint的RpcEndpointRef。
BlockManagerMasterEndpoint:由Driver上的SparkEnv负责创建和注册到Driver的RpcEnv中。BlockManagerMasterEndpoint只存在于Driver的SparkEnv中,Driver或Executor上BlockManagerMaster的driverEndpoint属性将持有BlockManagerMasterEndpoint的RpcEndpointRef。BlockManagerMasterEndpoint主要对各个节点上的BlockManager、BlockManager与Executor的映射关系及Block位置信息(即Block所在的BlockManager)等进行管理。
BlockManagerSlaveEndpoint:每个Executor或Driver的SparkEnv中都有属于自己的BlockManagerSlaveEndpoint,分别由各自的SparkEnv负责创建和注册到各自的RpcEnv中。Driver或Executor都存在各自的BlockManagerSlaveEndpoint,并由各自BlockManager的slaveEndpoint属性持有各自BlockManagerSlaveEndpoint下发的命令。记号③表示BlockManagerMasterEndpoint向Driver节点上的BlockManagerSlaveEndpoint下发命令,记号④表示BlockManagerMasterEndpoint向Executor节点上的BlockManagerSlaveEndpoint下发命令。例如,删除Block、获取Block状态、获取匹配的BlockId等。
SerializerManager:序列化管理器
MemoryManager:内存管理器。负责对单个节点上内存的分配与回收
MapOutPutTracker:map任务输出跟踪器。
ShuffleManager:Shuffle管理器
BlockTransferService:块传输服务。此组件也与Shuffle相关联,主要用于不同阶段的任务之间的Block数据的传输与读写。
shuffleClinet:Shuffle的客户端。与BlockTransferService配合使用。记号⑤表示Executor上的shuffleClient通过Driver上的BlockTransferService提供的服务上传和下载Block,记号⑥表示Driver上的shuffleClient通过Executor上的BlockTransferService提供的服务上传和下载Block。此外,不同Executor节点上的BlockTransferService和shuffleClient之间也可以互相上传、下载Block。
SecurityManager:安全管理器
DiskBlockManager:磁盘块管理器。对磁盘上的文件及目录的读写操作进行管理
BlockInfoManager:块信息管理器。负责对Block的元数据及锁资源进行管理
MemoryStore:内存存储。依赖于MemoryManager,负责对Block的内存存在
DiskStore:磁盘存储。依赖于DiskBlockManager,负责对Block的磁盘存储
Spark度量系统
Spark的度量系统有以下几部分,也可以参照MetricsSystem类的注释部分
Instance: 数据实例。Spark的Instance有Master、Worker、ApplicationInfo、StreamingContext等,主要用来提供Source数据、启停MetricsSystem
Source: 度量数据输入源。Source采集的数据来源于Instance实例属性
Sink: 度量数据输出源。Spark使用MetricsServlet作为默认Sink
MetricsConfig: 度量需要的配置信息。initialize()方法初始化properties
MetricsSystem: instance粒度的Source、Sink控制中心
Source
Spark将度量数据来源抽象为Source接口。提供了ApplicationSource、MasterSource、WorkerSource、DAGSchedulerSource、StreamingSource、JvmSource等实现
Sink
Spark将度量数据统计输出源抽象为Sink接口。提供了ConsoleSink、CsvSink、MetricsServlet、GraphiteSink、JmxSink、Slf4jSink等实现
MetricsConfig
读取Metrics相关的配置信息
MetricsSystem
负责register Sources、Sinks,并start sinks。MetricsSystem不是系统的控制中心,而是每个instance一个MetricsSystem对象,负责instance粒度的控制
MetricsSystem类三个核心方法: registerSources()、registerSinks()、sinks.foreach(_.start)
使用Graphite Sink监控spark应用
spark 是自带 Graphite Sink 的,这下省事了,只需要配置一把就可以生效了
/path/to/spark/conf/metrics.properties
*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=<metrics_hostname>
*.sink.graphite.port=<metrics_port>
*.sink.graphite.period=5
*.sink.graphite.unit=seconds
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
提交时记得使用 --files /path/to/spark/conf/metrics.properties 参数将配置文件分发到所有的 Executor,否则将采集不到相应的 executor 数据。
具体可以参考
http://rokroskar.github.io/monitoring-spark-on-hadoop-with-prometheus-and-grafana.html
Spark3.0特性
忽略数据本地性
在生产环境中有一个纯Spark集群和一个S3存储系统,其中计算与存储层是分开的。在这种部署模式下,数据局部性永远无法到达。
Spark调度程序中有一些配置可以减少数据本地性的等待时间(例如“ spark.locality.wait”)。而问题在于,在列出文件阶段,所有文件的位置信息以及每个文件内的所有block都从分布式文件系统中获取。实际上,在生产环境中,表可能是如此之大,以至于获取所有这些位置信息都需要花费数十秒的时间。
为了改善这种情况,Spark需要提供一个选项,在该选项中可以完全忽略数据位置,在列表文件阶段,我们需要的是文件位置,而没有任何块位置信息。
参数spark.sql.sources.ignore.datalocality默认为false,如果为true, Spark不会获取列表文件中每个文件的块位置。这可以加快文件列表的速度,但是调度器不能调度任务来利用数据局部性。如果数据是从远程集群读取的,这样调度器无论如何都无法利用局部性,那么这种方法就特别有用。
参考:https://issues.apache.org/jira/browse/SPARK-29189
Barrier Execution Mode
为了在大数据上让Spark支持深度学习而新增的特性,想象这样一个场景:我要建设一个数据流水线,从数据仓库中拿到训练数据并且以数据的并行度去训练深度学习模型。数仓一般是Hive、、Redshift,可以通过Spark拿到数据,而深度学习模型分布式训练一般是用TensorFlow?、Horovod,二者之间有割裂,我们可以让这件事更简单吗?于是便有屏障执行模式了。
这个模式为Spark添加了一个新的调度模型让用户可以适当的将深度学习模型嵌入为Spark的Stage从而简化深度学习任务流。
列如,Horovod?使用MPI实现?all-reduce去加速分布式TensorFlow训练。这种计算模型与Spark的MapReduce?不同。
在Spark中,一个stage中的task不会依赖同一个stage中的任何其他task,因此每个task可以被独立调度。
在MPI中,所有worker同时启动并相互传递信息。为了将这种工作模式带入Spark,便有了Barrier Execution Mode,这种模式使用了新的调度模式----barrier scheduling,这种模式同时启动task并且提供用户足够的信息和工具嵌入深度学习模型。Spark也提供额外的容错层以防中间的task失败,这种情况下Spark会丢弃所有task并重启stage。
参考:https://issues.apache.org/jira/browse/SPARK-24374
动态分区裁剪
使用spark不同的API都可以触发动态分区裁剪特性,spark生成逻辑执行计划后再生成物理执行计划,并在计划层面进行优化,这里从计划层面讨论下动态分区裁剪特性。
从逻辑执行计划考虑动态分区裁剪优化,假设有一个以颜色分区的分区表,一个未必分区的小表,假设我们对小表进行过滤,分区表上只有两个分区与过滤后的小表对应,如果进行join,分区表就只有这两个分区会被保留。因此,在这里可以优化,无需对分区表所有数据取出后join,在取出前按照小表的结果过滤,把过滤后的结果进行join,这样效率更高,我们只要在分区表前加上一个子查询,这个子查询拿到的是小表的结果,这样就可以对大表只取部分数据。
自适应查询执行AQE
自适应查询执行(又称 Adaptive Query Optimisation 或者 Adaptive Optimisation)是对查询执行计划的优化,允许 Spark Planner 在运行时执行可选的执行计划,这些计划将基于运行时统计数据进行优化。
自适应查询执行框架
自适应查询执行最重要的问题之一是何时进行重新优化。Spark 算子通常是以 pipeline 形式进行,并以并行的方式执行。然而,shuffle 或 broadcast exchange 打破了这个管道。我们称它们为物化点,并使用术语“查询阶段”来表示查询中由这些物化点限定的子部分。每个查询阶段都会物化它的中间结果,只有当运行物化的所有并行进程都完成时,才能继续执行下一个阶段。这为重新优化提供了一个绝佳的机会,因为此时所有分区上的数据统计都是可用的,并且后续操作还没有开始。
当查询开始时,自适应查询执行框架首先启动所有叶子阶段(leaf stages)——这些阶段不依赖于任何其他阶段。一旦其中一个或多个阶段完成物化,框架便会在物理查询计划中将它们标记为完成,并相应地更新逻辑查询计划,同时从完成的阶段检索运行时统计信息?;谡庑┬碌耐臣菩畔?,框架将运行优化程序、物理计划程序以及物理优化规则,其中包括常规物理规则(regular physical rules)和自适应执行特定的规则,如合并分区(coalescing partitions)、Join 数据倾斜处理(skew join handling)等。现在我们有了一个新优化的查询计划,其中包含一些已完成的阶段,自适应执行框架将搜索并执行子阶段已全部物化的新查询阶段,并重复上面的 execute-reoptimize-execute 过程,直到完成整个查询。
AQE 框架目前提供了三个功能:
动态合并 shuffle partitions
当在 Spark 中运行查询来处理非常大的数据时,shuffle 通常对查询性能有非常重要的影响。Shuffle 是一个昂贵的操作符,因为它需要在网络中移动数据,因此数据是按照下游操作符所要求的方式重新分布的。
shuffle 的一个关键属性是分区的数量。分区的最佳数量取决于数据,但是数据大小可能在不同的阶段、不同的查询之间有很大的差异,这使得这个数字很难调优:
如果分区数太少,那么每个分区处理的数据大小可能非常大,处理这些大分区的任务可能需要将数据溢写到磁盘(例如,涉及排序或聚合),从而减慢查询速度。
如果分区数太多,那么每个分区处理的数据大小可能非常小,并且将有大量的网络数据获取来读取 shuffle 块,这也会由于低效的 I/O 模式而减慢查询速度。拥有大量的任务也会给 Spark 任务调度程序带来更多的负担。
要解决这个问题,我们可以在开始时设置相对较多的 shuffle 分区数,然后在运行时通过查看 shuffle 文件统计信息将相邻的小分区合并为较大的分区。
假设我们运行 SELECT max(i)FROM tbl GROUP BY j 查询,tbl 表的输入数据相当小,所以在分组之前只有两个分区。我们把初始的 shuffle 分区数设置为 5,因此在 shuffle 的时候数据被打乱到 5 个分区中。如果没有 AQE,Spark 将启动 5 个任务来完成最后的聚合。然而,这里有三个非常小的分区,为每个分区启动一个单独的任务将是一种浪费。
使用 AQE 之后,Spark 将这三个小分区合并为一个,因此,最终的聚合只需要执行三个任务,而不是五个。
动态调整 join 策略
Spark 支持许多 Join 策略,其中 broadcast hash join 通常是性能最好的,前提是参加 join 的一张表的数据能够装入内存。由于这个原因,当 Spark 估计参加 join 的表数据量小于广播大小的阈值时,其会将 Join 策略调整为 broadcast hash join。但是,很多情况都可能导致这种大小估计出错——例如存在一个非常有选择性的过滤器。
为了解决这个问题,AQE 现在根据最精确的连接关系大小在运行时重新规划 join 策略。在下面的示例中可以看到,Join 的右侧比估计值小得多,并且小到足以进行广播,因此在 AQE 重新优化之后,静态计划的 sort merge join 现在被转换为 broadcast hash join。
对于在运行时转换的 broadcast hash join ,我们可以进一步将常规的 shuffle 优化为本地化 shuffle来减少网络流量。
动态优化倾斜的 join(skew joins)
当数据在集群中的分区之间分布不均时,就会发生数据倾斜。严重的倾斜会显著降低查询性能,特别是在进行 Join 操作时。AQE 倾斜 Join 优化从 shuffle 文件统计信息中自动检测到这种倾斜。然后,它将倾斜的分区分割成更小的子分区,这些子分区将分别从另一端连接到相应的分区。
假设表 A join 表B,其中表 A 的分区 A0 里面的数据明显大于其他分区。
skew join optimization 将把分区 A0 分成两个子分区,并将每个子分区 join 表 B 的相应分区 B0。
如果没有这个优化,将有四个任务运行 sort merge join,其中一个任务将花费非常长的时间。在此优化之后,将有5个任务运行 join,但每个任务将花费大致相同的时间,从而获得总体更好的性能。