Skip to content

Commit

Permalink
[SPARK-9193] Avoid assigning tasks to "lost" executor(s)
Browse files Browse the repository at this point in the history
Now, when some executors are killed by dynamic-allocation, it leads to some mis-assignment onto lost executors sometimes. Such kind of mis-assignment causes task failure(s) or even job failure if it repeats that errors for 4 times.

The root cause is that ***killExecutors*** doesn't remove those executors under killing ASAP. It depends on the ***OnDisassociated*** event to refresh the active working list later. The delay time really depends on your cluster status (from several milliseconds to sub-minute). When new tasks to be scheduled during that period of time, it will be assigned to those "active" but "under killing" executors. Then the tasks will be failed due to "executor lost". The better way is to exclude those executors under killing in the makeOffers(). Then all those tasks won't be allocated onto those executors "to be lost" any more.

Author: Grace <[email protected]>

Closes #7528 from GraceH/AssignToLostExecutor and squashes the following commits:

ecc1da6 [Grace] scala style fix
6e2ed96 [Grace] Re-word makeOffers by more readable lines
b5546ce [Grace] Add comments about the fix
30a9ad0 [Grace] Avoid assigning tasks to lost executors
  • Loading branch information
GraceH authored and squito committed Jul 21, 2015
1 parent df4ddb3 commit 6592a60
Showing 1 changed file with 12 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on all executors
private def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand All @@ -181,9 +184,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
// Filter out executors under killing
if (!executorsPendingToRemove.contains(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
}

// Launch tasks returned by a set of resource offers
Expand Down

0 comments on commit 6592a60

Please sign in to comment.