Skip to content

Commit

Permalink
Add a default ActorSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jan 19, 2016
1 parent d495ecc commit 0c8bd04
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 90 deletions.
6 changes: 3 additions & 3 deletions docs/streaming-custom-receivers.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class CustomActor extends ActorReceiver {

// A new input stream can be created with this custom actor as
val ssc: StreamingContext = ...
val lines = AkkaUtils.createStream[String](Props[CustomActor](), "CustomReceiver")
val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")

{% endhighlight %}

Expand All @@ -298,8 +298,8 @@ class CustomActor extends JavaActorReceiver {
}

// A new input stream can be created with this custom actor as
JavaStreamingContext ssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(Props.create(CustomActor.class), "CustomReceiver");
JavaStreamingContext jssc = ...;
JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");

{% endhighlight %}

Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ methods for creating DStreams from files and Akka actors as input sources.
<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.

- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
actors by using `AkkaUtils.createStream(actorProps, actor-name)`. See the [Custom Receiver
actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver
Guide](streaming-custom-receivers.html) for more details.

<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@
package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.ConfigFactory;

import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
Expand Down Expand Up @@ -120,7 +114,6 @@ public static void main(String[] args) {
*/
JavaDStream<String> lines = AkkaUtils.createStream(
jssc,
new WordcountActorSystemCreator(),
Props.create(JavaSampleActorReceiver.class, feederActorURI),
"SampleReceiver");

Expand All @@ -146,15 +139,3 @@ public Integer call(Integer i1, Integer i2) {
jssc.awaitTermination();
}
}

class WordcountActorSystemCreator implements Function0<ActorSystem> {

@Override
public ActorSystem call() {
String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId();
Map<String, Object> akkaConf = new HashMap<String, Object>();
akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
akkaConf.put("akka.remote.enabled-transports", Arrays.asList("akka.remote.netty.tcp"));
return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.util.Random
import akka.actor._
import com.typesafe.config.ConfigFactory

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

Expand Down Expand Up @@ -78,7 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) extends ActorReceiver {
class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

Expand Down Expand Up @@ -160,18 +160,8 @@ object ActorWordCount {
* For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
def actorSystemCreator(): ActorSystem = {
val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}

val lines = AkkaUtils.createStream[String](
ssc,
actorSystemCreator _,
Props(classOf[SampleActorReceiver[String]],
"akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
"SampleReceiver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,9 @@ object ZeroMQWordCount {

def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator

def actorSystemCreator(): ActorSystem = {
val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}

// For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(
ssc,
actorSystemCreator _,
url,
Subscribe(topic),
bytesToStringIterator _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import scala.reflect.ClassTag

import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
import com.typesafe.config.ConfigFactory

import org.apache.spark.Logging
import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
Expand All @@ -37,13 +38,22 @@ import org.apache.spark.streaming.receiver.Receiver
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
object ActorSupervisorStrategy {
object ActorReceiver {

val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException => Restart
case _: Exception => Escalate
}

def defaultActorSystemCreator(): ActorSystem = {
val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}
}

/**
Expand All @@ -59,7 +69,7 @@ object ActorSupervisorStrategy {
* }
* }
*
* AkkaUtils.createStream[String](Props[MyActor](),"MyActorReceiver")
* AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
*
* }}}
*
Expand Down Expand Up @@ -109,7 +119,7 @@ abstract class ActorReceiver extends Actor {
* }
* }
*
* AkkaUtils.<String>createStream(Props.create(MyActor.class), "MyActorReceiver");
* AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
*
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ object AkkaUtils {
* Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
*
* @param ssc The StreamingContext instance
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
* @param supervisorStrategy the supervisor strategy (default:
* ActorSupervisorStrategy.defaultStrategy)
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping (default:
* ActorReceiver.defaultActorSystemCreator)
* @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
Expand All @@ -48,11 +48,11 @@ object AkkaUtils {
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
actorSystemCreator: () => ActorSystem,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy
): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
val cleanF = ssc.sc.clean(actorSystemCreator)
ssc.receiverStream(new ActorReceiverSupervisor[T](
Expand All @@ -67,8 +67,6 @@ object AkkaUtils {
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
Expand All @@ -81,7 +79,6 @@ object AkkaUtils {
*/
def createStream[T](
jssc: JavaStreamingContext,
actorSystemCreator: JFunction0[ActorSystem],
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel,
Expand All @@ -91,19 +88,51 @@ object AkkaUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](
jssc.ssc,
() => actorSystemCreator.call(),
propsForActor,
actorName,
storageLevel,
supervisorStrategy)
supervisorStrategy = supervisorStrategy)
}

/**
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param supervisorStrategy the supervisor strategy
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel,
actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](
jssc.ssc,
propsForActor,
actorName,
storageLevel,
() => actorSystemCreator.call(),
supervisorStrategy)
}

/**
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
Expand All @@ -115,24 +144,48 @@ object AkkaUtils {
*/
def createStream[T](
jssc: JavaStreamingContext,
actorSystemCreator: JFunction0[ActorSystem],
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
}

/**
* Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param storageLevel Storage level to use for storing the received objects
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String,
storageLevel: StorageLevel,
actorSystemCreator: JFunction0[ActorSystem]
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](
jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName, storageLevel)
jssc.ssc, propsForActor, actorName, storageLevel, () => actorSystemCreator.call())
}

/**
* Create an input stream with a user-defined actor. Storage level of the data will be the default
* StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
*
Expand All @@ -143,13 +196,42 @@ object AkkaUtils {
*/
def createStream[T](
jssc: JavaStreamingContext,
actorSystemCreator: JFunction0[ActorSystem],
propsForActor: Props,
actorName: String
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](jssc.ssc, () => actorSystemCreator.call(), propsForActor, actorName)
createStream[T](jssc.ssc, propsForActor, actorName)
}

/**
* Create an input stream with a user-defined actor. Storage level of the data will be the default
* StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
*
* @param jssc The StreamingContext instance
* @param propsForActor Props object defining creation of the actor
* @param actorName Name of the actor
* @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
* be shut down when the receiver is stopping.
*
* @note An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e. parametrized type of data received and createStream
* should be same.
*/
def createStream[T](
jssc: JavaStreamingContext,
propsForActor: Props,
actorName: String,
actorSystemCreator: JFunction0[ActorSystem]
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
createStream[T](
jssc.ssc,
propsForActor,
actorName,
actorSystemCreator = () => actorSystemCreator.call())
}

}
Loading

0 comments on commit 0c8bd04

Please sign in to comment.