1. 概述

Spark应用在yarn运行模式下,其以Executor Container的形式存在,container能申请到的最大内存受yarn.scheduler.maximum-allocation-mb限制。下面说的大部分内容其实与yarn等没有多少直接关系,知识均为通用的。

Spark应用运行过程中的内存可以分为堆内内存堆外内存,其中堆内内存onheap由spark.executor.memory指定,堆外内存offheap由spark.yarn.executor.memoryOverhead参数指定,默认为executorMemory*0.1,最小384M。堆内内存executorMemory是spark使用的主要部分,其大小通过-Xmx参数传给jvm,内部有300M的保留资源不被executor使用。这里的堆外内存部分主要用于JVM自身,如字符串、NIO Buffer等开销,此部分用户代码及spark都无法直接操作。

executor执行的时候,用的内存可能会超过executor-memory,所以会为executor额外预留一部分内存,spark.yarn.executor.memoryOverhead即代表这部分内存。

另外还有部分堆外内存由spark.memory.offHeap.enabled及spark.memory.offHeap.size控制的堆外内存,这部分也归offheap,但主要是供统一内存管理使用的。 img1

2. 堆内内存

1
2
3
4
5
6
7
object UnifiedMemoryManager {

  // Set aside a fixed amount of memory for non-storage, non-execution purposes.
  // This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
  // sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
  // the memory used for execution and storage will be (1024 - 300) * 0.6 = 434MB by default.
  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024

堆内内存有300M的保留资源,此外的可用内存usableMemory被分为spark管理的内存和用户管理的内存两部分,spark管理的内存通过spark.memory.fraction进行控制,默认0.6。

Spark管理的统一内存:

在设置了executor memory为3G时,debug代码 其各部分值如下:

  1. systemMemory=3087007744 //container的JVM最多可用的内存
  2. reservedMemory=314572800 //保留的300M
  3. minSystemMemory=471859200 //300M*1.5
  4. executorMemory=3221225472 // 通过spark.executor.memory指定的值3g
  5. usableMemory=2772434944 //为systemMemory-reservedMemory 由上,spark可管理的内存大小为
1
2
3
注意 usableMemory 不是User Memory(有些也叫做other Memory)
实际为spark-submit 提交时申请的exector-memory 大小 - reservedMemory
usableMemory * memoryFraction=2772434944 *0.6=1,663,460,966

这块内存在spark中被称为unified region(代号M)或统一内存或可用内存,其进一步被分为执行内存ExecutionMemory和StorageMemory,见上图。其中storage memory(代号R)是M的一个subregion,其的大小占比受spark.memory.storageFraction控制,默认为0.5,即默认占usableMemory的 0.6*0.5=0.3。我们用onHeapStorageRegionSize来表示storage这部分的大小。


  • ExecutionMemory执行内存:主要存储Shuffle、Join、Sort、Aggregation等计算过程中的临时数据;
  • StorageMemory存储内存:主要存储spark的cache数据,如RDD.cache RDD.persist在调用时的数据存储,用户自定义变量及系统的广播变量等

这两块内存在当前默认的UnifiedMemoryManager(Spark1.6引入)下是可以互相动态侵占的,即Execution内存不足时可以占用Storage的内存,反之亦然,其详细规则如下:

  • Execution内存不足且onHeapStorageRegionSize有空闲时,可以向Storage Memory借用内存,- 但借用后storage不能将execution占用的部分驱逐evict出去,只能等着Execution自己释放。
  • Storage内存不足时可以借用Execution的内存,且当Execution又有内存资源需求时可以驱逐Storage占用的部分,但只能驱逐StorageMemory-onHeapStorageRegionSize的大小,原来划定的onHeapStorageRegionSize且在使用的不可被抢占。

在spark的WebUI下,我们会看到Executors的信息如下图所示 我指定的executor-memory=5g,此处显示的StorageMemory其实是Spark的可用内存,包括Execution和Storage部分。(5G - 300M) * 0.6 = 2.7 img2

