Skip to content

Commit

Permalink
DRY code review
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed Nov 16, 2015
1 parent 091e19a commit 0a4ef24
Showing 1 changed file with 13 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1330,11 +1330,7 @@ class DAGSchedulerSuite
assert(taskSet.properties.getProperty("testProperty") === expected)
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active
*/
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
def launchJobsThatShareStageAndCancelFirst(): ShuffleDependency[Int, Int, Nothing] = {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, null)
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
Expand Down Expand Up @@ -1363,15 +1359,24 @@ class DAGSchedulerSuite
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
assert(taskSets(0).properties != null)
// NB: This next assert isn't necessarily the "desired" behavior; it's just to document
// the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
// even though we have cancelled that job and are now running it because of job2, we haven't
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
// correct behavior.
assert(taskSets(0).properties.getProperty("testProperty") === "job1")
checkJobProperties(taskSets(0), "job1")
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

shuffleDep1
}

/**
* Makes sure that tasks for a stage used by multiple jobs are submitted with the properties of a
* later, active job if they were previously run under a job that is no longer active
*/
test("stage used by two jobs, the first no longer active (SPARK-6880)") {
launchJobsThatShareStageAndCancelFirst()

// The next check is the key for SPARK-6880. For the stage which was shared by both job1 and
// job2 but never had any tasks submitted for job1, the properties of job2 are now used to run
// the stage.
Expand All @@ -1393,49 +1398,7 @@ class DAGSchedulerSuite
*/
test("stage used by two jobs, some fetch failures, and the first job no longer active " +
"(SPARK-6880)") {
val baseRdd = new MyRDD(sc, 1, Nil)
val shuffleDep1 = new ShuffleDependency(baseRdd, null)
val intermediateRdd = new MyRDD(sc, 1, List(shuffleDep1))
val shuffleDep2 = new ShuffleDependency(intermediateRdd, null)
val finalRdd1 = new MyRDD(sc, 1, List(shuffleDep2))
val finalRdd2 = new MyRDD(sc, 1, List(shuffleDep2))
val job1Properties = new Properties()
val job2Properties = new Properties()
job1Properties.setProperty("testProperty", "job1")
job2Properties.setProperty("testProperty", "job2")

def checkJobProperties(taskSet: TaskSet, expected: String): Unit = {
assert(taskSet.properties != null)
assert(taskSet.properties.getProperty("testProperty") === expected)
}

// Run jobs 1 & 2, both referencing the same stage, then cancel job1.
// Note that we have to submit job2 before we cancel job1 to have them actually share
// *Stages*, and not just shuffle dependencies, due to skipped stages (at least until
// we address SPARK-10193.)
val jobId1 = submit(finalRdd1, Array(0), properties = job1Properties)
val jobId2 = submit(finalRdd2, Array(0), properties = job2Properties)
assert(scheduler.activeJobs.nonEmpty)
val testProperty1 = scheduler.jobIdToActiveJob(jobId1).properties.getProperty("testProperty")

// job 1 finishes stage 0
assert(taskSets(0).properties.getProperty("testProperty") === "job1")
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))

// remove job1 as an ActiveJob
cancel(jobId1)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

// job2 should still be running, starts from stage 1
assert(scheduler.activeJobs.nonEmpty)
val testProperty2 = scheduler.jobIdToActiveJob(jobId2).properties.getProperty("testProperty")
assert(testProperty1 != testProperty2)
// NB: This next assert isn't necessarily the "desired" behavior; it's just to document
// the current behavior. We've already submitted the TaskSet for stage 0 based on job1, but
// even though we have cancelled that job and are now running it because of job2, we haven't
// updated the TaskSet's properties. Changing the properties to "job2" is likely the more
// correct behavior.
checkJobProperties(taskSets(1), "job1")
val shuffleDep1 = launchJobsThatShareStageAndCancelFirst()

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
Expand Down

0 comments on commit 0a4ef24

Please sign in to comment.