您的位置 首页 应用

一文读懂 Spark 内存办理

一文读懂 Spark 内存管理-作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

Spark 作为一个依据内存的分布式核算引擎,其内存办理模块在整个体系中扮演着十分重要的人物。了解 Spark 内存办理的底子原理,有助于更好地开发 Spark 运用程序和进行功用调优。本文旨在梳理出 Spark 内存办理的头绪,抛砖引玉,文中论述的原理依据 Spark 2.1 版别,阅览本文需求读者有必定的 Spark 和 Java 根底,了解 RDD、Shuffle、JVM 等相关概念。

在履行 Spark 的运用程序时,Spark 集群会发动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,担任创立 Spark 上下文,提交 Spark 作业(Job),并将作业转化为核算使命(Task),在各个 Executor 进程间和谐使命的调度,后者担任在作业节点上履行详细的核算使命,并将成果回来给 Driver,一起为需求耐久化的 RDD 供给存储功用[1]。因为 Driver 的内存办理相对来说较为简略,本文首要对 Executor 的内存办理进行剖析,下文中的 Spark 内存均特指 Executor 的内存。

一、堆内和堆外内存规划

作为一个 JVM 进程,Executor 的内存办理建立在 JVM 的内存办理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分运用内存。一起,Spark 引进了堆外(Off-heap)内存,使之能够直接在作业节点的体系内存中拓荒空间,进一步优化了内存的运用。

一文读懂 Spark 内存办理

图 1 . 堆内和堆外内存示意图

1堆内内存

