基础篇

sparksql 如何加载metadata

任何的SQL引擎都是需要加载元数据的,不然,连执行计划都生成不了。 加载元数据总的来说分为两步:

  1. 加载元数据
  2. 创建会话连接Hive MetaStore

首先,Spark检测到我们没有设置spark.sql.warehouse.dir,然后就开始找我们在hite-site.xml中配置的hive.metastore.warehouse.dir。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://test-3:9083,thrift://test-4:9083</value>
  </property>
  <property>
    <name>hive.metastore.client.socket.timeout</name>
    <value>300</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/data/hive/warehouse</value>
  </property>
  <property>
    <name>hive.warehouse.subdir.inherit.perms</name>
    <value>true</value>

然后,SparkSession在HDFS临时位置创建了下面目录。

1
2
Moved: 'hdfs://nn1/data/hive/warehouse/pyspark_test.db/tb_name/part-00000-c46bc573-0d1d-4ac4-8a69-2359dff82485-c000' to trash at: hdfs://nn1/user/hive/.Trash/Current
Moved: 'hdfs://nn1/data/hive/warehouse/pyspark_test.db/tb_name/part-00001-c46bc573-0d1d-4ac4-8a69-2359dff82485-c000' to trash at: hdfs://nn1/user/hive/.Trash/Current

最后,Spark开始通过thrift RPC去连接Hive的MetaStore Server。

进阶篇

Spark为什么这么快

Spark是一个基于内存的,用于大规模数据处理的统一分析引擎,其运算速度可以达到Mapreduce的10-100倍。具有如下特点:

  • 内存计算。Spark优先将数据加载到内存中,数据可以被快速处理,并可启用缓存。
  • shuffle过程优化。和Mapreduce的shuffle过程中间文件频繁落盘不同,Spark对Shuffle机制进行了优化,降低中间文件的数量并保证内存优先。
  • RDD计算模型。Spark具有高效的DAG调度算法,同时将RDD计算结果存储在内存中,避免重复计算。

如何理解DAGScheduler的Stage划分算法

官网的RDD执行流程图: RDD

1
rdd1.join(rdd2).groupBy().filter()

针对一段应用代码(如上),Driver会以Action算子为边界生成DAG调度图。DAGScheduler从DAG末端开始遍历划分Stage,封装成一系列的tasksets移交TaskScheduler,后者根据调度算法, 将taskset分发到相应worker上的Executor中执行。

  1. DAGSchduler的工作原理
  • DAGScheduler是一个面向stage调度机制的高级调度器,为每个job计算stage的DAG(有向无环图),划分stage并提交taskset给TaskScheduler。
  • 追踪每个RDD和stage的物化情况,处理因shuffle过程丢失的RDD,重新计算和提交。
  • 查找rdd partition 是否cache/checkpoint。提供优先位置给TaskScheduler,等待后续TaskScheduler的最佳位置划分
  1. Stage划分算法
  • 从触发action操作的算子开始,从后往前遍历DAG。
  • 为最后一个rdd创建finalStage。
  • 遍历过程中如果发现该rdd是宽依赖,则为其生成一个新的stage,与旧stage分隔而开,此时该rdd是新stage的最后一个rdd。
  • 如果该rdd是窄依赖,将该rdd划分为旧stage内,继续遍历,以此类推,继续遍历直至DAG完成。 stage
  1. 如何理解TaskScheduler的Task分配算法 TaskScheduler负责Spark中的task任务调度工作。TaskScheduler内部使用TasksetPool调度池机制存放task任务。TasksetPool分为FIFO(先进先出调度)和FAIR(公平调度)。
  • FIFO调度: 基于队列思想,使用先进先出原则顺序调度taskset
  • FAIR调度: 根据权重值调度,一般选取资源占用率作为标准,可人为设定 taskScheduler
  1. TaskScheduler的工作原理
  • 负责Application在Cluster Manager上的注册
  • 根据不同策略创建TasksetPool资源调度池,初始化pool大小
  • 根据task分配算法发送Task到Executor上执行
  1. Task分配算法
  • 首先获取所有的executors,包含executors的ip和port等信息
  • 将所有的executors根据shuffle算法进行打散
  • 遍历executors。在程序中依次尝试本地化级别,最终选择每个task的最优位置(结合DAGScheduler优化位置策略)
  • 序列化task分配结果,并发送RPC消息等待Executor响应

Spark的本地化级别有哪几种?怎么调优

移动计算 or 移动数据?这是一个问题。在分布式计算的核心思想中,移动计算永远比移动数据要合算得多,如何合理利用本地化数据计算是值得思考的一个问题。

