基础篇
sparksql 如何加载metadata
任何的SQL引擎都是需要加载元数据的,不然,连执行计划都生成不了。 加载元数据总的来说分为两步:
- 加载元数据
- 创建会话连接Hive MetaStore
首先,Spark检测到我们没有设置spark.sql.warehouse.dir,然后就开始找我们在hite-site.xml中配置的hive.metastore.warehouse.dir。
|
|
然后,SparkSession在HDFS临时位置创建了下面目录。
|
|
最后,Spark开始通过thrift RPC去连接Hive的MetaStore Server。
进阶篇
Spark为什么这么快
Spark是一个基于内存的,用于大规模数据处理的统一分析引擎,其运算速度可以达到Mapreduce的10-100倍。具有如下特点:
- 内存计算。Spark优先将数据加载到内存中,数据可以被快速处理,并可启用缓存。
- shuffle过程优化。和Mapreduce的shuffle过程中间文件频繁落盘不同,Spark对Shuffle机制进行了优化,降低中间文件的数量并保证内存优先。
- RDD计算模型。Spark具有高效的DAG调度算法,同时将RDD计算结果存储在内存中,避免重复计算。
如何理解DAGScheduler的Stage划分算法
官网的RDD执行流程图:
|
|
针对一段应用代码(如上),Driver会以Action算子为边界生成DAG调度图。DAGScheduler从DAG末端开始遍历划分Stage,封装成一系列的tasksets移交TaskScheduler,后者根据调度算法, 将taskset分发到相应worker上的Executor中执行。
- DAGSchduler的工作原理
- DAGScheduler是一个
面向stage
调度机制的高级调度器,为每个job计算stage的DAG(有向无环图),划分stage并提交taskset给TaskScheduler。 - 追踪每个RDD和stage的物化情况,处理因shuffle过程丢失的RDD,重新计算和提交。
- 查找rdd partition 是否cache/checkpoint。提供
优先位置
给TaskScheduler,等待后续TaskScheduler的最佳位置划分
- Stage划分算法
- 从触发action操作的算子开始,从后往前遍历DAG。
- 为最后一个rdd创建finalStage。
- 遍历过程中如果发现该rdd是宽依赖,则为其生成一个新的stage,与旧stage分隔而开,此时该rdd是新stage的最后一个rdd。
- 如果该rdd是窄依赖,将该rdd划分为旧stage内,继续遍历,以此类推,继续遍历直至DAG完成。
- 如何理解TaskScheduler的Task分配算法
TaskScheduler负责Spark中的task任务调度工作。TaskScheduler内部使用
TasksetPool调度池
机制存放task任务。TasksetPool分为FIFO
(先进先出调度)和FAIR
(公平调度)。
- FIFO调度: 基于队列思想,使用先进先出原则顺序调度taskset
- FAIR调度: 根据权重值调度,一般选取资源占用率作为标准,可人为设定
- TaskScheduler的工作原理
- 负责Application在Cluster Manager上的注册
- 根据不同策略创建TasksetPool资源调度池,初始化pool大小
- 根据task分配算法发送Task到Executor上执行
- Task分配算法
- 首先获取所有的executors,包含executors的ip和port等信息
- 将所有的executors根据shuffle算法进行打散
- 遍历executors。在程序中依次尝试本地化级别,最终选择每个task的最优位置(结合DAGScheduler优化位置策略)
- 序列化task分配结果,并发送RPC消息等待Executor响应
Spark的本地化级别有哪几种?怎么调优
移动计算
or 移动数据
?这是一个问题。在分布式计算的核心思想中,移动计算永远比移动数据要合算得多,如何合理利用本地化数据计算是值得思考的一个问题。
TaskScheduler在进行task任务分配时,需要根据本地化级别
计算最优位置,一般是遵循就近原则
,选择最近位置和缓存。Spark中的本地化级别在TaskManager
中定义,分为五个级别。
- Spark本地化级别
PROCESS_LOCAL(进程本地化) partition和task在同一个executor中,task分配到本地Executor进程。
NODE_LOCAL(节点本地化) partition和task在同一个节点的不同Executor进程中,可能发生跨进程数据传输
NO_PREF(无位置) 没有最佳位置的要求,比如Spark读取JDBC的数据
RACK_LOCAL(机架本地化) partition和task在同一个机架的不同worker节点上,可能需要跨机器数据传输
ANY(跨机架): 数据在不同机架上,速度最慢
- Spark本地化调优
在task最佳位置的选择上,DAGScheduler先判断RDD是否有cache/checkpoint,即
缓存优先
;否则TaskScheduler进行本地级别选择等待发送task。
TaskScheduler首先会根据最高本地化级别
发送task,如果在尝试5次并等待3s内
还是无法执行,则认为当前资源不足,即降低本地化级别,按照PROCESS->NODE->RACK等顺序。
- 调优1:加大spark.locality.wait 全局等待时长
- 调优2:加大spark.locality.wait.xx等待时长(进程、节点、机架)
- 调优3:加大重试次数(根据实际情况微调)
说说Spark和Mapreduce中Shuffle的区别
Spark中的shuffle很多过程与MapReduce的shuffle类似,都有Map输出端、Reduce端,shuffle过程通过将Map端计算结果分区、排序并发送到Reducer端。
1. Hadoop Mapreduce Shuffle
MapReduce的shuffle需要依赖大量磁盘操作,数据会频繁落盘产生大量IO,同时产生大量小文件冗余。虽然缓存buffer区中启用了缓存机制,但是阈值较低且内存空间小。
- 读取输入数据,并根据split大小切分为map任务
- map任务在分布式节点中执行map()计算
- 每个map task维护一个环形的buffer缓存区,存储map输出结果,分区且排序
- 当buffer区域达到阈值时,开始溢写到临时文件中。map task任务结束时进行临时文件合并。此时,整合shuffle map端执行完成
- mapreduce根据partition数启动reduce任务,copy拉取数据
- merge合并拉取的文件
- reduce()函数聚合计算,整个过程完成
2. Spark的Shuffle机制
默认的shuffle计算引擎是HashShuffleManager,此种Shuffle产生大量的中间磁盘文件,消耗磁盘IO性能。在Spark1.2后续版本中,默认的ShuffleManager改成了SortShuffleManager,通过索引机制和合并临时文件的优化操作,大幅提高shuffle性能。
HashShuffleManager
HashShuffleManager的运行机制主要分成两种,一种是普通运行机制
,另一种是合并的运行机制
。合并机制主要是通过复用buffer
来优化Shuffle过程中产生的小文件的数量,Hash shuffle本身不排序
。开启合并机制后,同一个Executor共用一组core,文件个数为cores * reduces
。
SortShuffleManager
SortShuffleManager的运行机制分成两种,普通运行机制
和bypass运行机制
。当shuffletask的数量小于等于spark.shuffle.sort.bypassMergeThreshold
参数的值时(默认200
),会启用bypass机制。
普通运行机制
在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。
在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。
一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。
SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。 普通运行机制的 SortShuffleManager 工作原理如下图所示:
bypass运行机制
Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold
设置的个数时,使用带 Hash 风格的回退计划。
pass 运行机制的触发条件如下:
shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200
参数的值。不是聚合类的 shuffle 算子。
此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。
而该机制与普通 SortShuffleManager 运行机制的不同在于:
- 第一,磁盘写机制不同;
- 第二,不会进行排序。
也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
Tungsten Sort Shuffle 运行机制
Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。
Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用
SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的:
对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort
方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle
方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。
因此,当设置了spark.shuffle.manager=tungsten-sort
时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。
要实现 Tungsten Sort Shuffle 机制需要满足以下条件:
- Shuffle 依赖中
不带聚合操作
或没有对输出进行排序
的要求。 - Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。
- Shuffle 过程中的输出分区个数少于 16777216 个。
实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。
所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。
3. Spark Shuffle 历史:
为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle? 我们可以从 Spark 最根本要优化和迫切要解决的问题中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 时产生大量的文件。当数据量越来越多时,产生的文件量是不可控的,这严重制约了 Spark 的性能及扩展能力,所以 Spark 必须要解决这个问题,减少 Mapper 端 ShuffleWriter 产生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。
但使用 Sorted-Based Shuffle 就完美了吗,答案是否定的,Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须先进行排序,所以导致它排序的速度有点慢。好在出现了 Tungsten-Sort Shuffle ,它对排序算法进行了改进,优化了排序的速度。Tungsten-SortShuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序需要的是 Sorted-BasedShuffle,还是 Tungsten-Sort Shuffle。
Spark SQL和Hive SQL的区别
Hive SQL是Hive提供的SQL查询引擎,底层由MapReduce实现。Hive根据输入的SQL语句执行词法分析、语法树构建、编译、逻辑计划、优化逻辑计划以及物理计划等过程,转化为Map Task和Reduce Task最终交由Mapreduce
引擎执行。
- 执行引擎。具有mapreduce的一切特性,适合大批量数据离线处理,相较于Spark而言,速度较慢且IO操作频繁
- 有完整的hql语法,支持基本sql语法、函数和udf
- 对表数据存储格式有要求,不同存储、压缩格式性能不同
Checkpoint 检查点机制
应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。
原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续
- checkpoint和持久化机制的区别 最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低。
Spark shuffle 参数优化
- spark.shuffle.file.buffer:主要是设置的Shuffle过程中写文件的缓冲,默认32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
- spark.reducer.maxSizeInFight:主要是设置Shuffle过程中读文件的缓冲区,一次能够读取多少数据,默认48m, 如果内存足够,可以适当扩大,减少整个网络传输次数。
- spark.shuffle.io.maxRetries:主要是设置网络连接失败时,重试次数,默认3次, 适当调大能够增加稳定性。
- spark.shuffle.io.retryWait:主要设置每次重试之间的间隔时间,可以适当调大,默认5s, 增加程序稳定性。
- spark.shuffle.memoryFraction:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。Shuffle过程中的内存占用,如果程序中较多使用了Shuffle操作,那么可以适当调大该区域。 [deprecated], 旧版本的静态内存管理策略生效,新版本统一内存管理此参数无效。用的是 spark.storage.memoryFraction 中的内存
- spark.shuffle.manager:Hash和Sort方式,Sort是默认,Hash在reduce数量 比较少的时候,效率会很高。
- spark.shuffle.sort.bypassMergeThreshold:设置的是Sort方式中,默认200,启用Hash输出方式的临界值,如果你的程序数据不需要排序,而且reduce数量比较少,那推荐可以适当增大临界值。
- spark.shuffle.cosolidateFiles:如果你使用Hash shuffle方式,推荐打开该配置,实现更少的文件输出。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shuffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize: default 67108864(64M) 动态调整reduce个数的partition大小依据,动态合并reducer的partition。map端多个partition 合并后数据阈值,小于阈值会合并。如设置64MB则reduce阶段每个task最少处理64MB的数据
Spark Locality 参数
参数想 | 默认值 | 参数解释 |
---|---|---|
spark.locality.wait | 3000(毫秒) | 数据本地性降级的等待时间 |
spark.locality.wait.process | spark.locality.wait | 多长时间等不到PROCESS_LOCAL就降 |
spark.locality.wait.node | spark.locality.wait | 多长时间等不到NODE_LOCAL就降 |
spark.locality.wait.rack | spark.locality.wait | 多长时间等不到RACK_LOCAL就降级 |
|
|