1. 概述
Spark应用在yarn运行模式下,其以Executor Container的形式存在,container能申请到的最大内存受yarn.scheduler.maximum-allocation-mb限制。下面说的大部分内容其实与yarn等没有多少直接关系,知识均为通用的。
Spark应用运行过程中的内存可以分为堆内内存
与堆外内存
,其中堆内内存onheap由spark.executor.memory
指定,堆外内存offheap由spark.yarn.executor.memoryOverhead
参数指定,默认为executorMemory*0.1,最小384M
。堆内内存executorMemory是spark使用的主要部分,其大小通过-Xmx参数传给jvm,内部有300M的保留资源不被executor使用。这里的堆外内存部分主要用于JVM自身,如字符串、NIO Buffer等开销,此部分用户代码及spark都无法直接操作。
executor执行的时候,用的内存可能会超过executor-memory,所以会为executor额外预留一部分内存,spark.yarn.executor.memoryOverhead即代表这部分内存。
另外还有部分堆外内存由spark.memory.offHeap.enabled及spark.memory.offHeap.size
控制的堆外内存,这部分也归offheap,但主要是供统一内存管理使用的。
2. 堆内内存
|
|
堆内内存有300M的保留资源,此外的可用内存usableMemory被分为spark管理的内存和用户管理的内存
两部分,spark管理的内存通过spark.memory.fraction进行控制,默认0.6。
Spark管理的统一内存:
在设置了executor memory为3G时,debug代码 其各部分值如下:
- systemMemory=3087007744 //container的JVM最多可用的内存
- reservedMemory=314572800 //保留的300M
- minSystemMemory=471859200 //300M*1.5
- executorMemory=3221225472 // 通过spark.executor.memory指定的值3g
- usableMemory=2772434944 //为systemMemory-reservedMemory 由上,spark可管理的内存大小为
|
|
这块内存在spark中被称为unified region(代号M)或统一内存或可用内存,其进一步被分为执行内存ExecutionMemory和StorageMemory,见上图。其中storage memory(代号R)是M的一个subregion,其的大小占比受spark.memory.storageFraction控制,默认为0.5,即默认占usableMemory的 0.6*0.5=0.3。我们用onHeapStorageRegionSize来表示storage这部分的大小。
- ExecutionMemory执行内存:主要存储Shuffle、Join、Sort、Aggregation等计算过程中的临时数据;
- StorageMemory存储内存:主要存储spark的cache数据,如RDD.cache RDD.persist在调用时的数据存储,用户自定义变量及系统的广播变量等
这两块内存在当前默认的UnifiedMemoryManager(Spark1.6引入)下是可以互相动态侵占的,即Execution内存不足时可以占用Storage的内存,反之亦然,其详细规则如下:
- Execution内存不足且onHeapStorageRegionSize有空闲时,可以向Storage Memory借用内存,- 但借用后storage不能将execution占用的部分驱逐evict出去,只能等着Execution自己释放。
- Storage内存不足时可以借用Execution的内存,且当Execution又有内存资源需求时可以驱逐Storage占用的部分,但只能驱逐StorageMemory-onHeapStorageRegionSize的大小,原来划定的onHeapStorageRegionSize且在使用的不可被抢占。
在spark的WebUI下,我们会看到Executors的信息如下图所示 我指定的executor-memory=5g,此处显示的StorageMemory其实是Spark的可用内存,包括Execution和Storage部分。(5G - 300M) * 0.6 = 2.7
- 用户管理的内存(Other): 上面说了占可用内存spark.memory.fraction(0.6)的spark 统一内存,另外0.4的用户内存用于存储用户代码生成的对象及RDD依赖等,用户在处理partition中的记录时,其遍历到的记录可以看做存储在Other区,当需要将RDD缓存时,将会序列化或不序列化的方式以Block的形式存储到Storage内存中。
3. 堆外内存
前面说了,堆外内存有的是参数spark.yarn.executor.memoryOverhead
控制,有的是参数spark.memory.offHeap.size
控制,这个都算offheap内存,不过前者主要用于JVM运行自身,字符串, NIO Buffer等开销
,而后者主要是供统一内存管理用作Execution Memory及Storage Memory的用途
。
spark.yarn.executor.memoryOverhead设置的内存默认为executor.memory的0.1倍,最低384M,这个始终存在的,在采用yarn时,这块内存是包含在申请的容器内的,即申请容器大小大于
spark.executor.memory+spark.yarn.executor.memoryOverhead。
而通过spark.memory.offHeap.enable/size申请的内存不在JVM内,Spark利用TungSten技术直接操作管理JVM外的原生内存。主要是为了解决Java对象开销大和GC的问题。
|
|
其中MEMORY_OFFHEAP_SIZE为spark.memory.offHeap.size,这部分offHeap内存被spark.memory.storageFraction分为storage与execution用途供统一内存管理使用。
统一内存管理UnifiedMemoryManager会管理堆内堆外的execution和storage内存,定义了四个内存池分别为:onHeapStorageMemoryPool, offHeapStorageMemoryPool, onHeapExecutionMemoryPool, offHeapExecutionMemoryPool,在spark内部申请内存时会指定MemoryMode为ON_HEAP或OFF_HEAP决定从哪部分申请内存。
我们在WebUI看到的executors信息中Storage是包括了统一内存管理控制的堆内堆外区域的。
下面的5.9G中包括了2.7G的堆内和3.2G(3g按1000算为3.221G,非1024算)
对大的几个RDD进行cache并action后,立马看会看到存储占用了堆内2.7G的大部分,即把execution的抢占了,仍然不够时已经有些序列化到磁盘中了。稍等一会execution会将storage抢占的这部分驱逐并序列化到disk中,如上将会变成下面的状况
按前面所说,这种均是在堆内内存存储的,我们查看被缓存的RDD的信息也可看到。
序列化存储级别怎么存到堆外?尤其是那些不希望被GC的长期存在的RDD,例如常驻内存的名单库等。我们可以使用persist时设置level为StorageLevel.OFF_HEAP,此种情况下只能用内存,不能同时存储到其他地方。 注意: 默认情况下Off-heap模式的内存并不启用,可以通过“spark.memory.offHeap.enabled”参数开启,并由spark.memory.offHeap.size指定堆外内存的大小(占用的空间划归JVM OffHeap内存)。
4. 任务内存管理(Task Memory Manager)
Executor中任务以线程的方式执行,各线程共享JVM的资源,任务之间的内存资源没有强隔离(任务没有专用的Heap区域)。因此,可能会出现这样的情况:先到达的任务可能占用较大的内存,而后到的任务因得不到足够的内存而挂起。
在Spark任务内存管理中,使用HashMap存储任务与其消耗内存的映射关系。每个任务可占用的内存大小为潜在可使用计算内存的[1/2n, 1/n], 当剩余内存为小于1/2n时,任务将被挂起,直至有其他任务释放执行内存,而满足内存下限1/2n,任务被唤醒,其中n为当前Executor中活跃的任务数。
任务执行过程中,如果需要更多的内存,则会进行申请,如果,存在空闲内存,则自动扩容成功,否则,将抛出OutOffMemroyError。
5. 相关调优
什么时候需要调节Executor的堆外内存大小?
当出现一下异常时:shuffle file cannot find,executor lost、task lost,out of memory
出现这种问题的现象大致有这么两种情况:
- Executor挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数据
- Executor并没有挂掉,而是在拉取数据的过程出现了问题。
上述情况下,就可以去考虑调节一下executor的堆外内存。也许就可以避免报错;此外,有时,堆外内存调节的比较大的时候,对于性能来说,也会带来一定的提升。这个executor跑着跑着,突然内存不足了,堆外内存不足了,可能会OOM,挂掉。block manager也没有了,数据也丢失掉了。
如果此时,stage0的executor挂了,BlockManager也没有了;此时,stage1的executor的task,虽然通过 Driver的MapOutputTrakcer获取到了自己数据的地址;但是实际上去找对方的BlockManager获取数据的 时候,是获取不到的。
此时,就会在spark-submit运行作业(jar),client(standalone client、yarn client),在本机就会打印出log:shuffle output file not found。。。DAGScheduler,resubmitting task,一直会挂掉。反复挂掉几次,反复报错几次,整个spark作业就崩溃了
|
|
调节等待时长
executor,优先从自己本地关联的BlockManager中获取某份数据
如果本地BlockManager没有的话,那么会通过TransferService,去远程连接其他节点上executor 的BlockManager去获取,尝试建立远程的网络连接,并且去拉取数据,task创建的对象特别大,特别多频繁的让JVM堆内存满溢,进行垃圾回收。正好碰到那个exeuctor的JVM在垃圾回收。
处于垃圾回收过程中,所有的工作线程全部停止;相当于只要一旦进行垃圾回收,spark / executor停止工作,无法提供响应,此时呢,就会没有响应,无法建立网络连接,会卡住;ok,spark默认的网络连接的超时时长,是60s,如果卡住60s都无法建立连接的话,那么就宣告失败了。碰到一种情况,偶尔,偶尔,偶尔!!!没有规律!!!某某file。一串file id。uuid(dsfsfd-2342vs–sdf–sdfsd)。not found。file lost。这种情况下,很有可能是有那份数据的executor在jvm gc。所以拉取数据的时候,建立不了连接。然后超过默认60s以后,直接宣告失败。报错几次,几次都拉取不到数据的话,可能会导致spark作业的崩溃。也可能会导致DAGScheduler,反复提交几次stage。TaskScheduler,反复提交几次task。大大延长我们的spark作业的运行时间。
可以考虑调节连接的超时时长。
|
|
executor-memory 设置建议
如果设置小了,会发生什么:
- 频繁GC,GC超限,CPU大部分时间用来做GC而回首的内存又很少,也就是executor堆内存不足。(通常gc 时间建议不超过task 时间的5%)
如果发生OOM或者GC耗时过长,考虑提高executor-memory或降低executor-core
2. java.lang.OutOfMemoryError内存溢出,这和程序实现强相关,例如内存排序等,通常是要放入内存的数据量太大,内存空间不够引起的。 3. 数据频繁spill到磁盘,如果是I/O密集型的应用,响应时间就会显著延长。
具体怎么样算调整到位呢? TimeLine显示状态合理(通通绿条),GC时长合理(占比很小),系统能够稳定运行。 当然内存给太大了也是浪费资源,合理的临界值是在内存给到一定程度,对运行效率已经没有帮助了的时候,就可以了。
增加executor内存量以后,性能的提升:
- 如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
- 对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
- 对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升。
在给定执行内存 M、线程池大小 N 和数据总量 D 的时候,想要有效地提升 CPU 利用率,我们就要计算出最佳并行度 P,计算方法是让数据分片的平均大小 D/P 坐落在(M/N*2, M/N)区间,让每个Task能够拿到并处理适量的数据。怎么理解适量呢?D/P是原始数据的尺寸,真正到内存里去,是会翻倍的,至于翻多少倍,这个和文件格式有关系。不过,不管他翻多少倍,只要原始的D/P和M/N在一个当量,那么我们大概率就能避开OOM的问题,不至于某些Tasks需要处理的数据分片过大而OOM。Shuffle过后每个Reduce Task也会产生数据分片,spark.sql.shuffle.partitions
控制Joins之中的Shuffle Reduce阶段并行度,spark.sql.shuffle.partitions
= 估算结果文件大小 / [128M,256M],确保shuffle 后的数据分片大小在[128M,256M]区间。PS: 核心思路是,根据“定下来的”,去调整“未定下来的”,就可以去设置每一个参数了。
假定Spark读取分布式文件,总大小512M,HDFS的分片是128M,那么并行度 = 512M / 128M = 4 Executor 并发度=1,那么Executor 内存 M 应在 128M 到 256M 之间。 Executor 并发度=2,那么Executor 内存 M 应在 256M 到 512M 之间。