Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7799][SPARK-12786][Streaming]Add "streaming-akka" project #10744

Closed
wants to merge 14 commits into from
12 changes: 12 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ def contains_file(self, filename):
)


streaming_akka = Module(
name="streaming-akka",
dependencies=[streaming],
source_file_regexes=[
"external/akka",
],
sbt_test_goals=[
"streaming-akka/test",
]
)


streaming_flume = Module(
name="streaming-flume",
dependencies=[streaming],
Expand Down
5 changes: 5 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import akka.actor.ActorSystem;
import com.typesafe.config.ConfigFactory;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.akka.ActorSystemFactory;
import scala.Tuple2;

import akka.actor.ActorSelection;
Expand All @@ -31,7 +37,8 @@
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.JavaActorReceiver;
import org.apache.spark.streaming.akka.AkkaUtils;
import org.apache.spark.streaming.akka.JavaActorReceiver;

/**
* A sample actor as receiver, is also simplest. This receiver actor
Expand Down Expand Up @@ -110,8 +117,11 @@ public static void main(String[] args) {
* For example: Both actorStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
JavaDStream<String> lines = jssc.actorStream(
Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
JavaDStream<String> lines = AkkaUtils.actorStream(
jssc,
new WordcountActorSystemFactory(),
Props.create(JavaSampleActorReceiver.class, feederActorURI),
"SampleReceiver");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent. nvm. my mistake.


// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
Expand All @@ -135,3 +145,16 @@ public Integer call(Integer i1, Integer i2) {
jssc.awaitTermination();
}
}

class WordcountActorSystemFactory implements ActorSystemFactory {

@Override
public ActorSystem create() {
String uniqueSystemName = "actor-wordcount-" + TaskContext.get().taskAttemptId();
Map<String, String> akkaConf = new HashMap<String, String>();
akkaConf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider");
akkaConf.put(
"akka.remote.netty.tcp.transport-class", "akka.remote.transport.netty.NettyTransport");
return ActorSystem.create(uniqueSystemName, ConfigFactory.parseMap(akkaConf));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.util.Random

import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
import akka.actor._
import com.typesafe.config.ConfigFactory

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.ActorReceiver
import org.apache.spark.util.AkkaUtils
import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}

case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
Expand Down Expand Up @@ -78,8 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends ActorReceiver {
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) extends ActorReceiver {

lazy private val remotePublisher = context.actorSelection(urlOfPublisher)

Expand Down Expand Up @@ -108,9 +107,13 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq

val conf = new SparkConf
val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
securityManager = new SecurityManager(conf))._1
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change will resolve SPARK-12786.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing Could you also tag that JIRA in this PR and resolve it when this is closed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the title

|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the standard configurations that is recommended by Akka 2.3.0 for remoting.
http://doc.akka.io/docs/akka/2.3.0/scala/remoting.html

|akka.remote.netty.tcp.hostname = "$host"
|akka.remote.netty.tcp.port = $port
|""".stripMargin)
val actorSystem = ActorSystem("test", akkaConf)
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")

println("Feeder started as:" + feeder)
Expand Down Expand Up @@ -150,16 +153,27 @@ object ActorWordCount {
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* to ensure the type safety, i.e type of data received and InputDStream
* should be same.
*
* For example: Both actorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
def actorSystemCreator(): ActorSystem = {
val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use standard confs

|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}

val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
host, port.toInt))), "SampleReceiver")
val lines = AkkaUtils.actorStream[String](
ssc,
actorSystemCreator _,
Props(new SampleActorReceiver[String](
"akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt))),
"SampleReceiver")

// compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import akka.zeromq._
import akka.zeromq.Subscribe
import com.typesafe.config.ConfigFactory

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.zeromq._

Expand Down Expand Up @@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
*
* To run this example locally, you may run publisher as
* `$ bin/run-example \
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
* org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
* and run the example as
* `$ bin/run-example \
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
* org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
Expand All @@ -89,8 +90,22 @@ object ZeroMQWordCount {

def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator

def actorSystemCreator(): ActorSystem = {
val uniqueSystemName = s"actor-wordcount-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use standard confs

|""".stripMargin)
ActorSystem(uniqueSystemName, akkaConf)
}

// For this stream, a zeroMQ publisher should be running.
val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val lines = ZeroMQUtils.createStream(
ssc,
actorSystemCreator _,
url,
Subscribe(topic),
bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
Expand Down
66 changes: 66 additions & 0 deletions external/akka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.10</artifactId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-akka_2.10</artifactId>
<properties>
<sbt.project.name>streaming-akka</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Akka</name>
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.streaming.receiver
package org.apache.spark.streaming.akka

import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -27,9 +27,10 @@ import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}

import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -148,7 +149,7 @@ abstract class JavaActorReceiver extends UntypedActor {
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
* conjunction with `StreamingContext.actorStream` and
* [[org.apache.spark.streaming.receiver.ActorReceiver]].
* [[org.apache.spark.streaming.akka.ActorReceiver]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
Expand All @@ -157,10 +158,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)

/** Case class to receive data sent by child actors */
private[streaming] sealed trait ActorReceiverData
private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
private[akka] sealed trait ActorReceiverData
private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData

/**
* Provides Actors as receivers for receiving stream.
Expand All @@ -181,14 +182,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* context.parent ! Props(new Worker, "Worker")
* }}}
*/
private[streaming] class ActorReceiverSupervisor[T: ClassTag](
private[akka] class ActorReceiverSupervisor[T: ClassTag](
actorSystemCreator: () => ActorSystem,
props: Props,
name: String,
storageLevel: StorageLevel,
receiverSupervisorStrategy: SupervisorStrategy
) extends Receiver[T](storageLevel) with Logging {

protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
private lazy val actorSystem = actorSystemCreator()
protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)

class Supervisor extends Actor {
Expand Down Expand Up @@ -241,5 +244,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag](

def onStop(): Unit = {
actorSupervisor ! PoisonPill
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}
Loading