Skip to content

Commit

Permalink
[SPARK-22864][CORE] Disable allocation schedule in ExecutorAllocation…
Browse files Browse the repository at this point in the history
…ManagerSuite.

The scheduled task was racing with the test code and could influence
the values returned to the test, triggering assertions. The change adds
a new config that is only used during testing, and overrides it
on the affected test suite.

The issue in the bug can be reliably reproduced by reducing the interval
in the test (e.g. to 10ms).

While there, fixed an exception that shows up in the logs while these
tests run, and simplified some code (which was also causing misleading
log messages in the log output of the test).

Author: Marcelo Vanzin <[email protected]>

Closes #20050 from vanzin/SPARK-22864.
  • Loading branch information
Marcelo Vanzin authored and squito committed Dec 29, 2017
1 parent 8b49704 commit 4e9e6ae
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ private[spark] class ExecutorAllocationManager(
private val removeTimes = new mutable.HashMap[String, Long]

// Polling loop interval (ms)
private val intervalMillis: Long = 100
private val intervalMillis: Long = if (Utils.isTesting) {
conf.getLong(TESTING_SCHEDULE_INTERVAL_KEY, 100)
} else {
100
}

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new SystemClock()
Expand Down Expand Up @@ -856,4 +860,5 @@ private[spark] class ExecutorAllocationManager(

private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
val TESTING_SCHEDULE_INTERVAL_KEY = "spark.testing.dynamicAllocation.scheduleInterval"
}
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1575,10 +1575,10 @@ class SparkContext(config: SparkConf) extends Logging {

private[spark] def getExecutorIds(): Seq[String] = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
case b: ExecutorAllocationClient =>
b.getExecutorIds()
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
logWarning("Requesting executors is not supported by current scheduler.")
Nil
}
}
Expand All @@ -1604,10 +1604,10 @@ class SparkContext(config: SparkConf) extends Logging {
hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
case b: ExecutorAllocationClient =>
b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
logWarning("Requesting executors is not supported by current scheduler.")
false
}
}
Expand All @@ -1620,10 +1620,10 @@ class SparkContext(config: SparkConf) extends Logging {
@DeveloperApi
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
case b: ExecutorAllocationClient =>
b.requestExecutors(numAdditionalExecutors)
case _ =>
logWarning("Requesting executors is only supported in coarse-grained mode")
logWarning("Requesting executors is not supported by current scheduler.")
false
}
}
Expand All @@ -1642,10 +1642,10 @@ class SparkContext(config: SparkConf) extends Logging {
@DeveloperApi
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
case b: ExecutorAllocationClient =>
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
logWarning("Killing executors is not supported by current scheduler.")
false
}
}
Expand Down Expand Up @@ -1680,10 +1680,10 @@ class SparkContext(config: SparkConf) extends Logging {
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
case b: ExecutorAllocationClient =>
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
logWarning("Killing executors is not supported by current scheduler.")
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveLi
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
eventQueue.put(POISON_PILL)
eventCount.incrementAndGet()
eventQueue.put(POISON_PILL)
}
dispatchThread.join()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,11 @@ class ExecutorAllocationManagerSuite

val task2Info = createTaskInfo(1, 0, "executor-1")
post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info))

task1Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))

task2Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))

assert(adjustRequestedExecutors(manager) === -1)
Expand Down Expand Up @@ -1063,6 +1067,9 @@ class ExecutorAllocationManagerSuite
s"${sustainedSchedulerBacklogTimeout.toString}s")
.set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s")
.set("spark.dynamicAllocation.testing", "true")
// SPARK-22864: effectively disable the allocation schedule by setting the period to a
// really long value.
.set(TESTING_SCHEDULE_INTERVAL_KEY, "10000")
val sc = new SparkContext(conf)
contexts += sc
sc
Expand Down Expand Up @@ -1250,28 +1257,19 @@ private class DummyLocalExternalClusterManager extends ExternalClusterManager {
private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend)
extends SchedulerBackend with ExecutorAllocationClient {

override private[spark] def getExecutorIds(): Seq[String] = sc.getExecutorIds()
override private[spark] def getExecutorIds(): Seq[String] = Nil

override private[spark] def requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean =
sc.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
hostToLocalTaskCount: Map[String, Int]): Boolean = true

override def requestExecutors(numAdditionalExecutors: Int): Boolean =
sc.requestExecutors(numAdditionalExecutors)
override def requestExecutors(numAdditionalExecutors: Int): Boolean = true

override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Seq[String] = {
val response = sc.killExecutors(executorIds)
if (response) {
executorIds
} else {
Seq.empty[String]
}
}
force: Boolean): Seq[String] = executorIds

override def start(): Unit = sb.start()

Expand Down

0 comments on commit 4e9e6ae

Please sign in to comment.