堆内内存的巨细,由 Spark 运用程序发动时的 –executor-memory 或 spark.executor.memory 参数装备。Executor 内运转的并发使命同享 JVM 堆内内存,这些使命在缓存 RDD 数据和播送(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些使命在履行 Shuffle 时占用的内存被规划为履行(ExecuTIon)内存,剩下的部分不做特别规划,那些 Spark 内部的目标实例,或许用户界说的 Spark 运用程序中的目标实例,均占用剩下的空间。不同的办理办法下,这三部分占用的空间巨细各不相同(下面第二节会进行介绍)。

Spark 对堆内内存的办理是一种逻辑上的“规划式”的办理,因为目标实例占用内存的恳求和开释都由 JVM 完结,Spark 只能在恳求后和开释前记载这些内存,咱们来看其详细流程:

恳求内存:

Spark 在代码中 new 一个目标实例

JVM 从堆内内存分配空间,创立目标并回来目标引证

Spark 保存该目标的引证,记载该目标占用的内存

开释内存:

Spark 记载该目标开释的内存,删去该目标的引证

等候 JVM 的废物收回机制开释该目标占用的堆内内存

咱们知道,JVM 的目标能够以序列化的办法存储,序列化的进程是将目标转化为二进制字节约,本质上能够了解为将非接连空间的链式存储转化为接连空间或块存储,在拜访时则需求进行序列化的逆进程——反序列化,将字节约转化为目标,序列化的办法能够节约存储空间,但增加了存储和读取时分的核算开支。

关于 Spark 中序列化的目标,由所以字节约的办法,其占用的内存巨细可直接核算,而关于非序列化的目标,其占用的内存是经过周期性地采样近似预算而得,即并不是每次新增的数据项都会核算一次占用的内存巨细,这种办法下降了时间开支可是有或许差错较大,导致某一时间的实践内存有或许远远超出预期[2]。此外,在被 Spark 标记为开释的目标实例,很有或许在实践上并没有被 JVM 收回,导致实践可用的内存小于 Spark 记载的可用内存。所以 Spark 并不能准确记载实践可用的堆内内存,然后也就无法完全防止内存溢出(OOM, Out of Memory)的反常。

尽管不能精准操控堆内内存的恳求和开释,但 Spark 经过对存储内存和履行内存各自独立的规划办理,能够决议是否要在存储内存里缓存新的 RDD,以及是否为新的使命分配履行内存,在必定程度上能够进步内存的运用率,削减反常的呈现。

2堆外内存

为了进一步优化内存的运用以及进步 Shuffle 时排序的功率,Spark 引进了堆外(Off-heap)内存,使之能够直接在作业节点的体系内存中拓荒空间,存储经过序列化的二进制数据。运用 JDK Unsafe API(从 Spark 2.0 开端,在办理堆外的存储内存时不再依据 Tachyon,而是与堆外的履行内存相同,依据 JDK Unsafe API 完结[3]),Spark 能够直接操作体系堆外内存,削减了不必要的内存开支,以及频频的 GC 扫描和收回,进步了处理功用。堆外内存能够被准确地恳求和开释,而且序列化的数据占用的空间能够被准确核算,所以比较堆内内存来说下降了办理的难度,也下降了差错。

在默许状况下堆外内存并不启用,可经过装备 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的巨细。除了没有 other 空间,堆外内存与堆内内存的区别办法相同,一切运转中的并发使命同享存储内存和履行内存。

3内存办理接口

Spark 为存储内存和履行内存的办理供给了共同的接口——MemoryManager,同一个 Executor 内的使命都调用这个接口的办法来恳求或开释内存:

  清单 1 。 内存办理接口的首要办法

  //恳求存储内存

  def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  //恳求打开内存

  def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean

  //恳求履行内存

  def acquireExecuTIonMemory(numBytes: Long, taskAttempTId: Long, memoryMode: MemoryMode): Long

  //开释存储内存

  def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit

  //开释履行内存

  def releaseExecuTIonMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit

  //开释打开内存

  def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit

  咱们看到,在调用这些办法时都需求指定其内存办法(MemoryMode),这个参数决议了是在堆内仍是堆外完结这次操作。

MemoryManager 的详细完结上,Spark 1.6 之后默许为共同办理(Unified Memory Manager)办法,1.6 之前选用的静态办理(Static Memory Manager)办法仍被保存,可经过装备 spark.memory.useLegacyMode 参数启用。两种办法的差异在于对空间分配的办法,下面的第二节会分别对这两种办法进行介绍。

二、内存空间分配

1静态内存办理

在 Spark 开始选用的静态内存办理机制下,存储内存、履行内存和其他内存的巨细在 Spark 运用程序运转期间均为固定的,但用户能够运用程序发动前进行装备,堆内内存的分配如图 2 所示:

一文读懂 Spark 内存办理

图 2 . 静态内存办理图示——堆内

能够看到,可用的堆内内存的巨细需求依照下面的办法核算:

清单 2 . 可用堆内内存空间

可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction

可用的履行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

其间 systemMaxMemory 取决于当时 JVM 堆内内存的巨细,终究可用的履行内存或许存储内存要在此根底上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述核算公式中的两个 safetyFraction 参数,其含义在于在逻辑上预留出 1-safetyFraction 这么一块稳妥区域,下降因实践内存超出当时预设规划而导致 OOM 的危险(上文说到,关于非序列化目标的内存采样预算会发生差错)。值得注意的是,这个预留的稳妥区域仅仅是一种逻辑上的规划,在详细运用时 Spark 并没有差异对待,和“其它内存”相同交给了 JVM 去办理。

堆外的空间分配较为简略,只要存储内存和履行内存,如图 3 所示。可用的履行内存和存储内存占用的空间巨细直接由参数 spark.memory.storageFraction 决议,因为堆外内存占用的空间能够被准确核算,所以无需再设定稳妥区域。

一文读懂 Spark 内存办理

图 3 . 静态内存办理图示——堆外

静态内存办理机制完结起来较为简略,但假如用户不熟悉 Spark 的存储机制,或没有依据详细的数据规划和核算使命或做相应的装备,很简单形成“一半海水,一半火焰”的局势,即存储内存和履行内存中的一方剩下许多的空间,而另一方却早早被占满,不得不筛选或移出旧的内容以存储新的内容。因为新的内存办理机制的呈现,这种办法现在现已很少有开发者运用,出于兼容旧版别的运用程序的意图,Spark 依然保存了它的完结。

2共同内存办理

Spark 1.6 之后引进的共同内存办理机制,与静态内存办理的差异在于存储内存和履行内存同享同一块空间,能够动态占用对方的闲暇区域,如图 4 和图 5 所示。

一文读懂 Spark 内存办理

图 4 . 共同内存办理图示——堆内 

一文读懂 Spark 内存办理

图 5 . 共同内存办理图示——堆外

其间最重要的优化在于动态占用机制,其规矩如下:

设定底子的存储内存和履行内存区域(spark.storage.storageFraction 参数),该设定确认了两边各自具有的空间的规划;

两边的空间都缺乏时,则存储到硬盘;若己方空间缺乏而对方空余时,可借用对方的空间(存储空间缺乏是指缺乏以放下一个完好的 Block);

履行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后“偿还”借用的空间;

存储内存的空间被对方占用后,无法让对方“偿还”,因为需求考虑 Shuffle 进程中的许多要素,完结起来较为杂乱[4]。 

一文读懂 Spark 内存办理

图 6 . 动态占用机制图示

凭仗共同内存办理机制,Spark 在必定程度上进步了堆内和堆外内存资源的运用率,下降了开发者保护 Spark 内存的难度,但并不意味着开发者能够无忧无虑。比如,所以假如存储内存的空间太大或许说缓存的数据过多,反而会导致频频的全量废物收回,下降使命履行时的功用,因为缓存的 RDD 数据一般都是长时间驻留内存的 [5] 。所以要想充分发挥 Spark 的功用,需求开发者进一步了解存储内存和履行内存各自的办理办法和完结原理。

三、存储内存办理

1RDD 的耐久化机制

弹性分布式数据集(RDD)作为 Spark 最底子的数据笼统,是只读的分区记载(Partition)的调集,只能依据在安稳物理存储中的数据集上创立,或许在其他已有的 RDD 上履行转化(Transformation)操作发生一个新的 RDD。转化后的 RDD 与原始的 RDD 之间发生的依靠联系,构成了血缘(Lineage)。凭仗血缘,Spark 确保了每一个 RDD 都能够被从头康复。但 RDD 的一切转化都是慵懒的,即只要当一个回来成果给 Driver 的举动(Action)发生时,Spark 才会创立使命读取 RDD,然后真实触发转化的履行。

Task 在发动之初读取一个分区时,会先判别这个分区是否现已被耐久化,假如没有则需求查看 Checkpoint 或依照血缘从头核算。所以假如一个 RDD 上要履行屡次举动,能够在第一次举动中运用 persist 或 cache 办法,在内存或磁盘中耐久化或缓存这个 RDD,然后在后面的举动时进步核算速度。事实上,cache 办法是运用默许的 MEMORY_ONLY 的存储等级将 RDD 耐久化到内存,故缓存是一种特别的耐久化。 堆内和堆外存储内存的规划,便能够对缓存 RDD 时运用的内存做共同的规划和管 理 (存储内存的其他运用场景,如缓存 broadcast 数据,暂时不在本文的评论规划之内)。

RDD 的耐久化由 Spark 的 Storage 模块 [7] 担任,完结了 RDD 与物理存储的解耦合。Storage 模块担任办理 Spark 在核算进程中发生的数据,将那些在内存或磁盘、在本地或长途存取数据的功用封装了起来。在详细完结时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage 模块在逻辑上以 Block 为底子存储单位,RDD 的每个 Partition 经过处理后仅有对应一个 Block(BlockId 的格局为 rdd_RDD-ID_PARTITION-ID )。Master 担任整个 Spark 运用程序的 Block 的元数据信息的办理和保护,而 Slave 需求将 Block 的更新等状况上签到 Master,一起接纳 Master 的指令,例如新增或删去一个 RDD。

一文读懂 Spark 内存办理

图 7 . Storage 模块示意图

在对 RDD 耐久化时,Spark 规矩了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的 存储等级 ,而存储等级是以下 5 个变量的组合:

清单 3 . 存储等级

class StorageLevel private( 

private var _useDisk: Boolean, //磁盘 

private var _useMemory: Boolean, //这儿其实是指堆内内存 

private var _useOffHeap: Boolean, //堆外内存 

private var _deserialized: Boolean, //是否为非序列化 

private var _replication: Int = 1 //副本个数 

经过对数据结构的剖析,能够看出存储等级从三个维度界说了 RDD 的 Partition(一起也便是 Block)的存储办法:

存储方位:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是一起在磁盘和堆内内存上存储,完结了冗余备份。OFF_HEAP 则是只在堆外内存存储,现在挑选堆外内存时不能一起存储到其他方位;

存储办法:Block 缓存到存储内存后,是否为非序列化的办法。如 MEMORY_ONLY 对错序列化办法存储,OFF_HEAP 是序列化办法存储;

副本数量:大于 1 时需求长途冗余备份到其他节点。如 DISK_ONLY_2 需求长途备份 1 个副本。

2RDD 缓存的进程

RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来拜访,这是 Scala 言语中一种遍历数据调集的办法。经过 Iterator 能够获取分区中每一条序列化或许非序列化的数据项(Record),这些 Record 的目标实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不接连。

RDD 在缓存到存储内存之后,Partition 被转化成 Block,Record 在堆内或堆外存储内存中占用一块接连的空间。将Partition由不接连的存储空间转化为接连存储空间的进程,Spark称之为“打开”(Unroll)。Block 有序列化和非序列化两种存储格局,详细以哪种办法取决于该 RDD 的存储等级。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构界说,用一个数组存储一切的目标实例,序列化的 Block 则以 SerializedMemoryEntry的数据结构界说,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来办理堆内和堆外存储内存中一切的 Block 目标的实例[6],对这个 LinkedHashMap 新增和删去直接记载了内存的恳求和开释。

因为不能确保存储空间能够一次包容 Iterator 中的一切数据,当时的核算使命在 Unroll 时要向 MemoryManager 恳求满意的 Unroll 空间来暂时占位,空间缺乏则 Unroll 失利,空间满意时能够继续进行。关于序列化的 Partition,其所需的 Unroll 空间能够直接累加核算,一次恳求。而非序列化的 Partition 则要在遍历 Record 的进程中顺次恳求,即每读取一条 Record,采样预算其所需的 Unroll 空间并进行恳求,空间缺乏时能够中止,开释已占用的 Unroll 空间。假如终究 Unroll 成功,当时 Partition 所占用的 Unroll 空间被转化为正常的缓存 RDD 的存储空间,如下图 8 所示。

一文读懂 Spark 内存办理

图 8. Spark Unroll 示意图

在图 3 和图 5 中能够看到,在静态内存办理时,Spark 在存储内存中专门区别了一块 Unroll 空间,其巨细是固定的,共同内存办理时则没有对 Unroll 空间进行特别区别,当存储空间缺乏时会依据动态占用机制进行处理。

3筛选和落盘

因为同一个 Executor 的一切的核算使命同享有限的存储内存空间,当有新的 Block 需求缓存可是剩下空间缺乏且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行筛选(Eviction),而被筛选的 Block 假如其存储等级中一起包括存储到磁盘的要求,则要对其进行落盘(Drop),不然直接删去该 Block。

存储内存的筛选规矩为:

被筛选的旧 Block 要与新 Block 的 MemoryMode 相同,即同归于堆外或堆内内存;

新旧 Block 不能归于同一个 RDD,防止循环筛选;

旧 Block 所属 RDD 不能处于被读状况,防止引发共同性问题;

遍历 LinkedHashMap 中 Block,依照最近最少运用(LRU)的次序筛选,直到满意新 Block 所需的空间。其间 LRU 是 LinkedHashMap 的特性。

落盘的流程则比较简略,假如其存储等级契合_useDisk 为 true 的条件,再依据其_deserialized 判别是否对错序列化的办法,若是则对其进行序列化,终究将数据存储到磁盘,在 Storage 模块中更新其信息。

四、履行内存办理

1多使命间内存分配

Executor 内运转的使命相同同享履行内存,Spark 用一个 HashMap 结构保存了使命到内存消耗的映射。每个使命可占用的履行内存巨细的规划为 1/2N ~ 1/N,其间 N 为当时 Executor 内正在运转的使命的个数。每个使命在发动之时,要向 MemoryManager 恳求恳求最少为 1/2N 的履行内存,假如不能被满意要求则该使命被堵塞,直到有其他使命开释了满意的履行内存,该使命才能够被唤醒。

2Shuffle 的内存占用

履行内存首要用来存储使命在履行 Shuffle 时占用的内存,Shuffle 是依照必定规矩对 RDD 数据从头分区的进程,咱们来看 Shuffle 的 Write 和 Read 两阶段对履行内存的运用:

Shuffle Write:

若在 map 端挑选一般的排序办法,会选用 ExternalSorter 进行外排,在内存中存储数据时首要占用堆内履行空间;

若在 map 端挑选 Tungsten 的排序办法,则选用 ShuffleExternalSorter 直接对以序列化办法存储的数据排序,在内存中存储数据时能够占用堆外或堆内履行空间,取决于用户是否敞开了堆外内存以及堆外履行内存是否满意。

Shuffle Read:

在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内履行空间;

假如需求进行终究成果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内履行空间。

在 ExternalSorter 和 Aggregator 中,Spark 会运用一种叫 AppendOnlyMap 的哈希表在堆内履行内存中存储数据,但在 Shuffle 进程中一切数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样预算,当其大到必定程度,无法再从 MemoryManager 恳求到新的履行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个进程被称为溢存(Spill),溢存到磁盘的文件终究会被归并(Merge)。

Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 运用的方案[9],处理了一些 JVM 在功用上的约束和坏处。Spark 会依据 Shuffle 的状况来主动挑选是否选用 Tungsten 排序。Tungsten 选用的页式内存办理机制建立在 MemoryManager 之上,即 Tungsten 对履行内存的运用进行了一步的笼统,这样在 Shuffle 进程中无需关怀数据详细存储在堆内仍是堆外。

每个内存页用一个 MemoryBlock 来界说,并用 Object obj 和 long offset 这两个变量共同标识一个内存页在体系内存中的地址。堆内的 MemoryBlock 是以 long 型数组的办法分配的内存,其 obj 的值为是这个数组的目标引证,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者合作运用能够定位这个数组在堆内的肯定地址;堆外的 MemoryBlock 是直接恳求到的内存块,其 obj 为 null,offset 是这个内存块在体系内存中的 64 位肯定地址。Spark 用 MemoryBlock 奇妙地将堆内和堆外内存页共同笼统封装,并用页表(pageTable)办理每个 Task 恳求到的内存页。

Tungsten 页式办理下的一切内存用 64 位的逻辑地址表明,由页号和页内偏移量组成:

页号:占 13 位,仅有标识一个内存页,Spark 在恳求内存页之前要先恳求闲暇页号;

页内偏移量:占 51 位,是在运用内存页存储数据时,数据在页内的偏移地址。

有了共同的寻址办法,Spark 能够用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的进程只需求对指针进行排序,而且无需反序列化,整个进程十分高效,关于内存拜访功率和 CPU 运用功率带来了显着的进步[10]。

Spark 的存储内存和履行内存有着天壤之别的办理办法:关于存储内存来说,Spark 用一个 LinkedHashMap 来会集办理一切的 Block,Block 由需求缓存的 RDD 的 Partition 转化而成;而关于履行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 进程中的数据,在 Tungsten 排序中乃至笼统成为页式内存办理,拓荒了全新的 JVM 内存办理机制。

五、结束语

Spark 的内存办理是一套杂乱的机制,且 Spark 的版别更新比较快,笔者水平有限,不免有叙说不清、过错的当地,若读者有好的建议和更深的了解,还望不吝赐教。

声明:本文内容来自网络转载或用户投稿,文章版权归原作者和原出处所有。文中观点,不代表本站立场。若有侵权请联系本站删除(kf@86ic.com)https://www.86ic.net/yingyong/102929.html

为您推荐

联系我们

联系我们

在线咨询: QQ交谈

邮箱: kf@86ic.com

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部