Skip to content

Commit

Permalink
Close the executor pod watcher before deleting the executor pods
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Nov 27, 2017
1 parent 4bed817 commit c386186
Showing 1 changed file with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
} else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
for (i <- 0 until math.min(
for (_ <- 0 until math.min(
currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
val executorPod = executorPodFactory.createExecutorPod(
Expand Down Expand Up @@ -232,19 +232,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
// send stop message to executors so they shut down cleanly
super.stop()

// then delete the executor pods
Utils.tryLogNonFatalError {
val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
runningExecutorsToPods.clear()
runningExecutorPodsCopy
}
kubernetesClient.pods().delete(executorPodsToDelete: _*)
executorPodsByIPs.clear()
try {
val resource = executorWatchResource.getAndSet(null)
if (resource != null) {
resource.close()
}
} catch {
case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
}

// then delete the executor pods
Utils.tryLogNonFatalError {
deleteExecutorPodsOnStop()
executorPodsByIPs.clear()
}
Utils.tryLogNonFatalError {
logInfo("Closing kubernetes client")
Expand Down Expand Up @@ -298,6 +298,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
true
}

private def deleteExecutorPodsOnStop(): Unit = {
val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
runningExecutorsToPods.clear()
runningExecutorPodsCopy
}
kubernetesClient.pods().delete(executorPodsToDelete: _*)
}

private class ExecutorPodsWatcher extends Watcher[Pod] {

private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
Expand Down

0 comments on commit c386186

Please sign in to comment.