Skip to content

Commit

Permalink
[SPARK-6627] Some clean-up in shuffle code.
Browse files Browse the repository at this point in the history
Before diving into review #4450 I did a look through the existing shuffle
code to learn how it works. Unfortunately, there are some very
confusing things in this code. This patch makes a few small changes
to simplify things. It is not easily to concisely describe the changes
because of how convoluted the issues were, but they are fairly small
logically:

1. There is a trait named `ShuffleBlockManager` that only deals with
   one logical function which is retrieving shuffle block data given shuffle
   block coordinates. This trait has two implementors FileShuffleBlockManager
   and IndexShuffleBlockManager. Confusingly the vast majority of those
   implementations have nothing to do with this particular functionality.
   So I've renamed the trait to ShuffleBlockResolver and documented it.
2. The aforementioned trait had two almost identical methods, for no good
   reason. I removed one method (getBytes) and modified callers to use the
   other one. I think the behavior is preserved in all cases.
3. The sort shuffle code uses an identifier "0" in the reduce slot of a
   BlockID as a placeholder. I made it into a constant since it needs to
   be consistent across multiple places.

I think for (3) there is actually a better solution that would avoid the
need to do this type of workaround/hack in the first place, but it's more
complex so I'm punting it for now.

Author: Patrick Wendell <[email protected]>

Closes #5286 from pwendell/cleanup and squashes the following commits:

c71fbc7 [Patrick Wendell] Open interface back up for testing
f36edd5 [Patrick Wendell] Code review feedback
d1c0494 [Patrick Wendell] Style fix
a406079 [Patrick Wendell] [HOTFIX] Some clean-up in shuffle code.
  • Loading branch information
pwendell committed Apr 2, 2015
1 parent 40df5d4 commit 6562787
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[spark] trait ShuffleWriterGroup {
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
extends ShuffleBlockManager with Logging {
extends ShuffleBlockResolver with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)

Expand Down Expand Up @@ -175,11 +175,6 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}

override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
val segment = getBlockData(blockId)
Some(segment.nioByteBuffer())
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
if (consolidateShuffleFiles) {
// Search all file groups associated with this shuffle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._

import IndexShuffleBlockManager.NOOP_REDUCE_ID

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
* Data of shuffle blocks from the same map task are stored in a single consolidated data file.
Expand All @@ -39,25 +41,18 @@ import org.apache.spark.storage._
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {

private lazy val blockManager = SparkEnv.get.blockManager

private val transportConf = SparkTransportConf.fromSparkConf(conf)

/**
* Mapping to a single shuffleBlockId with reduce ID 0.
* */
def consolidateId(shuffleId: Int, mapId: Int): ShuffleBlockId = {
ShuffleBlockId(shuffleId, mapId, 0)
}

def getDataFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 0))
blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, 0))
blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

/**
Expand Down Expand Up @@ -97,10 +92,6 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
}
}

override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
Some(getBlockData(blockId).nioByteBuffer())
}

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
// The block is actually going to be a range of a single map output file for this map, so
// find out the consolidated file, then the offset within that from our index
Expand All @@ -123,3 +114,11 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {

override def stop(): Unit = {}
}

private[spark] object IndexShuffleBlockManager {
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
// TODO: Avoid this entirely by having the DiskBlockObjectWriter not require a BlockId.
val NOOP_REDUCE_ID = 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,19 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.storage.ShuffleBlockId

private[spark]
trait ShuffleBlockManager {
/**
* Implementers of this trait understand how to retrieve block data for a logical shuffle block
* identifier (i.e. map, reduce, and shuffle). Implementations may use files or file segments to
* encapsulate shuffle data. This is used by the BlockStore to abstract over different shuffle
* implementations when shuffle data is retrieved.
*/
trait ShuffleBlockResolver {
type ShuffleId = Int

/**
* Get shuffle block data managed by the local ShuffleBlockManager.
* @return Some(ByteBuffer) if block found, otherwise None.
* Retrieve the data for the specified block. If the data for that block is not available,
* throws an unspecified exception.
*/
def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer]

def getBlockData(blockId: ShuffleBlockId): ManagedBuffer

