Skip to content

Commit

Permalink
Changes to support executor recovery behavior during static allocatio…
Browse files Browse the repository at this point in the history
…n. (apache#244)

* Changes to support executor recovery behavior during static allocation.

* addressed review comments

* Style changes and removed inocrrectly merged code

* addressed latest review comments

* changed import order

* Minor changes to avoid exceptions when exit code is missing

* fixed style check

* Addressed review comments from Yinan LiAddressed review comments from
Yinan Li..

* Addressed comments and got rid of an explicit lock object.

* Fixed imports order.

* Addressed review comments from Matt

* Couple of style fixes
  • Loading branch information
varunkatta authored and foxish committed Jul 21, 2017
1 parent b1c48f9 commit 4dfb184
Showing 1 changed file with 173 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package org.apache.spark.scheduler.cluster.kubernetes

import java.io.Closeable
import java.net.InetAddress
import java.util.concurrent.TimeUnit
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}

import scala.collection.{concurrent, mutable}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.io.FilenameUtils
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap}
Expand All @@ -38,8 +40,8 @@ import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv}
import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{ThreadUtils, Utils}
Expand All @@ -55,10 +57,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
import KubernetesClusterSchedulerBackend._

private val RUNNING_EXECUTOR_PODS_LOCK = new Object
private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs.

// Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
// Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK.
private val runningPodsToExecutors = new mutable.HashMap[String, String]
// TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map.
private val EXECUTOR_PODS_BY_IPS_LOCK = new Object
private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs.
// Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK
private val executorPodsByIPs = new mutable.HashMap[String, Pod]
private val failedPods: concurrent.Map[String, ExecutorExited] = new
ConcurrentHashMap[String, ExecutorExited]().asScala
private val executorsToRemove = Collections.newSetFromMap[String](
new ConcurrentHashMap[String, java.lang.Boolean]()).asScala

private val executorExtraClasspath = conf.get(
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
Expand Down Expand Up @@ -135,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs(
conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key,
"shuffle-labels")
if (parsedShuffleLabels.size == 0) {
if (parsedShuffleLabels.isEmpty) {
throw new SparkException(s"Dynamic allocation enabled " +
s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
}
Expand Down Expand Up @@ -170,12 +180,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorWatchResource = new AtomicReference[Closeable]
protected var totalExpectedExecutors = new AtomicInteger(0)


private val driverUrl = RpcEndpointAddress(
sc.getConf.get("spark.driver.host"),
sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

private val initialExecutors = getInitialTargetExecutorNumber(1)
private val initialExecutors = getInitialTargetExecutorNumber()

private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
require(podAllocationInterval > 0, s"Allocation batch delay " +
Expand All @@ -192,23 +203,74 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val allocatorRunnable: Runnable = new Runnable {

// Number of times we are allowed check for the loss reason for an executor before we give up
// and assume the executor failed for good, and attribute it to a framework fault.
private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10
private val executorsToRecover = new mutable.HashSet[String]
// Maintains a map of executor id to count of checks performed to learn the loss reason
// for an executor.
private val executorReasonChecks = new mutable.HashMap[String, Int]

override def run(): Unit = {
if (totalRegisteredExecutors.get() < runningExecutorPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
removeFailedExecutors()
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) {
logDebug("Waiting for pending executors before scaling")
} else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) {
logDebug("Maximum allowed executor limit reached. Not scaling up further.")
} else {
val nodeToLocalTaskCount = getNodesWithLocalTaskCounts
for (i <- 0 until math.min(
totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) {
runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount)
totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) {
val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount)
runningExecutorsToPods.put(executorId, pod)
runningPodsToExecutors.put(pod.getMetadata.getName, executorId)
logInfo(
s"Requesting a new executor, total executors is now ${runningExecutorPods.size}")
s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
}
}
}
}

def removeFailedExecutors(): Unit = {
val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.toMap
}
executorsToRemove.foreach { case (executorId) =>
localRunningExecutorsToPods.get(executorId).map { pod: Pod =>
failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited =>
logDebug(s"Removing executor $executorId with loss reason " + executorExited.message)
removeExecutor(executorId, executorExited)
if (!executorExited.exitCausedByApp) {
executorsToRecover.add(executorId)
}
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))
}.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId))

executorsToRecover.foreach(executorId => {
executorsToRemove -= executorId
executorReasonChecks -= executorId
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorsToPods.remove(executorId).map { pod: Pod =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
}.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId"))
}
})
executorsToRecover.clear()
}
}

def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0)
if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) {
removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons"))
executorsToRecover.add(executorId)
executorReasonChecks -= executorId
} else {
executorReasonChecks.put(executorId, reasonCheckCount + 1)
}
}
}

