Skip to content

Commit

Permalink
[SPARK-24793][K8S] Enhance spark-submit for app management
Browse files Browse the repository at this point in the history
- supports `--kill` & `--status` flags.
- supports globs which is useful in general check this long standing [issue](kubernetes/kubernetes#17144 (comment)) for kubectl.

Manually against running apps. Example output:

Submission Id reported at launch time:

```
2019-01-20 23:47:56 INFO  Client:58 - Waiting for application spark-pi with submissionId spark:spark-pi-1548020873671-driver to finish...
```

Killing the app:

```
./bin/spark-submit --kill spark:spark-pi-1548020873671-driver --master  k8s://https://192.168.2.8:8443
2019-01-20 23:48:07 WARN  Utils:70 - Your hostname, universe resolves to a loopback address: 127.0.0.1; using 192.168.2.8 instead (on interface wlp2s0)
2019-01-20 23:48:07 WARN  Utils:70 - Set SPARK_LOCAL_IP if you need to bind to another address

```

App terminates with 143 (SIGTERM, since we have tiny this should lead to [graceful shutdown](https://cloud.google.com/solutions/best-practices-for-building-containers)):

```
2019-01-20 23:48:08 INFO  LoggingPodStatusWatcherImpl:58 - State changed, new state:
	 pod name: spark-pi-1548020873671-driver
	 namespace: spark
	 labels: spark-app-selector -> spark-e4730c80e1014b72aa77915a2203ae05, spark-role -> driver
	 pod uid: 0ba9a794-1cfd-11e9-8215-a434d9270a65
	 creation time: 2019-01-20T21:47:55Z
	 service account name: spark-sa
	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
	 node name: minikube
	 start time: 2019-01-20T21:47:55Z
	 phase: Running
	 container status:
		 container name: spark-kubernetes-driver
		 container image: skonto/spark:k8s-3.0.0
		 container state: running
		 container started at: 2019-01-20T21:48:00Z
2019-01-20 23:48:09 INFO  LoggingPodStatusWatcherImpl:58 - State changed, new state:
	 pod name: spark-pi-1548020873671-driver
	 namespace: spark
	 labels: spark-app-selector -> spark-e4730c80e1014b72aa77915a2203ae05, spark-role -> driver
	 pod uid: 0ba9a794-1cfd-11e9-8215-a434d9270a65
	 creation time: 2019-01-20T21:47:55Z
	 service account name: spark-sa
	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
	 node name: minikube
	 start time: 2019-01-20T21:47:55Z
	 phase: Failed
	 container status:
		 container name: spark-kubernetes-driver
		 container image: skonto/spark:k8s-3.0.0
		 container state: terminated
		 container started at: 2019-01-20T21:48:00Z
		 container finished at: 2019-01-20T21:48:08Z
		 exit code: 143
		 termination reason: Error
2019-01-20 23:48:09 INFO  LoggingPodStatusWatcherImpl:58 - Container final statuses:
	 container name: spark-kubernetes-driver
	 container image: skonto/spark:k8s-3.0.0
	 container state: terminated
	 container started at: 2019-01-20T21:48:00Z
	 container finished at: 2019-01-20T21:48:08Z
	 exit code: 143
	 termination reason: Error
2019-01-20 23:48:09 INFO  Client:58 - Application spark-pi with submissionId spark:spark-pi-1548020873671-driver finished.
2019-01-20 23:48:09 INFO  ShutdownHookManager:58 - Shutdown hook called
2019-01-20 23:48:09 INFO  ShutdownHookManager:58 - Deleting directory /tmp/spark-f114b2e0-5605-4083-9203-a4b1c1f6059e

```

Glob scenario:

```
./bin/spark-submit --status spark:spark-pi* --master  k8s://https://192.168.2.8:8443
2019-01-20 22:27:44 WARN  Utils:70 - Your hostname, universe resolves to a loopback address: 127.0.0.1; using 192.168.2.8 instead (on interface wlp2s0)
2019-01-20 22:27:44 WARN  Utils:70 - Set SPARK_LOCAL_IP if you need to bind to another address
Application status (driver):
	 pod name: spark-pi-1547948600328-driver
	 namespace: spark
	 labels: spark-app-selector -> spark-f13f01702f0b4503975ce98252d59b94, spark-role -> driver
	 pod uid: c576e1c6-1c54-11e9-8215-a434d9270a65
	 creation time: 2019-01-20T01:43:22Z
	 service account name: spark-sa
	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
	 node name: minikube
	 start time: 2019-01-20T01:43:22Z
	 phase: Running
	 container status:
		 container name: spark-kubernetes-driver
		 container image: skonto/spark:k8s-3.0.0
		 container state: running
		 container started at: 2019-01-20T01:43:27Z
Application status (driver):
	 pod name: spark-pi-1547948792539-driver
	 namespace: spark
	 labels: spark-app-selector -> spark-006d252db9b24f25b5069df357c30264, spark-role -> driver
	 pod uid: 38375b4b-1c55-11e9-8215-a434d9270a65
	 creation time: 2019-01-20T01:46:35Z
	 service account name: spark-sa
	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-b7wcm
	 node name: minikube
	 start time: 2019-01-20T01:46:35Z
	 phase: Succeeded
	 container status:
		 container name: spark-kubernetes-driver
		 container image: skonto/spark:k8s-3.0.0
		 container state: terminated
		 container started at: 2019-01-20T01:46:39Z
		 container finished at: 2019-01-20T01:46:56Z
		 exit code: 0
		 termination reason: Completed

```

Closes #23599 from skonto/submit_ops_extension.

Authored-by: Stavros Kontopoulos <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
Stavros Kontopoulos authored and Marcelo Vanzin committed Mar 26, 2019
1 parent 887279c commit 05168e7
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 42 deletions.
61 changes: 52 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
import java.util.{ServiceLoader, UUID}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.{Properties, Try}

Expand Down Expand Up @@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging {
}

/**
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
* Kill an existing submission.
*/
private def kill(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
if (RestSubmissionClient.supportsRestClient(args.master)) {
new RestSubmissionClient(args.master)
.killSubmission(args.submissionToKill)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.kill(args.submissionToKill, sparkConf)
}
}

/**
* Request the status of an existing submission using the REST protocol.
* Standalone and Mesos cluster mode only.
* Request the status of an existing submission.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
if (RestSubmissionClient.supportsRestClient(args.master)) {
new RestSubmissionClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
} else {
val sparkConf = args.toSparkConf()
sparkConf.set("spark.master", args.master)
SparkSubmitUtils
.getSubmitOperations(args.master)
.printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)
}
}

/** Print version information to the log. */
Expand Down Expand Up @@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging {
}
}

args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
// update spark config from args
args.toSparkConf(Option(sparkConf))
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

Expand Down Expand Up @@ -1336,6 +1353,23 @@ private[spark] object SparkSubmitUtils {
}
}

private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[SparkSubmitOperation], loader)
.asScala
.filter(_.supports(master))

