首页 > 代码库 > spark 存储机制详解
spark 存储机制详解
我们知道spark可以将运行过的RDD存储到内存上, 并在需要的时候重复利用. 那么spark是怎么完成这些工作的, 本文将通过分析源码来解释RDD的重复利用过程.
在上一篇文章解释了spark的执行机制, DAGScheduler负责分解action, 在DAGScheduler.getMissingParentStages中, spark首次利用了过去的RDD, 而所使用的函数就是DAGScheduler.getCacheLocs.
1 private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] 2 3 private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { 4 if (!cacheLocs.contains(rdd.id)) { 5 val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] 6 val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) 7 cacheLocs(rdd.id) = blockIds.map { id => 8 locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) 9 }10 }11 cacheLocs(rdd.id)12 }
DAGScheduler只在cacheLocs存储部分partiton的位置信息. 我们来看看未cache的执行逻辑, 首先生成代表每个partition的blocksIds, 然后调用BlockManager.blockIdsToBlockManagers把blocksId转换成Seq[blockManagerId], 而blockManagersId包含了partition的的位置信息(每个partition按一个block存放, block也可以存放broadcast等数据).
根据注释, 每个节点上(包括master 和 worker)都运行了BlockManager来管理所有的存储信息(包括RDD和broadcast等等), master与worker通过Akka Actor系统(可以看我的另外一篇文章来入门)交流, 即BlockManagerMasterActor 与 BlockManagerSlaveActor. 继续看BlockManager.blockIdsToBlockManagers.
1 def blockIdsToBlockManagers( 2 blockIds: Array[BlockId], 3 env: SparkEnv, 4 blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { 5 6 // blockManagerMaster != null is used in tests 7 assert(env != null || blockManagerMaster != null) 8 val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { 9 env.blockManager.getLocationBlockIds(blockIds)10 } else {11 blockManagerMaster.getLocations(blockIds)12 }13 14 val blockManagers = new HashMap[BlockId, Seq[BlockManagerId]]15 for (i <- 0 until blockIds.length) {16 blockManagers(blockIds(i)) = blockLocations(i)17 }18 blockManagers.toMap19 }
blockManager实在SparkEnv中被创建的, SparkEnv同样运行在所有节点上, 并在创建时区分为DriverEnv, 和 executorEnv(同一个类, 但是元素不一样, 和blockManager一样), 在创建sparkEnv时, 会为driver上的blockManager创建一个blockManagerMasterActor, 为executor上的blockManager给一个blockManagerMasterActor的Ref. 上面代码使用sparkEnv.blockManager.blockManagerMaster.getLocations来求出各个blockId的BlockManagerId, 并组织成Map的形式返回.接下来来到blockManager.getLocations.
1 def getLocations(blockId: BlockId): Seq[BlockManagerId] = {2 askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))3 }4 private def askDriverWithReply[T](message: Any): T = {5 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS,6 timeout)7 }
这段代码就是简单的将GetLocations的message发送给BlockManagerMasterActor, 并等待回复. BlockManagerMasterActor保存了所有关于存储的信息, blockManagerInfo有所有executor的存储信息, blockManagerIdByExecutor从executor到executor上的blockManagerId的映射, blockLocations保存了所有的block的所有存储位置(包含所有的partition的位置), 一下是blockManagerMasterActor的关于查询存储的位置:
1 override def receiveWithLogging = {2 case GetLocations(blockId) =>3 sender ! getLocations(blockId)4 case ... =>5 }6 7 private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {8 if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty9 }
由于BlockManagerMasterActor保存了所有Block的位置, 所以只是简单的给出答案. 到现在可以看出所有Block的位置信息都是保存在Master节点上的. 以上是关于spark 查找persist RDD所需要的完整步骤, 可是没能覆盖整个spark存储机制, 接下来要分析一些其他的代码.
由于Block所有信息都存放在master上, 所有单单徐闻Block位置达不到和executor交互的目的, 我们分析一下RDD.unpersistRDD, 他调用sparkContext.unpersistRDD, 再继续掉用env.blockManager.master.removeRdd:
1 def removeBlock(blockId: BlockId) {2 askDriverWithReply(RemoveBlock(blockId))3 }4 5 private def askDriverWithReply[T](message: Any): T = {6 AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS,7 timeout)8 }
跟上面的例子一样的, 发送RemoveBlock的消息到BlockManagerMasterActor.
1 override def receiveWithLogging = { 2 case RemoveBlock(blockId) => 3 removeBlockFromWorkers(blockId) 4 sender ! true 5 case ... => 6 } 7 8 private def removeBlockFromWorkers(blockId: BlockId) { 9 val locations = blockLocations.get(blockId)10 if (locations != null) {11 locations.foreach { blockManagerId: BlockManagerId =>12 val blockManager = blockManagerInfo.get(blockManagerId)13 if (blockManager.isDefined) {14 // Remove the block from the slave‘s BlockManager.15 // Doesn‘t actually wait for a confirmation and the message might get lost.16 // If message loss becomes frequent, we should add retry logic here.17 blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)18 }19 }20 }21 }
这段代码先通过blockLocations求出block的所有位置的BlockManagerId, 然后通过blockManagerId求出blockManagerInfo从而给出executor上的BlockManagerSlaveActor, 然后发送RemoveBlock的消息.
1 override def receiveWithLogging = {2 case RemoveBlock(blockId) =>3 doAsync[Boolean]("removing block " + blockId, sender) {4 blockManager.removeBlock(blockId)5 true6 }7 case ... =>8 }
BlockManagerSlaveActor收到消息后调用blockManager.removeBlock.
1 def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { 2 logInfo(s"Removing block $blockId") 3 val info = blockInfo.get(blockId).orNull 4 if (info != null) { 5 info.synchronized { 6 // Removals are idempotent in disk store and memory store. At worst, we get a warning. 7 val removedFromMemory = memoryStore.remove(blockId) 8 val removedFromDisk = diskStore.remove(blockId) 9 val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false10 if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {11 logWarning(s"Block $blockId could not be removed as it was not found in either " +12 "the disk, memory, or tachyon store")13 }14 blockInfo.remove(blockId)15 if (tellMaster && info.tellMaster) {16 val status = getCurrentBlockStatus(blockId, info)17 reportBlockStatus(blockId, info, status)18 }19 }20 } else {21 // The block has already been removed; do nothing.22 logWarning(s"Asked to remove block $blockId, which does not exist")23 }24 }
这段代码调用3个Store的remove函数来完成任务, 并按要求反馈结果. 其中在memoryStore中的内存以off-heap方式存储, 不受java GC影响.整个spark的存储管理机制就到这里了.
spark 存储机制详解