private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
Expand Down Expand Up @@ -280,8 +342,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
// indication as to why.
try {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_))
runningExecutorPods.clear()
runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_))
runningExecutorsToPods.clear()
runningPodsToExecutors.clear()
}
EXECUTOR_PODS_BY_IPS_LOCK.synchronized {
executorPodsByIPs.clear()
Expand Down Expand Up @@ -534,11 +597,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

override def createDriverEndpoint(
properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
totalExpectedExecutors.set(requestedTotal)
true
Expand All @@ -547,8 +605,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
for (executor <- executorIds) {
runningExecutorPods.remove(executor) match {
case Some(pod) => kubernetesClient.pods().delete(pod)
runningExecutorsToPods.remove(executor) match {
case Some(pod) =>
kubernetesClient.pods().delete(pod)
runningPodsToExecutors.remove(pod.getMetadata.getName)
case None => logWarning(s"Unable to remove pod for unknown executor $executor")
}
}
Expand All @@ -564,6 +624,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

private class ExecutorPodsWatcher extends Watcher[Pod] {

private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1

override def eventReceived(action: Action, pod: Pod): Unit = {
if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running"
&& pod.getMetadata.getDeletionTimestamp == null) {
Expand All @@ -583,12 +645,75 @@ private[spark] class KubernetesClusterSchedulerBackend(
executorPodsByIPs -= podIP
}
}
if (action == Action.ERROR) {
logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason)
handleErroredPod(pod)
} else if (action == Action.DELETED) {
logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason)
handleDeletedPod(pod)
}
}
}

override def onClose(cause: KubernetesClientException): Unit = {
logDebug("Executor pod watch closed.", cause)
}

def getExecutorExitStatus(pod: Pod): Int = {
val containerStatuses = pod.getStatus.getContainerStatuses
if (!containerStatuses.isEmpty) {
// we assume the first container represents the pod status. This assumption may not hold
// true in the future. Revisit this if side-car containers start running inside executor
// pods.
getExecutorExitStatus(containerStatuses.get(0))
} else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
}

def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
Option(containerStatus.getState).map(containerState =>
Option(containerState.getTerminated).map(containerStateTerminated =>
containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE)
).getOrElse(UNKNOWN_EXIT_CODE)
}

def isPodAlreadyReleased(pod: Pod): Boolean = {
RUNNING_EXECUTOR_PODS_LOCK.synchronized {
!runningPodsToExecutors.contains(pod.getMetadata.getName)
}
}

def handleErroredPod(pod: Pod): Unit = {
val alreadyReleased = isPodAlreadyReleased(pod)
val containerExitStatus = getExecutorExitStatus(pod)
// container was probably actively killed by the driver.
val exitReason = if (alreadyReleased) {
ExecutorExited(containerExitStatus, exitCausedByApp = false,
s"Container in pod " + pod.getMetadata.getName +
" exited from explicit termination request.")
} else {
val containerExitReason = containerExitStatus match {
case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE =>
memLimitExceededLogMessage(pod.getStatus.getReason)
case _ =>
// Here we can't be sure that that exit was caused by the application but this seems
// to be the right default since we know the pod was not explicitly deleted by
// the user.
"Pod exited with following container exit status code " + containerExitStatus
}
ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
}
failedPods.put(pod.getMetadata.getName, exitReason)
}

def handleDeletedPod(pod: Pod): Unit = {
val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false,
"Pod " + pod.getMetadata.getName + " deleted or lost.")
failedPods.put(pod.getMetadata.getName, exitReason)
}
}

override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(rpcEnv, properties)
}

private class KubernetesDriverEndpoint(
Expand All @@ -597,6 +722,14 @@ private[spark] class KubernetesClusterSchedulerBackend(
extends DriverEndpoint(rpcEnv, sparkProperties) {
private val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)

override def onDisconnected(rpcAddress: RpcAddress): Unit = {
addressToExecutorId.get(rpcAddress).foreach { executorId =>
if (disableExecutor(executorId)) {
executorsToRemove.add(executorId)
}
}
}

override def receiveAndReply(
context: RpcCallContext): PartialFunction[Any, Unit] = {
new PartialFunction[Any, Unit]() {
Expand All @@ -615,7 +748,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
var resolvedProperties = sparkProperties
val runningExecutorPod = kubernetesClient
.pods()
.withName(runningExecutorPods(executorId).getMetadata.getName)
.withName(runningExecutorsToPods(executorId).getMetadata.getName)
.get()
val nodeName = runningExecutorPod.getSpec.getNodeName
val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName)
Expand All @@ -637,7 +770,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
}.orElse(super.receiveAndReply(context))
}
}

}
case class ShuffleServiceConfig(
shuffleNamespace: String,
Expand All @@ -647,6 +779,14 @@ case class ShuffleServiceConfig(
private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val VMEM_EXCEEDED_EXIT_CODE = -103
private val PMEM_EXCEEDED_EXIT_CODE = -104
private val UNKNOWN_EXIT_CODE = -111

def memLimitExceededLogMessage(diagnostics: String): String = {
s"Pod/Container killed for exceeding memory limits. $diagnostics" +
" Consider boosting spark executor memory overhead."
}
}

/**
Expand Down

0 comments on commit 4dfb184

Please sign in to comment.