From 15e13f4887a338e1e0a0982287011944adec5eb3 Mon Sep 17 00:00:00 2001 From: Varun Date: Fri, 21 Jul 2017 16:36:08 -0700 Subject: [PATCH] Changes to support executor recovery behavior during static allocation. (#244) * Changes to support executor recovery behavior during static allocation. * addressed review comments * Style changes and removed inocrrectly merged code * addressed latest review comments * changed import order * Minor changes to avoid exceptions when exit code is missing * fixed style check * Addressed review comments from Yinan LiAddressed review comments from Yinan Li.. * Addressed comments and got rid of an explicit lock object. * Fixed imports order. * Addressed review comments from Matt * Couple of style fixes --- .../KubernetesClusterSchedulerBackend.scala | 206 +++++++++++++++--- 1 file changed, 173 insertions(+), 33 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index a0753728f8cfd..c993bff8df962 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -18,18 +18,20 @@ package org.apache.spark.scheduler.cluster.kubernetes import java.io.Closeable import java.net.InetAddress -import java.util.concurrent.TimeUnit +import java.util.Collections +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference} +import scala.collection.{concurrent, mutable} +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.io.FilenameUtils -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} @@ -38,8 +40,8 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.submit.InitContainerUtil import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient -import org.apache.spark.rpc.{RpcCallContext, RpcEndpointAddress, RpcEnv} -import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEndpointAddress, RpcEnv} +import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.{ThreadUtils, Utils} @@ -55,10 +57,18 @@ private[spark] class KubernetesClusterSchedulerBackend( import KubernetesClusterSchedulerBackend._ private val RUNNING_EXECUTOR_PODS_LOCK = new Object - private val runningExecutorPods = new mutable.HashMap[String, Pod] // Indexed by executor IDs. - + // Indexed by executor IDs and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningExecutorsToPods = new mutable.HashMap[String, Pod] + // Indexed by executor pod names and guarded by RUNNING_EXECUTOR_PODS_LOCK. + private val runningPodsToExecutors = new mutable.HashMap[String, String] + // TODO(varun): Get rid of this lock object by my making the underlying map a concurrent hash map. private val EXECUTOR_PODS_BY_IPS_LOCK = new Object - private val executorPodsByIPs = new mutable.HashMap[String, Pod] // Indexed by executor IP addrs. + // Indexed by executor IP addrs and guarded by EXECUTOR_PODS_BY_IPS_LOCK + private val executorPodsByIPs = new mutable.HashMap[String, Pod] + private val failedPods: concurrent.Map[String, ExecutorExited] = new + ConcurrentHashMap[String, ExecutorExited]().asScala + private val executorsToRemove = Collections.newSetFromMap[String]( + new ConcurrentHashMap[String, java.lang.Boolean]()).asScala private val executorExtraClasspath = conf.get( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) @@ -135,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val parsedShuffleLabels = ConfigurationUtils.parseKeyValuePairs( conf.get(KUBERNETES_SHUFFLE_LABELS), KUBERNETES_SHUFFLE_LABELS.key, "shuffle-labels") - if (parsedShuffleLabels.size == 0) { + if (parsedShuffleLabels.isEmpty) { throw new SparkException(s"Dynamic allocation enabled " + s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") } @@ -170,12 +180,13 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorWatchResource = new AtomicReference[Closeable] protected var totalExpectedExecutors = new AtomicInteger(0) + private val driverUrl = RpcEndpointAddress( sc.getConf.get("spark.driver.host"), sc.getConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString - private val initialExecutors = getInitialTargetExecutorNumber(1) + private val initialExecutors = getInitialTargetExecutorNumber() private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) require(podAllocationInterval > 0, s"Allocation batch delay " + @@ -192,23 +203,74 @@ private[spark] class KubernetesClusterSchedulerBackend( private val allocatorRunnable: Runnable = new Runnable { + // Number of times we are allowed check for the loss reason for an executor before we give up + // and assume the executor failed for good, and attribute it to a framework fault. + private val MAX_EXECUTOR_LOST_REASON_CHECKS = 10 + private val executorsToRecover = new mutable.HashSet[String] + // Maintains a map of executor id to count of checks performed to learn the loss reason + // for an executor. + private val executorReasonChecks = new mutable.HashMap[String, Int] + override def run(): Unit = { - if (totalRegisteredExecutors.get() < runningExecutorPods.size) { - logDebug("Waiting for pending executors before scaling") - } else if (totalExpectedExecutors.get() <= runningExecutorPods.size) { - logDebug("Maximum allowed executor limit reached. Not scaling up further.") - } else { - val nodeToLocalTaskCount = getNodesWithLocalTaskCounts - RUNNING_EXECUTOR_PODS_LOCK.synchronized { + removeFailedExecutors() + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + if (totalRegisteredExecutors.get() < runningExecutorsToPods.size) { + logDebug("Waiting for pending executors before scaling") + } else if (totalExpectedExecutors.get() <= runningExecutorsToPods.size) { + logDebug("Maximum allowed executor limit reached. Not scaling up further.") + } else { + val nodeToLocalTaskCount = getNodesWithLocalTaskCounts for (i <- 0 until math.min( - totalExpectedExecutors.get - runningExecutorPods.size, podAllocationSize)) { - runningExecutorPods += allocateNewExecutorPod(nodeToLocalTaskCount) + totalExpectedExecutors.get - runningExecutorsToPods.size, podAllocationSize)) { + val (executorId, pod) = allocateNewExecutorPod(nodeToLocalTaskCount) + runningExecutorsToPods.put(executorId, pod) + runningPodsToExecutors.put(pod.getMetadata.getName, executorId) logInfo( - s"Requesting a new executor, total executors is now ${runningExecutorPods.size}") + s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}") } } } } + + def removeFailedExecutors(): Unit = { + val localRunningExecutorsToPods = RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.toMap + } + executorsToRemove.foreach { case (executorId) => + localRunningExecutorsToPods.get(executorId).map { pod: Pod => + failedPods.get(pod.getMetadata.getName).map { executorExited: ExecutorExited => + logDebug(s"Removing executor $executorId with loss reason " + executorExited.message) + removeExecutor(executorId, executorExited) + if (!executorExited.exitCausedByApp) { + executorsToRecover.add(executorId) + } + }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) + }.getOrElse(removeExecutorOrIncrementLossReasonCheckCount(executorId)) + + executorsToRecover.foreach(executorId => { + executorsToRemove -= executorId + executorReasonChecks -= executorId + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + runningExecutorsToPods.remove(executorId).map { pod: Pod => + kubernetesClient.pods().delete(pod) + runningPodsToExecutors.remove(pod.getMetadata.getName) + }.getOrElse(logWarning(s"Unable to remove pod for unknown executor $executorId")) + } + }) + executorsToRecover.clear() + } + } + + def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = { + val reasonCheckCount = executorReasonChecks.getOrElse(executorId, 0) + if (reasonCheckCount > MAX_EXECUTOR_LOST_REASON_CHECKS) { + removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons")) + executorsToRecover.add(executorId) + executorReasonChecks -= executorId + } else { + executorReasonChecks.put(executorId, reasonCheckCount + 1) + } + } } private val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule) @@ -280,8 +342,9 @@ private[spark] class KubernetesClusterSchedulerBackend( // indication as to why. try { RUNNING_EXECUTOR_PODS_LOCK.synchronized { - runningExecutorPods.values.foreach(kubernetesClient.pods().delete(_)) - runningExecutorPods.clear() + runningExecutorsToPods.values.foreach(kubernetesClient.pods().delete(_)) + runningExecutorsToPods.clear() + runningPodsToExecutors.clear() } EXECUTOR_PODS_BY_IPS_LOCK.synchronized { executorPodsByIPs.clear() @@ -534,11 +597,6 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - override def createDriverEndpoint( - properties: Seq[(String, String)]): DriverEndpoint = { - new KubernetesDriverEndpoint(rpcEnv, properties) - } - override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] { totalExpectedExecutors.set(requestedTotal) true @@ -547,8 +605,10 @@ private[spark] class KubernetesClusterSchedulerBackend( override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] { RUNNING_EXECUTOR_PODS_LOCK.synchronized { for (executor <- executorIds) { - runningExecutorPods.remove(executor) match { - case Some(pod) => kubernetesClient.pods().delete(pod) + runningExecutorsToPods.remove(executor) match { + case Some(pod) => + kubernetesClient.pods().delete(pod) + runningPodsToExecutors.remove(pod.getMetadata.getName) case None => logWarning(s"Unable to remove pod for unknown executor $executor") } } @@ -564,6 +624,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private class ExecutorPodsWatcher extends Watcher[Pod] { + private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1 + override def eventReceived(action: Action, pod: Pod): Unit = { if (action == Action.MODIFIED && pod.getStatus.getPhase == "Running" && pod.getMetadata.getDeletionTimestamp == null) { @@ -583,12 +645,75 @@ private[spark] class KubernetesClusterSchedulerBackend( executorPodsByIPs -= podIP } } + if (action == Action.ERROR) { + logInfo(s"Received pod $podName exited event. Reason: " + pod.getStatus.getReason) + handleErroredPod(pod) + } else if (action == Action.DELETED) { + logInfo(s"Received delete pod $podName event. Reason: " + pod.getStatus.getReason) + handleDeletedPod(pod) + } } } override def onClose(cause: KubernetesClientException): Unit = { logDebug("Executor pod watch closed.", cause) } + + def getExecutorExitStatus(pod: Pod): Int = { + val containerStatuses = pod.getStatus.getContainerStatuses + if (!containerStatuses.isEmpty) { + // we assume the first container represents the pod status. This assumption may not hold + // true in the future. Revisit this if side-car containers start running inside executor + // pods. + getExecutorExitStatus(containerStatuses.get(0)) + } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS + } + + def getExecutorExitStatus(containerStatus: ContainerStatus): Int = { + Option(containerStatus.getState).map(containerState => + Option(containerState.getTerminated).map(containerStateTerminated => + containerStateTerminated.getExitCode.intValue()).getOrElse(UNKNOWN_EXIT_CODE) + ).getOrElse(UNKNOWN_EXIT_CODE) + } + + def isPodAlreadyReleased(pod: Pod): Boolean = { + RUNNING_EXECUTOR_PODS_LOCK.synchronized { + !runningPodsToExecutors.contains(pod.getMetadata.getName) + } + } + + def handleErroredPod(pod: Pod): Unit = { + val alreadyReleased = isPodAlreadyReleased(pod) + val containerExitStatus = getExecutorExitStatus(pod) + // container was probably actively killed by the driver. + val exitReason = if (alreadyReleased) { + ExecutorExited(containerExitStatus, exitCausedByApp = false, + s"Container in pod " + pod.getMetadata.getName + + " exited from explicit termination request.") + } else { + val containerExitReason = containerExitStatus match { + case VMEM_EXCEEDED_EXIT_CODE | PMEM_EXCEEDED_EXIT_CODE => + memLimitExceededLogMessage(pod.getStatus.getReason) + case _ => + // Here we can't be sure that that exit was caused by the application but this seems + // to be the right default since we know the pod was not explicitly deleted by + // the user. + "Pod exited with following container exit status code " + containerExitStatus + } + ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason) + } + failedPods.put(pod.getMetadata.getName, exitReason) + } + + def handleDeletedPod(pod: Pod): Unit = { + val exitReason = ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, + "Pod " + pod.getMetadata.getName + " deleted or lost.") + failedPods.put(pod.getMetadata.getName, exitReason) + } + } + + override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv, properties) } private class KubernetesDriverEndpoint( @@ -597,6 +722,14 @@ private[spark] class KubernetesClusterSchedulerBackend( extends DriverEndpoint(rpcEnv, sparkProperties) { private val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337) + override def onDisconnected(rpcAddress: RpcAddress): Unit = { + addressToExecutorId.get(rpcAddress).foreach { executorId => + if (disableExecutor(executorId)) { + executorsToRemove.add(executorId) + } + } + } + override def receiveAndReply( context: RpcCallContext): PartialFunction[Any, Unit] = { new PartialFunction[Any, Unit]() { @@ -615,7 +748,7 @@ private[spark] class KubernetesClusterSchedulerBackend( var resolvedProperties = sparkProperties val runningExecutorPod = kubernetesClient .pods() - .withName(runningExecutorPods(executorId).getMetadata.getName) + .withName(runningExecutorsToPods(executorId).getMetadata.getName) .get() val nodeName = runningExecutorPod.getSpec.getNodeName val shufflePodIp = shufflePodCache.get.getShufflePodForExecutor(nodeName) @@ -637,7 +770,6 @@ private[spark] class KubernetesClusterSchedulerBackend( }.orElse(super.receiveAndReply(context)) } } - } case class ShuffleServiceConfig( shuffleNamespace: String, @@ -647,6 +779,14 @@ case class ShuffleServiceConfig( private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val VMEM_EXCEEDED_EXIT_CODE = -103 + private val PMEM_EXCEEDED_EXIT_CODE = -104 + private val UNKNOWN_EXIT_CODE = -111 + + def memLimitExceededLogMessage(diagnostics: String): String = { + s"Pod/Container killed for exceeding memory limits. $diagnostics" + + " Consider boosting spark executor memory overhead." + } } /**