Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4028][Streaming] ReceivedBlockHandler interface to abstract the functionality of storage of received data #2940

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}

import WriteAheadLogBasedBlockHandler._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage._
import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogManager}
import org.apache.spark.util.Utils

private[streaming] sealed trait ReceivedBlock
private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock
private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock
private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock


/** Trait that represents a class that handles the storage of blocks received by receiver */
private[streaming] trait ReceivedBlockHandler {

/** Store a received block with the given block id */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the Option[Any] has provoked so much discussion, maybe we should document the return type in this scaladoc (e.g. say that it's arbitrary metadata or something); currently, it's not clear what's being returned.

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any]

/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlock(threshTime: Long)
}

/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks into a block manager with the specified storage level.
*/
private[streaming] class BlockManagerBasedBlockHandler(
blockManager: BlockManager, storageLevel: StorageLevel)
extends ReceivedBlockHandler with Logging {

def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
val putResult: Seq[(BlockId, BlockStatus)] = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
case IteratorBlock(iterator) =>
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
}
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
None
}

def cleanupOldBlock(threshTime: Long) {
// this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing
// of BlockRDDs.
}
}

/**
* Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
* stores the received blocks in both, a write ahead log and a block manager.
*/
private[streaming] class WriteAheadLogBasedBlockHandler(
blockManager: BlockManager,
streamId: Int,
storageLevel: StorageLevel,
conf: SparkConf,
hadoopConf: Configuration,
checkpointDir: String,
clock: Clock = new SystemClock
) extends ReceivedBlockHandler with Logging {

private val blockStoreTimeout = conf.getInt(
"spark.streaming.receiver.blockStoreTimeout", 30).seconds
private val rollingInterval = conf.getInt(
"spark.streaming.receiver.writeAheadLog.rollingInterval", 60)
private val maxFailures = conf.getInt(
"spark.streaming.receiver.writeAheadLog.maxFailures", 3)

// Manages rolling log files
private val logManager = new WriteAheadLogManager(
checkpointDirToLogDir(checkpointDir, streamId),
hadoopConf, rollingInterval, maxFailures,
callerName = this.getClass.getSimpleName,
clock = clock
)

// For processing futures used in parallel block storing into block manager and write ahead log
implicit private val executionContext = ExecutionContext.fromExecutorService(
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned earlier, this might actually end up being a bottle neck. Since you could write using multiple threads in the same receiver - we are basically blocking more than one write from happening at any point in time. Since the BlockManager can handle more writes in parallel, we should probably use a much higher value than 2.

That said, the WAL Writer would still be a bottle neck - since the writes to the WAL have to be synchronized. So I am not entirely sure if having more than 2 threads helps a whole lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a good scope of future optimization here. We can always create multiple WALManagers and write in parallel to multiple WALs. That would improve performance depending on where the bottle neck is.



/**
* This implementation stores the block into the block manager as well as a write ahead log.
* It does this in parallel, using Scala Futures, and returns only after the block has
* been stored in both places.
*/
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): Option[Any] = {
// Serialize the block so that it can be inserted into both
val serializedBlock = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) =>
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
blockManager.dataSerialize(blockId, iterator)
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}

// Store the block in block manager
val storeInBlockManagerFuture = Future {
val putResult =
blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true)
if (!putResult.map { _._1 }.contains(blockId)) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
}

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
logManager.writeToLog(serializedBlock)
}

// Combine the futures, wait for both to complete, and return the write ahead log segment
val combinedFuture = for {
_ <- storeInBlockManagerFuture
fileSegment <- storeInWriteAheadLogFuture
} yield fileSegment
Some(Await.result(combinedFuture, blockStoreTimeout))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics of this if there is a failure... will it throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Await.result forwards the exception. The unit test actually tests that, by trying insert a too-large-block, which causes exception in lines 115-118 (right above).

}

def cleanupOldBlock(threshTime: Long) {
logManager.cleanupOldLogs(threshTime)
}

def stop() {
logManager.stop()
}
}

private[streaming] object WriteAheadLogBasedBlockHandler {
def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ import scala.concurrent.Await

import akka.actor.{Actor, Props}
import akka.pattern.ask

import com.google.common.base.Throwables

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Logging, SparkEnv, SparkException}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want a blank line here to separate the Spark imports from the third-party libraries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Somehow the import sorter is goofing up.

import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler.DeregisterReceiver
import org.apache.spark.streaming.scheduler.AddBlock
import org.apache.spark.streaming.scheduler.RegisterReceiver
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
Expand All @@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver
*/
private[streaming] class ReceiverSupervisorImpl(
receiver: Receiver[_],
env: SparkEnv
env: SparkEnv,
hadoopConf: Configuration,
checkpointDirOption: Option[String]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell mentioned that we don't usually add opt/Option in the variable name, since that is implied from the type. Rename this to checkpointDir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment on that thread, on my preference to have Option in the name. Also spoke to @pwendell offline, and he does not seem to have a strong opinion on this.

) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val blockManager = env.blockManager
private val receivedBlockHandler: ReceivedBlockHandler = {
if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit off topic (and we can deal with this later) - but should we make the checkpoint directory into a sparkConf setting? That way we could do this type of validation earlier on. Right now unfortunately we can't distinguish here whether the user didn't call checkpoint or whether there was just a bug somewhere in Spark code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This is something that requires changes at both spark as well as spark streaming level, and probably further discussed, and hence deferred to the next release deadline.

"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
} else {
new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
}
}

private val storageLevel = receiver.storageLevel

/** Remote Akka actor for the ReceiverTracker */
private val trackerActor = {
Expand Down Expand Up @@ -105,46 +116,50 @@ private[streaming] class ReceiverSupervisorImpl(
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
}

/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption)
}

/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption)
}

/** Report pushed block */
def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) {
val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull)
trackerActor ! AddBlock(blockInfo)
/** Store block and report it to driver */
def pushAndReportBlock(
receivedBlock: ReceivedBlock,
metadataOption: Option[Any],
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
}

val time = System.currentTimeMillis
val persistenceInfoOption = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")

val blockInfo = ReceivedBlockInfo(streamId,
blockId, numRecords, metadataOption.orNull, persistenceInfoOption)
val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)
Await.result(future, askTimeout)
logDebug("Reported block " + blockId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.Time
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.streaming.Time

/**
* :: DeveloperApi ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.streaming.scheduler

import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.collection.mutable.HashSet

import org.apache.spark.streaming.Time

/** Class representing a set of Jobs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.annotation.DeveloperApi

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do the other change this would look a bit different:

private[streaming] case class ReceivedBlockInfo(
   streamId: Int,
   storedBlockInfo: StoredBlockInfo,
   numRecords: Long,
   metadata: Any

streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any,
persistenceInfoOption: Option[Any]
)

Loading