From 3c23c182fd8655e0f1a64cee64641f1cc803f7c2 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Tue, 10 Jun 2014 16:20:40 -0700 Subject: [PATCH] SPARK-1729. New Spark-Flume integration. Minor formatting changes. --- .../scala/org/apache/spark/flume/sink/SparkSink.scala | 1 + .../streaming/flume/FlumePollingInputDStream.scala | 9 ++++++--- .../streaming/flume/FlumePollingReceiverSuite.scala | 11 +++++++---- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala index e830b388c12b2..e430c0935e528 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala @@ -35,6 +35,7 @@ import org.apache.flume.{Channel, Transaction, FlumeException, Context} import org.slf4j.LoggerFactory import org.apache.spark.flume.{SparkSinkEvent, EventBatch, SparkFlumeProtocol} + /** * A sink that uses Avro RPC to run a server that can be polled by Spark's * FlumePollingInputDStream. This sink has the following configuration parameters: diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index dea5e0103ee26..3309b15604dd0 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -105,7 +105,9 @@ private[streaming] class FlumePollingReceiver( logDebug("Received batch of " + events.size() + " events with sequence number: " + seq) try { // Convert each Flume event to a serializable SparkPollingEvent - events.foreach(event => store(SparkFlumePollingEvent.fromSparkSinkEvent(event))) + events.foreach(event => { + store(SparkFlumePollingEvent.fromSparkSinkEvent(event)) + }) // Send an ack to Flume so that Flume discards the events from its channels. client.ack(seq) } catch { @@ -153,7 +155,7 @@ private[streaming] class FlumePollingReceiver( * @param client The client that the callbacks are received on. */ private class FlumeConnection(val transceiver: NettyTransceiver, - val client: SparkFlumeProtocol.Callback) + val client: SparkFlumeProtocol.Callback) private[streaming] object SparkFlumePollingEvent { def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = { @@ -162,13 +164,14 @@ private[streaming] object SparkFlumePollingEvent { event } } + /* * Unfortunately Avro does not allow including pre-compiled classes - so even though * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper * around that to make it externalizable. */ class SparkFlumePollingEvent() extends Externalizable with Logging { - var event : SparkSinkEvent = new SparkSinkEvent() + var event: SparkSinkEvent = new SparkSinkEvent() /* De-serialize from bytes. */ def readExternal(in: ObjectInput) { diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala index 87a9ae0797e00..7e6fe66052138 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala @@ -114,18 +114,19 @@ class FlumePollingReceiverSuite extends TestSuiteBase { } def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, - outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) { + outputBuffer: ArrayBuffer[Seq[SparkFlumePollingEvent]]) { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val executor = Executors.newCachedThreadPool() val executorCompletion = new ExecutorCompletionService[Void](executor) channels.map(channel => { executorCompletion.submit(new TxnSubmitter(channel, clock)) }) - for(i <- 0 until channels.size) { + for (i <- 0 until channels.size) { executorCompletion.take() } val startTime = System.currentTimeMillis() - while (outputBuffer.size < 5 * channels.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + while (outputBuffer.size < 5 * channels.size && + System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + outputBuffer.size) Thread.sleep(100) } @@ -164,7 +165,8 @@ class FlumePollingReceiverSuite extends TestSuiteBase { val tx = channel.getTransaction tx.begin() for (j <- 0 until 5) { - channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes("utf-8"), + channel.put(EventBuilder.withBody((channel.getName + " - " + String.valueOf(t)).getBytes( + "utf-8"), Map[String, String]("test-" + t.toString -> "header"))) t += 1 } @@ -176,4 +178,5 @@ class FlumePollingReceiverSuite extends TestSuiteBase { null } } + }