首页 > 代码库 > spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

  无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素。在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段。
ShuffleManager的主要功能是在task直接传递数据,所以getWriter和getReader是它的主要接口。
大流程:
  1)需求方:当一个Stage依赖于一个shuffleMap的结果,那它在DAG分解的时候就能识别到这个依赖,并注册到shuffleManager;
  2)供应方:也就是shuffleMap,它在结束后,会将自己的结果注册到shuffleManager,并通知说自己已经结束了。
  3)这样,shuffleManager就将shuffle两段连接了起来。


spark提供了两个shuffle管理器:
  1)HashShuffleManager: 提供了HashShuffleReader和HashShuffleWriter两个方法。数据的写入是按照k-v对的形式写入的,可以自定义排序和聚合。
* A ShuffleManager using hashing, that creates one output file per reduce partition on each
* mapper (possibly reusing these across waves of tasks).
  2)SortShuffleManager: 数据按顺序写入。保存了一个blockId文件和blockId.index文件,用途不太清楚。

 引用一个别人的图来说明这个关系:
技术分享
(图片来源:http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/ )

shuffle的数据读取的函数为HashShuffleReader,它基本上直接调用了下面的流程:
这是一段写得极为紧密的代码,几乎每一行都带了大量的运算,看得特纠结。。
======================获取block数据的流程============================
->BlockStoreShuffleFetcher.fetch[T](shuffleId: Int,reduceId: Int,...) : Iterator[T] --获取shuffleId,ReduceId对应的数据块
->val blockManager = SparkEnv.get.blockManager  --获取数据块管理器
->val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) 
//--获取shuffleId和reduceId对应的数据‘目录’,statuses是以ManagerId为key的hash表,value是数据的大小
-> val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] --创建解析数据‘目录’的缓冲
->for (((address, size), index) <- statuses.zipWithIndex) --遍历整个,并加了个索引。
->splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) --将‘目录’以地址为索引重新组织
->val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { --‘目录’再次重新组织
//新的‘目录’格式为Seq[(BlockManagerId, Seq[(BlockId, Long(数据长度))])],因为需要BlockId获取数据
->case (address, splits) => (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
->val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics) --获取多块block数据
-> new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer, readMetrics) --使用netty
->iter = BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) --获取blocks的迭代器
->BasicBlockFetcherIterator 初始化
->protected val localBlocksToFetch = new ArrayBuffer[BlockId]()  --本地获取的blockId
->protected val remoteBlocksToFetch = new HashSet[BlockId]()  --远程获取的blockId
->protected val results = new LinkedBlockingQueue[FetchResult] --获取的结果放在这里
->protected val fetchRequests = new Queue[FetchRequest]  --需要发送出去的请求,主要是为了控制获取的速度
->iter.initialize()  --初始化这个迭代器,它会启动获取数据
->val remoteRequests = splitLocalRemoteBlocks() --将传入的block请求转换按块划分的请求。控制并发度。
->val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)  --允许最多五个节点同时获取数据
// Split local and remote blocks. Remote blocks are further split into FetchRequests of size
->val remoteRequests = new ArrayBuffer[FetchRequest]  --请求块数组
->for ((address, blockInfos) <- blocksByAddress) { 遍历所有请求的blockId,以一个地址为单位
->if (address == blockManagerId) --本地获取
->localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)  本地允许所有同时获取,不控制并发
->else --远端机器
->val iterator = blockInfos.iterator --获得每个地址对应的blockId的迭代器
->while (iterator.hasNext) { --遍历一个地址的所有blockId
->val (blockId, size) = iterator.next()
->curBlocks += ((blockId, size)) --全部写到一个请求块里面
->if (curRequestSize >= targetRequestSize)  --如果一个请求块获取的数据太大了
->remoteRequests += new FetchRequest(address, curBlocks) --那么新建一个请求块
->curBlocks = new ArrayBuffer[(BlockId, Long)]  --创建新的请求块
->remoteRequests += new FetchRequest(address, curBlocks  )  将请求块封装到一个请求消息里面
->return remoteRequests 将所有的请求消息返回
->fetchRequests ++= Utils.randomize(remoteRequests) --// Add the remote requests into our queue in a random order随机打散
->while (!fetchRequests.isEmpty &&(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight))
->sendRequest(fetchRequests.dequeue())  --发送请求数据的消息
->val cmId = new ConnectionManagerId(req.address.host, req.address.port)--连接一个地址的信息封装
->blockMessageArray = new BlockMessageArray(req.blocks.map {  遍历这个地址的所有的blockId
->case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))  -- 将请求信息再封装一次!!
->future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) --异步执行
->future.onComplete { --future完成后回调
->case Success(message) => 成功获取数据
->val bufferMessage = message.asInstanceOf[BufferMessage] --获取到了数据块
->val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) --格式转换
->for (blockMessage <- blockMessageArray) --遍历所有获取到的数据块,写到results中
->results.put(new FetchResult(blockId, sizeMap(blockId), () => dataDeserialize(...)))
->logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
->getLocalBlocks()  --获取本地数据
->for (id <- localBlocksToFetch) 
->val iter = getLocalFromDisk(id, serializer).get --从本地磁盘获取
->results.put(new FetchResult(id, 0, () => iter)) --写入results中
->val itr = blockFetcherItr.flatMap(unpackBlock) //处理可能有失败的block的情况
->val completionIter = CompletionIterator[T, Iterator[T]](itr, { context.taskMetrics.updateShuffleReadMetrics()})
->new InterruptibleIterator[T](context, completionIter)  将获取的结果以迭代器的形式返回给上层。

