Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Jan 20, 2016
1 parent 5ac46f9 commit 9e36df1
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 3 deletions.
7 changes: 7 additions & 0 deletions external/akka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
<artifactId>akka-remote_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,23 @@ import org.apache.spark.streaming.receiver.Receiver
@DeveloperApi
object ActorReceiver {

val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
/**
* A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
* `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
* others, it just escalates the failure to the supervisor of the supervisor.
*/
val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException => Restart
case _: Exception => Escalate
}

def defaultActorSystemCreator(): ActorSystem = {
/**
* A default ActorSystem creator. It will use a unique system name
* (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
* communication.
*/
val defaultActorSystemCreator: () => ActorSystem = () => {
val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
val akkaConf = ConfigFactory.parseString(
s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object AkkaUtils {
actorName: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultStrategy
supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
val cleanF = ssc.sc.clean(actorSystemCreator)
ssc.receiverStream(new ActorReceiverSupervisor[T](
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.akka;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Test;

import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;

public class JavaAkkaUtilsSuite {

@Test // tests the API, does not actually test data receiving
public void testAkkaUtils() {
JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
try {
JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
jsc, Props.create(JavaTestActor.class), "test");
JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
jsc,
Props.create(JavaTestActor.class),
"test", StorageLevel.MEMORY_AND_DISK_SER_2(),
new ActorSystemCreatorForTest(),
SupervisorStrategy.defaultStrategy());
} finally {
jsc.stop();
}
}
}

class ActorSystemCreatorForTest implements Function0<ActorSystem> {
@Override
public ActorSystem call() {
return null;
}
}


class JavaTestActor extends JavaActorReceiver {
@Override
public void onReceive(Object message) throws Exception {
store((String) message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.akka

import akka.actor.{Props, SupervisorStrategy}

import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream

class AkkaUtilsSuite extends SparkFunSuite {

test("createStream") {
val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
try {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test")
val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc,
Props[TestActor](),
"test",
StorageLevel.MEMORY_AND_DISK_SER_2,
supervisorStrategy = SupervisorStrategy.defaultStrategy)
val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
ssc,
Props[TestActor](),
"test",
StorageLevel.MEMORY_AND_DISK_SER_2,
() => null,
SupervisorStrategy.defaultStrategy)
} finally {
ssc.stop()
}
}
}

class TestActor extends ActorReceiver {
override def receive: Receive = {
case m: String => store(m)
}
}

0 comments on commit 9e36df1

Please sign in to comment.