Skip to content

Commit

Permalink
Change allocation manager remove Executors to take resource profile id
Browse files Browse the repository at this point in the history
with the set of executors to avoid race.  Fix up minor nits from
reviews.
  • Loading branch information
tgravescs committed Feb 6, 2020
1 parent 87aab30 commit bef3a67
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* quickly over time in case the maximum number of executors is very high. Otherwise, it will take
* a long time to ramp up under heavy workloads.
*
* The remove policy is simpler: If an executor has been idle for K seconds and the number of
* executors is more then what is needed, meaning there are not enough tasks that could use
* The remove policy is simpler and is applied on each ResourceProfile separately. If an executor
* for that ResourceProfile has been idle for K seconds and the number of executors is more
* then what is needed for that ResourceProfile, meaning there are not enough tasks that could use
* the executor, then it is removed. Note that an executor caching any data
* blocks will be removed if it has been idle for more than L seconds.
*
Expand Down Expand Up @@ -240,10 +241,14 @@ private[spark] class ExecutorAllocationManager(
}
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)

client.requestTotalExecutors(
numExecutorsTargetPerResourceProfileId.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap,
rpIdToHostToLocalTaskCount)
// copy the maps inside synchonize to ensure not being modified
val (numExecutorsTarget, numLocalityAware) = synchronized {
val numTarget = numExecutorsTargetPerResourceProfileId.toMap
val numLocality = numLocalityAwareTasksPerResourceProfileId.toMap
(numTarget, numLocality)
}

client.requestTotalExecutors(numExecutorsTarget, numLocalityAware, rpIdToHostToLocalTaskCount)
}