根据shuffleId、reduceId获取结果数据的状态的函数包含了一个缓存功能,稍微复杂,独立拉出来
->MapOutputTracker::getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)]
->val statuses = mapStatuses.get(shuffleId).orNull
->if (statuses == null) --缓存没找到,到远端节点获取,在写入缓存
->if (fetching.contains(shuffleId))
->while (fetching.contains(shuffleId))
->fetching.wait()
->fetching += shuffleId
->val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]
->val future = trackerActor.ask(message)(timeout)
->Await.result(future, timeout)
->fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)
->mapStatuses.put(shuffleId, fetchedStatuses)
->fetching -= shuffleId
->fetching.notifyAll()
->result = MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
->statuses.map {status =>
->(status.location, decompressSize(status.compressedSizes(reduceId)))
->decompressSize(compressedSize: Byte)  --注意这个数据长度的编码很有意思
->math.pow(LOG_BASE, compressedSize & 0xFF).toLong, --LOG_BASE=1.1
->return result


shuffle接口类:
/**
* Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on both the
* driver and executors, based on the spark.shuffle.manager setting. The driver registers shuffles
* with it, and executors (or tasks running locally in the driver) can ask to read and write data.
*
* NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
* boolean isDriver as parameters.
*/
private[spark] trait ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
def registerShuffle[K, V, C](
shuffleId:
Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle

/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle
,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]

/** Remove a shuffle‘s metadata from the ShuffleManager. */
def unregisterShuffle(shuffleId: Int)
/** Shut down this ShuffleManager. */
def stop(): Unit
}


/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don‘t need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)

_rdd.sparkContext.
cleaner.foreach(_.registerShuffleForCleanup(this))
}

/**
* A basic ShuffleHandle implementation that just captures registerShuffle‘s parameters.
*/
private[spark] class BaseShuffleHandle[K, V, C](
shuffleId: Int,
val numMaps: Int,
val dependency: ShuffleDependency[K, V, C])
extends ShuffleHandle(shuffleId)


/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and worker) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
一个shuffleTask的结果会以MapStatus的形式返回给调度器,包括map执行的机器的BlockManager的地址以及输出结果的大小。注意,这个size是经过压缩后的大小
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
extends Externalizable {

private[spark] class BlockMessage() {
// Un-initialized: typ = 0
// GetBlock: typ = 1
// GotBlock: typ = 2
// PutBlock: typ = 3
private var typ: Int = BlockMessage.TYPE_NON_INITIALIZED
private var id: BlockId = null
private var data: ByteBuffer = null
private var level: StorageLevel = null
hash和sort的shuffleManager的reader都是用了这个HashShuffleReader,BlockStoreShuffleFetcher.fetch做了大部分工作。
private[spark] class HashShuffleReader[K, C](
handle: BaseShuffleHandle[
K, _, C],
startPartition: Int,
endPartition: Int,
context: TaskContext)
extends ShuffleReader[K, C]
{
require(endPartition == startPartition + 1,
"Hash shuffle currently only supports fetching one partition")

private val dep = handle.dependency

/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
val ser = Serializer.getSerializer(dep.serializer)
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser,
readMetrics)
    --下面这段是获取聚合器,它可以配置指定是map阶段聚合还是reduce阶段聚合。
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
}
else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
}
else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
else {
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}

// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) => --是否有自定义的排序算法
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won‘t spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
context.taskMetrics.
memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.
diskBytesSpilled += sorter.diskBytesSpilled
sorter.iterator
case None =>
aggregatedIter
}
}

/** Close this reader */
override def stop(): Unit = ???
}
HashShuffleWriter: 这个只是对父类的writer做了每次写入一个k-v的封装,比较简单
private[spark] class HashShuffleWriter[K, V](
handle: BaseShuffleHandle[
K, V, _],
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {

private val blockManager = SparkEnv.get.blockManager
private val shuffleBlockManager = blockManager.shuffleBlockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)

/** Write a bunch of records to this task‘s output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
}
else {
records
}
}
else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
else {
records
}

for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
shuffle.writers(bucketId).write(elem)
}
}

SortShuffleWriter会把数据按顺序写入,并且保持存blockId文件和blockId.index文件。
private[spark] class SortShuffleWriter[K, V, C](
handle: BaseShuffleHandle[
K, V, C],
mapId: Int,
context: TaskContext)
extends ShuffleWriter[K, V] with Logging {

private val dep = handle.dependency
private val numPartitions = dep.partitioner.numPartitions

private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.orNull)

private val conf = SparkEnv.get.conf
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
private var indexFile: File = null

// Are we in the process of stopping? Because map tasks can call stop() with success = true
// and then call stop() with success = false if they get an exception, we want to make sure
// we don‘t try deleting files, etc twice.
private var stopping = false

private var mapStatus: MapStatus = null

private val writeMetrics = new ShuffleWriteMetrics()
context.taskMetrics.
shuffleWriteMetrics = Some(writeMetrics)

/** Write a bunch of records to this task‘s output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
}
else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don‘t
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None
, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}

// Create a single shuffle file with reduce ID 0 that we‘ll write all results to. We‘ll later
// serve different ranges of this file using an index file that we create at the end.

val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0)

outputFile = blockManager.diskBlockManager.getFile(blockId)
indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index")

val partitionLengths = sorter.writePartitionedFile(blockId, context)

// Register our map output with the ShuffleBlockManager, which handles cleaning it over time
blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions)

mapStatus = new MapStatus(blockManager.blockManagerId,
partitionLengths.map(MapOutputTracker.compressSize))
}

  有关shuffle的细节,甚至是原理,都理解的不够深入,还有很多的需要学习。
















来自为知笔记(Wiz)


spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