diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 704019a55aff3..3b1233e86c210 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -114,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { - blockGenerator addData (data) + blockGenerator.addData(data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 9bad105f5a83b..e26c0c6859e57 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator addData count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(10) @@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator addData count + blockGenerator.addData(count) generatedData += count count += 1 Thread.sleep(1)