From 70bcc2ad5b117324652e41f0331eb974ab696966 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 9 Jun 2014 22:34:40 -0700 Subject: [PATCH] SPARK-1729. New Flume-Spark integration. Renamed the SparkPollingEvent to SparkFlumePollingEvent. --- .../flume/FlumePollingInputDStream.scala | 18 +++++++++--------- .../spark/streaming/flume/FlumeUtils.scala | 8 ++++---- .../flume/FlumePollingReceiverSuite.scala | 14 +++++++------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 324f9551287b1..dea5e0103ee26 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -53,13 +53,13 @@ class FlumePollingInputDStream[T: ClassTag]( val maxBatchSize: Int, val parallelism: Int, storageLevel: StorageLevel -) extends ReceiverInputDStream[SparkPollingEvent](ssc_) { +) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) { /** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ - override def getReceiver(): Receiver[SparkPollingEvent] = { + override def getReceiver(): Receiver[SparkFlumePollingEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } } @@ -69,7 +69,7 @@ private[streaming] class FlumePollingReceiver( maxBatchSize: Int, parallelism: Int, storageLevel: StorageLevel -) extends Receiver[SparkPollingEvent](storageLevel) with Logging { +) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging { lazy val channelFactoryExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). @@ -105,7 +105,7 @@ private[streaming] class FlumePollingReceiver( logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) try { // Convert each Flume event to a serializable SparkPollingEvent - events.foreach(event => store(SparkPollingEvent.fromSparkSinkEvent(event))) + events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event))) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) } catch { @@ -129,7 +129,7 @@ private[streaming] class FlumePollingReceiver( } } - override def store(dataItem: SparkPollingEvent) { + override def store(dataItem: SparkFlumePollingEvent) { // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized // This takes a performance hit, since the parallelism is useful only for pulling data now. this.synchronized { @@ -155,9 +155,9 @@ private[streaming] class FlumePollingReceiver( private class FlumeConnection(val transceiver: NettyTransceiver, val client: SparkFlumeProtocol.Callback) -private[streaming] object SparkPollingEvent { - def fromSparkSinkEvent(in: SparkSinkEvent): SparkPollingEvent = { - val event = new SparkPollingEvent() +private[streaming] object SparkFlumePollingEvent { + def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = { + val event = new SparkFlumePollingEvent() event.event = in event } @@ -167,7 +167,7 @@ private[streaming] object SparkPollingEvent { * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper * around that to make it externalizable. */ -class SparkPollingEvent() extends Externalizable with Logging { +class SparkFlumePollingEvent() extends Externalizable with Logging { var event : SparkSinkEvent = new SparkSinkEvent() /* De-serialize from bytes. */ diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 2ff8ce2a77d2d..71bc364114f2e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -89,8 +89,8 @@ object FlumeUtils { maxBatchSize: Int = 100, parallelism: Int = 5, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): ReceiverInputDStream[SparkPollingEvent] = { - new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize, + ): ReceiverInputDStream[SparkFlumePollingEvent] = { + new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } @@ -111,8 +111,8 @@ object FlumeUtils { maxBatchSize: Int = 100, parallelism: Int = 5, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 - ): JavaReceiverInputDStream[SparkPollingEvent] = { - new FlumePollingInputDStream[SparkPollingEvent](ssc, addresses, maxBatchSize, + ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } } diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala index dc64405872a2d..87a9ae0797e00 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala @@ -42,11 +42,11 @@ class FlumePollingReceiverSuite extends TestSuiteBase { test("flume polling test") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream: ReceiverInputDStream[SparkPollingEvent] = + val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] - with SynchronizedBuffer[Seq[SparkPollingEvent]] + val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] + with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() @@ -73,12 +73,12 @@ class FlumePollingReceiverSuite extends TestSuiteBase { test("flume polling test multiple hosts") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) - val flumeStream: ReceiverInputDStream[SparkPollingEvent] = + val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort), new InetSocketAddress("localhost", testPort + 1)), 100, 5, StorageLevel.MEMORY_AND_DISK) - val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]] - with SynchronizedBuffer[Seq[SparkPollingEvent]] + val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] + with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) outputStream.register() @@ -114,7 +114,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase { } def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, - outputBuffer: ArrayBuffer[Seq[SparkPollingEvent]]) { + outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val executor = Executors.newCachedThreadPool() val executorCompletion = new ExecutorCompletionService[Void](executor)