diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 7329ac4a98cdc..94b4c0c9aa017 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -48,12 +48,12 @@ class KinesisStreamSuite extends FunSuite testUtils.deleteDynamoDBTable(kinesisAppName) } - if (shouldRunTests) { + if (KinesisStreamSuite.shouldRunTests) { test("basic operation") { val conf = new SparkConf() .setMaster("local[4]") - .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name + .setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name ssc = new StreamingContext(conf, Milliseconds(1000)) //val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl, // Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY) @@ -79,8 +79,10 @@ class KinesisStreamSuite extends FunSuite } } } +} - def shouldRunTests: Boolean = { +object KinesisStreamSuite { + def shouldRunTests(): Boolean = { val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty def isCredentialsAvailable: Boolean = Try { new DefaultAWSCredentialsProviderChain().getCredentials diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 97399036d8fa4..e193add0791da 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -71,7 +71,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend /** * Push data to Kinesis stream and return a map of - * shardId -> seq of (data, seq number) pushed to corresponding shard + * shardId -> seq of (data, seq number) pushed to corresponding shard */ def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = { require(streamCreated, "Stream not yet created, call createStream() to create one") @@ -119,7 +119,6 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend } def deleteDynamoDBTable(tableName: String): Unit = { - try { val table = dynamoDB.getTable(tableName) table.delete() @@ -146,7 +145,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds)) describeStream(streamNameToWaitFor).foreach { description => val streamStatus = description.getStreamStatus() - println(s"\t- current state: $streamStatus\n") + logDebug(s"\t- current state: $streamStatus\n") if ("ACTIVE".equals(streamStatus)) { return }