  • 用户管理的内存(Other): 上面说了占可用内存spark.memory.fraction(0.6)的spark 统一内存,另外0.4的用户内存用于存储用户代码生成的对象及RDD依赖等,用户在处理partition中的记录时,其遍历到的记录可以看做存储在Other区,当需要将RDD缓存时,将会序列化或不序列化的方式以Block的形式存储到Storage内存中。

3. 堆外内存

前面说了,堆外内存有的是参数spark.yarn.executor.memoryOverhead控制,有的是参数spark.memory.offHeap.size控制,这个都算offheap内存,不过前者主要用于JVM运行自身,字符串, NIO Buffer等开销,而后者主要是供统一内存管理用作Execution Memory及Storage Memory的用途

spark.yarn.executor.memoryOverhead设置的内存默认为executor.memory的0.1倍,最低384M,这个始终存在的,在采用yarn时,这块内存是包含在申请的容器内的,即申请容器大小大于spark.executor.memory+spark.yarn.executor.memoryOverhead。

而通过spark.memory.offHeap.enable/size申请的内存不在JVM内,Spark利用TungSten技术直接操作管理JVM外的原生内存。主要是为了解决Java对象开销大和GC的问题。 img3

1
2
3
4
5
6
protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
protected[this] val offHeapStorageMemory =
  (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory)
offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)

其中MEMORY_OFFHEAP_SIZE为spark.memory.offHeap.size,这部分offHeap内存被spark.memory.storageFraction分为storage与execution用途供统一内存管理使用。

统一内存管理UnifiedMemoryManager会管理堆内堆外的execution和storage内存,定义了四个内存池分别为:onHeapStorageMemoryPool, offHeapStorageMemoryPool, onHeapExecutionMemoryPool, offHeapExecutionMemoryPool,在spark内部申请内存时会指定MemoryMode为ON_HEAP或OFF_HEAP决定从哪部分申请内存。

我们在WebUI看到的executors信息中Storage是包括了统一内存管理控制的堆内堆外区域的。

下面的5.9G中包括了2.7G的堆内和3.2G(3g按1000算为3.221G,非1024算) img4

对大的几个RDD进行cache并action后,立马看会看到存储占用了堆内2.7G的大部分,即把execution的抢占了,仍然不够时已经有些序列化到磁盘中了。稍等一会execution会将storage抢占的这部分驱逐并序列化到disk中,如上将会变成下面的状况 img5

按前面所说,这种均是在堆内内存存储的,我们查看被缓存的RDD的信息也可看到。 img6

序列化存储级别怎么存到堆外?尤其是那些不希望被GC的长期存在的RDD,例如常驻内存的名单库等。我们可以使用persist时设置level为StorageLevel.OFF_HEAP,此种情况下只能用内存,不能同时存储到其他地方。 img7 注意: 默认情况下Off-heap模式的内存并不启用,可以通过“spark.memory.offHeap.enabled”参数开启,并由spark.memory.offHeap.size指定堆外内存的大小(占用的空间划归JVM OffHeap内存)。

4. 任务内存管理(Task Memory Manager)

Executor中任务以线程的方式执行,各线程共享JVM的资源,任务之间的内存资源没有强隔离(任务没有专用的Heap区域)。因此,可能会出现这样的情况:先到达的任务可能占用较大的内存,而后到的任务因得不到足够的内存而挂起。

在Spark任务内存管理中,使用HashMap存储任务与其消耗内存的映射关系。每个任务可占用的内存大小为潜在可使用计算内存的[1/2n, 1/n], 当剩余内存为小于1/2n时,任务将被挂起,直至有其他任务释放执行内存,而满足内存下限1/2n,任务被唤醒,其中n为当前Executor中活跃的任务数。

任务执行过程中,如果需要更多的内存,则会进行申请,如果,存在空闲内存,则自动扩容成功,否则,将抛出OutOffMemroyError。

5. 相关调优

什么时候需要调节Executor的堆外内存大小?