TaskScheduler在进行task任务分配时,需要根据本地化级别计算最优位置,一般是遵循就近原则,选择最近位置和缓存。Spark中的本地化级别在TaskManager中定义,分为五个级别。

  1. Spark本地化级别
  • PROCESS_LOCAL(进程本地化) partition和task在同一个executor中,task分配到本地Executor进程。 locality

  • NODE_LOCAL(节点本地化) partition和task在同一个节点的不同Executor进程中,可能发生跨进程数据传输 locality

  • NO_PREF(无位置) 没有最佳位置的要求,比如Spark读取JDBC的数据

  • RACK_LOCAL(机架本地化) partition和task在同一个机架的不同worker节点上,可能需要跨机器数据传输 locality

  • ANY(跨机架): 数据在不同机架上,速度最慢

  1. Spark本地化调优 在task最佳位置的选择上,DAGScheduler先判断RDD是否有cache/checkpoint,即缓存优先;否则TaskScheduler进行本地级别选择等待发送task。

TaskScheduler首先会根据最高本地化级别发送task,如果在尝试5次并等待3s内还是无法执行,则认为当前资源不足,即降低本地化级别,按照PROCESS->NODE->RACK等顺序。

  • 调优1:加大spark.locality.wait 全局等待时长
  • 调优2:加大spark.locality.wait.xx等待时长(进程、节点、机架)
  • 调优3:加大重试次数(根据实际情况微调) optimization

说说Spark和Mapreduce中Shuffle的区别

Spark中的shuffle很多过程与MapReduce的shuffle类似,都有Map输出端、Reduce端,shuffle过程通过将Map端计算结果分区、排序并发送到Reducer端。

1. Hadoop Mapreduce Shuffle

MapReduce的shuffle需要依赖大量磁盘操作,数据会频繁落盘产生大量IO,同时产生大量小文件冗余。虽然缓存buffer区中启用了缓存机制,但是阈值较低且内存空间小。

  • 读取输入数据,并根据split大小切分为map任务
  • map任务在分布式节点中执行map()计算
  • 每个map task维护一个环形的buffer缓存区,存储map输出结果,分区且排序
  • 当buffer区域达到阈值时,开始溢写到临时文件中。map task任务结束时进行临时文件合并。此时,整合shuffle map端执行完成
  • mapreduce根据partition数启动reduce任务,copy拉取数据
  • merge合并拉取的文件
  • reduce()函数聚合计算,整个过程完成 MR

2. Spark的Shuffle机制

默认的shuffle计算引擎是HashShuffleManager,此种Shuffle产生大量的中间磁盘文件,消耗磁盘IO性能。在Spark1.2后续版本中,默认的ShuffleManager改成了SortShuffleManager,通过索引机制和合并临时文件的优化操作,大幅提高shuffle性能。 hashShuffleManager

HashShuffleManager

HashShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是合并的运行机制。合并机制主要是通过复用buffer来优化Shuffle过程中产生的小文件的数量,Hash shuffle本身不排序。开启合并机制后,同一个Executor共用一组core,文件个数为cores * reducesmerged

SortShuffleManager

SortShuffleManager的运行机制分成两种,普通运行机制bypass运行机制。当shuffletask的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认200),会启用bypass机制。

普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存;如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的 batch 数量是 10000 条,也就是说,排序好的数据,会以每批 1 万条数据的形式分批写入磁盘文件。写入磁盘文件是通过 Java 的 BufferedOutputStream 实现的。BufferedOutputStream 是 Java 的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘 IO 次数,提升性能。

一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就意味着该 task 为下游 stage 的 task 准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的过程,因此大大减少了文件数量。比如第一个 stage 有 50 个 task,总共有 10 个 Executor,每个 Executor 执行 5 个 task,而第二个 stage 有 100 个 task。由于每个 task 最终只有一个磁盘文件,因此此时每个 Executor 上只有 5 个磁盘文件,所有 Executor 只有 50 个磁盘文件。 普通运行机制的 SortShuffleManager 工作原理如下图所示: SortShuffleManager

bypass运行机制

Reducer 端任务数比较少的情况下,基于 Hash Shuffle 实现机制明显比基于 Sort Shuffle 实现机制要快,因此基于 Sort Shuffle 实现机制提供了一个带 Hash 风格的回退方案,就是 bypass 运行机制。对于 Reducer 端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带 Hash 风格的回退计划。

pass 运行机制的触发条件如下: shuffle map task 数量小于spark.shuffle.sort.bypassMergeThreshold=200参数的值。不是聚合类的 shuffle 算子。

此时,每个 task 会为每个下游 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据 key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的 HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager 运行机制的不同在于:

  • 第一,磁盘写机制不同;
  • 第二,不会进行排序。

也就是说,启用该机制的最大好处在于,shuffle write 过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。 bypass

Tungsten Sort Shuffle 运行机制

Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort 会进行排序,但排序的不是内容本身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,实现了直接对序列化后的二进制数据进行排序。由于直接基于二进制数据进行操作,所以在这里面没有序列化和反序列化的过程。内存的消耗大大降低,相应的,会极大的减少的 GC 的开销。

Spark 提供了配置属性,用于选择具体的 Shuffle 实现机制,但需要说明的是,虽然默认情况下 Spark 默认开启的是基于 SortShuffle 实现机制,但实际上,参考 Shuffle 的框架内核部分可知基于 SortShuffle 的实现机制与基于 Tungsten Sort Shuffle 实现机制都是使用

SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的: 对应非基于 Tungsten Sort 时,通过 SortShuffleWriter.shouldBypassMergeSort 方法判断是否需要回退到 Hash 风格的 Shuffle 实现机制,当该方法返回的条件不满足时,则通过 SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于 Tungsten Sort Shuffle 实现机制,而当这两个方法返回都为 false,即都不满足对应的条件时,会自动采用普通运行机制。

因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于 Tungsten Sort 的 Shuffle 实现机制。

要实现 Tungsten Sort Shuffle 机制需要满足以下条件:

  • Shuffle 依赖中不带聚合操作没有对输出进行排序的要求。
  • Shuffle 的序列化器支持序列化值的重定位(当前仅支持 KryoSerializer Spark SQL 框架自定义的序列化器)。
  • Shuffle 过程中的输出分区个数少于 16777216 个。

实际上,使用过程中还有其他一些限制,如引入 Page 形式的内存管理模型后,内部单条记录的长度不能超过 128 MB (具体内存模型可以参考 PackedRecordPointer 类)。另外,分区个数的限制也是该内存模型导致的。

所以,目前使用基于 Tungsten Sort Shuffle 实现机制条件还是比较苛刻的。

3. Spark Shuffle 历史:

shuffle

为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle? 我们可以从 Spark 最根本要优化和迫切要解决的问题中找到答案,使用 HashShuffle 的 Spark 在 Shuffle 时产生大量的文件。当数据量越来越多时,产生的文件量是不可控的,这严重制约了 Spark 的性能及扩展能力,所以 Spark 必须要解决这个问题,减少 Mapper 端 ShuffleWriter 产生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。

但使用 Sorted-Based Shuffle 就完美了吗,答案是否定的,Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须先进行排序,所以导致它排序的速度有点慢。好在出现了 Tungsten-Sort Shuffle ,它对排序算法进行了改进,优化了排序的速度。Tungsten-SortShuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序需要的是 Sorted-BasedShuffle,还是 Tungsten-Sort Shuffle。

Spark SQL和Hive SQL的区别

Hive SQL是Hive提供的SQL查询引擎,底层由MapReduce实现。Hive根据输入的SQL语句执行词法分析、语法树构建、编译、逻辑计划、优化逻辑计划以及物理计划等过程,转化为Map Task和Reduce Task最终交由Mapreduce引擎执行。

  • 执行引擎。具有mapreduce的一切特性,适合大批量数据离线处理,相较于Spark而言,速度较慢且IO操作频繁
  • 有完整的hql语法,支持基本sql语法、函数和udf
  • 对表数据存储格式有要求,不同存储、压缩格式性能不同 flow

Checkpoint 检查点机制

  • 应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。

  • 原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续

  • checkpoint和持久化机制的区别 最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低。

Spark shuffle 参数优化

  1. spark.shuffle.file.buffer:主要是设置的Shuffle过程中写文件的缓冲,默认32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
  2. spark.reducer.maxSizeInFight:主要是设置Shuffle过程中读文件的缓冲区,一次能够读取多少数据,默认48m, 如果内存足够,可以适当扩大,减少整个网络传输次数。
  3. spark.shuffle.io.maxRetries:主要是设置网络连接失败时,重试次数,默认3次, 适当调大能够增加稳定性。
  4. spark.shuffle.io.retryWait:主要设置每次重试之间的间隔时间,可以适当调大,默认5s, 增加程序稳定性。
  5. spark.shuffle.memoryFraction:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。Shuffle过程中的内存占用,如果程序中较多使用了Shuffle操作,那么可以适当调大该区域。 [deprecated], 旧版本的静态内存管理策略生效,新版本统一内存管理此参数无效。用的是 spark.storage.memoryFraction 中的内存
  6. spark.shuffle.manager:Hash和Sort方式,Sort是默认,Hash在reduce数量 比较少的时候,效率会很高。
  7. spark.shuffle.sort.bypassMergeThreshold:设置的是Sort方式中,默认200,启用Hash输出方式的临界值,如果你的程序数据不需要排序,而且reduce数量比较少,那推荐可以适当增大临界值。
  8. spark.shuffle.cosolidateFiles:如果你使用Hash shuffle方式,推荐打开该配置,实现更少的文件输出。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shuffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
  9. spark.sql.adaptive.shuffle.targetPostShuffleInputSize: default 67108864(64M) 动态调整reduce个数的partition大小依据,动态合并reducer的partition。map端多个partition 合并后数据阈值,小于阈值会合并。如设置64MB则reduce阶段每个task最少处理64MB的数据

Spark Locality 参数

参数想默认值参数解释
spark.locality.wait3000(毫秒)数据本地性降级的等待时间
spark.locality.wait.processspark.locality.wait多长时间等不到PROCESS_LOCAL就降
spark.locality.wait.nodespark.locality.wait多长时间等不到NODE_LOCAL就降
spark.locality.wait.rackspark.locality.wait多长时间等不到RACK_LOCAL就降级
1
2
3
4
new SparkConf().set("spark.locality.wait", "10")
spark.locality.wait.process//建议60s
spark.locality.wait.node//建议30s
spark.locality.wait.rack//建议20s