serviceLoaders.size match {
case x if x > 1 =>
throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
s"clients registered for master url ${master}.")
case 1 => serviceLoaders.headOption.get
case _ =>
throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
s"clients found for master url: '$master'")
}
}
}

/**
Expand All @@ -1348,3 +1382,12 @@ private case class OptionAssigner(
deployMode: Int,
clOption: String = null,
confKey: String = null)

private[spark] trait SparkSubmitOperation {

def kill(submissionId: String, conf: SparkConf): Unit

def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit

def supports(master: String): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.io.Source
import scala.util.Try

import org.apache.spark.{SparkException, SparkUserAppException}
import org.apache.spark.{SparkConf, SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkSubmitAction._
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
Expand Down Expand Up @@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
}

private def validateKillArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
error("Killing submissions is only supported in standalone or Mesos mode!")
}
if (submissionToKill == null) {
error("Please specify a submission to kill.")
}
}

private def validateStatusRequestArguments(): Unit = {
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
error(
"Requesting submission statuses is only supported in standalone or Mesos mode!")
}
if (submissionToRequestStatusFor == null) {
error("Please specify a submission to request status for.")
}
Expand Down Expand Up @@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark standalone or Mesos with cluster deploy mode only:
| --supervise If given, restarts the driver on failure.
|
| Spark standalone, Mesos or K8s with cluster deploy mode only:
| --kill SUBMISSION_ID If given, kills the driver specified.
| --status SUBMISSION_ID If given, requests the status of the driver specified.
|
Expand Down Expand Up @@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S

private def error(msg: String): Unit = throw new SparkException(msg)

private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = {
// either use an existing config or create a new empty one
sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) {
case (conf, (k, v)) => conf.set(k, v)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ import org.apache.spark.util.Utils
private[spark] class RestSubmissionClient(master: String) extends Logging {
import RestSubmissionClient._

private val supportedMasterPrefixes = Seq("spark://", "mesos://")

private val masters: Array[String] = if (master.startsWith("spark://")) {
Utils.parseStandaloneMasterUrls(master)
} else {
Expand Down Expand Up @@ -409,6 +407,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {

private[spark] object RestSubmissionClient {

val supportedMasterPrefixes = Seq("spark://", "mesos://")

// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
// on the remote machine (SPARK-12345) (SPARK-25934)
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
Expand All @@ -424,6 +424,10 @@ private[spark] object RestSubmissionClient {
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}
}

private[spark] def supportsRestClient(master: String): Boolean = {
supportedMasterPrefixes.exists(master.startsWith)
}
}

private[spark] class RestSubmissionClientApp extends SparkApplication {
Expand Down Expand Up @@ -456,5 +460,4 @@ private[spark] class RestSubmissionClientApp extends SparkApplication {
val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
run(appResource, mainClass, appArgs, conf, env)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ import org.apache.spark.SparkException
* Contains basic command line parsing functionality and methods to parse some common Spark CLI
* options.
*/
private[spark] trait CommandLineUtils {
private[spark] trait CommandLineUtils extends CommandLineLoggingUtils {

def main(args: Array[String]): Unit
}

private[spark] trait CommandLineLoggingUtils {
// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)

Expand All @@ -41,6 +45,4 @@ private[spark] trait CommandLineUtils {
printMessage("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}

def main(args: Array[String]): Unit
}
17 changes: 17 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,23 @@ class SparkSubmitSuite

conf.get(nonDelimSpaceFromFile._1) should be ("blah")
}

test("get a Spark configuration from arguments") {
val testConf = "spark.test.hello" -> "world"
val masterConf = "spark.master" -> "yarn"
val clArgs = Seq(
"--conf", s"${testConf._1}=${testConf._2}",
"--conf", s"${masterConf._1}=${masterConf._2}",
"--class", "Foo",
"app.jar")
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
Seq(
testConf,
masterConf
).foreach { case (k, v) =>
conf.get(k) should be (v)
}
}
}

object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
Expand Down
Loading

0 comments on commit 05168e7

Please sign in to comment.