diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala new file mode 100644 index 0000000000000..dba4fc4140f99 --- /dev/null +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -0,0 +1,17 @@ +package org.apache.spark.streaming.kinesis + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.SparkFunSuite + +abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll { + import KinesisTestUtils._ + + def testOrIgnore(testName: String)(testBody: => Unit) { + if (shouldRunTests) { + test(testName)(testBody) + } else { + ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody) + } + } +} \ No newline at end of file diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 5f50e842cfd51..00b64542c9935 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -29,14 +29,14 @@ import org.apache.hadoop.fs.FileSystem import org.scalatest.concurrent.Eventually import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite} -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkFunSuite, SparkContext, SparkConf} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.util.Utils -class KinesisStreamSuite extends FunSuite +class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { private val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" @@ -46,6 +46,7 @@ class KinesisStreamSuite extends FunSuite private var ssc: StreamingContext = _ override def beforeAll(): Unit = { + super.beforeAll() testUtils.createStream() } @@ -65,45 +66,29 @@ class KinesisStreamSuite extends FunSuite testUtils.deleteDynamoDBTable(kinesisAppName) } - if (KinesisStreamSuite.shouldRunTests) { - - test("basic operation") { - val conf = new SparkConf() - .setMaster("local[4]") - .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name - ssc = new StreamingContext(conf, Milliseconds(1000)) - //val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl, - // Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY) - - val credentials = new DefaultAWSCredentialsProviderChain().getCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, - testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) - - - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - println("Collected = " + rdd.collect().toSeq.mkString(", ")) - } - ssc.start() + testOrIgnore("basic operation") { + val conf = new SparkConf() + .setMaster("local[4]") + .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name + ssc = new StreamingContext(conf, Milliseconds(1000)) + val credentials = KinesisTestUtils.getAWSCredentials() + val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName, + testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + credentials.getAWSAccessKeyId, credentials.getAWSSecretKey) + + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) + } + ssc.start() - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - testUtils.pushData(testData) - assert(collected === testData.toSet, "\nData received does not match data sent") - } + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + testUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") } + ssc.stop() } } - -object KinesisStreamSuite { - def shouldRunTests(): Boolean = { - val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty - def isCredentialsAvailable: Boolean = Try { - new DefaultAWSCredentialsProviderChain().getCredentials - }.isSuccess - isSystemVariableSet && isCredentialsAvailable - } -} \ No newline at end of file diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 8bacdbe680f71..00b0937df7a9b 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import scala.util.Random +import scala.util.{Failure, Random, Success, Try} -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain +import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.regions.RegionUtils import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB @@ -49,11 +49,8 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend private var streamCreated = false private var _streamName: String = _ - - private val credentialsProvider = new DefaultAWSCredentialsProviderChain() - private lazy val kinesisClient = { - val client = new AmazonKinesisClient(credentialsProvider) + val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials()) client.setEndpoint(endpointUrl) client } @@ -70,7 +67,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend } def createStream(): Unit = { - println("Creating stream") + logInfo("Creating stream") require(!streamCreated, "Stream already created") _streamName = findNonExistentStreamName() @@ -83,7 +80,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend // The stream is now being created. Wait for it to become active. waitForStreamToBeActive(_streamName) streamCreated = true - println("Created stream") + logInfo("Created stream") } /** @@ -108,7 +105,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend sentSeqNumbers += ((num, seqNumber)) } - println(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") + logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}") shardIdToSeqNumbers.toMap } @@ -171,3 +168,25 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend require(false, s"Stream $streamName never became active") } } + +object KinesisTestUtils { + + + val envVarName = "RUN_KINESIS_TESTS" + + val shouldRunTests = sys.env.get(envVarName).nonEmpty + + def isAWSCredentialsPresent: Boolean = { + Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess + } + + def getAWSCredentials(): AWSCredentials = { + assert(shouldRunTests, + "Kinesis test not enabled, should not attempt to get AWS credentials") + Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match { + case Success(cred) => cred + case Failure(e) => + throw new Exception("Kinesis tests enabled, but could get not AWS credentials") + } + } +}