diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b4d7462f79368..9efaaa765b5d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab import java.net.{URI, URL} import java.security.PrivilegedExceptionAction import java.text.ParseException -import java.util.UUID +import java.util.{ServiceLoader, UUID} import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.{Properties, Try} @@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging { } /** - * Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only. + * Kill an existing submission. */ private def kill(args: SparkSubmitArguments): Unit = { - new RestSubmissionClient(args.master) - .killSubmission(args.submissionToKill) + if (RestSubmissionClient.supportsRestClient(args.master)) { + new RestSubmissionClient(args.master) + .killSubmission(args.submissionToKill) + } else { + val sparkConf = args.toSparkConf() + sparkConf.set("spark.master", args.master) + SparkSubmitUtils + .getSubmitOperations(args.master) + .kill(args.submissionToKill, sparkConf) + } } /** - * Request the status of an existing submission using the REST protocol. - * Standalone and Mesos cluster mode only. + * Request the status of an existing submission. */ private def requestStatus(args: SparkSubmitArguments): Unit = { - new RestSubmissionClient(args.master) - .requestSubmissionStatus(args.submissionToRequestStatusFor) + if (RestSubmissionClient.supportsRestClient(args.master)) { + new RestSubmissionClient(args.master) + .requestSubmissionStatus(args.submissionToRequestStatusFor) + } else { + val sparkConf = args.toSparkConf() + sparkConf.set("spark.master", args.master) + SparkSubmitUtils + .getSubmitOperations(args.master) + .printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf) + } } /** Print version information to the log. */ @@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging { } } - args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) } + // update spark config from args + args.toSparkConf(Option(sparkConf)) val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() @@ -1336,6 +1353,23 @@ private[spark] object SparkSubmitUtils { } } + private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoaders = + ServiceLoader.load(classOf[SparkSubmitOperation], loader) + .asScala + .filter(_.supports(master)) + + serviceLoaders.size match { + case x if x > 1 => + throw new SparkException(s"Multiple($x) external SparkSubmitOperations " + + s"clients registered for master url ${master}.") + case 1 => serviceLoaders.headOption.get + case _ => + throw new IllegalArgumentException(s"No external SparkSubmitOperations " + + s"clients found for master url: '$master'") + } + } } /** @@ -1348,3 +1382,12 @@ private case class OptionAssigner( deployMode: Int, clOption: String = null, confKey: String = null) + +private[spark] trait SparkSubmitOperation { + + def kill(submissionId: String, conf: SparkConf): Unit + + def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit + + def supports(master: String): Boolean +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index e7954d1ee1651..ed1324baed0f1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.io.Source import scala.util.Try -import org.apache.spark.{SparkException, SparkUserAppException} +import org.apache.spark.{SparkConf, SparkException, SparkUserAppException} import org.apache.spark.deploy.SparkSubmitAction._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED @@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } private def validateKillArguments(): Unit = { - if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { - error("Killing submissions is only supported in standalone or Mesos mode!") - } if (submissionToKill == null) { error("Please specify a submission to kill.") } } private def validateStatusRequestArguments(): Unit = { - if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { - error( - "Requesting submission statuses is only supported in standalone or Mesos mode!") - } if (submissionToRequestStatusFor == null) { error("Please specify a submission to request status for.") } @@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | Spark standalone or Mesos with cluster deploy mode only: | --supervise If given, restarts the driver on failure. + | + | Spark standalone, Mesos or K8s with cluster deploy mode only: | --kill SUBMISSION_ID If given, kills the driver specified. | --status SUBMISSION_ID If given, requests the status of the driver specified. | @@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S private def error(msg: String): Unit = throw new SparkException(msg) + private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = { + // either use an existing config or create a new empty one + sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) { + case (conf, (k, v)) => conf.set(k, v) + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index afa413fe165df..1648ba516d9b6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -61,8 +61,6 @@ import org.apache.spark.util.Utils private[spark] class RestSubmissionClient(master: String) extends Logging { import RestSubmissionClient._ - private val supportedMasterPrefixes = Seq("spark://", "mesos://") - private val masters: Array[String] = if (master.startsWith("spark://")) { Utils.parseStandaloneMasterUrls(master) } else { @@ -409,6 +407,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging { private[spark] object RestSubmissionClient { + val supportedMasterPrefixes = Seq("spark://", "mesos://") + // SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong // on the remote machine (SPARK-12345) (SPARK-25934) private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR") @@ -424,6 +424,10 @@ private[spark] object RestSubmissionClient { (k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_") } } + + private[spark] def supportsRestClient(master: String): Boolean = { + supportedMasterPrefixes.exists(master.startsWith) + } } private[spark] class RestSubmissionClientApp extends SparkApplication { @@ -456,5 +460,4 @@ private[spark] class RestSubmissionClientApp extends SparkApplication { val env = RestSubmissionClient.filterSystemEnvironment(sys.env) run(appResource, mainClass, appArgs, conf, env) } - } diff --git a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala index 4b6602b50aa1c..add1146c90840 100644 --- a/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala @@ -25,8 +25,12 @@ import org.apache.spark.SparkException * Contains basic command line parsing functionality and methods to parse some common Spark CLI * options. */ -private[spark] trait CommandLineUtils { +private[spark] trait CommandLineUtils extends CommandLineLoggingUtils { + def main(args: Array[String]): Unit +} + +private[spark] trait CommandLineLoggingUtils { // Exposed for testing private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode) @@ -41,6 +45,4 @@ private[spark] trait CommandLineUtils { printMessage("Run with --help for usage help or --verbose for debug output") exitFn(1) } - - def main(args: Array[String]): Unit } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 3051d650750fe..641751ff36e58 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1239,6 +1239,23 @@ class SparkSubmitSuite conf.get(nonDelimSpaceFromFile._1) should be ("blah") } + + test("get a Spark configuration from arguments") { + val testConf = "spark.test.hello" -> "world" + val masterConf = "spark.master" -> "yarn" + val clArgs = Seq( + "--conf", s"${testConf._1}=${testConf._2}", + "--conf", s"${masterConf._1}=${masterConf._2}", + "--class", "Foo", + "app.jar") + val conf = new SparkSubmitArguments(clArgs).toSparkConf() + Seq( + testConf, + masterConf + ).foreach { case (k, v) => + conf.get(k) should be (v) + } + } } object SparkSubmitSuite extends SparkFunSuite with TimeLimits { diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 40ff62cf43dcc..7e26d77b055fe 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -62,7 +62,7 @@ logs and remains in "completed" state in the Kubernetes API until it's eventuall Note that in the completed state, the driver pod does *not* use any computational or memory resources. -The driver and executor pod scheduling is handled by Kubernetes. Communication to the Kubernetes API is done via fabric8, and we are +The driver and executor pod scheduling is handled by Kubernetes. Communication to the Kubernetes API is done via fabric8, and we are currently running kubernetes-client version 4.1.0. Make sure that when you are making infrastructure additions that you are aware of said version. It is possible to schedule the driver and executor pods on a subset of available nodes through a [node selector](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector) using the configuration property for it. It will be possible to use more advanced @@ -89,7 +89,7 @@ $ ./bin/docker-image-tool.sh -r -t my-tag push ``` This will build using the projects provided default `Dockerfiles`. To see more options available for customising the behaviour of this tool, including providing custom `Dockerfiles`, please run with the `-h` flag. -By default `bin/docker-image-tool.sh` builds docker image for running JVM jobs. You need to opt-in to build additional +By default `bin/docker-image-tool.sh` builds docker image for running JVM jobs. You need to opt-in to build additional language binding docker images. Example usage is @@ -250,7 +250,7 @@ To mount a volume of any of the types above into the driver pod, use the followi --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path= --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly= --conf spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.subPath= -``` +``` Specifically, `VolumeType` can be one of the following values: `hostPath`, `emptyDir`, and `persistentVolumeClaim`. `VolumeName` is the name you want to use for the volume under the `volumes` field in the pod specification. @@ -258,7 +258,7 @@ Each supported type of volumes may have some specific configuration options, whi ``` spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]= -``` +``` For example, the claim name of a `persistentVolumeClaim` with volume name `checkpointpvc` can be specified using the following property: @@ -266,7 +266,7 @@ For example, the claim name of a `persistentVolumeClaim` with volume name `check spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=check-point-pvc-claim ``` -The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. +The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below. ## Local Storage @@ -403,6 +403,36 @@ RBAC authorization and how to configure Kubernetes service accounts for pods, pl [Using RBAC Authorization](https://kubernetes.io/docs/admin/authorization/rbac/) and [Configure Service Accounts for Pods](https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/). +## Spark Application Management + +Kubernetes provides simple application management via the spark-submit CLI tool in cluster mode. +Users can kill a job by providing the submission ID that is printed when submitting their job. +The submission ID follows the format ``namespace:driver-pod-name``. +If user omits the namespace then the namespace set in current k8s context is used. +For example if user has set a specific namespace as follows `kubectl config set-context minikube --namespace=spark` +then the `spark` namespace will be used by default. On the other hand, if there is no namespace added to the specific context +then all namespaces will be considered by default. That means operations will affect all Spark applications matching the given submission ID regardless of namespace. +Moreover, spark-submit for application management uses the same backend code that is used for submitting the driver, so the same properties +like `spark.kubernetes.context` etc., can be re-used. + +For example: +```bash +$ spark-submit --kill spark:spark-pi-1547948636094-driver --master k8s://https://192.168.2.8:8443 +``` +Users also can list the application status by using the `--status` flag: + +```bash +$ spark-submit --status spark:spark-pi-1547948636094-driver --master k8s://https://192.168.2.8:8443 +``` +Both operations support glob patterns. For example user can run: +```bash +$ spark-submit --kill spark:spark-pi* --master k8s://https://192.168.2.8:8443 +``` +The above will kill all application with the specific prefix. + +User can specify the grace period for pod termination via the `spark.kubernetes.appKillPodDeletionGracePeriod` property, +using `--conf` as means to provide it (default value for all K8s pods is 30 secs). + ## Future Work There are several Spark on Kubernetes features that are currently being worked on or planned to be worked on. Those features are expected to eventually make it into future versions of the spark-kubernetes integration. @@ -411,7 +441,6 @@ Some of these include: * Dynamic Resource Allocation and External Shuffle Service * Local File Dependency Management -* Spark Application Management * Job Queues and Resource Management # Configuration @@ -426,10 +455,10 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.context (none) - The context from the user Kubernetes configuration file used for the initial + The context from the user Kubernetes configuration file used for the initial auto-configuration of the Kubernetes client library. When not specified then - the users current context is used. NB: Many of the - auto-configured settings can be overridden by the use of other Spark + the users current context is used. NB: Many of the + auto-configured settings can be overridden by the use of other Spark configuration properties e.g. spark.kubernetes.namespace. @@ -762,7 +791,7 @@ See the [configuration page](configuration.html) for information on Spark config Specify the cpu request for each executor pod. Values conform to the Kubernetes convention. Example values include 0.1, 500m, 1.5, 5, etc., with the definition of cpu units documented in CPU units. - This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod cpu request if set. Task + This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod cpu request if set. Task parallelism, e.g., number of tasks an executor can run concurrently is not affected by this. @@ -900,14 +929,14 @@ See the [configuration page](configuration.html) for information on Spark config 0.1 This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. - This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default. + This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This prempts this error with a higher default. spark.kubernetes.pyspark.pythonVersion "3" - This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. @@ -931,7 +960,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.hadoop.configMapName (none) - Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver + Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver and executors for custom Hadoop configuration. @@ -940,14 +969,14 @@ See the [configuration page](configuration.html) for information on Spark config (none) Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user - to provide any kerberos credentials for launching a job. + to provide any kerberos credentials for launching a job. spark.kubernetes.kerberos.tokenSecret.itemKey (none) - Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user + Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user to provide any kerberos credentials for launching a job. @@ -1018,6 +1047,13 @@ See the [configuration page](configuration.html) for information on Spark config Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. + + spark.kubernetes.appKillPodDeletionGracePeriod + (none) + + Specify the grace period in seconds when deleting a Spark application using spark-submit. + + #### Pod template properties @@ -1155,7 +1191,6 @@ The following affect the driver and executor containers. All other containers in The cpu limits are set by spark.kubernetes.{driver,executor}.limit.cores. The cpu is set by spark.{driver,executor}.cores. The memory request and limit are set by summing the values of spark.{driver,executor}.memory and spark.{driver,executor}.memoryOverhead. - diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 23106cb7ec68f..85fd613228e6f 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -53,6 +53,12 @@ test-jar + + org.mockito + mockito-core + test + + io.fabric8 kubernetes-client diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation new file mode 100644 index 0000000000000..d589e6b60f847 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkSubmitOperation @@ -0,0 +1 @@ +org.apache.spark.deploy.k8s.submit.K8SSparkSubmitOperation \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 83b5a758f0f5e..b33b125f5dd2e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -325,6 +325,13 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(true) + val KUBERNETES_SUBMIT_GRACE_PERIOD = + ConfigBuilder("spark.kubernetes.appKillPodDeletionGracePeriod") + .doc("Time to wait for graceful deletion of Spark pods when spark-submit" + + " is used for killing an application.") + .timeConf(TimeUnit.SECONDS) + .createOptional + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala new file mode 100644 index 0000000000000..d45c06772c76a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import scala.collection.JavaConverters._ + +import K8SSparkSubmitOperation.getGracePeriod +import io.fabric8.kubernetes.api.model.{DoneablePod, Pod, PodList} +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkSubmitOperation +import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} +import org.apache.spark.deploy.k8s.Config.{KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, KUBERNETES_SUBMIT_GRACE_PERIOD} +import org.apache.spark.deploy.k8s.Constants.{SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL} +import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState +import org.apache.spark.util.{CommandLineLoggingUtils, Utils} + +private sealed trait K8sSubmitOp extends CommandLineLoggingUtils { + type NON_NAMESPACED_PODS = + NonNamespaceOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] + def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit + def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit + def listPodsInNameSpace(namespace: Option[String]) + (implicit client: KubernetesClient): NON_NAMESPACED_PODS = { + namespace match { + case Some(ns) => client.pods.inNamespace(ns) + case None => client.pods + } + } +} + +private class KillApplication extends K8sSubmitOp { + override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit = { + val podToDelete = listPodsInNameSpace(namespace).withName(pName) + + if (Option(podToDelete).isDefined) { + getGracePeriod(sparkConf) match { + case Some(period) => podToDelete.withGracePeriod(period).delete() + case _ => podToDelete.delete() + } + } else { + printMessage("Application not found.") + } + } + + override def executeOnGlob(pods: List[Pod], namespace: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit = { + if (pods.nonEmpty) { + pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") } + val listedPods = listPodsInNameSpace(namespace) + + getGracePeriod(sparkConf) match { + case Some(period) => + // this is not using the batch api because no option is provided + // when using the grace period. + pods.foreach { pod => + listedPods + .withName(pod.getMetadata.getName) + .withGracePeriod(period) + .delete() + } + case _ => listedPods.delete(pods.asJava) + } + } else { + printMessage("No applications found.") + } + } +} + +private class ListStatus extends K8sSubmitOp { + override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit = { + val pod = listPodsInNameSpace(namespace).withName(pName).get() + if (Option(pod).isDefined) { + printMessage("Application status (driver): " + + Option(pod).map(formatPodState).getOrElse("unknown.")) + } else { + printMessage("Application not found.") + } + } + + override def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf) + (implicit client: KubernetesClient): Unit = { + if (pods.nonEmpty) { + for (pod <- pods) { + printMessage("Application status (driver): " + + Option(pod).map(formatPodState).getOrElse("unknown.")) + } + } else { + printMessage("No applications found.") + } + } +} + +private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation + with CommandLineLoggingUtils { + + private def isGlob(name: String): Boolean = { + name.last == '*' + } + + def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = { + val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master")) + submissionId.split(":", 2) match { + case Array(part1, part2@_*) => + val namespace = if (part2.isEmpty) None else Some(part1) + val pName = if (part2.isEmpty) part1 else part2.headOption.get + Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( + master, + namespace, + KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, + SparkKubernetesClientFactory.ClientType.Submission, + sparkConf, + None, + None) + ) { kubernetesClient => + implicit val client: KubernetesClient = kubernetesClient + if (isGlob(pName)) { + val ops = namespace match { + case Some(ns) => + kubernetesClient + .pods + .inNamespace(ns) + case None => + kubernetesClient + .pods + } + val pods = ops + .list() + .getItems + .asScala + .filter { pod => + val meta = pod.getMetadata + meta.getName.startsWith(pName.stripSuffix("*")) && + meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE + }.toList + op.executeOnGlob(pods, namespace, sparkConf) + } else { + op.executeOnPod(pName, namespace, sparkConf) + } + } + case _ => + printErrorAndExit(s"Submission ID: {$submissionId} is invalid.") + } + } + + override def kill(submissionId: String, conf: SparkConf): Unit = { + printMessage(s"Submitting a request to kill submission " + + s"${submissionId} in ${conf.get("spark.master")}. " + + s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}") + execute(submissionId, conf, new KillApplication) + } + + override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = { + printMessage(s"Submitting a request for the status of submission" + + s" ${submissionId} in ${conf.get("spark.master")}.") + execute(submissionId, conf, new ListStatus) + } + + override def supports(master: String): Boolean = { + master.startsWith("k8s://") + } +} + +private object K8SSparkSubmitOperation { + def getGracePeriod(sparkConf: SparkConf): Option[Long] = { + sparkConf.get(KUBERNETES_SUBMIT_GRACE_PERIOD) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 92d5176baa4d6..11bbad9c480a1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -141,12 +141,15 @@ private[spark] class Client( throw e } + val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" + + s"${resolvedDriverPod.getMetadata.getName}" if (waitForAppCompletion) { - logInfo(s"Waiting for application ${conf.appName} to finish...") + logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...") watcher.awaitCompletion() - logInfo(s"Application ${conf.appName} finished.") + logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.") } else { - logInfo(s"Deployed Spark application ${conf.appName} into Kubernetes.") + logInfo(s"Deployed Spark application ${conf.appName} with " + + s"submission ID ${sId} into Kubernetes.") } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala new file mode 100644 index 0000000000000..d8be13280c2e6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.PrintStream + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.PodResource +import org.mockito.{ArgumentMatchers, Mock, MockitoAnnotations} +import org.mockito.Mockito.{times, verify, when} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_SUBMIT_GRACE_PERIOD +import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL} +import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS +import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID + +class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter { + private val driverPodName1 = "driver1" + private val driverPodName2 = "driver2" + private val driverPod1 = buildDriverPod(driverPodName1, "1") + private val driverPod2 = buildDriverPod(driverPodName2, "2") + private val podList = List(driverPod1, driverPod2) + private val namespace = "test" + + @Mock + private var podOperations: PODS = _ + + @Mock + private var driverPodOperations1: PodResource[Pod, DoneablePod] = _ + + @Mock + private var driverPodOperations2: PodResource[Pod, DoneablePod] = _ + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var err: PrintStream = _ + + before { + MockitoAnnotations.initMocks(this) + when(kubernetesClient.pods()).thenReturn(podOperations) + when(podOperations.inNamespace(namespace)).thenReturn(podOperations) + when(podOperations.delete(podList.asJava)).thenReturn(true) + when(podOperations.withName(driverPodName1)).thenReturn(driverPodOperations1) + when(podOperations.withName(driverPodName2)).thenReturn(driverPodOperations2) + when(driverPodOperations1.get).thenReturn(driverPod1) + when(driverPodOperations1.delete()).thenReturn(true) + when(driverPodOperations2.get).thenReturn(driverPod2) + when(driverPodOperations2.delete()).thenReturn(true) + } + + test("List app status") { + implicit val kubeClient: KubernetesClient = kubernetesClient + val listStatus = new ListStatus + listStatus.printStream = err + listStatus.executeOnPod(driverPodName1, Option(namespace), new SparkConf()) + // scalastyle:off + verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName1, "1"))) + // scalastyle:on + } + + test("List status for multiple apps with glob") { + implicit val kubeClient: KubernetesClient = kubernetesClient + val listStatus = new ListStatus + listStatus.printStream = err + listStatus.executeOnGlob(podList, Option(namespace), new SparkConf()) + // scalastyle:off + verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName1, "1"))) + verify(err).println(ArgumentMatchers.eq(getPodStatus(driverPodName2, "2"))) + // scalastyle:on + } + + test("Kill app") { + implicit val kubeClient: KubernetesClient = kubernetesClient + val killApp = new KillApplication + killApp.executeOnPod(driverPodName1, Option(namespace), new SparkConf()) + verify(driverPodOperations1, times(1)).delete() + } + + test("Kill app with gracePeriod") { + implicit val kubeClient: KubernetesClient = kubernetesClient + val killApp = new KillApplication + val conf = new SparkConf().set(KUBERNETES_SUBMIT_GRACE_PERIOD, 1L) + when(driverPodOperations1.withGracePeriod(1L)).thenReturn(driverPodOperations1) + killApp.executeOnPod(driverPodName1, Option(namespace), conf) + verify(driverPodOperations1, times(1)).withGracePeriod(1L) + verify(driverPodOperations1, times(1)).delete() + } + + test("Kill multiple apps with glob without gracePeriod") { + implicit val kubeClient: KubernetesClient = kubernetesClient + val killApp = new KillApplication + killApp.printStream = err + killApp.executeOnGlob(podList, Option(namespace), new SparkConf()) + verify(podOperations, times(1)).delete(podList.asJava) + // scalastyle:off + verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName1.")) + verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName2.")) + // scalastyle:on + } + + private def buildDriverPod(podName: String, id: String): Pod = { + new PodBuilder() + .withNewMetadata() + .withName(podName) + .withNamespace(namespace) + .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) + .withUid(s"driver-pod-$id") + .endMetadata() + .withNewSpec() + .withServiceAccountName(s"test$id") + .withVolumes() + .withNodeName(s"testNode$id") + .endSpec() + .withNewStatus() + .withPhase("Running") + .endStatus() + .build() + } + + private def getPodStatus(podName: String, id: String): String = { + "Application status (driver): " + + s"""|${"\n\t"} pod name: $podName + |${"\t"} namespace: N/A + |${"\t"} labels: spark-app-selector -> spark-app-id, spark-role -> driver + |${"\t"} pod uid: driver-pod-$id + |${"\t"} creation time: N/A + |${"\t"} service account name: test$id + |${"\t"} volumes: N/A + |${"\t"} node name: testNode$id + |${"\t"} start time: N/A + |${"\t"} phase: Running + |${"\t"} container status: N/A""".stripMargin + } +}