From 0c8bd0409e760e4a6e7b52fd662d0d77928650e2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 11:12:42 -0800 Subject: [PATCH] Add a default ActorSystem --- docs/streaming-custom-receivers.md | 6 +- docs/streaming-programming-guide.md | 2 +- .../streaming/JavaActorWordCount.java | 19 --- .../examples/streaming/ActorWordCount.scala | 14 +- .../examples/streaming/ZeroMQWordCount.scala | 10 -- .../spark/streaming/akka/ActorReceiver.scala | 18 ++- .../spark/streaming/akka/AkkaUtils.scala | 116 ++++++++++++--- .../spark/streaming/zeromq/ZeroMQUtils.scala | 133 +++++++++++++++--- .../zeromq/JavaZeroMQStreamSuite.java | 13 +- .../streaming/zeromq/ZeroMQStreamSuite.scala | 16 ++- 10 files changed, 257 insertions(+), 90 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 33e54216d7400..95b99862ec062 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -275,7 +275,7 @@ class CustomActor extends ActorReceiver { // A new input stream can be created with this custom actor as val ssc: StreamingContext = ... -val lines = AkkaUtils.createStream[String](Props[CustomActor](), "CustomReceiver") +val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") {% endhighlight %} @@ -298,8 +298,8 @@ class CustomActor extends JavaActorReceiver { } // A new input stream can be created with this custom actor as -JavaStreamingContext ssc = ...; -JavaDStream lines = AkkaUtils.createStream(Props.create(CustomActor.class), "CustomReceiver"); +JavaStreamingContext jssc = ...; +JavaDStream lines = AkkaUtils.createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); {% endhighlight %} diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 802effa555eaa..93c34efb6662d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -659,7 +659,7 @@ methods for creating DStreams from files and Akka actors as input sources. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka - actors by using `AkkaUtils.createStream(actorProps, actor-name)`. See the [Custom Receiver + actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. Python API Since actors are available only in the Java and Scala diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index ebaccdbb15f7f..62e563380a9e7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -18,20 +18,14 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import scala.Tuple2; import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import akka.actor.Props; -import com.typesafe.config.ConfigFactory; import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function0; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; @@ -120,7 +114,6 @@ public static void main(String[] args) { */ JavaDStream lines = AkkaUtils.createStream( jssc, - new WordcountActorSystemCreator(), Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); @@ -146,15 +139,3 @@ public Integer call(Integer i1, Integer i2) { jssc.awaitTermination(); } } - -class WordcountActorSystemCreator implements Function0 { - - @Override - public ActorSystem call() { - String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId(); - Map akkaConf = new HashMap(); - akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); - akkaConf.put("akka.remote.enabled-transports", Arrays.asList("akka.remote.netty.tcp")); - return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf)); - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index b07c8a63791f5..8e88987439ffc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -25,7 +25,7 @@ import scala.util.Random import akka.actor._ import com.typesafe.config.ConfigFactory -import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} @@ -78,7 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) extends ActorReceiver { +class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -160,18 +160,8 @@ object ActorWordCount { * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - def actorSystemCreator(): ActorSystem = { - val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" - val akkaConf = ConfigFactory.parseString( - s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - |""".stripMargin) - ActorSystem(uniqueSystemName, akkaConf) - } - val lines = AkkaUtils.createStream[String]( ssc, - actorSystemCreator _, Props(classOf[SampleActorReceiver[String]], "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), "SampleReceiver") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 6826b830c853c..f612e508eb78e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -90,19 +90,9 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator - def actorSystemCreator(): ActorSystem = { - val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" - val akkaConf = ConfigFactory.parseString( - s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - |""".stripMargin) - ActorSystem(uniqueSystemName, akkaConf) - } - // For this stream, a zeroMQ publisher should be running. val lines = ZeroMQUtils.createStream( ssc, - actorSystemCreator _, url, Subscribe(topic), bytesToStringIterator _) diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 6f411cac50a01..4e249bd81fefb 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -26,8 +26,9 @@ import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import com.typesafe.config.ConfigFactory -import org.apache.spark.Logging +import org.apache.spark.{Logging, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver @@ -37,13 +38,22 @@ import org.apache.spark.streaming.receiver.Receiver * A helper with set of defaults for supervisor strategy */ @DeveloperApi -object ActorSupervisorStrategy { +object ActorReceiver { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException => Restart case _: Exception => Escalate } + + def defaultActorSystemCreator(): ActorSystem = { + val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } } /** @@ -59,7 +69,7 @@ object ActorSupervisorStrategy { * } * } * - * AkkaUtils.createStream[String](Props[MyActor](),"MyActorReceiver") + * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver") * * }}} * @@ -109,7 +119,7 @@ abstract class ActorReceiver extends Actor { * } * } * - * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver"); + * AkkaUtils.createStream(jssc, Props.create(MyActor.class), "MyActorReceiver"); * * }}} * diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index 7b885d09fdc1e..e8025f8cb1adc 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -33,13 +33,13 @@ object AkkaUtils { * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details. * * @param ssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * @param supervisorStrategy the supervisor strategy (default: - * ActorSupervisorStrategy.defaultStrategy) + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility @@ -48,11 +48,11 @@ object AkkaUtils { */ def createStream[T: ClassTag]( ssc: StreamingContext, - actorSystemCreator: () => ActorSystem, propsForActor: Props, actorName: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { val cleanF = ssc.sc.clean(actorSystemCreator) ssc.receiverStream(new ActorReceiverSupervisor[T]( @@ -67,8 +67,6 @@ object AkkaUtils { * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects @@ -81,7 +79,6 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String, storageLevel: StorageLevel, @@ -91,19 +88,51 @@ object AkkaUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), propsForActor, actorName, storageLevel, - supervisorStrategy) + supervisorStrategy = supervisorStrategy) } /** * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects @@ -115,15 +144,41 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String, storageLevel: StorageLevel ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName, storageLevel) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T]( - jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName, storageLevel) + jssc.ssc, propsForActor, actorName, storageLevel, () => actorSystemCreator.call()) } /** @@ -131,8 +186,6 @@ object AkkaUtils { * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @@ -143,13 +196,42 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T](jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName) + createStream[T](jssc.ssc, propsForActor, actorName) + } + + /** + * Create an input stream with a user-defined actor. Storage level of the data will be the default + * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + actorSystemCreator = () => actorSystemCreator.call()) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index bb74ef3a0fe8c..5855a34eb63d7 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -27,7 +27,7 @@ import akka.zeromq.Subscribe import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, AkkaUtils} +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -35,8 +35,6 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param ssc StreamingContext object - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic @@ -45,41 +43,43 @@ object ZeroMQUtils { * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T: ClassTag]( ssc: StreamingContext, - actorSystemCreator: () => ActorSystem, publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy ): ReceiverInputDStream[T] = { AkkaUtils.createStream( ssc, - actorSystemCreator, Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, + actorSystemCreator, supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param publisherUrl Url of remote ZeroMQ publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -92,30 +92,63 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), publisherUrl, subscribe, fn, storageLevel, - supervisorStrategy) + supervisorStrategy = supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * @param storageLevel RDD storage level. */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -127,7 +160,6 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), publisherUrl, subscribe, fn, @@ -137,8 +169,40 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel RDD storage level. * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel, + () => actorSystemCreator.call()) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each @@ -149,7 +213,6 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] @@ -158,6 +221,42 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, () => actorSystemCreator.call(), publisherUrl, subscribe, fn) + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might + * be deserializer of bytes) to translate from sequence of sequence of + * bytes, where sequence refer to a frame and sub sequence refer to its + * payload. + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + actorSystemCreator = () => actorSystemCreator.call()) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 6db5cb77625e7..caddbdb91d631 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -39,12 +39,19 @@ public void testZeroMQStream() { Function0 actorSystemCreator = new ActorSystemCreatorForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects); + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator, SupervisorStrategy.defaultStrategy()); + JavaReceiverInputDStream test4 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects); + JavaReceiverInputDStream test5 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test6 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + SupervisorStrategy.defaultStrategy()); } } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 3f77a62d7644e..bac2679cabae5 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -42,12 +42,20 @@ class ZeroMQStreamSuite extends SparkFunSuite { // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, () => null, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, () => null, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = + ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop()