Skip to content

Commit

Permalink
Update test added in master
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Feb 3, 2020
1 parent 89dfb19 commit 5435640
Showing 1 changed file with 33 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,44 +402,47 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
val clock = new ManualClock()
val stage = createStageInfo(0, 40)
val manager = createManager(createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4), clock = clock)
val updatesNeeded =
new mutable.HashMap[ResourceProfile, ExecutorAllocationManager.TargetNumUpdates]

post(SparkListenerStageSubmitted(stage))
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
assert(addExecutors(manager) === 4)
assert(addExecutors(manager) === 3)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 2)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 4)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 3)

(0 to 9).foreach(execId => onExecutorAdded(manager, execId.toString))
(0 to 9).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
(0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskStart(0, 0, info))
}
assert(numExecutorsTarget(manager) === 10)
assert(maxNumExecutorsNeeded(manager) == 10)
assert(numExecutorsTarget(manager, defaultProfile.id) === 10)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 10)

// 30 tasks (0 - 29) finished
(0 to 29).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) }
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 3)
assert(maxNumExecutorsNeeded(manager) == 3)
assert(numExecutorsTarget(manager, defaultProfile.id) === 3)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 3)
(0 to 6).foreach { i => assert(removeExecutor(manager, i.toString))}
(0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}

// 10 speculative tasks (30 - 39) launch for the remaining tasks
(30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) == 5)
assert(maxNumExecutorsNeeded(manager) == 5)
(10 to 12).foreach(execId => onExecutorAdded(manager, execId.toString))
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
assert(addExecutorsToTargetForDefaultProfile(manager, updatesNeeded) === 1)
assert(numExecutorsTarget(manager, defaultProfile.id) == 5)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)
(10 to 12).foreach(execId => onExecutorAddedDefaultProfile(manager, execId.toString))
(40 to 49).map { i =>
createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
.foreach { info => post(SparkListenerTaskStart(0, 0, info))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) == 5) // At this point, we still have 6 executors running
assert(maxNumExecutorsNeeded(manager) == 5)
// At this point, we still have 6 executors running
assert(numExecutorsTarget(manager, defaultProfile.id) == 5)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 5)

// 6 speculative tasks (40 - 45) finish before the original tasks, with 4 speculative remaining
(40 to 45).map { i =>
Expand All @@ -448,8 +451,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 4)
assert(maxNumExecutorsNeeded(manager) == 4)
assert(numExecutorsTarget(manager, defaultProfile.id) === 4)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 4)
assert(removeExecutor(manager, "10"))
onExecutorRemoved(manager, "10")
// At this point, we still have 5 executors running: ["7", "8", "9", "11", "12"]
Expand All @@ -461,8 +464,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
SparkListenerTaskEnd(0, 0, null, TaskKilled("test"), info, new ExecutorMetrics, null))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
(7 to 8).foreach { i => assert(removeExecutor(manager, i.toString))}
(7 to 8).foreach { i => onExecutorRemoved(manager, i.toString)}
// At this point, we still have 3 executors running: ["9", "11", "12"]
Expand All @@ -477,8 +480,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
// tasks running. Target lowers to 2, but still hold 3 executors ["9", "11", "12"]
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
// At this point, we still have 3 executors running: ["9", "11", "12"]

// Task 37 and 47 succeed at the same time
Expand All @@ -491,8 +494,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
// tasks running
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 1)
assert(maxNumExecutorsNeeded(manager) == 1)
assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)
assert(removeExecutor(manager, "11"))
onExecutorRemoved(manager, "11")
// At this point, we still have 2 executors running: ["9", "12"]
Expand All @@ -506,14 +509,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
// maxNeeded = 1, allocate one more to satisfy speculation locality requirement
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
assert(numExecutorsTarget(manager, defaultProfile.id) === 2)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 2)
post(SparkListenerTaskStart(0, 0,
createTaskInfo(50, 39, executorId = "12", speculative = true)))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 1)
assert(maxNumExecutorsNeeded(manager) == 1)
assert(numExecutorsTarget(manager, defaultProfile.id) === 1)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 1)

// Task 39 and 48 succeed, task 50 killed
post(SparkListenerTaskEnd(0, 0, null, Success,
Expand All @@ -525,8 +528,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
post(SparkListenerStageCompleted(stage))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 0)
assert(maxNumExecutorsNeeded(manager) == 0)
assert(numExecutorsTarget(manager, defaultProfile.id) === 0)
assert(maxNumExecutorsNeededPerResourceProfile(manager, defaultProfile) == 0)
assert(removeExecutor(manager, "9"))
onExecutorRemoved(manager, "9")
assert(removeExecutor(manager, "12"))
Expand Down

0 comments on commit 5435640

Please sign in to comment.