diff --git a/.travis.yml b/.travis.yml
index d7e9f8c0290e8..05b94adeeb93b 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- - build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
+ - build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
# 6. Run lint-java.
script:
diff --git a/NOTICE b/NOTICE
index f4b64b5c3f470..6ec240efbf12e 100644
--- a/NOTICE
+++ b/NOTICE
@@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation
+This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
+Copyright (C) 2015 Red Hat, Inc.
+
+This product includes/uses OkHttp (https://github.com/square/okhttp)
+Copyright (C) 2012 The Android Open Source Project
+
=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the DataNucleus distribution. ==
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
new file mode 100644
index 0000000000000..c166d030f2c89
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
+import org.apache.spark.util.Utils
+
+private[spark] object SchedulerBackendUtils {
+ val DEFAULT_NUMBER_EXECUTORS = 2
+
+ /**
+ * Getting the initial target number of executors depends on whether dynamic allocation is
+ * enabled.
+ * If not using dynamic allocation it gets the number of executors requested by the user.
+ */
+ def getInitialTargetExecutorNumber(
+ conf: SparkConf,
+ numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
+ if (Utils.isDynamicAllocationEnabled(conf)) {
+ val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
+ val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
+ val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
+ require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
+ s"initial executor number $initialNumExecutors must between min executor number " +
+ s"$minNumExecutors and max executor number $maxNumExecutors")
+
+ initialNumExecutors
+ } else {
+ conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
+ }
+ }
+}
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 91d5667ed1f07..46a16f83073f7 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -531,6 +531,14 @@ def __hash__(self):
sbt_test_goals=["mesos/test"]
)
+kubernetes = Module(
+ name="kubernetes",
+ dependencies=[],
+ source_file_regexes=["resource-managers/kubernetes/core"],
+ build_profile_flags=["-Pkubernetes"],
+ sbt_test_goals=["kubernetes/test"]
+)
+
# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
diff --git a/docs/configuration.md b/docs/configuration.md
index 7a777d3c6fa3d..7129b904698a2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1397,10 +1397,10 @@ Apart from these, the following properties are also available, and may be useful
spark.scheduler.minRegisteredResourcesRatio |
- 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode |
+ 0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode |
The minimum ratio of registered resources (registered resources / total expected resources)
- (resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
+ (resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
Regardless of whether the minimum ratio of resources has been reached,
diff --git a/pom.xml b/pom.xml
index 9fac8b1e53788..fa4888d00f2ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2648,6 +2648,13 @@
+
+ kubernetes
+
+ resource-managers/kubernetes/core
+
+
+
hive-thriftserver
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9501eed1e906b..14e6c586b18f6 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -53,11 +53,11 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
- val optionallyEnabledProjects@Seq(mesos, yarn,
+ val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
- Seq("mesos", "yarn",
+ Seq("kubernetes", "mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
@@ -671,9 +671,9 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
new file mode 100644
index 0000000000000..7d35aea8a4142
--- /dev/null
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -0,0 +1,100 @@
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.11
+ 2.3.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ spark-kubernetes_2.11
+ jar
+ Spark Project Kubernetes
+
+ kubernetes
+ 3.0.0
+
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+
+ io.fabric8
+ kubernetes-client
+ ${kubernetes.client.version}
+
+
+ com.fasterxml.jackson.core
+ *
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+
+
+
+
+
+
+ com.fasterxml.jackson.dataformat
+ jackson-dataformat-yaml
+ ${fasterxml.jackson.version}
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+ com.squareup.okhttp3
+ okhttp
+ 3.8.1
+
+
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
new file mode 100644
index 0000000000000..f0742b91987b6
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+ val KUBERNETES_NAMESPACE =
+ ConfigBuilder("spark.kubernetes.namespace")
+ .doc("The namespace that will be used for running the driver and executor pods. When using " +
+ "spark-submit in cluster mode, this can also be passed to spark-submit via the " +
+ "--kubernetes-namespace command line argument.")
+ .stringConf
+ .createWithDefault("default")
+
+ val EXECUTOR_DOCKER_IMAGE =
+ ConfigBuilder("spark.kubernetes.executor.docker.image")
+ .doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
+ "format.")
+ .stringConf
+ .createOptional
+
+ val DOCKER_IMAGE_PULL_POLICY =
+ ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+ .doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
+ .stringConf
+ .checkValues(Set("Always", "Never", "IfNotPresent"))
+ .createWithDefault("IfNotPresent")
+
+ val APISERVER_AUTH_DRIVER_CONF_PREFIX =
+ "spark.kubernetes.authenticate.driver"
+ val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+ "spark.kubernetes.authenticate.driver.mounted"
+ val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+ val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+ val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+ val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+ val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+ val KUBERNETES_SERVICE_ACCOUNT_NAME =
+ ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+ .doc("Service account that is used when running the driver pod. The driver pod uses " +
+ "this service account when requesting executor pods from the API server. If specific " +
+ "credentials are given for the driver pod to use, the driver will favor " +
+ "using those credentials instead.")
+ .stringConf
+ .createOptional
+
+ // Note that while we set a default for this when we start up the
+ // scheduler, the specific default value is dynamically determined
+ // based on the executor memory.
+ val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
+ ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
+ .doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
+ "is memory that accounts for things like VM overheads, interned strings, other native " +
+ "overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
+ .bytesConf(ByteUnit.MiB)
+ .createOptional
+
+ val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
+ val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
+
+ val KUBERNETES_DRIVER_POD_NAME =
+ ConfigBuilder("spark.kubernetes.driver.pod.name")
+ .doc("Name of the driver pod.")
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
+ ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
+ .doc("Prefix to use in front of the executor pod names.")
+ .internal()
+ .stringConf
+ .createWithDefault("spark")
+
+ val KUBERNETES_ALLOCATION_BATCH_SIZE =
+ ConfigBuilder("spark.kubernetes.allocation.batch.size")
+ .doc("Number of pods to launch at once in each round of executor allocation.")
+ .intConf
+ .checkValue(value => value > 0, "Allocation batch size should be a positive integer")
+ .createWithDefault(5)
+
+ val KUBERNETES_ALLOCATION_BATCH_DELAY =
+ ConfigBuilder("spark.kubernetes.allocation.batch.delay")
+ .doc("Number of seconds to wait between each round of executor allocation.")
+ .longConf
+ .checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
+ .createWithDefault(1)
+
+ val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ ConfigBuilder("spark.kubernetes.executor.limit.cores")
+ .doc("Specify the hard cpu limit for a single executor pod")
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
+ ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
+ .doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
+ "before it is assumed that the executor failed.")
+ .intConf
+ .checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
+ "must be a positive integer")
+ .createWithDefault(10)
+
+ val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
new file mode 100644
index 0000000000000..01717479fddd9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SparkConf
+
+private[spark] object ConfigurationUtils {
+
+ /**
+ * Extract and parse Spark configuration properties with a given name prefix and
+ * return the result as a Map. Keys must not have more than one value.
+ *
+ * @param sparkConf Spark configuration
+ * @param prefix the given property name prefix
+ * @return a Map storing the configuration property keys and values
+ */
+ def parsePrefixedKeyValuePairs(
+ sparkConf: SparkConf,
+ prefix: String): Map[String, String] = {
+ sparkConf.getAllWithPrefix(prefix).toMap
+ }
+
+ def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
+ opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
new file mode 100644
index 0000000000000..4ddeefb15a89d
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+private[spark] object Constants {
+
+ // Labels
+ val SPARK_APP_ID_LABEL = "spark-app-selector"
+ val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id"
+ val SPARK_ROLE_LABEL = "spark-role"
+ val SPARK_POD_DRIVER_ROLE = "driver"
+ val SPARK_POD_EXECUTOR_ROLE = "executor"
+
+ // Default and fixed ports
+ val DEFAULT_DRIVER_PORT = 7078
+ val DEFAULT_BLOCKMANAGER_PORT = 7079
+ val BLOCK_MANAGER_PORT_NAME = "blockmanager"
+ val EXECUTOR_PORT_NAME = "executor"
+
+ // Environment Variables
+ val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
+ val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
+ val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES"
+ val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
+ val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
+ val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
+ val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
+ val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_EXECUTOR_EXTRA_CLASSPATH"
+ val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
+ val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
+
+ // Miscellaneous
+ val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
+ val MEMORY_OVERHEAD_FACTOR = 0.10
+ val MEMORY_OVERHEAD_MIN_MIB = 384L
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
new file mode 100644
index 0000000000000..1e3f055e05766
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import okhttp3.Dispatcher
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
+ * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
+ * options for different components.
+ */
+private[spark] object SparkKubernetesClientFactory {
+
+ def createKubernetesClient(
+ master: String,
+ namespace: Option[String],
+ kubernetesAuthConfPrefix: String,
+ sparkConf: SparkConf,
+ defaultServiceAccountToken: Option[File],
+ defaultServiceAccountCaCert: Option[File]): KubernetesClient = {
+ val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
+ val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
+ val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
+ .map(new File(_))
+ .orElse(defaultServiceAccountToken)
+ val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
+ ConfigurationUtils.requireNandDefined(
+ oauthTokenFile,
+ oauthTokenValue,
+ s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a " +
+ s"value $oauthTokenConf.")
+
+ val caCertFile = sparkConf
+ .getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
+ .orElse(defaultServiceAccountCaCert.map(_.getAbsolutePath))
+ val clientKeyFile = sparkConf
+ .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
+ val clientCertFile = sparkConf
+ .getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
+ val dispatcher = new Dispatcher(
+ ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
+ val config = new ConfigBuilder()
+ .withApiVersion("v1")
+ .withMasterUrl(master)
+ .withWebsocketPingInterval(0)
+ .withOption(oauthTokenValue) {
+ (token, configBuilder) => configBuilder.withOauthToken(token)
+ }.withOption(oauthTokenFile) {
+ (file, configBuilder) =>
+ configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
+ }.withOption(caCertFile) {
+ (file, configBuilder) => configBuilder.withCaCertFile(file)
+ }.withOption(clientKeyFile) {
+ (file, configBuilder) => configBuilder.withClientKeyFile(file)
+ }.withOption(clientCertFile) {
+ (file, configBuilder) => configBuilder.withClientCertFile(file)
+ }.withOption(namespace) {
+ (ns, configBuilder) => configBuilder.withNamespace(ns)
+ }.build()
+ val baseHttpClient = HttpClientUtils.createHttpClient(config)
+ val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
+ .dispatcher(dispatcher)
+ .build()
+ new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
+ }
+
+ private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
+ extends AnyVal {
+
+ def withOption[T]
+ (option: Option[T])
+ (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
+ option.map { opt =>
+ configurator(opt, configBuilder)
+ }.getOrElse(configBuilder)
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
new file mode 100644
index 0000000000000..f79155b117b67
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.Utils
+
+/**
+ * A factory class for configuring and creating executor pods.
+ */
+private[spark] trait ExecutorPodFactory {
+
+ /**
+ * Configure and construct an executor pod with the given parameters.
+ */
+ def createExecutorPod(
+ executorId: String,
+ applicationId: String,
+ driverUrl: String,
+ executorEnvs: Seq[(String, String)],
+ driverPod: Pod,
+ nodeToLocalTaskCount: Map[String, Int]): Pod
+}
+
+private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf)
+ extends ExecutorPodFactory {
+
+ private val executorExtraClasspath =
+ sparkConf.get(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH)
+
+ private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+ sparkConf,
+ KUBERNETES_EXECUTOR_LABEL_PREFIX)
+ require(
+ !executorLabels.contains(SPARK_APP_ID_LABEL),
+ s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
+ require(
+ !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL),
+ s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
+ " Spark.")
+ require(
+ !executorLabels.contains(SPARK_ROLE_LABEL),
+ s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
+
+ private val executorAnnotations =
+ ConfigurationUtils.parsePrefixedKeyValuePairs(
+ sparkConf,
+ KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+ private val nodeSelector =
+ ConfigurationUtils.parsePrefixedKeyValuePairs(
+ sparkConf,
+ KUBERNETES_NODE_SELECTOR_PREFIX)
+
+ private val executorDockerImage = sparkConf
+ .get(EXECUTOR_DOCKER_IMAGE)
+ .getOrElse(throw new SparkException("Must specify the executor Docker image"))
+ private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+ private val blockManagerPort = sparkConf
+ .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
+
+ private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
+
+ private val executorMemoryMiB = sparkConf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
+ private val executorMemoryString = sparkConf.get(
+ org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
+ org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)
+
+ private val memoryOverheadMiB = sparkConf
+ .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
+ .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMiB).toInt,
+ MEMORY_OVERHEAD_MIN_MIB))
+ private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
+
+ private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
+ private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)
+
+ override def createExecutorPod(
+ executorId: String,
+ applicationId: String,
+ driverUrl: String,
+ executorEnvs: Seq[(String, String)],
+ driverPod: Pod,
+ nodeToLocalTaskCount: Map[String, Int]): Pod = {
+ val name = s"$executorPodNamePrefix-exec-$executorId"
+
+ // hostname must be no longer than 63 characters, so take the last 63 characters of the pod
+ // name as the hostname. This preserves uniqueness since the end of name contains
+ // executorId
+ val hostname = name.substring(Math.max(0, name.length - 63))
+ val resolvedExecutorLabels = Map(
+ SPARK_EXECUTOR_ID_LABEL -> executorId,
+ SPARK_APP_ID_LABEL -> applicationId,
+ SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
+ executorLabels
+ val executorMemoryQuantity = new QuantityBuilder(false)
+ .withAmount(s"${executorMemoryMiB}Mi")
+ .build()
+ val executorMemoryLimitQuantity = new QuantityBuilder(false)
+ .withAmount(s"${executorMemoryWithOverhead}Mi")
+ .build()
+ val executorCpuQuantity = new QuantityBuilder(false)
+ .withAmount(executorCores.toString)
+ .build()
+ val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
+ new EnvVarBuilder()
+ .withName(ENV_EXECUTOR_EXTRA_CLASSPATH)
+ .withValue(cp)
+ .build()
+ }
+ val executorExtraJavaOptionsEnv = sparkConf
+ .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
+ .map { opts =>
+ val delimitedOpts = Utils.splitCommandString(opts)
+ delimitedOpts.zipWithIndex.map {
+ case (opt, index) =>
+ new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
+ }
+ }.getOrElse(Seq.empty[EnvVar])
+ val executorEnv = (Seq(
+ (ENV_DRIVER_URL, driverUrl),
+ // Executor backend expects integral value for executor cores, so round it up to an int.
+ (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString),
+ (ENV_EXECUTOR_MEMORY, executorMemoryString),
+ (ENV_APPLICATION_ID, applicationId),
+ (ENV_EXECUTOR_ID, executorId)) ++ executorEnvs)
+ .map(env => new EnvVarBuilder()
+ .withName(env._1)
+ .withValue(env._2)
+ .build()
+ ) ++ Seq(
+ new EnvVarBuilder()
+ .withName(ENV_EXECUTOR_POD_IP)
+ .withValueFrom(new EnvVarSourceBuilder()
+ .withNewFieldRef("v1", "status.podIP")
+ .build())
+ .build()
+ ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
+ val requiredPorts = Seq(
+ (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
+ .map { case (name, port) =>
+ new ContainerPortBuilder()
+ .withName(name)
+ .withContainerPort(port)
+ .build()
+ }
+
+ val executorContainer = new ContainerBuilder()
+ .withName("executor")
+ .withImage(executorDockerImage)
+ .withImagePullPolicy(dockerImagePullPolicy)
+ .withNewResources()
+ .addToRequests("memory", executorMemoryQuantity)
+ .addToLimits("memory", executorMemoryLimitQuantity)
+ .addToRequests("cpu", executorCpuQuantity)
+ .endResources()
+ .addAllToEnv(executorEnv.asJava)
+ .withPorts(requiredPorts.asJava)
+ .build()
+
+ val executorPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(name)
+ .withLabels(resolvedExecutorLabels.asJava)
+ .withAnnotations(executorAnnotations.asJava)
+ .withOwnerReferences()
+ .addNewOwnerReference()
+ .withController(true)
+ .withApiVersion(driverPod.getApiVersion)
+ .withKind(driverPod.getKind)
+ .withName(driverPod.getMetadata.getName)
+ .withUid(driverPod.getMetadata.getUid)
+ .endOwnerReference()
+ .endMetadata()
+ .withNewSpec()
+ .withHostname(hostname)
+ .withRestartPolicy("Never")
+ .withNodeSelector(nodeSelector.asJava)
+ .endSpec()
+ .build()
+
+ val containerWithExecutorLimitCores = executorLimitCores.map { limitCores =>
+ val executorCpuLimitQuantity = new QuantityBuilder(false)
+ .withAmount(limitCores)
+ .build()
+ new ContainerBuilder(executorContainer)
+ .editResources()
+ .addToLimits("cpu", executorCpuLimitQuantity)
+ .endResources()
+ .build()
+ }.getOrElse(executorContainer)
+
+ new PodBuilder(executorPod)
+ .editSpec()
+ .addToContainers(containerWithExecutorLimitCores)
+ .endSpec()
+ .build()
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
new file mode 100644
index 0000000000000..68ca6a7622171
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.File
+
+import io.fabric8.kubernetes.client.Config
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging {
+
+ override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s")
+
+ override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
+ new TaskSchedulerImpl(sc)
+ }
+
+ override def createSchedulerBackend(
+ sc: SparkContext,
+ masterURL: String,
+ scheduler: TaskScheduler): SchedulerBackend = {
+ val sparkConf = sc.getConf
+
+ val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
+ KUBERNETES_MASTER_INTERNAL_URL,
+ Some(sparkConf.get(KUBERNETES_NAMESPACE)),
+ APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX,
+ sparkConf,
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
+ Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
+
+ val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf)
+ val allocatorExecutor = ThreadUtils
+ .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
+ val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
+ "kubernetes-executor-requests")
+ new KubernetesClusterSchedulerBackend(
+ scheduler.asInstanceOf[TaskSchedulerImpl],
+ sc.env.rpcEnv,
+ executorPodFactory,
+ kubernetesClient,
+ allocatorExecutor,
+ requestExecutorsService)
+ }
+
+ override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
+ scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
new file mode 100644
index 0000000000000..e79c987852db2
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+import java.net.InetAddress
+import java.util.concurrent.{ConcurrentHashMap, ExecutorService, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
+import javax.annotation.concurrent.GuardedBy
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkException
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEnv}
+import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils}
+import org.apache.spark.util.Utils
+
+private[spark] class KubernetesClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ rpcEnv: RpcEnv,
+ executorPodFactory: ExecutorPodFactory,
+ kubernetesClient: KubernetesClient,
+ allocatorExecutor: ScheduledExecutorService,
+ requestExecutorsService: ExecutorService)
+ extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
+
+ import KubernetesClusterSchedulerBackend._
+
+ private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+ private val RUNNING_EXECUTOR_PODS_LOCK = new Object
+ @GuardedBy("RUNNING_EXECUTOR_PODS_LOCK")
+ private val runningExecutorsToPods = new mutable.HashMap[String, Pod]
+ private val executorPodsByIPs = new ConcurrentHashMap[String, Pod]()
+ private val podsWithKnownExitReasons = new ConcurrentHashMap[String, ExecutorExited]()
+ private val disconnectedPodsByExecutorIdPendingRemoval = new ConcurrentHashMap[String, Pod]()
+
+ private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE)
+
+ private val kubernetesDriverPodName = conf
+ .get(KUBERNETES_DRIVER_POD_NAME)
+ .getOrElse(throw new SparkException("Must specify the driver pod name"))
+ private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
+ requestExecutorsService)
+
+ private val driverPod = kubernetesClient.pods()
+ .inNamespace(kubernetesNamespace)
+ .withName(kubernetesDriverPodName)
+ .get()
+
+ protected override val minRegisteredRatio =
+ if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+ 0.8
+ } else {
+ super.minRegisteredRatio
+ }
+
+ private val executorWatchResource = new AtomicReference[Closeable]
+ private val totalExpectedExecutors = new AtomicInteger(0)
+
+ private val driverUrl = RpcEndpointAddress(
+ conf.get("spark.driver.host"),
+ conf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+ CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+
+ private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
+
+ private val podAllocationInterval = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+ private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+ private val executorLostReasonCheckMaxAttempts = conf.get(
+ KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
+
+ private val allocatorRunnable = new Runnable {
+
+ // Maintains a map of executor id to count of checks performed to learn the loss reason
+ // for an executor.
+ private val executorReasonCheckAttemptCounts = new mutable.HashMap[String, Int]
+
+ override def run(): Unit = {
+ handleDisconnectedExecutors()
+
+ val executorsToAllocate = mutable.Map[String, Pod]()
+ val currentTotalRegisteredExecutors = totalRegisteredExecutors.get
+ val currentTotalExpectedExecutors = totalExpectedExecutors.get
+ val currentNodeToLocalTaskCount = getNodesWithLocalTaskCounts()
+ RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ if (currentTotalRegisteredExecutors < runningExecutorsToPods.size) {
+ logDebug("Waiting for pending executors before scaling")
+ } else if (currentTotalExpectedExecutors <= runningExecutorsToPods.size) {
+ logDebug("Maximum allowed executor limit reached. Not scaling up further.")
+ } else {
+ for (_ <- 0 until math.min(
+ currentTotalExpectedExecutors - runningExecutorsToPods.size, podAllocationSize)) {
+ val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString
+ val executorPod = executorPodFactory.createExecutorPod(
+ executorId,
+ applicationId(),
+ driverUrl,
+ conf.getExecutorEnv,
+ driverPod,
+ currentNodeToLocalTaskCount)
+ executorsToAllocate(executorId) = executorPod
+ logInfo(
+ s"Requesting a new executor, total executors is now ${runningExecutorsToPods.size}")
+ }
+ }
+ }
+
+ val allocatedExecutors = executorsToAllocate.mapValues { pod =>
+ Utils.tryLog {
+ kubernetesClient.pods().create(pod)
+ }
+ }
+
+ RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ allocatedExecutors.map {
+ case (executorId, attemptedAllocatedExecutor) =>
+ attemptedAllocatedExecutor.map { successfullyAllocatedExecutor =>
+ runningExecutorsToPods.put(executorId, successfullyAllocatedExecutor)
+ }
+ }
+ }
+ }
+
+ def handleDisconnectedExecutors(): Unit = {
+ // For each disconnected executor, synchronize with the loss reasons that may have been found
+ // by the executor pod watcher. If the loss reason was discovered by the watcher,
+ // inform the parent class with removeExecutor.
+ disconnectedPodsByExecutorIdPendingRemoval.asScala.foreach {
+ case (executorId, executorPod) =>
+ val knownExitReason = Option(podsWithKnownExitReasons.remove(
+ executorPod.getMetadata.getName))
+ knownExitReason.fold {
+ removeExecutorOrIncrementLossReasonCheckCount(executorId)
+ } { executorExited =>
+ logWarning(s"Removing executor $executorId with loss reason " + executorExited.message)
+ removeExecutor(executorId, executorExited)
+ // We don't delete the pod running the executor that has an exit condition caused by
+ // the application from the Kubernetes API server. This allows users to debug later on
+ // through commands such as "kubectl logs " and
+ // "kubectl describe pod ". Note that exited containers have terminated and
+ // therefore won't take CPU and memory resources.
+ // Otherwise, the executor pod is marked to be deleted from the API server.
+ if (executorExited.exitCausedByApp) {
+ logInfo(s"Executor $executorId exited because of the application.")
+ deleteExecutorFromDataStructures(executorId)
+ } else {
+ logInfo(s"Executor $executorId failed because of a framework error.")
+ deleteExecutorFromClusterAndDataStructures(executorId)
+ }
+ }
+ }
+ }
+
+ def removeExecutorOrIncrementLossReasonCheckCount(executorId: String): Unit = {
+ val reasonCheckCount = executorReasonCheckAttemptCounts.getOrElse(executorId, 0)
+ if (reasonCheckCount >= executorLostReasonCheckMaxAttempts) {
+ removeExecutor(executorId, SlaveLost("Executor lost for unknown reasons."))
+ deleteExecutorFromClusterAndDataStructures(executorId)
+ } else {
+ executorReasonCheckAttemptCounts.put(executorId, reasonCheckCount + 1)
+ }
+ }
+
+ def deleteExecutorFromClusterAndDataStructures(executorId: String): Unit = {
+ deleteExecutorFromDataStructures(executorId).foreach { pod =>
+ kubernetesClient.pods().delete(pod)
+ }
+ }
+
+ def deleteExecutorFromDataStructures(executorId: String): Option[Pod] = {
+ disconnectedPodsByExecutorIdPendingRemoval.remove(executorId)
+ executorReasonCheckAttemptCounts -= executorId
+ podsWithKnownExitReasons.remove(executorId)
+ RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ runningExecutorsToPods.remove(executorId).orElse {
+ logWarning(s"Unable to remove pod for unknown executor $executorId")
+ None
+ }
+ }
+ }
+ }
+
+ override def sufficientResourcesRegistered(): Boolean = {
+ totalRegisteredExecutors.get() >= initialExecutors * minRegisteredRatio
+ }
+
+ override def start(): Unit = {
+ super.start()
+ executorWatchResource.set(
+ kubernetesClient
+ .pods()
+ .withLabel(SPARK_APP_ID_LABEL, applicationId())
+ .watch(new ExecutorPodsWatcher()))
+
+ allocatorExecutor.scheduleWithFixedDelay(
+ allocatorRunnable, 0L, podAllocationInterval, TimeUnit.SECONDS)
+
+ if (!Utils.isDynamicAllocationEnabled(conf)) {
+ doRequestTotalExecutors(initialExecutors)
+ }
+ }
+
+ override def stop(): Unit = {
+ // stop allocation of new resources and caches.
+ allocatorExecutor.shutdown()
+ allocatorExecutor.awaitTermination(30, TimeUnit.SECONDS)
+
+ // send stop message to executors so they shut down cleanly
+ super.stop()
+
+ try {
+ val resource = executorWatchResource.getAndSet(null)
+ if (resource != null) {
+ resource.close()
+ }
+ } catch {
+ case e: Throwable => logWarning("Failed to close the executor pod watcher", e)
+ }
+
+ // then delete the executor pods
+ Utils.tryLogNonFatalError {
+ deleteExecutorPodsOnStop()
+ executorPodsByIPs.clear()
+ }
+ Utils.tryLogNonFatalError {
+ logInfo("Closing kubernetes client")
+ kubernetesClient.close()
+ }
+ }
+
+ /**
+ * @return A map of K8s cluster nodes to the number of tasks that could benefit from data
+ * locality if an executor launches on the cluster node.
+ */
+ private def getNodesWithLocalTaskCounts() : Map[String, Int] = {
+ val nodeToLocalTaskCount = synchronized {
+ mutable.Map[String, Int]() ++ hostToLocalTaskCount
+ }
+
+ for (pod <- executorPodsByIPs.values().asScala) {
+ // Remove cluster nodes that are running our executors already.
+ // TODO: This prefers spreading out executors across nodes. In case users want
+ // consolidating executors on fewer nodes, introduce a flag. See the spark.deploy.spreadOut
+ // flag that Spark standalone has: https://spark.apache.org/docs/latest/spark-standalone.html
+ nodeToLocalTaskCount.remove(pod.getSpec.getNodeName).nonEmpty ||
+ nodeToLocalTaskCount.remove(pod.getStatus.getHostIP).nonEmpty ||
+ nodeToLocalTaskCount.remove(
+ InetAddress.getByName(pod.getStatus.getHostIP).getCanonicalHostName).nonEmpty
+ }
+ nodeToLocalTaskCount.toMap[String, Int]
+ }
+
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future[Boolean] {
+ totalExpectedExecutors.set(requestedTotal)
+ true
+ }
+
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
+ val podsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ executorIds.flatMap { executorId =>
+ runningExecutorsToPods.remove(executorId) match {
+ case Some(pod) =>
+ disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+ Some(pod)
+
+ case None =>
+ logWarning(s"Unable to remove pod for unknown executor $executorId")
+ None
+ }
+ }
+ }
+
+ kubernetesClient.pods().delete(podsToDelete: _*)
+ true
+ }
+
+ private def deleteExecutorPodsOnStop(): Unit = {
+ val executorPodsToDelete = RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ val runningExecutorPodsCopy = Seq(runningExecutorsToPods.values.toSeq: _*)
+ runningExecutorsToPods.clear()
+ runningExecutorPodsCopy
+ }
+ kubernetesClient.pods().delete(executorPodsToDelete: _*)
+ }
+
+ private class ExecutorPodsWatcher extends Watcher[Pod] {
+
+ private val DEFAULT_CONTAINER_FAILURE_EXIT_STATUS = -1
+
+ override def eventReceived(action: Action, pod: Pod): Unit = {
+ val podName = pod.getMetadata.getName
+ val podIP = pod.getStatus.getPodIP
+
+ action match {
+ case Action.MODIFIED if (pod.getStatus.getPhase == "Running"
+ && pod.getMetadata.getDeletionTimestamp == null) =>
+ val clusterNodeName = pod.getSpec.getNodeName
+ logInfo(s"Executor pod $podName ready, launched at $clusterNodeName as IP $podIP.")
+ executorPodsByIPs.put(podIP, pod)
+
+ case Action.DELETED | Action.ERROR =>
+ val executorId = getExecutorId(pod)
+ logDebug(s"Executor pod $podName at IP $podIP was at $action.")
+ if (podIP != null) {
+ executorPodsByIPs.remove(podIP)
+ }
+
+ val executorExitReason = if (action == Action.ERROR) {
+ logWarning(s"Received error event of executor pod $podName. Reason: " +
+ pod.getStatus.getReason)
+ executorExitReasonOnError(pod)
+ } else if (action == Action.DELETED) {
+ logWarning(s"Received delete event of executor pod $podName. Reason: " +
+ pod.getStatus.getReason)
+ executorExitReasonOnDelete(pod)
+ } else {
+ throw new IllegalStateException(
+ s"Unknown action that should only be DELETED or ERROR: $action")
+ }
+ podsWithKnownExitReasons.put(pod.getMetadata.getName, executorExitReason)
+
+ if (!disconnectedPodsByExecutorIdPendingRemoval.containsKey(executorId)) {
+ log.warn(s"Executor with id $executorId was not marked as disconnected, but the " +
+ s"watch received an event of type $action for this executor. The executor may " +
+ "have failed to start in the first place and never registered with the driver.")
+ }
+ disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+
+ case _ => logDebug(s"Received event of executor pod $podName: " + action)
+ }
+ }
+
+ override def onClose(cause: KubernetesClientException): Unit = {
+ logDebug("Executor pod watch closed.", cause)
+ }
+
+ private def getExecutorExitStatus(pod: Pod): Int = {
+ val containerStatuses = pod.getStatus.getContainerStatuses
+ if (!containerStatuses.isEmpty) {
+ // we assume the first container represents the pod status. This assumption may not hold
+ // true in the future. Revisit this if side-car containers start running inside executor
+ // pods.
+ getExecutorExitStatus(containerStatuses.get(0))
+ } else DEFAULT_CONTAINER_FAILURE_EXIT_STATUS
+ }
+
+ private def getExecutorExitStatus(containerStatus: ContainerStatus): Int = {
+ Option(containerStatus.getState).map { containerState =>
+ Option(containerState.getTerminated).map { containerStateTerminated =>
+ containerStateTerminated.getExitCode.intValue()
+ }.getOrElse(UNKNOWN_EXIT_CODE)
+ }.getOrElse(UNKNOWN_EXIT_CODE)
+ }
+
+ private def isPodAlreadyReleased(pod: Pod): Boolean = {
+ val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+ RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ !runningExecutorsToPods.contains(executorId)
+ }
+ }
+
+ private def executorExitReasonOnError(pod: Pod): ExecutorExited = {
+ val containerExitStatus = getExecutorExitStatus(pod)
+ // container was probably actively killed by the driver.
+ if (isPodAlreadyReleased(pod)) {
+ ExecutorExited(containerExitStatus, exitCausedByApp = false,
+ s"Container in pod ${pod.getMetadata.getName} exited from explicit termination " +
+ "request.")
+ } else {
+ val containerExitReason = s"Pod ${pod.getMetadata.getName}'s executor container " +
+ s"exited with exit status code $containerExitStatus."
+ ExecutorExited(containerExitStatus, exitCausedByApp = true, containerExitReason)
+ }
+ }
+
+ private def executorExitReasonOnDelete(pod: Pod): ExecutorExited = {
+ val exitMessage = if (isPodAlreadyReleased(pod)) {
+ s"Container in pod ${pod.getMetadata.getName} exited from explicit termination request."
+ } else {
+ s"Pod ${pod.getMetadata.getName} deleted or lost."
+ }
+ ExecutorExited(getExecutorExitStatus(pod), exitCausedByApp = false, exitMessage)
+ }
+
+ private def getExecutorId(pod: Pod): String = {
+ val executorId = pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL)
+ require(executorId != null, "Unexpected pod metadata; expected all executor pods " +
+ s"to have label $SPARK_EXECUTOR_ID_LABEL.")
+ executorId
+ }
+ }
+
+ override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+ new KubernetesDriverEndpoint(rpcEnv, properties)
+ }
+
+ private class KubernetesDriverEndpoint(
+ rpcEnv: RpcEnv,
+ sparkProperties: Seq[(String, String)])
+ extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+ override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+ addressToExecutorId.get(rpcAddress).foreach { executorId =>
+ if (disableExecutor(executorId)) {
+ RUNNING_EXECUTOR_PODS_LOCK.synchronized {
+ runningExecutorsToPods.get(executorId).foreach { pod =>
+ disconnectedPodsByExecutorIdPendingRemoval.put(executorId, pod)
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+private object KubernetesClusterSchedulerBackend {
+ private val UNKNOWN_EXIT_CODE = -1
+}
diff --git a/resource-managers/kubernetes/core/src/test/resources/log4j.properties b/resource-managers/kubernetes/core/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000..ad95fadb7c0c0
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/unit-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/unit-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
new file mode 100644
index 0000000000000..1c7717c238096
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, _}
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach {
+ private val driverPodName: String = "driver-pod"
+ private val driverPodUid: String = "driver-uid"
+ private val executorPrefix: String = "base"
+ private val executorImage: String = "executor-image"
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(driverPodName)
+ .withUid(driverPodUid)
+ .endMetadata()
+ .withNewSpec()
+ .withNodeName("some-node")
+ .endSpec()
+ .withNewStatus()
+ .withHostIP("192.168.99.100")
+ .endStatus()
+ .build()
+ private var baseConf: SparkConf = _
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ baseConf = new SparkConf()
+ .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+ .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
+ .set(EXECUTOR_DOCKER_IMAGE, executorImage)
+ }
+
+ test("basic executor pod has reasonable defaults") {
+ val factory = new ExecutorPodFactoryImpl(baseConf)
+ val executor = factory.createExecutorPod(
+ "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
+
+ // The executor pod name and default labels.
+ assert(executor.getMetadata.getName === s"$executorPrefix-exec-1")
+ assert(executor.getMetadata.getLabels.size() === 3)
+ assert(executor.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) === "1")
+
+ // There is exactly 1 container with no volume mounts and default memory limits.
+ // Default memory limit is 1024M + 384M (minimum overhead constant).
+ assert(executor.getSpec.getContainers.size() === 1)
+ assert(executor.getSpec.getContainers.get(0).getImage === executorImage)
+ assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty)
+ assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
+ assert(executor.getSpec.getContainers.get(0).getResources
+ .getLimits.get("memory").getAmount === "1408Mi")
+
+ // The pod has no node selector, volumes.
+ assert(executor.getSpec.getNodeSelector.isEmpty)
+ assert(executor.getSpec.getVolumes.isEmpty)
+
+ checkEnv(executor, Map())
+ checkOwnerReferences(executor, driverPodUid)
+ }
+
+ test("executor pod hostnames get truncated to 63 characters") {
+ val conf = baseConf.clone()
+ conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
+ "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")
+
+ val factory = new ExecutorPodFactoryImpl(conf)
+ val executor = factory.createExecutorPod(
+ "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())
+
+ assert(executor.getSpec.getHostname.length === 63)
+ }
+
+ test("classpath and extra java options get translated into environment variables") {
+ val conf = baseConf.clone()
+ conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+ conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")
+
+ val factory = new ExecutorPodFactoryImpl(conf)
+ val executor = factory.createExecutorPod(
+ "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())
+
+ checkEnv(executor,
+ Map("SPARK_JAVA_OPT_0" -> "foo=bar",
+ "SPARK_EXECUTOR_EXTRA_CLASSPATH" -> "bar=baz",
+ "qux" -> "quux"))
+ checkOwnerReferences(executor, driverPodUid)
+ }
+
+ // There is always exactly one controller reference, and it points to the driver pod.
+ private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
+ assert(executor.getMetadata.getOwnerReferences.size() === 1)
+ assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid)
+ assert(executor.getMetadata.getOwnerReferences.get(0).getController === true)
+ }
+
+ // Check that the expected environment variables are present.
+ private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = {
+ val defaultEnvs = Map(
+ ENV_EXECUTOR_ID -> "1",
+ ENV_DRIVER_URL -> "dummy",
+ ENV_EXECUTOR_CORES -> "1",
+ ENV_EXECUTOR_MEMORY -> "1g",
+ ENV_APPLICATION_ID -> "dummy",
+ ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars
+
+ assert(executor.getSpec.getContainers.size() === 1)
+ assert(executor.getSpec.getContainers.get(0).getEnv.size() === defaultEnvs.size)
+ val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map {
+ x => (x.getName, x.getValue)
+ }.toMap
+ assert(defaultEnvs === mapEnvs)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000..3febb2f47cfd4
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{ExecutorService, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodBuilder, PodList}
+import io.fabric8.kubernetes.client.{KubernetesClient, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NonNamespaceOperation, PodResource}
+import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.Matchers.{any, eq => mockitoEq}
+import org.mockito.Mockito.{doNothing, never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+import org.scalatest.mockito.MockitoSugar._
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{ExecutorExited, LiveListenerBus, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.util.ThreadUtils
+
+class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val APP_ID = "test-spark-app"
+ private val DRIVER_POD_NAME = "spark-driver-pod"
+ private val NAMESPACE = "test-namespace"
+ private val SPARK_DRIVER_HOST = "localhost"
+ private val SPARK_DRIVER_PORT = 7077
+ private val POD_ALLOCATION_INTERVAL = 60L
+ private val DRIVER_URL = RpcEndpointAddress(
+ SPARK_DRIVER_HOST, SPARK_DRIVER_PORT, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
+ private val FIRST_EXECUTOR_POD = new PodBuilder()
+ .withNewMetadata()
+ .withName("pod1")
+ .endMetadata()
+ .withNewSpec()
+ .withNodeName("node1")
+ .endSpec()
+ .withNewStatus()
+ .withHostIP("192.168.99.100")
+ .endStatus()
+ .build()
+ private val SECOND_EXECUTOR_POD = new PodBuilder()
+ .withNewMetadata()
+ .withName("pod2")
+ .endMetadata()
+ .withNewSpec()
+ .withNodeName("node2")
+ .endSpec()
+ .withNewStatus()
+ .withHostIP("192.168.99.101")
+ .endStatus()
+ .build()
+
+ private type PODS = MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+ private type LABELED_PODS = FilterWatchListDeletable[
+ Pod, PodList, java.lang.Boolean, Watch, Watcher[Pod]]
+ private type IN_NAMESPACE_PODS = NonNamespaceOperation[
+ Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]
+
+ @Mock
+ private var sparkContext: SparkContext = _
+
+ @Mock
+ private var listenerBus: LiveListenerBus = _
+
+ @Mock
+ private var taskSchedulerImpl: TaskSchedulerImpl = _
+
+ @Mock
+ private var allocatorExecutor: ScheduledExecutorService = _
+
+ @Mock
+ private var requestExecutorsService: ExecutorService = _
+
+ @Mock
+ private var executorPodFactory: ExecutorPodFactory = _
+
+ @Mock
+ private var kubernetesClient: KubernetesClient = _
+
+ @Mock
+ private var podOperations: PODS = _
+
+ @Mock
+ private var podsWithLabelOperations: LABELED_PODS = _
+
+ @Mock
+ private var podsInNamespace: IN_NAMESPACE_PODS = _
+
+ @Mock
+ private var podsWithDriverName: PodResource[Pod, DoneablePod] = _
+
+ @Mock
+ private var rpcEnv: RpcEnv = _
+
+ @Mock
+ private var driverEndpointRef: RpcEndpointRef = _
+
+ @Mock
+ private var executorPodsWatch: Watch = _
+
+ @Mock
+ private var successFuture: Future[Boolean] = _
+
+ private var sparkConf: SparkConf = _
+ private var executorPodsWatcherArgument: ArgumentCaptor[Watcher[Pod]] = _
+ private var allocatorRunnable: ArgumentCaptor[Runnable] = _
+ private var requestExecutorRunnable: ArgumentCaptor[Runnable] = _
+ private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
+
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(DRIVER_POD_NAME)
+ .addToLabels(SPARK_APP_ID_LABEL, APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .endMetadata()
+ .build()
+
+ before {
+ MockitoAnnotations.initMocks(this)
+ sparkConf = new SparkConf()
+ .set(KUBERNETES_DRIVER_POD_NAME, DRIVER_POD_NAME)
+ .set(KUBERNETES_NAMESPACE, NAMESPACE)
+ .set("spark.driver.host", SPARK_DRIVER_HOST)
+ .set("spark.driver.port", SPARK_DRIVER_PORT.toString)
+ .set(KUBERNETES_ALLOCATION_BATCH_DELAY, POD_ALLOCATION_INTERVAL)
+ executorPodsWatcherArgument = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
+ allocatorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+ requestExecutorRunnable = ArgumentCaptor.forClass(classOf[Runnable])
+ driverEndpoint = ArgumentCaptor.forClass(classOf[RpcEndpoint])
+ when(sparkContext.conf).thenReturn(sparkConf)
+ when(sparkContext.listenerBus).thenReturn(listenerBus)
+ when(taskSchedulerImpl.sc).thenReturn(sparkContext)
+ when(kubernetesClient.pods()).thenReturn(podOperations)
+ when(podOperations.withLabel(SPARK_APP_ID_LABEL, APP_ID)).thenReturn(podsWithLabelOperations)
+ when(podsWithLabelOperations.watch(executorPodsWatcherArgument.capture()))
+ .thenReturn(executorPodsWatch)
+ when(podOperations.inNamespace(NAMESPACE)).thenReturn(podsInNamespace)
+ when(podsInNamespace.withName(DRIVER_POD_NAME)).thenReturn(podsWithDriverName)
+ when(podsWithDriverName.get()).thenReturn(driverPod)
+ when(allocatorExecutor.scheduleWithFixedDelay(
+ allocatorRunnable.capture(),
+ mockitoEq(0L),
+ mockitoEq(POD_ALLOCATION_INTERVAL),
+ mockitoEq(TimeUnit.SECONDS))).thenReturn(null)
+ // Creating Futures in Scala backed by a Java executor service resolves to running
+ // ExecutorService#execute (as opposed to submit)
+ doNothing().when(requestExecutorsService).execute(requestExecutorRunnable.capture())
+ when(rpcEnv.setupEndpoint(
+ mockitoEq(CoarseGrainedSchedulerBackend.ENDPOINT_NAME), driverEndpoint.capture()))
+ .thenReturn(driverEndpointRef)
+
+ // Used by the CoarseGrainedSchedulerBackend when making RPC calls.
+ when(driverEndpointRef.ask[Boolean]
+ (any(classOf[Any]))
+ (any())).thenReturn(successFuture)
+ when(successFuture.failed).thenReturn(Future[Throwable] {
+ // emulate behavior of the Future.failed method.
+ throw new NoSuchElementException()
+ }(ThreadUtils.sameThread))
+ }
+
+ test("Basic lifecycle expectations when starting and stopping the scheduler.") {
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ assert(executorPodsWatcherArgument.getValue != null)
+ assert(allocatorRunnable.getValue != null)
+ scheduler.stop()
+ verify(executorPodsWatch).close()
+ }
+
+ test("Static allocation should request executors upon first allocator run.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(firstResolvedPod)
+ verify(podOperations).create(secondResolvedPod)
+ }
+
+ test("Killing executors deletes the executor pods") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 2)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ when(podOperations.create(any(classOf[Pod])))
+ .thenAnswer(AdditionalAnswers.returnsFirstArg())
+ allocatorRunnable.getValue.run()
+ scheduler.doKillExecutors(Seq("2"))
+ requestExecutorRunnable.getAllValues.asScala.last.run()
+ verify(podOperations).delete(secondResolvedPod)
+ verify(podOperations, never()).delete(firstResolvedPod)
+ }
+
+ test("Executors should be requested in batches.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 2)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ requestExecutorRunnable.getValue.run()
+ when(podOperations.create(any(classOf[Pod])))
+ .thenAnswer(AdditionalAnswers.returnsFirstArg())
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ val secondResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(firstResolvedPod)
+ verify(podOperations, never()).create(secondResolvedPod)
+ val registerFirstExecutorMessage = RegisterExecutor(
+ "1", mock[RpcEndpointRef], "localhost", 1, Map.empty[String, String])
+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
+ .apply(registerFirstExecutorMessage)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(secondResolvedPod)
+ }
+
+ test("Scaled down executors should be cleaned up") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+
+ // The scheduler backend spins up one executor pod.
+ requestExecutorRunnable.getValue.run()
+ when(podOperations.create(any(classOf[Pod])))
+ .thenAnswer(AdditionalAnswers.returnsFirstArg())
+ val resolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ val executorEndpointRef = mock[RpcEndpointRef]
+ when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
+ val registerFirstExecutorMessage = RegisterExecutor(
+ "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
+ .apply(registerFirstExecutorMessage)
+
+ // Request that there are 0 executors and trigger deletion from driver.
+ scheduler.doRequestTotalExecutors(0)
+ requestExecutorRunnable.getAllValues.asScala.last.run()
+ scheduler.doKillExecutors(Seq("1"))
+ requestExecutorRunnable.getAllValues.asScala.last.run()
+ verify(podOperations, times(1)).delete(resolvedPod)
+ driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
+
+ val exitedPod = exitPod(resolvedPod, 0)
+ executorPodsWatcherArgument.getValue.eventReceived(Action.DELETED, exitedPod)
+ allocatorRunnable.getValue.run()
+
+ // No more deletion attempts of the executors.
+ // This is graceful termination and should not be detected as a failure.
+ verify(podOperations, times(1)).delete(resolvedPod)
+ verify(driverEndpointRef, times(1)).ask[Boolean](
+ RemoveExecutor("1", ExecutorExited(
+ 0,
+ exitCausedByApp = false,
+ s"Container in pod ${exitedPod.getMetadata.getName} exited from" +
+ s" explicit termination request.")))
+ }
+
+ test("Executors that fail should not be deleted.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
+
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+ requestExecutorRunnable.getValue.run()
+ allocatorRunnable.getValue.run()
+ val executorEndpointRef = mock[RpcEndpointRef]
+ when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
+ val registerFirstExecutorMessage = RegisterExecutor(
+ "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
+ .apply(registerFirstExecutorMessage)
+ driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
+ executorPodsWatcherArgument.getValue.eventReceived(
+ Action.ERROR, exitPod(firstResolvedPod, 1))
+
+ // A replacement executor should be created but the error pod should persist.
+ val replacementPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ scheduler.doRequestTotalExecutors(1)
+ requestExecutorRunnable.getValue.run()
+ allocatorRunnable.getAllValues.asScala.last.run()
+ verify(podOperations, never()).delete(firstResolvedPod)
+ verify(driverEndpointRef).ask[Boolean](
+ RemoveExecutor("1", ExecutorExited(
+ 1,
+ exitCausedByApp = true,
+ s"Pod ${FIRST_EXECUTOR_POD.getMetadata.getName}'s executor container exited with" +
+ " exit status code 1.")))
+ }
+
+ test("Executors disconnected due to unknown reasons are deleted and replaced.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
+ val executorLostReasonCheckMaxAttempts = sparkConf.get(
+ KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS)
+
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ when(podOperations.create(any(classOf[Pod]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
+ requestExecutorRunnable.getValue.run()
+ allocatorRunnable.getValue.run()
+ val executorEndpointRef = mock[RpcEndpointRef]
+ when(executorEndpointRef.address).thenReturn(RpcAddress("pod.example.com", 9000))
+ val registerFirstExecutorMessage = RegisterExecutor(
+ "1", executorEndpointRef, "localhost:9000", 1, Map.empty[String, String])
+ when(taskSchedulerImpl.resourceOffers(any())).thenReturn(Seq.empty)
+ driverEndpoint.getValue.receiveAndReply(mock[RpcCallContext])
+ .apply(registerFirstExecutorMessage)
+
+ driverEndpoint.getValue.onDisconnected(executorEndpointRef.address)
+ 1 to executorLostReasonCheckMaxAttempts foreach { _ =>
+ allocatorRunnable.getValue.run()
+ verify(podOperations, never()).delete(FIRST_EXECUTOR_POD)
+ }
+
+ val recreatedResolvedPod = expectPodCreationWithId(2, SECOND_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).delete(firstResolvedPod)
+ verify(driverEndpointRef).ask[Boolean](
+ RemoveExecutor("1", SlaveLost("Executor lost for unknown reasons.")))
+ }
+
+ test("Executors that fail to start on the Kubernetes API call rebuild in the next batch.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ when(podOperations.create(firstResolvedPod))
+ .thenThrow(new RuntimeException("test"))
+ requestExecutorRunnable.getValue.run()
+ allocatorRunnable.getValue.run()
+ verify(podOperations, times(1)).create(firstResolvedPod)
+ val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(recreatedResolvedPod)
+ }
+
+ test("Executors that are initially created but the watch notices them fail are rebuilt" +
+ " in the next batch.") {
+ sparkConf
+ .set(KUBERNETES_ALLOCATION_BATCH_SIZE, 1)
+ .set(org.apache.spark.internal.config.EXECUTOR_INSTANCES, 1)
+ val scheduler = newSchedulerBackend()
+ scheduler.start()
+ val firstResolvedPod = expectPodCreationWithId(1, FIRST_EXECUTOR_POD)
+ when(podOperations.create(FIRST_EXECUTOR_POD)).thenAnswer(AdditionalAnswers.returnsFirstArg())
+ requestExecutorRunnable.getValue.run()
+ allocatorRunnable.getValue.run()
+ verify(podOperations, times(1)).create(firstResolvedPod)
+ executorPodsWatcherArgument.getValue.eventReceived(Action.ERROR, firstResolvedPod)
+ val recreatedResolvedPod = expectPodCreationWithId(2, FIRST_EXECUTOR_POD)
+ allocatorRunnable.getValue.run()
+ verify(podOperations).create(recreatedResolvedPod)
+ }
+
+ private def newSchedulerBackend(): KubernetesClusterSchedulerBackend = {
+ new KubernetesClusterSchedulerBackend(
+ taskSchedulerImpl,
+ rpcEnv,
+ executorPodFactory,
+ kubernetesClient,
+ allocatorExecutor,
+ requestExecutorsService) {
+
+ override def applicationId(): String = APP_ID
+ }
+ }
+
+ private def exitPod(basePod: Pod, exitCode: Int): Pod = {
+ new PodBuilder(basePod)
+ .editStatus()
+ .addNewContainerStatus()
+ .withNewState()
+ .withNewTerminated()
+ .withExitCode(exitCode)
+ .endTerminated()
+ .endState()
+ .endContainerStatus()
+ .endStatus()
+ .build()
+ }
+
+ private def expectPodCreationWithId(executorId: Int, expectedPod: Pod): Pod = {
+ val resolvedPod = new PodBuilder(expectedPod)
+ .editMetadata()
+ .addToLabels(SPARK_EXECUTOR_ID_LABEL, executorId.toString)
+ .endMetadata()
+ .build()
+ when(executorPodFactory.createExecutorPod(
+ executorId.toString,
+ APP_ID,
+ DRIVER_URL,
+ sparkConf.getExecutorEnv,
+ driverPod,
+ Map.empty)).thenReturn(resolvedPod)
+ resolvedPod
+ }
+}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7052fb347106b..506adb363aa90 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -41,6 +41,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
+import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
/**
@@ -109,7 +110,7 @@ private[yarn] class YarnAllocator(
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)
@volatile private var targetNumExecutors =
- YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
+ SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)
private var currentNodeBlacklist = Set.empty[String]
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 3d9f99f57bed7..9c1472cb50e3a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -133,8 +133,6 @@ object YarnSparkHadoopUtil {
val ANY_HOST = "*"
- val DEFAULT_NUMBER_EXECUTORS = 2
-
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
@@ -279,27 +277,5 @@ object YarnSparkHadoopUtil {
securityMgr.getModifyAclsGroups)
)
}
-
- /**
- * Getting the initial target number of executors depends on whether dynamic allocation is
- * enabled.
- * If not using dynamic allocation it gets the number of executors requested by the user.
- */
- def getInitialTargetExecutorNumber(
- conf: SparkConf,
- numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
- if (Utils.isDynamicAllocationEnabled(conf)) {
- val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
- val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
- val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
- require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
- s"initial executor number $initialNumExecutors must between min executor number " +
- s"$minNumExecutors and max executor number $maxNumExecutors")
-
- initialNumExecutors
- } else {
- conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
- }
- }
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d482376d14dd7..b722cc401bb73 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -52,7 +52,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray)
- totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(conf)
+ totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
client = new Client(args, conf)
bindToYarn(client.submitApplication(), None)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 4f3d5ebf403e0..e2d477be329c3 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -34,7 +34,7 @@ private[spark] class YarnClusterSchedulerBackend(
val attemptId = ApplicationMaster.getAttemptId
bindToYarn(attemptId.getApplicationId(), Some(attemptId))
super.start()
- totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
+ totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(sc.conf)
}
override def getDriverLogUrls: Option[Map[String, String]] = {
|