Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed Nov 16, 2015
1 parent 0a4ef24 commit 31ba5de
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1325,16 +1325,17 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
def checkJobPropertiesAndPriority(taskSet: TaskSet, expected: String, priority: Int): Unit = {
assert(taskSet.properties != null)
assert(taskSet.properties.getProperty("testProperty") === expected)
assert(taskSet.priority === priority)
}

def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, null)
val shuffleDep1 = new ShuffleDependency(baseRdd, new HashPartitioner(1))
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
val shuffleDep2 = new ShuffleDependency(intermediateRdd, new HashPartitioner(1))
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
Expand All @@ -1353,7 +1354,6 @@ class DAGSchedulerSuite

// remove job1 as an ActiveJob
cancel(jobId1)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// job2 should still be running
assert(scheduler.activeJobs.nonEmpty)
Expand All @@ -1364,7 +1364,8 @@ class DAGSchedulerSuite
// even though we have cancelled that job and are now running it because of job2, we haven't
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
// correct behavior.
checkJobProperties(taskSets(0), "job1")
val job1Id = 0 // TaskSet priority for Stages run with "job1" as the ActiveJob
checkJobPropertiesAndPriority(taskSets(0), "job1", job1Id)
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

shuffleDep1
Expand All @@ -1380,7 +1381,7 @@ class DAGSchedulerSuite
// The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
// job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
// the stage.
checkJobProperties(taskSets(1), "job2")
checkJobPropertiesAndPriority(taskSets(1), "job2", 1)

complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
assert(taskSets(2).properties != null)
Expand All @@ -1399,6 +1400,7 @@ class DAGSchedulerSuite
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
"(SPARK-6880)") {
val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()
val job2Id = 1 // TaskSet priority for Stages run with "job2" as the ActiveJob

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
Expand All @@ -1409,13 +1411,13 @@ class DAGSchedulerSuite
// stage 0, attempt 1 should have the properties of job2
assert(taskSets(2).stageId === 0)
assert(taskSets(2).stageAttemptId === 1)
checkJobProperties(taskSets(2), "job2")
checkJobPropertiesAndPriority(taskSets(2), "job2", job2Id)

// run the rest of the stages normally, checking that they have the correct properties
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
checkJobProperties(taskSets(3), "job2")
checkJobPropertiesAndPriority(taskSets(3), "job2", job2Id)
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1))))
checkJobProperties(taskSets(4), "job2")
checkJobPropertiesAndPriority(taskSets(4), "job2", job2Id)
complete(taskSets(4), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assert(scheduler.activeJobs.isEmpty)
Expand Down

0 comments on commit 31ba5de

Please sign in to comment.