Skip to content

Commit

Permalink
Updated code to use ReceivedBlockStoreResult as the return type for h…
Browse files Browse the repository at this point in the history
…andler's storeBlock
  • Loading branch information
tdas committed Oct 30, 2014
1 parent 33c30c9 commit df5f320
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver}
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.SparkException

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand Down Expand Up @@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] }
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
Some(new BlockRDD[T](ssc.sc, Array.empty))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

/** Trait representing a received block */
private[streaming] sealed trait ReceivedBlock

/** class representing a block received as an ArrayBuffer */
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock

/** class representing a block received as an Iterator */
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock

/** class representing a block received as an ByteBuffer */
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
Expand All @@ -30,25 +27,36 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogManager}
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, Clock, SystemClock, WriteAheadLogManager}
import org.apache.spark.util.Utils

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock


/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
def blockId: StreamBlockId // Any implementation of this trait will store a block id
}

/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {

/** Store a received block with the given block id */
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any]
/** Store a received block with the given block id and return related metadata */
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlock(threshTime: Long)
}


/**
* Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
*/
private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
extends ReceivedBlockStoreResult


/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
Expand All @@ -57,8 +65,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
val putResult: Seq[(BlockId, BlockStatus)] = receivedBlock match {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
case IteratorBlock(iterator) =>
Expand All @@ -73,7 +81,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
None
BlockManagerBasedStoreResult(blockId)
}

def cleanupOldBlock(threshTime: Long) {
Expand All @@ -82,6 +90,18 @@ private[streaming] class BlockManagerBasedBlockHandler(
}
}


/**
* Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]]
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler]]
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
segment: WriteAheadLogFileSegment
) extends ReceivedBlockStoreResult


/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
Expand Down Expand Up @@ -112,18 +132,19 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
)

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))


/**
* This implementation stores the block into the block manager as well as a write ahead log.
* It does this in parallel, using Scala Futures, and returns only after the block has
* been stored in both places.
*/
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {

// Serialize the block so that it can be inserted into both
val serializedBlock = receivedBlock match {
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
Expand Down Expand Up @@ -154,7 +175,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
_ <- storeInBlockManagerFuture
fileSegment <- storeInWriteAheadLogFuture
} yield fileSegment
Some(Await.result(combinedFuture, blockStoreTimeout))
val segment = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, segment)
}

def cleanupOldBlock(threshTime: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,13 @@ private[streaming] class ReceiverSupervisorImpl(
}

val time = System.currentTimeMillis
val persistenceInfoOption = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, metadataOption.orNull, persistenceInfoOption)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
logDebug(s"Reported block $blockId")
}

/** Report error to the receiver tracker */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any,
persistenceInfoOption: Option[Any]
blockStoreResult: ReceivedBlockStoreResult
)

Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
def addBlocks(receivedBlockInfo: ReceivedBlockInfo) {
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " +
receivedBlockInfo.blockId)
receivedBlockInfo.blockStoreResult.blockId)
}

/** Report error sent by a receiver */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
}.toList
storedData shouldEqual data

// Verify the store results were None
storeResults.foreach { _ shouldEqual None }
// Verify that the store results are instances of BlockManagerBasedStoreResult
assert(
storeResults.forall { _.isInstanceOf[BlockManagerBasedStoreResult] },
"Unexpected store result type"
)
}
}
}
Expand All @@ -128,9 +131,13 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
}.toList
storedData shouldEqual data

// Verify that the store results are instances of WriteAheadLogBasedStoreResult
assert(
storeResults.forall { _.isInstanceOf[WriteAheadLogBasedStoreResult] },
"Unexpected store result type"
)
// Verify the data in write ahead log files is correct
storeResults.foreach { s => assert(s.nonEmpty) }
val fileSegments = storeResults.map { _.asInstanceOf[Some[WriteAheadLogFileSegment]].get }
val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
val loggedData = fileSegments.flatMap { segment =>
val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf)
val bytes = reader.read(segment)
Expand Down Expand Up @@ -172,13 +179,17 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
* using the given verification function
*/
private def testBlockStoring(receivedBlockHandler: ReceivedBlockHandler)
(verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[Option[Any]]) => Unit) {
(verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) {
val data = Seq.tabulate(100) { _.toString }

def storeAndVerify(blocks: Seq[ReceivedBlock]) {
blocks should not be empty
val (blockIds, storeResults) = storeBlocks(receivedBlockHandler, blocks)
withClue(s"Testing with ${blocks.head.getClass.getSimpleName}s:") {
// Verify returns store results have correct block ids
(storeResults.map { _.blockId }) shouldEqual blockIds

// Call handler-specific verification function
verifyFunc(data, blockIds, storeResults)
}
}
Expand Down Expand Up @@ -227,7 +238,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
private def storeBlocks(
receivedBlockHandler: ReceivedBlockHandler,
blocks: Seq[ReceivedBlock]
): (Seq[StreamBlockId], Seq[Option[Any]]) = {
): (Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) = {
val blockIds = Seq.fill(blocks.size)(generateBlockId())
val storeResults = blocks.zip(blockIds).map {
case (block, id) =>
Expand Down

0 comments on commit df5f320

Please sign in to comment.