From 10b6214c39ba4f41d428ae87cba4bfb00f89a497 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 17 Jul 2014 01:17:02 -0700 Subject: [PATCH] Changed public API, changed sink package, and added java unit test to make sure Java API is callable from Java. --- .../streaming/FlumePollingEventCount.scala | 67 +++++++++ .../flume-sink/src/main/avro/sparkflume.avdl | 3 +- .../{ => streaming}/flume/sink/Logging.scala | 26 ++-- .../flume/sink/SparkAvroCallbackHandler.scala | 15 +- .../flume/sink/SparkSink.scala | 9 +- .../flume/sink/SparkSinkUtils.scala | 6 +- .../flume/sink/TransactionProcessor.scala | 7 +- .../flume/FlumePollingInputDStream.scala | 52 +++---- .../spark/streaming/flume/FlumeUtils.scala | 136 ++++++++++-------- .../flume/JavaFlumePollingStreamSuite.java | 27 ++++ ...te.scala => FlumePollingStreamSuite.scala} | 14 +- 11 files changed, 226 insertions(+), 136 deletions(-) create mode 100644 examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala rename external/flume-sink/src/main/scala/org/apache/spark/{ => streaming}/flume/sink/Logging.scala (82%) rename external/flume-sink/src/main/scala/org/apache/spark/{ => streaming}/flume/sink/SparkAvroCallbackHandler.scala (94%) rename external/flume-sink/src/main/scala/org/apache/spark/{ => streaming}/flume/sink/SparkSink.scala (98%) rename external/flume-sink/src/main/scala/org/apache/spark/{ => streaming}/flume/sink/SparkSinkUtils.scala (91%) rename external/flume-sink/src/main/scala/org/apache/spark/{ => streaming}/flume/sink/TransactionProcessor.scala (97%) create mode 100644 external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java rename external/flume/src/test/scala/org/apache/spark/streaming/flume/{FlumePollingReceiverSuite.scala => FlumePollingStreamSuite.scala} (94%) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala new file mode 100644 index 0000000000000..1cc8c8d5c23b6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam +import java.net.InetSocketAddress + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with the Spark Sink running in a Flume agent. See + * the Spark Streaming programming guide for more details. + * + * Usage: FlumePollingEventCount + * `host` is the host on which the Spark Sink is running. + * `port` is the port at which the Spark Sink is listening. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` + */ +object FlumePollingEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumePollingEventCount ") + System.exit(1) + } + + StreamingExamples.setStreamingLogLevels() + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream that polls the Spark Sink running in a Flume agent + val stream = FlumeUtils.createPollingStream(ssc, host, port) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} diff --git a/external/flume-sink/src/main/avro/sparkflume.avdl b/external/flume-sink/src/main/avro/sparkflume.avdl index f8edd92f67e3e..8806e863ac7c6 100644 --- a/external/flume-sink/src/main/avro/sparkflume.avdl +++ b/external/flume-sink/src/main/avro/sparkflume.avdl @@ -17,7 +17,7 @@ * under the License. */ -@namespace("org.apache.spark.flume") +@namespace("org.apache.spark.streaming.flume.sink") protocol SparkFlumeProtocol { @@ -37,5 +37,4 @@ protocol SparkFlumeProtocol { void ack (string sequenceNumber); void nack (string sequenceNumber); - } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala similarity index 82% rename from external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala rename to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala index 8b6453362a121..81eb25f2ee6d7 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/Logging.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/Logging.scala @@ -14,12 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink +package org.apache.spark.streaming.flume.sink + import org.apache.log4j.{LogManager, PropertyConfigurator} import org.slf4j.{Logger, LoggerFactory} import org.slf4j.impl.StaticLoggerBinder -trait Logging { +/** + * Copy of the org.apache.spark.Logging for being used in the Spark Sink. + * The org.apache.spark.Logging is not used so that all of Spark is not brought + * in as a dependency. + */ +private[sink] trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine @transient private var log_ : Logger = null @@ -95,20 +101,6 @@ trait Logging { } private def initializeLogging() { - // If Log4j is being used, but is not initialized, load a default properties file - val binder = StaticLoggerBinder.getSingleton - val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory") - val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements - if (!log4jInitialized && usingLog4j) { - val defaultLogProps = "org/apache/spark/log4j-defaults.properties" - Option(getClass.getClassLoader.getResource(defaultLogProps)) match { - case Some(url) => - PropertyConfigurator.configure(url) - log.info(s"Using Spark's default log4j profile: $defaultLogProps") - case None => - System.err.println(s"Spark was unable to load $defaultLogProps") - } - } Logging.initialized = true // Force a call into slf4j to initialize it. Avoids this happening from mutliple threads @@ -117,7 +109,7 @@ trait Logging { } } -private object Logging { +private[sink] object Logging { @volatile private var initialized = false val initLock = new Object() try { diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala similarity index 94% rename from external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala rename to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index e4925b85c81ee..6249a3197d07d 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -14,17 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink +package org.apache.spark.streaming.flume.sink -import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{ConcurrentHashMap, Executors} +import java.util.concurrent.atomic.AtomicLong -import com.google.common.util.concurrent.ThreadFactoryBuilder - -import org.apache.commons.lang.RandomStringUtils import org.apache.flume.Channel -import org.apache.spark.flume.{EventBatch, SparkFlumeProtocol} -import org.slf4j.LoggerFactory +import org.apache.commons.lang.RandomStringUtils +import com.google.common.util.concurrent.ThreadFactoryBuilder /** * Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process @@ -34,7 +31,7 @@ import org.slf4j.LoggerFactory * @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark * is rolled back. */ -private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, +private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol with Logging { val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, new ThreadFactoryBuilder().setDaemon(true) @@ -109,7 +106,7 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel, * @return The transaction processor for the corresponding batch. Note that this instance is no * longer tracked and the caller is responsible for that txn processor. */ - private[flume] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { + private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { processorMap.remove(sequenceNumber.toString) // The toString is required! } diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala similarity index 98% rename from external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala rename to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala index 35b766ff04d9f..d5afde0fae19d 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSink.scala @@ -14,20 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink +package org.apache.spark.streaming.flume.sink import java.net.InetSocketAddress import java.util.concurrent._ import org.apache.avro.ipc.NettyServer import org.apache.avro.ipc.specific.SpecificResponder +import org.apache.flume.Context import org.apache.flume.Sink.Status import org.apache.flume.conf.{Configurable, ConfigurationException} import org.apache.flume.sink.AbstractSink -import org.apache.flume.Context -import org.slf4j.LoggerFactory - -import org.apache.spark.flume.SparkFlumeProtocol /** * A sink that uses Avro RPC to run a server that can be polled by Spark's @@ -48,6 +45,7 @@ import org.apache.spark.flume.SparkFlumeProtocol // until an ACK or NACK comes back or the transaction times out (after the specified timeout). // When the response comes, the TransactionProcessor is retrieved and then unblocked, // at which point the transaction is committed or rolled back. +private[flume] class SparkSink extends AbstractSink with Logging with Configurable { // Size of the pool to use for holding transaction processors. @@ -130,6 +128,7 @@ class SparkSink extends AbstractSink with Logging with Configurable { /** * Configuration parameters and their defaults. */ +private[flume] object SparkSinkConfig { val THREADS = "threads" val DEFAULT_THREADS = 10 diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala similarity index 91% rename from external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala rename to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala index 8f16246d495a0..47c0e294d6b52 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSinkUtils.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkSinkUtils.scala @@ -14,11 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink +package org.apache.spark.streaming.flume.sink -import org.apache.spark.flume.EventBatch - -object SparkSinkUtils { +private[flume] object SparkSinkUtils { /** * This method determines if this batch represents an error or not. * @param batch - The batch to check diff --git a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala similarity index 97% rename from external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala rename to external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala index 1d2dddfbf7ff0..6f4e50b0f1d63 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/TransactionProcessor.scala @@ -14,18 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.flume.sink +package org.apache.spark.streaming.flume.sink import java.nio.ByteBuffer import java.util -import java.util.concurrent.{TimeUnit, CountDownLatch, Callable} +import java.util.concurrent.{Callable, CountDownLatch, TimeUnit} import scala.util.control.Breaks import org.apache.flume.{Transaction, Channel} -import org.apache.spark.flume.{SparkSinkEvent, EventBatch} -import org.slf4j.LoggerFactory - // Flume forces transactions to be thread-local (horrible, I know!) // So the sink basically spawns a new thread to pull the events out within a transaction. diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 66df20863037b..6d7a74f7a9020 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -22,8 +22,6 @@ import java.net.InetSocketAddress import java.nio.ByteBuffer import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, Executors} -import org.apache.spark.flume.sink.SparkSinkUtils - import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -33,45 +31,44 @@ import org.apache.avro.ipc.specific.SpecificRequestor import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.apache.spark.Logging -import org.apache.spark.flume.{SparkSinkEvent, SparkFlumeProtocol} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.flume.sink._ + /** * A [[ReceiverInputDStream]] that can be used to read data from several Flume agents running * [[org.apache.spark.flume.sink.SparkSink]]s. - * @param ssc_ Streaming context that will execute this input stream + * @param _ssc Streaming context that will execute this input stream * @param addresses List of addresses at which SparkSinks are listening * @param maxBatchSize Maximum size of a batch * @param parallelism Number of parallel connections to open * @param storageLevel The storage level to use. * @tparam T Class type of the object of this stream */ +private[streaming] class FlumePollingInputDStream[T: ClassTag]( - @transient ssc_ : StreamingContext, - val addresses: Seq[InetSocketAddress], - val maxBatchSize: Int, - val parallelism: Int, - storageLevel: StorageLevel -) extends ReceiverInputDStream[SparkFlumePollingEvent](ssc_) { - /** - * Gets the receiver object that will be sent to the worker nodes - * to receive data. This method needs to defined by any specific implementation - * of a NetworkInputDStream. - */ + @transient _ssc: StreamingContext, + val addresses: Seq[InetSocketAddress], + val maxBatchSize: Int, + val parallelism: Int, + storageLevel: StorageLevel + ) extends ReceiverInputDStream[SparkFlumePollingEvent](_ssc) { + override def getReceiver(): Receiver[SparkFlumePollingEvent] = { new FlumePollingReceiver(addresses, maxBatchSize, parallelism, storageLevel) } } -private[streaming] class FlumePollingReceiver( - addresses: Seq[InetSocketAddress], - maxBatchSize: Int, - parallelism: Int, - storageLevel: StorageLevel -) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging { +private[streaming] +class FlumePollingReceiver( + addresses: Seq[InetSocketAddress], + maxBatchSize: Int, + parallelism: Int, + storageLevel: StorageLevel + ) extends Receiver[SparkFlumePollingEvent](storageLevel) with Logging { lazy val channelFactoryExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true). @@ -150,14 +147,6 @@ private[streaming] class FlumePollingReceiver( } } - override def store(dataItem: SparkFlumePollingEvent) { - // Not entirely sure store is thread-safe for all storage levels - so wrap it in synchronized - // This takes a performance hit, since the parallelism is useful only for pulling data now. - this.synchronized { - super.store(dataItem) - } - } - override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") receiverExecutor.shutdownNow() @@ -176,6 +165,9 @@ private[streaming] class FlumePollingReceiver( private class FlumeConnection(val transceiver: NettyTransceiver, val client: SparkFlumeProtocol.Callback) +/** + * Companion object of [[SparkFlumePollingEvent]] + */ private[streaming] object SparkFlumePollingEvent { def fromSparkSinkEvent(in: SparkSinkEvent): SparkFlumePollingEvent = { val event = new SparkFlumePollingEvent() @@ -189,7 +181,7 @@ private[streaming] object SparkFlumePollingEvent { * SparkSinkEvent is identical to AvroFlumeEvent, we need to create a new class and a wrapper * around that to make it externalizable. */ -class SparkFlumePollingEvent() extends Externalizable with Logging { +class SparkFlumePollingEvent extends Externalizable with Logging { var event: SparkSinkEvent = new SparkSinkEvent() /* De-serialize from bytes. */ diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 4e4fc3a612d4f..c754fe33738b8 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress +import org.apache.spark.annotation.Experimental import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} @@ -26,6 +27,9 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream object FlumeUtils { + private val DEFAULT_POLLING_PARALLELISM = 5 + private val DEFAULT_POLLING_BATCH_SIZE = 1000 + /** * Create a input stream from a Flume source. * @param ssc StreamingContext object @@ -112,54 +116,57 @@ object FlumeUtils { /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 100 events and run 5 threads to pull data. - * @param host The address of the host on which the Spark Sink is running - * @param port The port that the host is listening on + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param host Address of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening * @param storageLevel Storage level to use for storing the received objects */ + @Experimental def createPollingStream( - ssc: StreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel + ssc: StreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, - Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel) + createPollingStream(ssc, Seq(new InetSocketAddress(hostname, port)), storageLevel) } /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 100 events and run 5 threads to pull data. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. * @param addresses List of InetSocketAddresses representing the hosts to connect to. * @param storageLevel Storage level to use for storing the received objects */ - def createPollingStream ( - ssc: StreamingContext, - addresses: Seq[InetSocketAddress], - storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, 100, 5, storageLevel) + @Experimental + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel + ): ReceiverInputDStream[SparkFlumePollingEvent] = { + createPollingStream(ssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. * @param addresses List of InetSocketAddresses representing the hosts to connect to. - * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a + * @param maxBatchSize Maximum number of events to be pulled from the Spark sink in a * single RPC call * @param parallelism Number of concurrent requests this stream should send to the sink. Note * that having a higher number of requests concurrently being pulled will * result in this stream using more threads * @param storageLevel Storage level to use for storing the received objects */ - def createPollingStream ( - ssc: StreamingContext, - addresses: Seq[InetSocketAddress], - maxBatchSize: Int, - parallelism: Int, - storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumePollingEvent] = { + @Experimental + def createPollingStream( + ssc: StreamingContext, + addresses: Seq[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): ReceiverInputDStream[SparkFlumePollingEvent] = { new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize, parallelism, storageLevel) } @@ -167,56 +174,73 @@ object FlumeUtils { /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 100 events and run 5 threads to pull data. - * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening + */ + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int + ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + createPollingStream(jssc, hostname, port, StorageLevel.MEMORY_AND_DISK_SER_2) + } + + /** + * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. + * This stream will poll the sink for data and will pull events as they are available. + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param hostname Hostname of the host on which the Spark Sink is running + * @param port Port of the host at which the Spark Sink is listening * @param storageLevel Storage level to use for storing the received objects */ - def createPollingStream ( - jssc: JavaStreamingContext, - addresses: Seq[InetSocketAddress], - storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, 100, 5, - StorageLevel.MEMORY_AND_DISK_SER_2) + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + hostname: String, + port: Int, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + createPollingStream(jssc, Array(new InetSocketAddress(hostname, port)), storageLevel) } /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * This stream will use a batch size of 100 events and run 5 threads to pull data. - * @param host The address of the host on which the Spark Sink is running - * @param port The port that the host is listening on + * This stream will use a batch size of 1000 events and run 5 threads to pull data. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running. * @param storageLevel Storage level to use for storing the received objects */ + @Experimental def createPollingStream( - jssc: JavaStreamingContext, - host: String, - port: Int, - storageLevel: StorageLevel - ): ReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, - Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel) + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel + ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + createPollingStream(jssc, addresses, storageLevel, + DEFAULT_POLLING_BATCH_SIZE, DEFAULT_POLLING_PARALLELISM) } /** * Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent. * This stream will poll the sink for data and will pull events as they are available. - * @param addresses List of InetSocketAddresses representing the hosts to connect to. + * @param addresses List of InetSocketAddresses on which the Spark Sink is running * @param maxBatchSize The maximum number of events to be pulled from the Spark sink in a * single RPC call - * @param parallelism Number of concurrent requests this stream should send to the sink. Note - * that having a higher number of requests concurrently being pulled will - * result in this stream using more threads + * @param parallelism Number of concurrent requests this stream should send to the sink. Note + * that having a higher number of requests concurrently being pulled will + * result in this stream using more threads * @param storageLevel Storage level to use for storing the received objects */ - def createPollingStream ( - jssc: JavaStreamingContext, - addresses: Seq[InetSocketAddress], - maxBatchSize: Int, - parallelism: Int, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { - new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, maxBatchSize, - parallelism, storageLevel) + @Experimental + def createPollingStream( + jssc: JavaStreamingContext, + addresses: Array[InetSocketAddress], + storageLevel: StorageLevel, + maxBatchSize: Int, + parallelism: Int + ): JavaReceiverInputDStream[SparkFlumePollingEvent] = { + createPollingStream(jssc.ssc, addresses, storageLevel, maxBatchSize, parallelism) } } diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java new file mode 100644 index 0000000000000..ef8622481d599 --- /dev/null +++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumePollingStreamSuite.java @@ -0,0 +1,27 @@ +package org.apache.spark.streaming.flume; + +import java.net.InetSocketAddress; + +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.LocalJavaStreamingContext; + +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.junit.Test; + +public class JavaFlumePollingStreamSuite extends LocalJavaStreamingContext { + @Test + public void testFlumeStream() { + // tests the API, does not actually test data receiving + InetSocketAddress[] addresses = new InetSocketAddress[] { + new InetSocketAddress("localhost", 12345) + }; + JavaReceiverInputDStream test1 = + FlumeUtils.createPollingStream(ssc, "localhost", 12345); + JavaReceiverInputDStream test2 = FlumeUtils.createPollingStream( + ssc, "localhost", 12345, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test4 = FlumeUtils.createPollingStream( + ssc, addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 5); + } +} diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala similarity index 94% rename from external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala rename to external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index d1c8042c68d1a..3ff0cca523928 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -29,13 +29,13 @@ import org.apache.flume.channel.MemoryChannel import org.apache.flume.conf.Configurables import org.apache.flume.event.EventBuilder -import org.apache.spark.flume.sink.{SparkSinkConfig, SparkSink} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.util.ManualClock import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext} +import org.apache.spark.streaming.flume.sink._ - class FlumePollingReceiverSuite extends TestSuiteBase { + class FlumePollingStreamSuite extends TestSuiteBase { val testPort = 9999 @@ -43,8 +43,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = - FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 1, - StorageLevel.MEMORY_AND_DISK) + FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), StorageLevel.MEMORY_AND_DISK, 100, 1) val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -65,6 +64,7 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon sink.setChannel(channel) sink.start() ssc.start() + writeAndVerify(Seq(channel), ssc, outputBuffer) assertChannelIsEmpty(channel) sink.stop() @@ -74,10 +74,9 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon test("flume polling test multiple hosts") { // Set up the streaming context and input streams val ssc = new StreamingContext(conf, batchDuration) + val addresses = Seq(testPort, testPort + 1).map(new InetSocketAddress("localhost", _)) val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] = - FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort), - new InetSocketAddress("localhost", testPort + 1)), 100, 5, - StorageLevel.MEMORY_AND_DISK) + FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK, 100, 5) val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]] with SynchronizedBuffer[Seq[SparkFlumePollingEvent]] val outputStream = new TestOutputStream(flumeStream, outputBuffer) @@ -187,5 +186,4 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon null } } - }