def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ private[spark] trait ShuffleManager {
*/
def unregisterShuffle(shuffleId: Int): Boolean

def shuffleBlockManager: ShuffleBlockManager
/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
*/
def shuffleBlockResolver: ShuffleBlockResolver

/** Shut down this ShuffleManager. */
def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.scheduler.MapStatus
* Obtained inside a map task to write out records to the shuffle system.
*/
private[spark] trait ShuffleWriter[K, V] {
/** Write a bunch of records to this task's output */
/** Write a sequence of records to this task's output */
def write(records: Iterator[_ <: Product2[K, V]]): Unit

/** Close this writer, passing along whether the map completed */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,20 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
override def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext)
: ShuffleWriter[K, V] = {
new HashShuffleWriter(
shuffleBlockManager, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
shuffleBlockResolver, handle.asInstanceOf[BaseShuffleHandle[K, V, _]], mapId, context)
}

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
shuffleBlockManager.removeShuffle(shuffleId)
shuffleBlockResolver.removeShuffle(shuffleId)
}

override def shuffleBlockManager: FileShuffleBlockManager = {
override def shuffleBlockResolver: FileShuffleBlockManager = {
fileShuffleBlockManager
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockManager.stop()
shuffleBlockResolver.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,27 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
val baseShuffleHandle = handle.asInstanceOf[BaseShuffleHandle[K, V, _]]
shuffleMapNumber.putIfAbsent(baseShuffleHandle.shuffleId, baseShuffleHandle.numMaps)
new SortShuffleWriter(
shuffleBlockManager, baseShuffleHandle, mapId, context)
shuffleBlockResolver, baseShuffleHandle, mapId, context)
}

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
if (shuffleMapNumber.containsKey(shuffleId)) {
val numMaps = shuffleMapNumber.remove(shuffleId)
(0 until numMaps).map{ mapId =>
shuffleBlockManager.removeDataByMap(shuffleId, mapId)
shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
}
}
true
}

override def shuffleBlockManager: IndexShuffleBlockManager = {
override def shuffleBlockResolver: IndexShuffleBlockManager = {
indexShuffleBlockManager
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {
shuffleBlockManager.stop()
shuffleBlockResolver.stop()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,15 @@ private[spark] class SortShuffleWriter[K, V, C](
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)
sorter.insertAll(records)
}

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

Expand Down Expand Up @@ -100,3 +99,4 @@ private[spark] class SortShuffleWriter[K, V, C](
}
}
}

14 changes: 5 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private[spark] class BlockManager(
*/
override def getBlockData(blockId: BlockId): ManagedBuffer = {
if (blockId.isShuffle) {
shuffleManager.shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
} else {
val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
.asInstanceOf[Option[ByteBuffer]]
Expand Down Expand Up @@ -439,14 +439,10 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockManager = shuffleManager.shuffleBlockManager
shuffleBlockManager.getBytes(blockId.asInstanceOf[ShuffleBlockId]) match {
case Some(bytes) =>
Some(bytes)
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
val shuffleBlockManager = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,8 @@ private[spark] class ExternalSorter[K, V, C](
}

/**
* Exposed for testing purposes.
*
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
Expand All @@ -673,7 +675,7 @@ private[spark] class ExternalSorter[K, V, C](
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
*/
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val usingMap = aggregator.isDefined
val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer
if (spills.isEmpty && partitionWriters == null) {
Expand Down Expand Up @@ -781,7 +783,7 @@ private[spark] class ExternalSorter[K, V, C](
/**
* Read a partition file back as an iterator (used in our iterator method)
*/
def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
private def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = {
if (writer.isOpen) {
writer.commitAndClose()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test", conf)

val shuffleBlockManager =
SparkEnv.get.shuffleManager.shuffleBlockManager.asInstanceOf[FileShuffleBlockManager]
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]

val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object StoragePerfTester {
val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]

def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = hashShuffleManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits,
val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
val writers = shuffle.writers
for (i <- 1 to recordsPerMap) {
Expand Down

0 comments on commit 6562787

Please sign in to comment.