Skip to content


Updates based on PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 28, 2014
1 parent 18aec1e commit 2f025b3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, Logging, SparkConf}
import{BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogManager}

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager}
import org.apache.spark.util.Utils
import WriteAheadLogBasedBlockHandler._

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
Expand All @@ -25,7 +26,7 @@ private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends Re
private[streaming] trait ReceivedBlockHandler {

/** Store a received block with the given block id */
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any]

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlock(threshTime: Long)
Expand All @@ -39,16 +40,17 @@ private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
val putResult = receivedBlock match {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
val putResult: Seq[(BlockId, BlockStatus)] = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
case IteratorBlock(iterator) =>
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case _ =>
throw new SparkException(s"Could not store $blockId to block manager, unexpected block type")
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
if (! { _._1 }.contains(blockId)) {
throw new SparkException(
Expand Down Expand Up @@ -84,19 +86,25 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
callerName = "WriteAheadLogBasedBlockHandler",
callerName = this.getClass.getSimpleName,
clock = clock

// For processing futures used in parallel block storing into block manager and write ahead log
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, "WriteAheadLogBasedBlockHandler"))

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

* This implementation stores the block into the block manager as well as a write ahead log.
* It does this in parallel, using Scala Futures, and returns only after the block has
* been stored in both places.
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
// Serialize the block so that it can be inserted into both
val serializedBlock = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
Expand All @@ -109,22 +117,26 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")

val pushToBlockManagerFuture = Future {
// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
if (! { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
val pushToLogFuture = Future {

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {

// Combine the futures, wait for both to complete, and return the write ahead log segment
val combinedFuture = for {
_ <- pushToBlockManagerFuture
fileSegment <- pushToLogFuture
_ <- storeInBlockManagerFuture
fileSegment <- storeInWriteAheadLogFuture
} yield fileSegment

Some(Await.result(combinedFuture, blockStoreTimeout))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,51 +116,48 @@ private[streaming] class ReceiverSupervisorImpl(
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), optionalMetadata, optionalBlockId)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)

/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
pushAndReportBlock(IteratorBlock(iterator), optionalMetadata, optionalBlockId)
pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)

/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
pushAndReportBlock(ByteBufferBlock(bytes), optionalMetadata, optionalBlockId)
pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1

val time = System.currentTimeMillis
val fileSegmentOption = receivedBlockHandler.storeBlock(blockId, receivedBlock) match {
case Some(f: WriteAheadLogFileSegment) => Some(f)
case _ => None
val persistenceInfoOption = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")

val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, optionalMetadata.orNull, fileSegmentOption)
blockId, numRecords, metadataOption.orNull, persistenceInfoOption)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ private[streaming] case class ReceivedBlockInfo(
blockId: StreamBlockId,
numRecords: Long,
metadata: Any,
fileSegmentOption: Option[WriteAheadLogFileSegment]
persistenceInfoOption: Option[Any]


0 comments on commit 2f025b3

Please sign in to comment.