首页 > 代码库 > spark 存储机制详解

spark 存储机制详解

  我们知道spark可以将运行过的RDD存储到内存上, 并在需要的时候重复利用. 那么spark是怎么完成这些工作的, 本文将通过分析源码来解释RDD的重复利用过程.

  在上一篇文章解释了spark的执行机制, DAGScheduler负责分解action, 在DAGScheduler.getMissingParentStages中, spark首次利用了过去的RDD, 而所使用的函数就是DAGScheduler.getCacheLocs.

 1   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] 2    3   private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = { 4     if (!cacheLocs.contains(rdd.id)) { 5       val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] 6       val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) 7       cacheLocs(rdd.id) = blockIds.map { id => 8         locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) 9       }10     }11     cacheLocs(rdd.id)12   }

  DAGScheduler只在cacheLocs存储部分partiton的位置信息. 我们来看看未cache的执行逻辑, 首先生成代表每个partition的blocksIds, 然后调用BlockManager.blockIdsToBlockManagers把blocksId转换成Seq[blockManagerId], 而blockManagersId包含了partition的的位置信息(每个partition按一个block存放, block也可以存放broadcast等数据).

  根据注释, 每个节点上(包括master 和 worker)都运行了BlockManager来管理所有的存储信息(包括RDD和broadcast等等),  master与worker通过Akka Actor系统(可以看我的另外一篇文章来入门)交流, 即BlockManagerMasterActor 与 BlockManagerSlaveActor. 继续看BlockManager.blockIdsToBlockManagers.

 1   def blockIdsToBlockManagers( 2       blockIds: Array[BlockId], 3       env: SparkEnv, 4       blockManagerMaster: BlockManagerMaster = null): Map[BlockId, Seq[BlockManagerId]] = { 5  6     // blockManagerMaster != null is used in tests 7     assert(env != null || blockManagerMaster != null) 8     val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) { 9       env.blockManager.getLocationBlockIds(blockIds)10     } else {11       blockManagerMaster.getLocations(blockIds)12     }13 14     val blockManagers = new HashMap[BlockId, Seq[BlockManagerId]]15     for (i <- 0 until blockIds.length) {16       blockManagers(blockIds(i)) = blockLocations(i)17     }18     blockManagers.toMap19   }

  blockManager实在SparkEnv中被创建的, SparkEnv同样运行在所有节点上, 并在创建时区分为DriverEnv, 和 executorEnv(同一个类, 但是元素不一样, 和blockManager一样), 在创建sparkEnv时, 会为driver上的blockManager创建一个blockManagerMasterActor, 为executor上的blockManager给一个blockManagerMasterActor的Ref. 上面代码使用sparkEnv.blockManager.blockManagerMaster.getLocations来求出各个blockId的BlockManagerId, 并组织成Map的形式返回.接下来来到blockManager.getLocations.

1   def getLocations(blockId: BlockId): Seq[BlockManagerId] = {2     askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId))3   }4   private def askDriverWithReply[T](message: Any): T = {5     AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS,6       timeout)7   }

  这段代码就是简单的将GetLocations的message发送给BlockManagerMasterActor, 并等待回复. BlockManagerMasterActor保存了所有关于存储的信息, blockManagerInfo有所有executor的存储信息, blockManagerIdByExecutor从executor到executor上的blockManagerId的映射, blockLocations保存了所有的block的所有存储位置(包含所有的partition的位置), 一下是blockManagerMasterActor的关于查询存储的位置:

1   override def receiveWithLogging = {2     case GetLocations(blockId) =>3       sender ! getLocations(blockId)4     case ... =>5   }6 7   private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {8     if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty9   }

  由于BlockManagerMasterActor保存了所有Block的位置, 所以只是简单的给出答案. 到现在可以看出所有Block的位置信息都是保存在Master节点上的. 以上是关于spark 查找persist RDD所需要的完整步骤, 可是没能覆盖整个spark存储机制, 接下来要分析一些其他的代码.

  由于Block所有信息都存放在master上, 所有单单徐闻Block位置达不到和executor交互的目的, 我们分析一下RDD.unpersistRDD, 他调用sparkContext.unpersistRDD, 再继续掉用env.blockManager.master.removeRdd:

1   def removeBlock(blockId: BlockId) {2     askDriverWithReply(RemoveBlock(blockId))3   }4 5   private def askDriverWithReply[T](message: Any): T = {6     AkkaUtils.askWithReply(message, driverActor, AKKA_RETRY_ATTEMPTS, AKKA_RETRY_INTERVAL_MS,7       timeout)8   }

  跟上面的例子一样的, 发送RemoveBlock的消息到BlockManagerMasterActor.

 1   override def receiveWithLogging = { 2     case RemoveBlock(blockId) => 3       removeBlockFromWorkers(blockId) 4       sender ! true 5     case ... => 6   } 7  8   private def removeBlockFromWorkers(blockId: BlockId) { 9     val locations = blockLocations.get(blockId)10     if (locations != null) {11       locations.foreach { blockManagerId: BlockManagerId =>12         val blockManager = blockManagerInfo.get(blockManagerId)13         if (blockManager.isDefined) {14           // Remove the block from the slave‘s BlockManager.15           // Doesn‘t actually wait for a confirmation and the message might get lost.16           // If message loss becomes frequent, we should add retry logic here.17           blockManager.get.slaveActor.ask(RemoveBlock(blockId))(akkaTimeout)18         }19       }20     }21   }

  这段代码先通过blockLocations求出block的所有位置的BlockManagerId, 然后通过blockManagerId求出blockManagerInfo从而给出executor上的BlockManagerSlaveActor, 然后发送RemoveBlock的消息.

1   override def receiveWithLogging = {2     case RemoveBlock(blockId) =>3       doAsync[Boolean]("removing block " + blockId, sender) {4         blockManager.removeBlock(blockId)5         true6       }7     case ... =>8   }

  BlockManagerSlaveActor收到消息后调用blockManager.removeBlock.

 1   def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { 2     logInfo(s"Removing block $blockId") 3     val info = blockInfo.get(blockId).orNull 4     if (info != null) { 5       info.synchronized { 6         // Removals are idempotent in disk store and memory store. At worst, we get a warning. 7         val removedFromMemory = memoryStore.remove(blockId) 8         val removedFromDisk = diskStore.remove(blockId) 9         val removedFromTachyon = if (tachyonInitialized) tachyonStore.remove(blockId) else false10         if (!removedFromMemory && !removedFromDisk && !removedFromTachyon) {11           logWarning(s"Block $blockId could not be removed as it was not found in either " +12             "the disk, memory, or tachyon store")13         }14         blockInfo.remove(blockId)15         if (tellMaster && info.tellMaster) {16           val status = getCurrentBlockStatus(blockId, info)17           reportBlockStatus(blockId, info, status)18         }19       }20     } else {21       // The block has already been removed; do nothing.22       logWarning(s"Asked to remove block $blockId, which does not exist")23     }24   }

   这段代码调用3个Store的remove函数来完成任务, 并按要求反馈结果. 其中在memoryStore中的内存以off-heap方式存储, 不受java GC影响.整个spark的存储管理机制就到这里了.

 

spark 存储机制详解