From 9e36df118ba21b4f45254f96ec6fdec02b1a5b2d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 19 Jan 2016 16:28:49 -0800 Subject: [PATCH] Add unit tests --- external/akka/pom.xml | 7 ++ .../spark/streaming/akka/ActorReceiver.scala | 14 +++- .../spark/streaming/akka/AkkaUtils.scala | 2 +- .../streaming/akka/JavaAkkaUtilsSuite.java | 66 +++++++++++++++++++ .../spark/streaming/akka/AkkaUtilsSuite.scala | 64 ++++++++++++++++++ 5 files changed, 150 insertions(+), 3 deletions(-) create mode 100644 external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java create mode 100644 external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala diff --git a/external/akka/pom.xml b/external/akka/pom.xml index a7f3e83faca10..34de9bae00e49 100644 --- a/external/akka/pom.xml +++ b/external/akka/pom.xml @@ -58,6 +58,13 @@ akka-remote_${scala.binary.version} ${akka.version} + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + test-jar + test + target/scala-${scala.binary.version}/classes diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index 4e249bd81fefb..c75dc92445b64 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -40,13 +40,23 @@ import org.apache.spark.streaming.receiver.Receiver @DeveloperApi object ActorReceiver { - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + /** + * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and + * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for + * others, it just escalates the failure to the supervisor of the supervisor. + */ + val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException => Restart case _: Exception => Escalate } - def defaultActorSystemCreator(): ActorSystem = { + /** + * A default ActorSystem creator. It will use a unique system name + * (streaming-actor-system-) to start an ActorSystem that supports remote + * communication. + */ + val defaultActorSystemCreator: () => ActorSystem = () => { val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" val akkaConf = ConfigFactory.parseString( s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala index a48adef307b4a..38c35c5ae7a18 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -52,7 +52,7 @@ object AkkaUtils { actorName: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, - supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { val cleanF = ssc.sc.clean(actorSystemCreator) ssc.receiverStream(new ActorReceiverSupervisor[T]( diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java new file mode 100644 index 0000000000000..b732506767154 --- /dev/null +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.Test; + +import org.apache.spark.api.java.function.Function0; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; + +public class JavaAkkaUtilsSuite { + + @Test // tests the API, does not actually test data receiving + public void testAkkaUtils() { + JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + try { + JavaReceiverInputDStream test1 = AkkaUtils.createStream( + jsc, Props.create(JavaTestActor.class), "test"); + JavaReceiverInputDStream test2 = AkkaUtils.createStream( + jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream test3 = AkkaUtils.createStream( + jsc, + Props.create(JavaTestActor.class), + "test", StorageLevel.MEMORY_AND_DISK_SER_2(), + new ActorSystemCreatorForTest(), + SupervisorStrategy.defaultStrategy()); + } finally { + jsc.stop(); + } + } +} + +class ActorSystemCreatorForTest implements Function0 { + @Override + public ActorSystem call() { + return null; + } +} + + +class JavaTestActor extends JavaActorReceiver { + @Override + public void onReceive(Object message) throws Exception { + store((String) message); + } +} diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala new file mode 100644 index 0000000000000..f437585a98e4f --- /dev/null +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import akka.actor.{Props, SupervisorStrategy} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +class AkkaUtilsSuite extends SparkFunSuite { + + test("createStream") { + val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000)) + try { + // tests the API, does not actually test data receiving + val test1: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test") + val test2: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2) + val test3: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy = SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test5: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test6: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + () => null, + SupervisorStrategy.defaultStrategy) + } finally { + ssc.stop() + } + } +} + +class TestActor extends ActorReceiver { + override def receive: Receive = { + case m: String => store(m) + } +}