Skip to content

Commit

Permalink
Minor updates
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jul 16, 2015
1 parent cc36510 commit 129d436
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class KinesisStreamSuite extends FunSuite
testUtils.deleteDynamoDBTable(kinesisAppName)
}

if (shouldRunTests) {
if (KinesisStreamSuite.shouldRunTests) {

test("basic operation") {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name
.setAppName(kinesisAppName) // Setting Spark app name to Kinesis app name
ssc = new StreamingContext(conf, Milliseconds(1000))
//val stream = KinesisUtils.createStream(ssc, testUtils.streamName, testUtils.endpointUrl,
// Seconds(10), InitialPositionInStream.LATEST, StorageLevel.MEMORY_ONLY)
Expand All @@ -79,8 +79,10 @@ class KinesisStreamSuite extends FunSuite
}
}
}
}

def shouldRunTests: Boolean = {
object KinesisStreamSuite {
def shouldRunTests(): Boolean = {
val isSystemVariableSet = true // sys.env.get("RUN_KINESIS_STREAM_TESTS").nonEmpty
def isCredentialsAvailable: Boolean = Try {
new DefaultAWSCredentialsProviderChain().getCredentials
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend

/**
* Push data to Kinesis stream and return a map of
* shardId -> seq of (data, seq number) pushed to corresponding shard
* shardId -> seq of (data, seq number) pushed to corresponding shard
*/
def pushData(testData: Seq[Int]): Map[String, Seq[(Int, String)]] = {
require(streamCreated, "Stream not yet created, call createStream() to create one")
Expand Down Expand Up @@ -119,7 +119,6 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
}

def deleteDynamoDBTable(tableName: String): Unit = {

try {
val table = dynamoDB.getTable(tableName)
table.delete()
Expand All @@ -146,7 +145,7 @@ class KinesisTestUtils(val endpointUrl: String, _regionName: String = "") extend
Thread.sleep(TimeUnit.SECONDS.toMillis(describeStreamPollTimeSeconds))
describeStream(streamNameToWaitFor).foreach { description =>
val streamStatus = description.getStreamStatus()
println(s"\t- current state: $streamStatus\n")
logDebug(s"\t- current state: $streamStatus\n")
if ("ACTIVE".equals(streamStatus)) {
return
}
Expand Down

0 comments on commit 129d436

Please sign in to comment.