/**
Expand Down Expand Up @@ -500,23 +505,21 @@ private[spark] class ExecutorAllocationManager(
delta
}

private def getResourceProfileIdOfExecutor(executorId: String): Int = {
executorMonitor.getResourceProfileId(executorId)
}

/**
* Request the cluster manager to remove the given executors.
* Returns the list of executors which are removed.
*/
private def removeExecutors(executors: Seq[String]): Seq[String] = synchronized {
private def removeExecutors(executors: Seq[(String, Int)]): Seq[String] = synchronized {
val executorIdsToBeRemoved = new ArrayBuffer[String]
logDebug(s"Request to remove executorIds: ${executors.mkString(", ")}")
val numExecutorsTotalPerRpId = mutable.Map[Int, Int]()
executors.foreach { executorIdToBeRemoved =>
val rpId = getResourceProfileIdOfExecutor(executorIdToBeRemoved)
executors.foreach { case (executorIdToBeRemoved, rpId) =>
if (rpId == UNKNOWN_RESOURCE_PROFILE_ID) {
logWarning(s"Not removing executor $executorIdsToBeRemoved because couldn't find " +
"ResourceProfile for it!")
if (testing) {
throw new SparkException("ResourceProfile Id was UNKNOWN, this is not expected")
}
logWarning(s"Not removing executor $executorIdsToBeRemoved because the " +
"ResourceProfile was UNKNOWN!")
} else {
// get the running total as we remove or initialize it to the count - pendingRemoval
val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
Expand Down Expand Up @@ -590,7 +593,7 @@ private[spark] class ExecutorAllocationManager(
private def onSchedulerQueueEmpty(): Unit = synchronized {
logDebug("Clearing timer to add executors because there are no more pending tasks")
addTime = NOT_SET
numExecutorsToAddPerResourceProfileId.keys.foreach(numExecutorsToAddPerResourceProfileId(_) = 1)
numExecutorsToAddPerResourceProfileId.transform { case (_, _) => 1 }
}

private case class StageAttempt(stageId: Int, stageAttemptId: Int) {
Expand Down Expand Up @@ -784,43 +787,39 @@ private[spark] class ExecutorAllocationManager(
*/
def pendingTasksPerResourceProfile(rpId: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rpId, Set.empty).toSeq
getPendingTaskSum(attempts)
attempts.map(attempt => getPendingTaskSum(attempt)).sum
}

def hasPendingRegularTasks: Boolean = {
val attempts = resourceProfileIdToStageAttempt.values.flatten.toSeq
val pending = getPendingTaskSum(attempts)
(pending > 0)
val attemptSets = resourceProfileIdToStageAttempt.values
attemptSets.exists(attempts => attempts.exists(getPendingTaskSum(_) > 0))
}

private def getPendingTaskSum(attempts: Seq[StageAttempt]): Int = {
attempts.map { attempt =>
val numTotalTasks = stageAttemptToNumTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}.sum
private def getPendingTaskSum(attempt: StageAttempt): Int = {
val numTotalTasks = stageAttemptToNumTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}

def pendingSpeculativeTasksPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
getPendingSpeculativeTaskSum(attempts)
attempts.map(attempt => getPendingSpeculativeTaskSum(attempt)).sum
}

def hasPendingSpeculativeTasks: Boolean = {
val attempts = resourceProfileIdToStageAttempt.values.flatten.toSeq
val pending = getPendingSpeculativeTaskSum(attempts)
(pending > 0)
val attemptSets = resourceProfileIdToStageAttempt.values
attemptSets.exists { attempts =>
attempts.exists(getPendingSpeculativeTaskSum(_) > 0)
}
}

private def getPendingSpeculativeTaskSum(attempts: Seq[StageAttempt]): Int = {
attempts.map { attempt =>
val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}.sum
private def getPendingSpeculativeTaskSum(attempt: StageAttempt): Int = {
val numTotalTasks = stageAttemptToNumSpeculativeTasks.getOrElse(attempt, 0)
val numRunning = stageAttemptToSpeculativeTaskIndices.get(attempt).map(_.size).getOrElse(0)
numTotalTasks - numRunning
}

def hasPendingTasks(): Boolean = {
def hasPendingTasks: Boolean = {
hasPendingSpeculativeTasks || hasPendingRegularTasks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Number of executors for each ResourceProfile requested by the cluster
// manager, [[ExecutorAllocationManager]]
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var requestedTotalExecutorsPerResourceProfile = new HashMap[ResourceProfile, Int]
private val requestedTotalExecutorsPerResourceProfile = new HashMap[ResourceProfile, Int]

private val listenerBus = scheduler.sc.listenerBus

Expand Down Expand Up @@ -621,8 +621,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
(scheduler.sc.resourceProfileManager.resourceProfileFromId(rpid), num)
}
val response = synchronized {
this.requestedTotalExecutorsPerResourceProfile =
new HashMap[ResourceProfile, Int] ++= resourceProfileToNumExecutors
this.requestedTotalExecutorsPerResourceProfile.clear()
this.requestedTotalExecutorsPerResourceProfile ++= resourceProfileToNumExecutors
this.numLocalityAwareTasksPerResourceProfileId = numLocalityAwareTasksPerResourceProfileId
this.rpHostToLocalTaskCount = hostToLocalTaskCount
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] class ExecutorMonitor(
// this listener. There are safeguards in other parts of the code that would prevent that executor
// from being removed.
private val nextTimeout = new AtomicLong(Long.MaxValue)
private var timedOutExecs = Seq.empty[String]
private var timedOutExecs = Seq.empty[(String, Int)]

// Active job tracking.
//
Expand Down Expand Up @@ -100,10 +100,10 @@ private[spark] class ExecutorMonitor(
}

/**
* Returns the list of executors that are currently considered to be timed out.
* Should only be called from the EAM thread.
* Returns the list of executors and their ResourceProfile id that are currently considered to
* be timed out. Should only be called from the EAM thread.
*/
def timedOutExecutors(): Seq[String] = {
def timedOutExecutors(): Seq[(String, Int)] = {
val now = clock.nanoTime()
if (now >= nextTimeout.get()) {
// Temporarily set the next timeout at Long.MaxValue. This ensures that after
Expand All @@ -126,7 +126,7 @@ private[spark] class ExecutorMonitor(
true
}
}
.keys
.map { case (name, exec) => (name, exec.resourceProfileId)}
.toSeq
updateNextTimeout(newNextTimeout)
}
Expand Down Expand Up @@ -155,6 +155,7 @@ private[spark] class ExecutorMonitor(
execResourceProfileCount.getOrDefault(id, 0)
}

// for testing
def getResourceProfileId(executorId: String): Int = {
val execTrackingInfo = executors.get(executorId)
if (execTrackingInfo != null) {
Expand Down
Loading

0 comments on commit bef3a67

Please sign in to comment.