Skip to content

Commit

Permalink
Review fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed May 28, 2018
1 parent 2d35dfa commit 7233e80
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ private[streaming] class ReceivedBlockTracker(
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
streamIds.foreach {
getReceivedBlockQueue(_).clear()
}
streamIds.foreach(getReceivedBlockQueue(_).clear())
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
lastAllocatedBatchTime = batchTime
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class ReceivedBlockTrackerSuite
}

test("block allocation to batch should not loose blocks from received queue") {
val tracker1 = createTracker(createSpyTracker = true)
val tracker1 = spy(createTracker())
tracker1.isWriteAheadLogEnabled should be (true)
tracker1.getUnallocatedBlocks(streamId) shouldEqual Seq.empty

Expand All @@ -131,13 +131,10 @@ class ReceivedBlockTrackerSuite
// The blocks should stay in the received queue when WAL write failing
doThrow(new RuntimeException("Not able to write BatchAllocationEvent"))
.when(tracker1).writeToLog(any(classOf[BatchAllocationEvent]))
try {
val errMsg = intercept[RuntimeException] {
tracker1.allocateBlocksToBatch(1)
assert(false)
} catch {
case _: RuntimeException =>
// Nothing to do here
}
assert(errMsg.getMessage === "Not able to write BatchAllocationEvent")
tracker1.getUnallocatedBlocks(streamId) shouldEqual blockInfos
tracker1.getBlocksOfBatch(1) shouldEqual Map.empty
tracker1.getBlocksOfBatchAndStream(1, streamId) shouldEqual Seq.empty
Expand Down Expand Up @@ -354,16 +351,12 @@ class ReceivedBlockTrackerSuite
* want to control time by manually incrementing it to test log clean.
*/
def createTracker(
createSpyTracker: Boolean = false,
setCheckpointDir: Boolean = true,
recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
var tracker = new ReceivedBlockTracker(
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
if (createSpyTracker) {
tracker = spy(tracker)
}
allReceivedBlockTrackers += tracker
tracker
}
Expand Down

0 comments on commit 7233e80

Please sign in to comment.