Skip to content

Commit

Permalink
[SPARK-10564] ThreadingSuite: assertion failures in threads don't fai…
Browse files Browse the repository at this point in the history
…l the test (round 2)

This is a follow-up patch to #8723. I missed one case there.

Author: Andrew Or <[email protected]>

Closes #8727 from andrewor14/fix-threading-suite.

(cherry picked from commit 7b6c856)
Signed-off-by: Andrew Or <[email protected]>
  • Loading branch information
Andrew Or committed Sep 14, 2015
1 parent eb0cb25 commit 5db51f9
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions core/src/test/scala/org/apache/spark/ThreadingSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
}.start()
}
sem.acquire(2)
throwable.foreach { t => throw t }
if (ThreadingSuiteState.failed.get()) {
logError("Waited 1 second without seeing runningThreads = 4 (it was " +
ThreadingSuiteState.runningThreads.get() + "); failing test")
fail("One or more threads didn't see runningThreads = 4")
}
throwable.foreach { t => throw t }
}

test("set local properties in different thread") {
Expand All @@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
assert(sc.getLocalProperty("test") === null)
throwable.foreach { t => throw t }
assert(sc.getLocalProperty("test") === null)
}

test("set and get local properties in parent-children thread") {
Expand Down Expand Up @@ -207,15 +207,16 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
threads.foreach(_.start())

sem.acquire(5)
throwable.foreach { t => throw t }
assert(sc.getLocalProperty("test") === "parent")
assert(sc.getLocalProperty("Foo") === null)
throwable.foreach { t => throw t }
}

test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
val jobStarted = new Semaphore(0)
val jobEnded = new Semaphore(0)
@volatile var jobResult: JobResult = null
var throwable: Option[Throwable] = None

sc = new SparkContext("local", "test")
sc.setJobGroup("originalJobGroupId", "description")
Expand All @@ -232,14 +233,19 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
// Create a new thread which will inherit the current thread's properties
val thread = new Thread() {
override def run(): Unit = {
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
try {
sc.parallelize(1 to 100).foreach { x =>
Thread.sleep(100)
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
try {
sc.parallelize(1 to 100).foreach { x =>
Thread.sleep(100)
}
} catch {
case s: SparkException => // ignored so that we don't print noise in test logs
}
} catch {
case s: SparkException => // ignored so that we don't print noise in test logs
case t: Throwable =>
throwable = Some(t)
}
}
}
Expand All @@ -252,6 +258,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging {
// modification of the properties object should not affect the properties of running jobs
sc.cancelJobGroup("originalJobGroupId")
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
throwable.foreach { t => throw t }
assert(jobResult.isInstanceOf[JobFailed])
}
}

0 comments on commit 5db51f9

Please sign in to comment.