Skip to content

Commit

Permalink
Made unit tests optional in a nice way
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jul 16, 2015
1 parent 4d70703 commit 465b55d
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.spark.streaming.kinesis

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.SparkFunSuite

abstract class KinesisFunSuite extends SparkFunSuite with BeforeAndAfterAll {
import KinesisTestUtils._

def testOrIgnore(testName: String)(testBody: => Unit) {
if (shouldRunTests) {
test(testName)(testBody)
} else {
ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import org.apache.hadoop.fs.FileSystem
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FunSuite}

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkFunSuite, SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.util.Utils

class KinesisStreamSuite extends FunSuite
class KinesisStreamSuite extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {

private val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
Expand All @@ -46,6 +46,7 @@ class KinesisStreamSuite extends FunSuite
private var ssc: StreamingContext = _

override def beforeAll(): Unit = {
super.beforeAll()
testUtils.createStream()
}

Expand All @@ -65,45 +66,29 @@ class KinesisStreamSuite extends FunSuite
testUtils.deleteDynamoDBTable(kinesisAppName)
}

if (KinesisStreamSuite.shouldRunTests) {

test("basic operation") {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name
ssc = new StreamingContext(conf, Milliseconds(1000))
//val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl,
// Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY)

val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
credentials.getAWSAccessKeyId, credentials.getAWSSecretKey)


val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
println("Collected = " + rdd.collect().toSeq.mkString(", "))
}
ssc.start()
testOrIgnore("basic operation") {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
ssc = new StreamingContext(conf, Milliseconds(1000))
val credentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
credentials.getAWSAccessKeyId, credentials.getAWSSecretKey)

val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
}
ssc.start()

val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop()
}
}

object KinesisStreamSuite {
def shouldRunTests(): Boolean = {
val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty
def isCredentialsAvailable: Boolean = Try {
new DefaultAWSCredentialsProviderChain().getCredentials
}.isSuccess
isSystemVariableSet && isCredentialsAvailable
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.{Failure, Random, Success, Try}

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient
import com.amazonaws.services.dynamodbv2.document.DynamoDB
Expand All @@ -49,11 +49,8 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
private var streamCreated = false
private var _streamName: String = _


private val credentialsProvider = new DefaultAWSCredentialsProviderChain()

private lazy val kinesisClient = {
val client = new AmazonKinesisClient(credentialsProvider)
val client = new AmazonKinesisClient(KinesisTestUtils.getAWSCredentials())
client.setEndpoint(endpointUrl)
client
}
Expand All @@ -70,7 +67,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
}

def createStream(): Unit = {
println("Creating stream")
logInfo("Creating stream")
require(!streamCreated, "Stream already created")
_streamName = findNonExistentStreamName()

Expand All @@ -83,7 +80,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
// The stream is now being created. Wait for it to become active.
waitForStreamToBeActive(_streamName)
streamCreated = true
println("Created stream")
logInfo("Created stream")
}

/**
Expand All @@ -108,7 +105,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
sentSeqNumbers += ((num, seqNumber))

}
println(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
shardIdToSeqNumbers.toMap
}

Expand Down Expand Up @@ -171,3 +168,25 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
require(false, s"Stream $streamName never became active")
}
}

object KinesisTestUtils {


val envVarName = "RUN_KINESIS_TESTS"

val shouldRunTests = sys.env.get(envVarName).nonEmpty

def isAWSCredentialsPresent: Boolean = {
Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
}

def getAWSCredentials(): AWSCredentials = {
assert(shouldRunTests,
"Kinesis test not enabled, should not attempt to get AWS credentials")
Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
case Success(cred) => cred
case Failure(e) =>
throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
}
}
}

0 comments on commit 465b55d

Please sign in to comment.