From a5ec6444ba8c7824de949d478d9c888e8939248b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 13 Jan 2016 13:22:48 -0800 Subject: [PATCH 01/13] Add "streaming-akka" project Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly --- dev/sparktestsupport/modules.py | 12 ++ examples/pom.xml | 5 + .../streaming/JavaActorWordCount.java | 29 +++- .../examples/streaming/ActorWordCount.scala | 40 +++-- .../examples/streaming/ZeroMQWordCount.scala | 23 ++- external/akka/pom.xml | 66 ++++++++ .../spark/streaming/akka}/ActorReceiver.scala | 23 +-- .../spark/streaming/akka/AkkaUtils.scala | 148 ++++++++++++++++++ external/zeromq/pom.xml | 5 + .../streaming/zeromq/ZeroMQReceiver.scala | 2 +- .../spark/streaming/zeromq/ZeroMQUtils.scala | 64 +++++--- .../zeromq/JavaZeroMQStreamSuite.java | 35 +++-- .../streaming/zeromq/ZeroMQStreamSuite.scala | 8 +- pom.xml | 1 + project/SparkBuild.scala | 9 +- .../spark/streaming/StreamingContext.scala | 24 +-- .../api/java/JavaStreamingContext.scala | 64 -------- 17 files changed, 403 insertions(+), 155 deletions(-) create mode 100644 external/akka/pom.xml rename {streaming/src/main/scala/org/apache/spark/streaming/receiver => external/akka/src/main/scala/org/apache/spark/streaming/akka}/ActorReceiver.scala (90%) create mode 100644 external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 93a8c15e3ec30..efe58ea2e0e78 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -222,6 +222,18 @@ def contains_file(self, filename): ) +streaming_akka = Module( + name="streaming-akka", + dependencies=[streaming], + source_file_regexes=[ + "external/akka", + ], + sbt_test_goals=[ + "streaming-akka/test", + ] +) + + streaming_flume = Module( name="streaming-flume", dependencies=[streaming], diff --git a/examples/pom.xml b/examples/pom.xml index 1a0d5e5854642..9437cee2abfdf 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -75,6 +75,11 @@ spark-streaming-flume_${scala.binary.version} ${project.version} + + org.apache.spark + spark-streaming-akka_${scala.binary.version} + ${project.version} + org.apache.spark spark-streaming-mqtt_${scala.binary.version} diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 2377207779fec..57bbc585de63c 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -18,7 +18,13 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import akka.actor.ActorSystem; +import com.typesafe.config.ConfigFactory; +import org.apache.spark.TaskContext; +import org.apache.spark.streaming.akka.ActorSystemFactory; import scala.Tuple2; import akka.actor.ActorSelection; @@ -31,7 +37,8 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.receiver.JavaActorReceiver; +import org.apache.spark.streaming.akka.AkkaUtils; +import org.apache.spark.streaming.akka.JavaActorReceiver; /** * A sample actor as receiver, is also simplest. This receiver actor @@ -110,8 +117,11 @@ public static void main(String[] args) { * For example: Both actorStream and JavaSampleActorReceiver are parameterized * to same type to ensure type safety. */ - JavaDStream lines = jssc.actorStream( - Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); + JavaDStream lines = AkkaUtils.actorStream( + jssc, + new WordcountActorSystemFactory(), + Props.create(JavaSampleActorReceiver.class, feederActorURI), + "SampleReceiver"); // compute wordcount lines.flatMap(new FlatMapFunction() { @@ -135,3 +145,16 @@ public Integer call(Integer i1, Integer i2) { jssc.awaitTermination(); } } + +class WordcountActorSystemFactory implements ActorSystemFactory { + + @Override + public ActorSystem create() { + String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId(); + Map akkaConf = new HashMap(); + akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); + akkaConf.put( + "akka.remote.netty.tcp.transport-class", "akka.remote.transport.netty.NettyTransport"); + return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf)); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 88cdc6bc144e5..faa42d2eabaed 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList import scala.reflect.ClassTag import scala.util.Random -import akka.actor.{actorRef2Scala, Actor, ActorRef, Props} +import akka.actor._ +import com.typesafe.config.ConfigFactory -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.receiver.ActorReceiver -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -78,8 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends ActorReceiver { +class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -108,9 +107,13 @@ object FeederActor { } val Seq(host, port) = args.toSeq - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |""".stripMargin) + val actorSystem = ActorSystem("test", akkaConf) val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) @@ -150,16 +153,27 @@ object ActorWordCount { * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * to ensure the type safety, i.e type of data received and InputDStream * should be same. * * For example: Both actorStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ + def actorSystemCreator(): ActorSystem = { + val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + val lines = AkkaUtils.actorStream[String]( + ssc, + actorSystemCreator _, + Props(new SampleActorReceiver[String]( + "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt))), + "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 96448905760fb..76ce3e8e564a7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala import akka.util.ByteString import akka.zeromq._ import akka.zeromq.Subscribe +import com.typesafe.config.ConfigFactory -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.zeromq._ @@ -69,10 +70,10 @@ object SimpleZeroMQPublisher { * * To run this example locally, you may run publisher as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` * and run the example as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { @@ -89,8 +90,22 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator + def actorSystemCreator(): ActorSystem = { + val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } + // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val lines = ZeroMQUtils.createStream( + ssc, + actorSystemCreator _, + url, + Subscribe(topic), + bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/external/akka/pom.xml b/external/akka/pom.xml new file mode 100644 index 0000000000000..a7f3e83faca10 --- /dev/null +++ b/external/akka/pom.xml @@ -0,0 +1,66 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.10 + 2.0.0-SNAPSHOT + ../../pom.xml + + + org.apache.spark + spark-streaming-akka_2.10 + + streaming-akka + + jar + Spark Project External Akka + http://spark.apache.org/ + + + + org.apache.spark + spark-streaming_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + + + ${akka.group} + akka-actor_${scala.binary.version} + ${akka.version} + + + ${akka.group} + akka-remote_${scala.binary.version} + ${akka.version} + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala similarity index 90% rename from streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala rename to external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 0eabf3d260b26..b8f8c628cb5e2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.streaming.receiver +package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger @@ -27,9 +27,10 @@ import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} -import org.apache.spark.{Logging, SparkEnv} +import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver /** * :: DeveloperApi :: @@ -148,7 +149,7 @@ abstract class JavaActorReceiver extends UntypedActor { * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.receiver.ActorReceiver]]. + * [[org.apache.spark.streaming.akka.ActorReceiver]]. */ @DeveloperApi case class Statistics(numberOfMsgs: Int, @@ -157,10 +158,10 @@ case class Statistics(numberOfMsgs: Int, otherInfo: String) /** Case class to receive data sent by child actors */ -private[streaming] sealed trait ActorReceiverData -private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData -private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData -private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] sealed trait ActorReceiverData +private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData +private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -181,14 +182,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec * context.parent ! Props(new Worker, "Worker") * }}} */ -private[streaming] class ActorReceiverSupervisor[T: ClassTag]( +private[akka] class ActorReceiverSupervisor[T: ClassTag]( + actorSystemCreator: () => ActorSystem, props: Props, name: String, storageLevel: StorageLevel, receiverSupervisorStrategy: SupervisorStrategy ) extends Receiver[T](storageLevel) with Logging { - protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor), + private lazy val actorSystem = actorSystemCreator() + protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) class Supervisor extends Actor { @@ -241,5 +244,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag]( def onStop(): Unit = { actorSupervisor ! PoisonPill + actorSystem.shutdown() + actorSystem.awaitTermination() } } diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala new file mode 100644 index 0000000000000..0de2130c353c9 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import scala.reflect.ClassTag + +import akka.actor.{ActorSystem, Props, SupervisorStrategy} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +/** + * Factory interface for creating a new ActorSystem in executors. + */ +trait ActorSystemFactory extends Serializable { + def create(): ActorSystem +} + +object AkkaUtils { + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @param ssc the StreamingContext instance + * @param actorSystemCreator a function to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @param supervisorStrategy the supervisor strategy (default: + * ActorSupervisorStrategy.defaultStrategy) + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T: ClassTag]( + ssc: StreamingContext, + actorSystemCreator: () => ActorSystem, + props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { + val cleanF = ssc.sc.clean(actorSystemCreator) + ssc.receiverStream( + new ActorReceiverSupervisor[T](cleanF, props, name, storageLevel, supervisorStrategy)) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String, + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + actorStream[T]( + jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + actorStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * + * @param jssc the StreamingContext instance + * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors + * @param props Props object defining creation of the actor + * @param name Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, + props: Props, + name: String + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + actorStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name) + } + +} diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml index a725988449075..7781aaeed9e0c 100644 --- a/external/zeromq/pom.xml +++ b/external/zeromq/pom.xml @@ -41,6 +41,11 @@ ${project.version} provided + + org.apache.spark + spark-streaming-akka_${scala.binary.version} + ${project.version} + org.apache.spark spark-core_${scala.binary.version} diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala index 506ba8782d3d5..dd367cd43b807 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala @@ -23,7 +23,7 @@ import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging -import org.apache.spark.streaming.receiver.ActorReceiver +import org.apache.spark.streaming.akka.ActorReceiver /** * A receiver to subscribe to ZeroMQ stream. diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 63cd8a2721f0c..9e556ffd3ee8c 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -20,23 +20,24 @@ package org.apache.spark.streaming.zeromq import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import akka.actor.{Props, SupervisorStrategy} +import akka.actor.{ActorSystem, Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, ActorSystemFactory, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -import org.apache.spark.streaming.receiver.ActorSupervisorStrategy object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param ssc StreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param ssc StreamingContext object + * @param actorSystemCreator A function to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic * and each frame has sequence of byte thus it needs the converter * (which might be deserializer of bytes) to translate from sequence @@ -46,21 +47,28 @@ object ZeroMQUtils { */ def createStream[T: ClassTag]( ssc: StreamingContext, + actorSystemCreator: () => ActorSystem, publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = { - ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) + AkkaUtils.actorStream( + ssc, + actorSystemCreator, + Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", + storageLevel, + supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param actorSystemFactory A factory to create ActorSystem in executors + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, @@ -69,6 +77,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -79,14 +88,22 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + createStream[T]( + jssc.ssc, + () => actorSystemFactory.create(), + publisherUrl, + subscribe, + fn, + storageLevel, + supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param actorSystemFactory A factory to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, @@ -95,6 +112,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -104,14 +122,21 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) + createStream[T]( + jssc.ssc, + () => actorSystemFactory.create(), + publisherUrl, + subscribe, + fn, + storageLevel) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to + * @param jssc JavaStreamingContext object + * @param actorSystemFactory A factory to create ActorSystem in executors + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might * be deserializer of bytes) to translate from sequence of sequence of @@ -120,6 +145,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, + actorSystemFactory: ActorSystemFactory, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] @@ -128,6 +154,6 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, publisherUrl, subscribe, fn) + createStream[T](jssc.ssc, () => actorSystemFactory.create(), publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 417b91eecb0ee..291bcdd301c27 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,14 +17,17 @@ package org.apache.spark.streaming.zeromq; -import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; -import org.junit.Test; +import akka.actor.ActorSystem; import akka.actor.SupervisorStrategy; import akka.util.ByteString; import akka.zeromq.Subscribe; +import org.junit.Test; + import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.akka.ActorSystemFactory; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { public void testZeroMQStream() { String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); - Function> bytesToObjects = new Function>() { - @Override - public Iterable call(byte[][] bytes) throws Exception { - return null; - } - }; + Function> bytesToObjects = new BytesToObjects(); + ActorSystemFactory actorSystemFactory = new ActorSystemFactoryForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects); + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( - ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); } } + +class BytesToObjects implements Function> { + @Override + public Iterable call(byte[][] bytes) throws Exception { + return null; + } +} + +class ActorSystemFactoryForTest implements ActorSystemFactory { + @Override + public ActorSystem create() { + return null; + } +} diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 35d2e62c68480..3f77a62d7644e 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -42,14 +42,14 @@ class ZeroMQStreamSuite extends SparkFunSuite { // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream(ssc, () => null, publishUrl, subscribe, bytesToObjects) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, + ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) - // TODO: Actually test data receiving + // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop() } } diff --git a/pom.xml b/pom.xml index fc5cf970e0601..7f196400fcef7 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ external/flume external/flume-sink external/flume-assembly + external/akka external/mqtt external/mqtt-assembly external/zeromq diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 4c34c888cfd5e..b9c62cbed2770 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -35,11 +35,11 @@ object BuildCommons { private val buildLocation = file(".").getAbsoluteFile.getParentFile val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, - sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka, + sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) = Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl", "sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink", - "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter", + "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter", "streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _)) val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl, @@ -232,8 +232,9 @@ object SparkBuild extends PomBuild { /* Enable tests settings for all projects except examples, assembly and tools */ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings)) + // TODO: remove streamingAkka from this list after 2.0.0 allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl, - networkCommon, networkShuffle, networkYarn, unsafe, testTags).contains(x)).foreach { + networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach { x => enable(MimaBuild.mimaSettings(sparkHome, x))(x) } @@ -634,7 +635,7 @@ object Unidoc { "-public", "-group", "Core Java API", packageList("api.java", "api.java.function"), "-group", "Spark Streaming", packageList( - "streaming.api.java", "streaming.flume", "streaming.kafka", + "streaming.api.java", "streaming.flume", "streaming.akka", "streaming.kafka", "streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis" ), "-group", "MLlib", packageList( diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 157ee92fd71b3..c4c3f3ef8128f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -41,7 +40,7 @@ import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver} +import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -294,27 +293,6 @@ class StreamingContext private[streaming] ( } } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = withNamedScope("actor stream") { - receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy)) - } - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 00f9d8a9e8817..7a25ce54b6ff0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} @@ -356,69 +355,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name) - } - /** * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. From c20b51ac6ad57c34b0372fbfb6a7aa85319eae1a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 13 Jan 2016 15:21:11 -0800 Subject: [PATCH 02/13] Fix Mima tests --- project/MimaExcludes.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4206d1fada421..f008d467943e9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -147,6 +147,16 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.Logging.org$apache$spark$streaming$flume$sink$Logging$$_log_="), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"), ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=") + ) ++ Seq( + // SPARK-7799 Add "streaming-akka" project + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"), + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream") ) case v if v.startsWith("1.6") => Seq( From f2671578b5b28c6ae547c2106b20efb67753eeea Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 12:47:40 -0800 Subject: [PATCH 03/13] actorStream -> createStream --- .../examples/streaming/JavaActorWordCount.java | 2 +- .../spark/examples/streaming/ActorWordCount.scala | 2 +- .../apache/spark/streaming/akka/AkkaUtils.scala | 14 +++++++------- .../spark/streaming/zeromq/ZeroMQUtils.scala | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 57bbc585de63c..1b40996e7f08a 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -117,7 +117,7 @@ public static void main(String[] args) { * For example: Both actorStream and JavaSampleActorReceiver are parameterized * to same type to ensure type safety. */ - JavaDStream lines = AkkaUtils.actorStream( + JavaDStream lines = AkkaUtils.createStream( jssc, new WordcountActorSystemFactory(), Props.create(JavaSampleActorReceiver.class, feederActorURI), diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index faa42d2eabaed..f035c06eea877 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -168,7 +168,7 @@ object ActorWordCount { ActorSystem(uniqueSystemName, akkaConf) } - val lines = AkkaUtils.actorStream[String]( + val lines = AkkaUtils.createStream[String]( ssc, actorSystemCreator _, Props(new SampleActorReceiver[String]( diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index 0de2130c353c9..f834d1098b11d 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -52,7 +52,7 @@ object AkkaUtils { * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T: ClassTag]( + def createStream[T: ClassTag]( ssc: StreamingContext, actorSystemCreator: () => ActorSystem, props: Props, @@ -80,7 +80,7 @@ object AkkaUtils { * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T]( + def createStream[T]( jssc: JavaStreamingContext, actorSystemFactory: ActorSystemFactory, props: Props, @@ -90,7 +90,7 @@ object AkkaUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - actorStream[T]( + createStream[T]( jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel, supervisorStrategy) } @@ -108,7 +108,7 @@ object AkkaUtils { * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T]( + def createStream[T]( jssc: JavaStreamingContext, actorSystemFactory: ActorSystemFactory, props: Props, @@ -117,7 +117,7 @@ object AkkaUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - actorStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel) + createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel) } /** @@ -134,7 +134,7 @@ object AkkaUtils { * to ensure the type safety, i.e parametrized type of data received and actorStream * should be same. */ - def actorStream[T]( + def createStream[T]( jssc: JavaStreamingContext, actorSystemFactory: ActorSystemFactory, props: Props, @@ -142,7 +142,7 @@ object AkkaUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - actorStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name) + createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 9e556ffd3ee8c..52fc592d3abef 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -54,7 +54,7 @@ object ZeroMQUtils { storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy ): ReceiverInputDStream[T] = { - AkkaUtils.actorStream( + AkkaUtils.createStream( ssc, actorSystemCreator, Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), From f650ff5d90db4e1c8d4451aa8461980c927e02bc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 13:02:15 -0800 Subject: [PATCH 04/13] Use standard Akka confs --- .../apache/spark/examples/streaming/JavaActorWordCount.java | 5 ++--- .../org/apache/spark/examples/streaming/ActorWordCount.scala | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 1b40996e7f08a..b9ae8b81b3a70 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -151,10 +151,9 @@ class WordcountActorSystemFactory implements ActorSystemFactory { @Override public ActorSystem create() { String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId(); - Map akkaConf = new HashMap(); + Map akkaConf = new HashMap(); akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); - akkaConf.put( - "akka.remote.netty.tcp.transport-class", "akka.remote.transport.netty.NettyTransport"); + akkaConf.put("akka.remote.enabled-transports", Arrays.asList("akka.remote.netty.tcp")); return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf)); } } diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index f035c06eea877..662664cd7b443 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -109,7 +109,7 @@ object FeederActor { val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |""".stripMargin) @@ -163,7 +163,7 @@ object ActorWordCount { val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] |""".stripMargin) ActorSystem(uniqueSystemName, akkaConf) } From f2a07d484dec7231c838695b98b7960e8564de1a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 13:03:09 -0800 Subject: [PATCH 05/13] Use Function0 rather than ActorSystemFactory and update a lot of docs --- docs/streaming-custom-receivers.md | 47 ++++++-- .../streaming/JavaActorWordCount.java | 19 ++-- .../examples/streaming/ActorWordCount.scala | 9 +- .../spark/streaming/akka/ActorReceiver.scala | 16 +-- .../spark/streaming/akka/AkkaUtils.scala | 107 ++++++++++-------- .../spark/streaming/zeromq/ZeroMQUtils.scala | 28 +++-- .../zeromq/JavaZeroMQStreamSuite.java | 14 +-- 7 files changed, 140 insertions(+), 100 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 97db865daa371..5986791fbb77f 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -257,25 +257,52 @@ The following table summarizes the characteristics of both types of receivers ## Implementing and Using a Custom Actor-based Receiver +
+
+ Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to -receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper) -trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using - `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc. +receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. {% highlight scala %} -class CustomActor extends Actor with ActorHelper { + +class CustomActor extends ActorReceiver { def receive = { case data: String => store(data) } } + +// A new input stream can be created with this custom actor as +val ssc: StreamingContext = ... +val lines = AkkaUtils.createStream[String](Props[CustomActor](), "CustomReceiver") + {% endhighlight %} -And a new input stream can be created with this custom actor as +See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example. +
+
+ +Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to +receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver) +allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of +this actor can be configured to handle failures, etc. + +{% highlight java %} + +class CustomActor extends JavaActorReceiver { + @Override + public void onReceive(Object msg) throws Exception { + store((String) msg); + } +} + +// A new input stream can be created with this custom actor as +JavaStreamingContext ssc = ... +JavaDStream lines = AkkaUtils.createStream(Props.create(CustomActor.class), "CustomReceiver") -{% highlight scala %} -val ssc: StreamingContext = ... -val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver") {% endhighlight %} -See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) -for an end-to-end example. +See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. +
+
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index b9ae8b81b3a70..ebaccdbb15f7f 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -21,17 +21,17 @@ import java.util.HashMap; import java.util.Map; -import akka.actor.ActorSystem; -import com.typesafe.config.ConfigFactory; -import org.apache.spark.TaskContext; -import org.apache.spark.streaming.akka.ActorSystemFactory; import scala.Tuple2; import akka.actor.ActorSelection; +import akka.actor.ActorSystem; import akka.actor.Props; +import com.typesafe.config.ConfigFactory; import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function0; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; @@ -63,6 +63,7 @@ public void preStart() { remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); } + @Override public void onReceive(Object msg) throws Exception { store((T) msg); } @@ -107,19 +108,19 @@ public static void main(String[] args) { String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e type of data received and InputDstream * should be same. * - * For example: Both actorStream and JavaSampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized * to same type to ensure type safety. */ JavaDStream lines = AkkaUtils.createStream( jssc, - new WordcountActorSystemFactory(), + new WordcountActorSystemCreator(), Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); @@ -146,10 +147,10 @@ public Integer call(Integer i1, Integer i2) { } } -class WordcountActorSystemFactory implements ActorSystemFactory { +class WordcountActorSystemCreator implements Function0 { @Override - public ActorSystem create() { + public ActorSystem call() { String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId(); Map akkaConf = new HashMap(); akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 662664cd7b443..b07c8a63791f5 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -124,6 +124,7 @@ object FeederActor { /** * A sample word count program demonstrating the use of plugging in + * * Actor as Receiver * Usage: ActorWordCount * and describe the AkkaSystem that Spark Sample feeder is running on. @@ -149,14 +150,14 @@ object ActorWordCount { val ssc = new StreamingContext(sparkConf, Seconds(2)) /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e type of data received and InputDStream * should be same. * - * For example: Both actorStream and SampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ def actorSystemCreator(): ActorSystem = { @@ -171,8 +172,8 @@ object ActorWordCount { val lines = AkkaUtils.createStream[String]( ssc, actorSystemCreator _, - Props(new SampleActorReceiver[String]( - "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt))), + Props(classOf[SampleActorReceiver[String]], + "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), "SampleReceiver") // compute wordcount diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index b8f8c628cb5e2..0cb771eaf9908 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -59,13 +59,13 @@ object ActorSupervisorStrategy { * } * } * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * // Can be used with AkkaUtils.createStream as follows + * AkkaUtils.createStream[String](Props[MyActor](),"MyActorReceiver") * * }}} * * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream + * to ensure the type safety, i.e. parametrized type of push block and InputDStream * should be same. */ @DeveloperApi @@ -109,13 +109,13 @@ abstract class ActorReceiver extends Actor { * } * } * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") + * // Can be used with AkkaUtils.createStream as follows + * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver") * * }}} * * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream + * to ensure the type safety, i.e. parametrized type of push block and InputDStream * should be same. */ @DeveloperApi @@ -148,8 +148,8 @@ abstract class JavaActorReceiver extends UntypedActor { /** * :: DeveloperApi :: * Statistics for querying the supervisor about state of workers. Used in - * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.akka.ActorReceiver]]. + * conjunction with `AkkaUtils.createStream` and + * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]]. */ @DeveloperApi case class Statistics(numberOfMsgs: Int, diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index f834d1098b11d..7b885d09fdc1e 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -21,128 +21,135 @@ import scala.reflect.ClassTag import akka.actor.{ActorSystem, Props, SupervisorStrategy} +import org.apache.spark.api.java.function.{Function0 => JFunction0} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream -/** - * Factory interface for creating a new ActorSystem in executors. - */ -trait ActorSystemFactory extends Serializable { - def create(): ActorSystem -} - object AkkaUtils { /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details. * - * @param ssc the StreamingContext instance - * @param actorSystemCreator a function to create ActorSystem in executors - * @param props Props object defining creation of the actor - * @param name Name of the actor + * @param ssc The StreamingContext instance + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @param supervisorStrategy the supervisor strategy (default: * ActorSupervisorStrategy.defaultStrategy) * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream + * to ensure the type safety, i.e. parametrized type of data received and createStream * should be same. */ def createStream[T: ClassTag]( ssc: StreamingContext, actorSystemCreator: () => ActorSystem, - props: Props, - name: String, + propsForActor: Props, + actorName: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { + ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { val cleanF = ssc.sc.clean(actorSystemCreator) - ssc.receiverStream( - new ActorReceiverSupervisor[T](cleanF, props, name, storageLevel, supervisorStrategy)) + ssc.receiverStream(new ActorReceiverSupervisor[T]( + cleanF, + propsForActor, + actorName, + storageLevel, + supervisorStrategy)) } /** - * Create an input stream with any arbitrary user implemented actor receiver. + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * - * @param jssc the StreamingContext instance - * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors - * @param props Props object defining creation of the actor - * @param name Name of the actor + * @param jssc The StreamingContext instance + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects * @param supervisorStrategy the supervisor strategy * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream + * to ensure the type safety, i.e. parametrized type of data received and createStream * should be same. */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, - props: Props, - name: String, + actorSystemCreator: JFunction0[ActorSystem], + propsForActor: Props, + actorName: String, storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T]( - jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel, supervisorStrategy) + jssc.ssc, + () => actorSystemCreator.call(), + propsForActor, + actorName, + storageLevel, + supervisorStrategy) } /** - * Create an input stream with any arbitrary user implemented actor receiver. + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * - * @param jssc the StreamingContext instance - * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors - * @param props Props object defining creation of the actor - * @param name Name of the actor + * @param jssc The StreamingContext instance + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream + * to ensure the type safety, i.e. parametrized type of data received and createStream * should be same. */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, - props: Props, - name: String, + actorSystemCreator: JFunction0[ActorSystem], + propsForActor: Props, + actorName: String, storageLevel: StorageLevel ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name, storageLevel) + createStream[T]( + jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName, storageLevel) } /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. + * Create an input stream with a user-defined actor. Storage level of the data will be the default + * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. * - * @param jssc the StreamingContext instance - * @param actorSystemFactory an ActorSystemFactory to create ActorSystem in executors - * @param props Props object defining creation of the actor - * @param name Name of the actor + * @param jssc The StreamingContext instance + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream + * to ensure the type safety, i.e. parametrized type of data received and createStream * should be same. */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, - props: Props, - name: String + actorSystemCreator: JFunction0[ActorSystem], + propsForActor: Props, + actorName: String ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T](jssc.ssc, () => actorSystemFactory.create(), props, name) + createStream[T](jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 52fc592d3abef..bb74ef3a0fe8c 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -24,10 +24,10 @@ import akka.actor.{ActorSystem, Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe -import org.apache.spark.api.java.function.{Function => JFunction} +import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, ActorSystemFactory, AkkaUtils} +import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -35,7 +35,8 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param ssc StreamingContext object - * @param actorSystemCreator A function to create ActorSystem in executors + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic @@ -66,7 +67,8 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object - * @param actorSystemFactory A factory to create ActorSystem in executors + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. * @param publisherUrl Url of remote ZeroMQ publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each @@ -77,7 +79,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, + actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -90,7 +92,7 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemFactory.create(), + () => actorSystemCreator.call(), publisherUrl, subscribe, fn, @@ -101,7 +103,8 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object - * @param actorSystemFactory A factory to create ActorSystem in executors + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each @@ -112,7 +115,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, + actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -124,7 +127,7 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemFactory.create(), + () => actorSystemCreator.call(), publisherUrl, subscribe, fn, @@ -134,7 +137,8 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object - * @param actorSystemFactory A factory to create ActorSystem in executors + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each @@ -145,7 +149,7 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemFactory: ActorSystemFactory, + actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] @@ -154,6 +158,6 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, () => actorSystemFactory.create(), publisherUrl, subscribe, fn) + createStream[T](jssc.ssc, () => actorSystemCreator.call(), publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 291bcdd301c27..6db5cb77625e7 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -24,9 +24,9 @@ import org.junit.Test; import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function0; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; -import org.apache.spark.streaming.akka.ActorSystemFactory; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @@ -36,14 +36,14 @@ public void testZeroMQStream() { String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function> bytesToObjects = new BytesToObjects(); - ActorSystemFactory actorSystemFactory = new ActorSystemFactoryForTest(); + Function0 actorSystemCreator = new ActorSystemCreatorForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects); + ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( - ssc, actorSystemFactory, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); } } @@ -55,9 +55,9 @@ public Iterable call(byte[][] bytes) throws Exception { } } -class ActorSystemFactoryForTest implements ActorSystemFactory { +class ActorSystemCreatorForTest implements Function0 { @Override - public ActorSystem create() { + public ActorSystem call() { return null; } } From 463569cd6700734b87f950d204254ce081ff628a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 14:07:51 -0800 Subject: [PATCH 06/13] Update zeromq --- .../org/apache/spark/examples/streaming/ZeroMQWordCount.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 76ce3e8e564a7..6826b830c853c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -94,7 +94,7 @@ object ZeroMQWordCount { val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] |""".stripMargin) ActorSystem(uniqueSystemName, akkaConf) } From 4e252299231d0dbeddee319343ac08603af034ea Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 14:10:10 -0800 Subject: [PATCH 07/13] Minor doc fix --- docs/streaming-custom-receivers.md | 4 ++-- .../scala/org/apache/spark/streaming/akka/ActorReceiver.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 5986791fbb77f..caaa5bd8eb020 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -298,8 +298,8 @@ class CustomActor extends JavaActorReceiver { } // A new input stream can be created with this custom actor as -JavaStreamingContext ssc = ... -JavaDStream lines = AkkaUtils.createStream(Props.create(CustomActor.class), "CustomReceiver") +JavaStreamingContext ssc = ...; +JavaDStream lines = AkkaUtils.createStream(Props.create(CustomActor.class), "CustomReceiver"); {% endhighlight %} diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 0cb771eaf9908..876769466cddc 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -110,7 +110,7 @@ abstract class ActorReceiver extends Actor { * } * * // Can be used with AkkaUtils.createStream as follows - * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver") + * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver"); * * }}} * From 9a1b2759d29a0daed8f06fa3c0eaec6e0a5b5a37 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 17:15:33 -0800 Subject: [PATCH 08/13] doc for Python API not supported --- docs/streaming-custom-receivers.md | 2 ++ docs/streaming-programming-guide.md | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index caaa5bd8eb020..33e54216d7400 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -306,3 +306,5 @@ JavaDStream lines = AkkaUtils.createStream(Props.create(CustomAc See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example. + +Python API Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API. diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 8fd075d02b78e..802effa555eaa 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka - actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver + actors by using `AkkaUtils.createStream(actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. Python API Since actors are available only in the Java and Scala - libraries, `actorStream` is not available in the Python API. + libraries, `AkkaUtils.createStream` is not available in the Python API. - **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream. From d495ecc860693b4bfb713d6124be12c5d472e9ce Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 18 Jan 2016 17:21:56 -0800 Subject: [PATCH 09/13] Fix docs --- .../org/apache/spark/streaming/akka/ActorReceiver.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 876769466cddc..6f411cac50a01 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -59,7 +59,6 @@ object ActorSupervisorStrategy { * } * } * - * // Can be used with AkkaUtils.createStream as follows * AkkaUtils.createStream[String](Props[MyActor](),"MyActorReceiver") * * }}} @@ -104,12 +103,12 @@ abstract class ActorReceiver extends Actor { * * @example {{{ * class MyActor extends JavaActorReceiver { - * def receive { - * case anything: String => store(anything) + * @Override + * public void onReceive(Object msg) throws Exception { + * store((String) msg); * } * } * - * // Can be used with AkkaUtils.createStream as follows * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver"); * * }}} From 0c8bd0409e760e4a6e7b52fd662d0d77928650e2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 11:12:42 -0800 Subject: [PATCH 10/13] Add a default ActorSystem --- docs/streaming-custom-receivers.md | 6 +- docs/streaming-programming-guide.md | 2 +- .../streaming/JavaActorWordCount.java | 19 --- .../examples/streaming/ActorWordCount.scala | 14 +- .../examples/streaming/ZeroMQWordCount.scala | 10 -- .../spark/streaming/akka/ActorReceiver.scala | 18 ++- .../spark/streaming/akka/AkkaUtils.scala | 116 ++++++++++++--- .../spark/streaming/zeromq/ZeroMQUtils.scala | 133 +++++++++++++++--- .../zeromq/JavaZeroMQStreamSuite.java | 13 +- .../streaming/zeromq/ZeroMQStreamSuite.scala | 16 ++- 10 files changed, 257 insertions(+), 90 deletions(-) diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md index 33e54216d7400..95b99862ec062 100644 --- a/docs/streaming-custom-receivers.md +++ b/docs/streaming-custom-receivers.md @@ -275,7 +275,7 @@ class CustomActor extends ActorReceiver { // A new input stream can be created with this custom actor as val ssc: StreamingContext = ... -val lines = AkkaUtils.createStream[String](Props[CustomActor](), "CustomReceiver") +val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver") {% endhighlight %} @@ -298,8 +298,8 @@ class CustomActor extends JavaActorReceiver { } // A new input stream can be created with this custom actor as -JavaStreamingContext ssc = ...; -JavaDStream lines = AkkaUtils.createStream(Props.create(CustomActor.class), "CustomReceiver"); +JavaStreamingContext jssc = ...; +JavaDStream lines = AkkaUtils.createStream(jssc, Props.create(CustomActor.class), "CustomReceiver"); {% endhighlight %} diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 802effa555eaa..93c34efb6662d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -659,7 +659,7 @@ methods for creating DStreams from files and Akka actors as input sources. Python API `fileStream` is not available in the Python API, only `textFileStream` is available. - **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka - actors by using `AkkaUtils.createStream(actorProps, actor-name)`. See the [Custom Receiver + actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver Guide](streaming-custom-receivers.html) for more details. Python API Since actors are available only in the Java and Scala diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index ebaccdbb15f7f..62e563380a9e7 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -18,20 +18,14 @@ package org.apache.spark.examples.streaming; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; import scala.Tuple2; import akka.actor.ActorSelection; -import akka.actor.ActorSystem; import akka.actor.Props; -import com.typesafe.config.ConfigFactory; import org.apache.spark.SparkConf; -import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function0; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; @@ -120,7 +114,6 @@ public static void main(String[] args) { */ JavaDStream lines = AkkaUtils.createStream( jssc, - new WordcountActorSystemCreator(), Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); @@ -146,15 +139,3 @@ public Integer call(Integer i1, Integer i2) { jssc.awaitTermination(); } } - -class WordcountActorSystemCreator implements Function0 { - - @Override - public ActorSystem call() { - String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId(); - Map akkaConf = new HashMap(); - akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider"); - akkaConf.put("akka.remote.enabled-transports", Arrays.asList("akka.remote.netty.tcp")); - return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf)); - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index b07c8a63791f5..8e88987439ffc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -25,7 +25,7 @@ import scala.util.Random import akka.actor._ import com.typesafe.config.ConfigFactory -import org.apache.spark.{SparkConf, TaskContext} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} @@ -78,7 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) extends ActorReceiver { +class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -160,18 +160,8 @@ object ActorWordCount { * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - def actorSystemCreator(): ActorSystem = { - val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" - val akkaConf = ConfigFactory.parseString( - s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - |""".stripMargin) - ActorSystem(uniqueSystemName, akkaConf) - } - val lines = AkkaUtils.createStream[String]( ssc, - actorSystemCreator _, Props(classOf[SampleActorReceiver[String]], "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), "SampleReceiver") diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 6826b830c853c..f612e508eb78e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -90,19 +90,9 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator - def actorSystemCreator(): ActorSystem = { - val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}" - val akkaConf = ConfigFactory.parseString( - s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] - |""".stripMargin) - ActorSystem(uniqueSystemName, akkaConf) - } - // For this stream, a zeroMQ publisher should be running. val lines = ZeroMQUtils.createStream( ssc, - actorSystemCreator _, url, Subscribe(topic), bytesToStringIterator _) diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 6f411cac50a01..4e249bd81fefb 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -26,8 +26,9 @@ import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import com.typesafe.config.ConfigFactory -import org.apache.spark.Logging +import org.apache.spark.{Logging, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver @@ -37,13 +38,22 @@ import org.apache.spark.streaming.receiver.Receiver * A helper with set of defaults for supervisor strategy */ @DeveloperApi -object ActorSupervisorStrategy { +object ActorReceiver { val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException => Restart case _: Exception => Escalate } + + def defaultActorSystemCreator(): ActorSystem = { + val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } } /** @@ -59,7 +69,7 @@ object ActorSupervisorStrategy { * } * } * - * AkkaUtils.createStream[String](Props[MyActor](),"MyActorReceiver") + * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver") * * }}} * @@ -109,7 +119,7 @@ abstract class ActorReceiver extends Actor { * } * } * - * AkkaUtils.createStream(Props.create(MyActor.class), "MyActorReceiver"); + * AkkaUtils.createStream(jssc, Props.create(MyActor.class), "MyActorReceiver"); * * }}} * diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index 7b885d09fdc1e..e8025f8cb1adc 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -33,13 +33,13 @@ object AkkaUtils { * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details. * * @param ssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * @param supervisorStrategy the supervisor strategy (default: - * ActorSupervisorStrategy.defaultStrategy) + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) * * @note An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility @@ -48,11 +48,11 @@ object AkkaUtils { */ def createStream[T: ClassTag]( ssc: StreamingContext, - actorSystemCreator: () => ActorSystem, propsForActor: Props, actorName: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { val cleanF = ssc.sc.clean(actorSystemCreator) ssc.receiverStream(new ActorReceiverSupervisor[T]( @@ -67,8 +67,6 @@ object AkkaUtils { * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects @@ -81,7 +79,6 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String, storageLevel: StorageLevel, @@ -91,19 +88,51 @@ object AkkaUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), propsForActor, actorName, storageLevel, - supervisorStrategy) + supervisorStrategy = supervisorStrategy) } /** * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @param storageLevel Storage level to use for storing the received objects @@ -115,15 +144,41 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String, storageLevel: StorageLevel ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName, storageLevel) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T]( - jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName, storageLevel) + jssc.ssc, propsForActor, actorName, storageLevel, () => actorSystemCreator.call()) } /** @@ -131,8 +186,6 @@ object AkkaUtils { * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. * * @param jssc The StreamingContext instance - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param propsForActor Props object defining creation of the actor * @param actorName Name of the actor * @@ -143,13 +196,42 @@ object AkkaUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], propsForActor: Props, actorName: String ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T](jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName) + createStream[T](jssc.ssc, propsForActor, actorName) + } + + /** + * Create an input stream with a user-defined actor. Storage level of the data will be the default + * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + actorSystemCreator = () => actorSystemCreator.call()) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index bb74ef3a0fe8c..5855a34eb63d7 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -27,7 +27,7 @@ import akka.zeromq.Subscribe import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext -import org.apache.spark.streaming.akka.{ActorSupervisorStrategy, AkkaUtils} +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream @@ -35,8 +35,6 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param ssc StreamingContext object - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic @@ -45,41 +43,43 @@ object ZeroMQUtils { * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T: ClassTag]( ssc: StreamingContext, - actorSystemCreator: () => ActorSystem, publisherUrl: String, subscribe: Subscribe, bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy ): ReceiverInputDStream[T] = { AkkaUtils.createStream( ssc, - actorSystemCreator, Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, + actorSystemCreator, supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. * @param publisherUrl Url of remote ZeroMQ publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects + * @param storageLevel Storage level to use for storing the received objects + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -92,30 +92,63 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), publisherUrl, subscribe, fn, storageLevel, - supervisorStrategy) + supervisorStrategy = supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each * frame has sequence of byte thus it needs the converter(which might be * deserializer of bytes) to translate from sequence of sequence of bytes, * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * @param storageLevel RDD storage level. */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -127,7 +160,6 @@ object ZeroMQUtils { (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T]( jssc.ssc, - () => actorSystemCreator.call(), publisherUrl, subscribe, fn, @@ -137,8 +169,40 @@ object ZeroMQUtils { /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might be + * deserializer of bytes) to translate from sequence of sequence of bytes, + * where sequence refer to a frame and sub sequence refer to its payload. + * @param storageLevel RDD storage level. * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will * be shut down when the receiver is stopping. + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + storageLevel, + () => actorSystemCreator.call()) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object * @param publisherUrl Url of remote zeromq publisher * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each @@ -149,7 +213,6 @@ object ZeroMQUtils { */ def createStream[T]( jssc: JavaStreamingContext, - actorSystemCreator: JFunction0[ActorSystem], publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] @@ -158,6 +221,42 @@ object ZeroMQUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T](jssc.ssc, () => actorSystemCreator.call(), publisherUrl, subscribe, fn) + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each + * frame has sequence of byte thus it needs the converter(which might + * be deserializer of bytes) to translate from sequence of sequence of + * bytes, where sequence refer to a frame and sub sequence refer to its + * payload. + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + */ + def createStream[T]( + jssc: JavaStreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + actorSystemCreator: JFunction0[ActorSystem] + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala + createStream[T]( + jssc.ssc, + publisherUrl, + subscribe, + fn, + actorSystemCreator = () => actorSystemCreator.call()) } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index 6db5cb77625e7..caddbdb91d631 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -39,12 +39,19 @@ public void testZeroMQStream() { Function0 actorSystemCreator = new ActorSystemCreatorForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects); + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( - ssc, actorSystemCreator, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator, SupervisorStrategy.defaultStrategy()); + JavaReceiverInputDStream test4 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects); + JavaReceiverInputDStream test5 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test6 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + SupervisorStrategy.defaultStrategy()); } } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 3f77a62d7644e..bac2679cabae5 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -42,12 +42,20 @@ class ZeroMQStreamSuite extends SparkFunSuite { // tests the API, does not actually test data receiving val test1: ReceiverInputDStream[String] = - ZeroMQUtils.createStream(ssc, () => null, publishUrl, subscribe, bytesToObjects) + ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null) val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, () => null, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null) val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream( - ssc, () => null, publishUrl, subscribe, bytesToObjects, - StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = + ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, + StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving. A real test needs the native ZeroMQ library ssc.stop() From 5ac46f9bfc2a4c83679ff2e66aee5e5b48b48ef0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 14:02:15 -0800 Subject: [PATCH 11/13] Remove some Java overloads --- .../spark/streaming/akka/AkkaUtils.scala | 90 ----------------- .../spark/streaming/zeromq/ZeroMQUtils.scala | 99 ------------------- .../zeromq/JavaZeroMQStreamSuite.java | 11 +-- 3 files changed, 2 insertions(+), 198 deletions(-) diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index e8025f8cb1adc..a48adef307b4a 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -63,37 +63,6 @@ object AkkaUtils { supervisorStrategy)) } - /** - * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. - * - * @param jssc The StreamingContext instance - * @param propsForActor Props object defining creation of the actor - * @param actorName Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * @param supervisorStrategy the supervisor strategy - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e. parametrized type of data received and createStream - * should be same. - */ - def createStream[T]( - jssc: JavaStreamingContext, - propsForActor: Props, - actorName: String, - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T]( - jssc.ssc, - propsForActor, - actorName, - storageLevel, - supervisorStrategy = supervisorStrategy) - } - /** * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. * @@ -153,34 +122,6 @@ object AkkaUtils { createStream[T](jssc.ssc, propsForActor, actorName, storageLevel) } - /** - * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. - * - * @param jssc The StreamingContext instance - * @param propsForActor Props object defining creation of the actor - * @param actorName Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e. parametrized type of data received and createStream - * should be same. - */ - def createStream[T]( - jssc: JavaStreamingContext, - propsForActor: Props, - actorName: String, - storageLevel: StorageLevel, - actorSystemCreator: JFunction0[ActorSystem] - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T]( - jssc.ssc, propsForActor, actorName, storageLevel, () => actorSystemCreator.call()) - } - /** * Create an input stream with a user-defined actor. Storage level of the data will be the default * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. @@ -203,35 +144,4 @@ object AkkaUtils { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] createStream[T](jssc.ssc, propsForActor, actorName) } - - /** - * Create an input stream with a user-defined actor. Storage level of the data will be the default - * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. - * - * @param jssc The StreamingContext instance - * @param propsForActor Props object defining creation of the actor - * @param actorName Name of the actor - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e. parametrized type of data received and createStream - * should be same. - */ - def createStream[T]( - jssc: JavaStreamingContext, - propsForActor: Props, - actorName: String, - actorSystemCreator: JFunction0[ActorSystem] - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - createStream[T]( - jssc.ssc, - propsForActor, - actorName, - actorSystemCreator = () => actorSystemCreator.call()) - } - } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 5855a34eb63d7..3eb8f6cb75752 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -66,39 +66,6 @@ object ZeroMQUtils { supervisorStrategy) } - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might be - * deserializer of bytes) to translate from sequence of sequence of bytes, - * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel Storage level to use for storing the received objects - * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) - */ - def createStream[T]( - jssc: JavaStreamingContext, - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn, - storageLevel, - supervisorStrategy = supervisorStrategy) - } - /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object @@ -166,40 +133,6 @@ object ZeroMQUtils { storageLevel) } - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might be - * deserializer of bytes) to translate from sequence of sequence of bytes, - * where sequence refer to a frame and sub sequence refer to its payload. - * @param storageLevel RDD storage level. - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. - */ - def createStream[T]( - jssc: JavaStreamingContext, - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - storageLevel: StorageLevel, - actorSystemCreator: JFunction0[ActorSystem] - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn, - storageLevel, - () => actorSystemCreator.call()) - } - /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param jssc JavaStreamingContext object @@ -227,36 +160,4 @@ object ZeroMQUtils { subscribe, fn) } - - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param jssc JavaStreamingContext object - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe Topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each - * frame has sequence of byte thus it needs the converter(which might - * be deserializer of bytes) to translate from sequence of sequence of - * bytes, where sequence refer to a frame and sub sequence refer to its - * payload. - * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will - * be shut down when the receiver is stopping. - */ - def createStream[T]( - jssc: JavaStreamingContext, - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], - actorSystemCreator: JFunction0[ActorSystem] - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = - (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala - createStream[T]( - jssc.ssc, - publisherUrl, - subscribe, - fn, - actorSystemCreator = () => actorSystemCreator.call()) - } } diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index caddbdb91d631..9ff4b41f97d50 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -39,19 +39,12 @@ public void testZeroMQStream() { Function0 actorSystemCreator = new ActorSystemCreatorForTest(); JavaReceiverInputDStream test1 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator); + ssc, publishUrl, subscribe, bytesToObjects); JavaReceiverInputDStream test2 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator); + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaReceiverInputDStream test3 = ZeroMQUtils.createStream( ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator, SupervisorStrategy.defaultStrategy()); - JavaReceiverInputDStream test4 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects); - JavaReceiverInputDStream test5 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaReceiverInputDStream test6 = ZeroMQUtils.createStream( - ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), - SupervisorStrategy.defaultStrategy()); } } From 9e36df118ba21b4f45254f96ec6fdec02b1a5b2d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 16:28:49 -0800 Subject: [PATCH 12/13] Add unit tests --- external/akka/pom.xml | 7 ++ .../spark/streaming/akka/ActorReceiver.scala | 14 +++- .../spark/streaming/akka/AkkaUtils.scala | 2 +- .../streaming/akka/JavaAkkaUtilsSuite.java | 66 +++++++++++++++++++ .../spark/streaming/akka/AkkaUtilsSuite.scala | 64 ++++++++++++++++++ 5 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java create mode 100644 external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala diff --git a/external/akka/pom.xml b/external/akka/pom.xml index a7f3e83faca10..34de9bae00e49 100644 --- a/external/akka/pom.xml +++ b/external/akka/pom.xml @@ -58,6 +58,13 @@ akka-remote_${scala.binary.version} ${akka.version}
+ + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + target/scala-${scala.binary.version}/classes diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 4e249bd81fefb..c75dc92445b64 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -40,13 +40,23 @@ import org.apache.spark.streaming.receiver.Receiver @DeveloperApi object ActorReceiver { - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + /** + * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and + * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for + * others, it just escalates the failure to the supervisor of the supervisor. + */ + val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException => Restart case _: Exception => Escalate } - def defaultActorSystemCreator(): ActorSystem = { + /** + * A default ActorSystem creator. It will use a unique system name + * (streaming-actor-system-) to start an ActorSystem that supports remote + * communication. + */ + val defaultActorSystemCreator: () => ActorSystem = () => { val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index a48adef307b4a..38c35c5ae7a18 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -52,7 +52,7 @@ object AkkaUtils { actorName: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, - supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { val cleanF = ssc.sc.clean(actorSystemCreator) ssc.receiverStream(new ActorReceiverSupervisor[T]( diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java new file mode 100644 index 0000000000000..b732506767154 --- /dev/null +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.Test; + +import org.apache.spark.api.java.function.Function0; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; + +public class JavaAkkaUtilsSuite { + + @Test // tests the API, does not actually test data receiving + public void testAkkaUtils() { + JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + try { + JavaReceiverInputDStream test1 = AkkaUtils.createStream( + jsc, Props.create(JavaTestActor.class), "test"); + JavaReceiverInputDStream test2 = AkkaUtils.createStream( + jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = AkkaUtils.createStream( + jsc, + Props.create(JavaTestActor.class), + "test", StorageLevel.MEMORY_AND_DISK_SER_2(), + new ActorSystemCreatorForTest(), + SupervisorStrategy.defaultStrategy()); + } finally { + jsc.stop(); + } + } +} + +class ActorSystemCreatorForTest implements Function0 { + @Override + public ActorSystem call() { + return null; + } +} + + +class JavaTestActor extends JavaActorReceiver { + @Override + public void onReceive(Object message) throws Exception { + store((String) message); + } +} diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala new file mode 100644 index 0000000000000..f437585a98e4f --- /dev/null +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import akka.actor.{Props, SupervisorStrategy} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +class AkkaUtilsSuite extends SparkFunSuite { + + test("createStream") { + val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000)) + try { + // tests the API, does not actually test data receiving + val test1: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test") + val test2: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2) + val test3: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy = SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test5: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test6: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + () => null, + SupervisorStrategy.defaultStrategy) + } finally { + ssc.stop() + } + } +} + +class TestActor extends ActorReceiver { + override def receive: Receive = { + case m: String => store(m) + } +} From 5e2ac50a9971401312e22a3ddbd683a3d5735862 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 18:02:14 -0800 Subject: [PATCH 13/13] Fix the compiler error --- .../scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 3eb8f6cb75752..1784d6e8623ad 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -55,7 +55,7 @@ object ZeroMQUtils { bytesToObjects: Seq[ByteString] => Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, - supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy ): ReceiverInputDStream[T] = { AkkaUtils.createStream( ssc,