Skip to content

Commit

Permalink
Added ReceivedBlockHandler and its associated tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 25, 2014
1 parent 3a845d3 commit 95a4987
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.receiver.ReceivedBlockInfo

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, Logging, SparkConf}
import org.apache.spark.storage.{BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogManager}
import org.apache.spark.util.Utils

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock


/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {

/** Store a received block with the given block id */
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef]

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlock(threshTime: Long)
}

/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {
val putResult = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
case IteratorBlock(iterator) =>
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case _ =>
throw new SparkException(s"Could not store $blockId to block manager, unexpected block type")
}
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
None
}

def cleanupOldBlock(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}

/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
blockManager: BlockManager,
streamId: Int,
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
checkpointDir: String,
clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rollingInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
callerName = "WriteAheadLogBasedBlockHandler",
clock = clock
)

// For processing futures used in parallel block storing into block manager and write ahead log
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(1, "WriteAheadLogBasedBlockHandler"))

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[AnyRef] = {

// Serialize the block so that it can be inserted into both
val serializedBlock = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
blockManager.dataSerialize(blockId, iterator)
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}

val pushToBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
}
val pushToLogFuture = Future {
logManager.writeToLog(serializedBlock)
}
val combinedFuture = for {
_ <- pushToBlockManagerFuture
fileSegment <- pushToLogFuture
} yield fileSegment

Some(Await.result(combinedFuture, blockStoreTimeout))
}

def cleanupOldBlock(threshTime: Long) {
logManager.cleanupOldLogs(threshTime)
}

def stop() {
logManager.stop()
}
}

private[streaming] object WriteAheadLogBasedBlockHandler {
def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.streaming.receiver

import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.WriteAheadLogFileSegment

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any,
fileSegmentOption: Option[WriteAheadLogFileSegment]
)

Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import scala.concurrent.Await

import akka.actor.{Actor, Props}
import akka.pattern.ask

import com.google.common.base.Throwables

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import org.apache.spark.streaming.scheduler.RegisterReceiver
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
*/
private[streaming] class ReceiverSupervisorImpl(
receiver: Receiver[_],
env: SparkEnv
env: SparkEnv,
hadoopConf: Configuration,
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val blockManager = env.blockManager
private val receivedBlockHandler: ReceivedBlockHandler = {
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}

private val storageLevel = receiver.storageLevel

/** Remote Akka actor for the ReceiverTracker */
private val trackerActor = {
Expand Down Expand Up @@ -108,11 +119,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), optionalMetadata, optionalBlockId)
}

/** Store a iterator of received data as a data block into Spark's memory. */
Expand All @@ -121,11 +128,7 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
pushAndReportBlock(IteratorBlock(iterator), optionalMetadata, optionalBlockId)
}

/** Store the bytes of received data as a data block into Spark's memory. */
Expand All @@ -134,17 +137,32 @@ private[streaming] class ReceiverSupervisorImpl(
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
pushAndReportBlock(ByteBufferBlock(bytes), optionalMetadata, optionalBlockId)
}

/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
}

val time = System.currentTimeMillis
blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
}
val fileSegmentOption = receivedBlockHandler.storeBlock(blockId, receivedBlock) match {
case Some(f: WriteAheadLogFileSegment) => Some(f)
case _ => None
}
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")

/** Report pushed block */
def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
trackerActor ! AddBlock(blockInfo)
val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, optionalMetadata.orNull, fileSegmentOption)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.Time
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.receiver.ReceivedBlockInfo

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{ArrayBuffer, HashSet}
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.receiver.ReceivedBlockInfo

/** Class representing a set of Jobs
* belong to the same batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue}
import scala.language.existentials

import akka.actor._
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.{SerializableWritable, Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.streaming.receiver.{ReceivedBlockInfo, Receiver, ReceiverSupervisorImpl, StopReceiver}
import org.apache.spark.util.AkkaUtils

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any
)

/**
* Messages used by the NetworkReceiver and the ReceiverTracker to communicate
* with each other.
Expand Down Expand Up @@ -252,16 +243,20 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
ssc.sc.makeRDD(receivers, receivers.size)
}

val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)

// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,3 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura
HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.")
}
}

Loading

0 comments on commit 95a4987

Please sign in to comment.