Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23991][DSTREAMS] Fix data loss when WAL write fails in allocateBlocksToBatch #21430

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ private[streaming] class ReceivedBlockTracker(
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
(streamId, getReceivedBlockQueue(streamId).clone())
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
streamIds.foreach(getReceivedBlockQueue(_).clear())
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import scala.language.{implicitConversions, postfixOps}
import scala.util.Random

import org.apache.hadoop.conf.Configuration
import org.mockito.Matchers.any
import org.mockito.Mockito.{doThrow, reset, spy}
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
Expand Down Expand Up @@ -115,6 +117,47 @@ class ReceivedBlockTrackerSuite
tracker2.stop()
}

test("block allocation to batch should not loose blocks from received queue") {
val tracker1 = spy(createTracker())
tracker1.isWriteAheadLogEnabled should be (true)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty

// Add blocks
val blockInfos = generateBlockInfos()
blockInfos.map(tracker1.addBlock)
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos

// Try to allocate the blocks to a batch and verify that it's failing
// The blocks should stay in the received queue when WAL write failing
doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
.when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
val errMsg = intercept[RuntimeException] {
tracker1.allocateBlocksToBatch(1)
}
assert(errMsg.getMessage === "Not able to write BatchAllocationEvent")
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty

// Allocate the blocks to a batch and verify that all of them have been allocated
reset(tracker1)
tracker1.allocateBlocksToBatch(2)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker1.hasUnallocatedReceivedBlocks should be (false)
tracker1.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker1.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos

tracker1.stop()

// Recover from WAL to see the correctness
val tracker2 = createTracker(recoverFromWriteAheadLog = true)
tracker2.getUnallocatedBlocks(streamId) shouldEqual Seq.empty
tracker2.hasUnallocatedReceivedBlocks should be (false)
tracker2.getBlocksOfBatch(2) shouldEqual Map(streamId -> blockInfos)
tracker2.getBlocksOfBatchAndStream(2, streamId) shouldEqual blockInfos
tracker2.stop()
}

test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
Expand Down Expand Up @@ -312,7 +355,7 @@ class ReceivedBlockTrackerSuite
recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(
var tracker = new ReceivedBlockTracker(
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
Expand Down