Skip to content

Commit

Permalink
Add test cases to BlockManagerSuite for SPARK-2583
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jul 24, 2014
1 parent e579302 commit 22d7ebd
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers {
bufferMessage.buffers.foreach{ b =>
buffer.put(b)
}
buffer.flip
buffer.flip()
arrayBuffer += buffer

val someMessage = Some(Message.createBufferMessage(arrayBuffer))
Expand Down
112 changes: 111 additions & 1 deletion core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import akka.actor._
import org.apache.spark.SparkConf
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
import org.mockito.Mockito.{mock, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.Mockito.{doAnswer, mock, spy, when}
import org.mockito.stubbing.Answer
import org.mockito.Matchers.any
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts._
Expand All @@ -33,8 +36,11 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.network.{BufferMessage, ConnectionManagerId, Message}
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.storage.{BlockMessage, BlockMessageArray}
import org.apache.spark.storage.BlockMessage._
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -1015,4 +1021,108 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}

test("return error message when error occurred in BlockManagerWorker#onBlockMessageReceive") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)

val worker = spy(new BlockManagerWorker(store))
val connManagerId = mock(classOf[ConnectionManagerId])

// setup request block messages
val reqBlId1 = ShuffleBlockId(0,0,0)
val reqBlId2 = ShuffleBlockId(0,1,0)
val reqBlockMessage1 = BlockMessage.fromGetBlock(GetBlock(reqBlId1))
val reqBlockMessage2 = BlockMessage.fromGetBlock(GetBlock(reqBlId2))
val reqBlockMessages = new BlockMessageArray(
Seq(reqBlockMessage1, reqBlockMessage2))
val reqBufferMessage = reqBlockMessages.toBufferMessage

val answer = new Answer[Option[BlockMessage]] {
override def answer(invocation: InvocationOnMock)
:Option[BlockMessage]= {
throw new Exception
}
}

doAnswer(answer).when(worker).processBlockMessage(any())

// Test when exception was thrown during processing block messages
var ackMessage = worker.onBlockMessageReceive(reqBufferMessage, connManagerId)

assert(ackMessage.isDefined, "When Exception was thrown in " +
"BlockManagerWorker#processBlockMessage, " +
"ackMessage should be defined")
assert(ackMessage.get.hasError, "When Exception was thown in " +
"BlockManagerWorker#processBlockMessage, " +
"ackMessage should have error")

val notBufferMessage = mock(classOf[Message])

// Test when not BufferMessage was received
ackMessage = worker.onBlockMessageReceive(notBufferMessage, connManagerId)
assert(ackMessage.isDefined, "When not BufferMessage was passed to " +
"BlockManagerWorker#onBlockMessageReceive, " +
"ackMessage should be defined")
assert(ackMessage.get.hasError, "When not BufferMessage was passed to " +
"BlockManagerWorker#onBlockMessageReceive, " +
"ackMessage should have error")
}

test("return ack message when no error occurred in BlocManagerWorker#onBlockMessageReceive") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)

val worker = spy(new BlockManagerWorker(store))
val connManagerId = mock(classOf[ConnectionManagerId])

// setup request block messages
val reqBlId1 = ShuffleBlockId(0,0,0)
val reqBlId2 = ShuffleBlockId(0,1,0)
val reqBlockMessage1 = BlockMessage.fromGetBlock(GetBlock(reqBlId1))
val reqBlockMessage2 = BlockMessage.fromGetBlock(GetBlock(reqBlId2))
val reqBlockMessages = new BlockMessageArray(
Seq(reqBlockMessage1, reqBlockMessage2))

val tmpBufferMessage = reqBlockMessages.toBufferMessage
val buffer = ByteBuffer.allocate(tmpBufferMessage.size)
val arrayBuffer = new ArrayBuffer[ByteBuffer]
tmpBufferMessage.buffers.foreach{ b =>
buffer.put(b)
}
buffer.flip()
arrayBuffer += buffer
val reqBufferMessage = Message.createBufferMessage(arrayBuffer)

// setup ack block messages
val buf1 = ByteBuffer.allocate(4)
val buf2 = ByteBuffer.allocate(4)
buf1.putInt(1)
buf1.flip()
buf2.putInt(1)
buf2.flip()
val ackBlockMessage1 = BlockMessage.fromGotBlock(GotBlock(reqBlId1, buf1))
val ackBlockMessage2 = BlockMessage.fromGotBlock(GotBlock(reqBlId2, buf2))

val answer = new Answer[Option[BlockMessage]] {
override def answer(invocation: InvocationOnMock)
:Option[BlockMessage]= {
if (invocation.getArguments()(0).asInstanceOf[BlockMessage].eq(
reqBlockMessage1)) {
return Some(ackBlockMessage1)
} else {
return Some(ackBlockMessage2)
}
}
}

doAnswer(answer).when(worker).processBlockMessage(any())

val ackMessage = worker.onBlockMessageReceive(reqBufferMessage, connManagerId)
assert(ackMessage.isDefined, "When BlockManagerWorker#onBlockMessageReceive " +
"was executed successfully, ackMessage should be defined")
assert(!ackMessage.get.hasError, "When BlockManagerWorker#onBlockMessageReceive " +
"was executed successfully, ackMessage should not have error")
}

}

0 comments on commit 22d7ebd

Please sign in to comment.