当出现一下异常时:shuffle file cannot find,executor lost、task lost,out of memory

出现这种问题的现象大致有这么两种情况:

  • Executor挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数据
  • Executor并没有挂掉,而是在拉取数据的过程出现了问题。

上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。这个executor跑着跑着,突然内存不足了,堆外内存不足了,可能会OOM,挂掉。block manager也没有了,数据也丢失掉了。

如果此时,stage0的executor挂了,BlockManager也没有了;此时,stage1的executor的task,虽然通过 Driver的MapOutputTrakcer获取到了自己数据的地址;但是实际上去找对方的BlockManager获取数据的 时候,是获取不到的。

此时,就会在spark-submit运行作业(jar),client(standalone client、yarn client),在本机就会打印出log:shuffle output file not found。。。DAGScheduler,resubmitting task,一直会挂掉。反复挂掉几次,反复报错几次,整个spark作业就崩溃了

1
2
3
4
5
--conf spark.yarn.executor.memoryOverhead=2048

spark-submit脚本里面去用--conf的方式去添加配置一定要注意!!!切记
不是在你的spark作业代码中用new SparkConf().set()这种方式去设置不要这样去设置是没有用的
一定要在spark-submit脚本中去设置

调节等待时长

img8 executor,优先从自己本地关联的BlockManager中获取某份数据

如果本地BlockManager没有的话,那么会通过TransferService,去远程连接其他节点上executor 的BlockManager去获取,尝试建立远程的网络连接,并且去拉取数据,task创建的对象特别大,特别多频繁的让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。

处于垃圾回收过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作,无法提供响应,此时呢,就会没有响应,无法建立网络连接,会卡住;ok,spark默认的网络连接的超时时长,是60s,如果卡住60s都无法建立连接的话,那么就宣告失败了。碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs–sdf–sdfsd)。not found。file lost。这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。

可以考虑调节连接的超时时长。

1
2
--conf spark.core.connection.ack.wait.timeout=300
spark-submit脚本切记不是在new SparkConf().set()这种方式来设置的spark.core.connection.ack.wait.timeoutspark coreconnection连接ackwait timeout建立不上连接的时候超时等待时长调节这个值比较大以后通常来说可以避免部分的偶尔出现的某某文件拉取失败某某文件lost掉了。。。

executor-memory 设置建议

如果设置小了,会发生什么:

  1. 频繁GC,GC超限,CPU大部分时间用来做GC而回首的内存又很少,也就是executor堆内存不足。(通常gc 时间建议不超过task 时间的5%)

如果发生OOM或者GC耗时过长,考虑提高executor-memory或降低executor-core

img9 2. java.lang.OutOfMemoryError内存溢出,这和程序实现强相关,例如内存排序等,通常是要放入内存的数据量太大,内存空间不够引起的。 3. 数据频繁spill到磁盘,如果是I/O密集型的应用,响应时间就会显著延长。

具体怎么样算调整到位呢? TimeLine显示状态合理(通通绿条),GC时长合理(占比很小),系统能够稳定运行。 当然内存给太大了也是浪费资源,合理的临界值是在内存给到一定程度,对运行效率已经没有帮助了的时候,就可以了。

增加executor内存量以后,性能的提升:

  • 如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
  • 对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
  • 对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升。

在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N*2, M/N)区间,让每个Task能够拿到并处理适量的数据。怎么理解适量呢?D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。Shuffle过后每个Reduce Task也会产生数据分片,spark.sql.shuffle.partitions 控制Joins之中的Shuffle Reduce阶段并行度,spark.sql.shuffle.partitions = 估算结果文件大小 / [128M,256M],确保shuffle 后的数据分片大小在[128M,256M]区间。PS: 核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。

假定Spark读取分布式文件,总大小512M,HDFS的分片是128M,那么并行度 = 512M / 128M = 4 Executor 并发度=1,那么Executor 内存 M 应在 128M 到 256M 之间。 Executor 并发度=2,那么Executor 内存 M 应在 256M 到 512M 之间。