Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>kubernetes</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,11 @@ private[spark] object SparkConf extends Logging {
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3"))
AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")),
DRIVER_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
EXECUTOR_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))
)

/**
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
Expand Down
48 changes: 41 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
private val STANDALONE = 2
private val MESOS = 4
private val LOCAL = 8
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
private val KUBERNETES = 16
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

// Deploy modes
private val CLIENT = 1
Expand All @@ -97,6 +98,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
"org.apache.spark.deploy.yarn.YarnClusterApplication"
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"

// scalastyle:off println
private[spark] def printVersionAndExit(): Unit = {
Expand Down Expand Up @@ -257,9 +260,10 @@ object SparkSubmit extends CommandLineUtils with Logging {
YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}

Expand Down Expand Up @@ -294,6 +298,16 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
printErrorAndExit(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
}

// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
Expand All @@ -302,6 +316,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand All @@ -322,6 +342,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER

if (!isMesosCluster && !isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
Expand Down Expand Up @@ -557,19 +578,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
confKey = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
confKey = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
confKey = "spark.driver.supervise"),
Expand Down Expand Up @@ -703,6 +724,19 @@ object SparkSubmit extends CommandLineUtils with Logging {
}
}

if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
}
childArgs ++= Array("--main-class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += ("--arg", arg)
}
}
}

// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
outStream.println(
s"""
|Options:
| --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local
| (Default: local[*]).
| --master MASTER_URL spark://host:port, mesos://host:port, yarn,
| k8s://https://host:port, or local (Default: local[*]).
| --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
| on one of the worker machines inside the cluster ("cluster")
| (Default: client).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should also add doc for this config here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val EVENT_LOG_COMPRESS =
ConfigBuilder("spark.eventLog.compress")
.booleanConf
Expand Down Expand Up @@ -80,6 +84,10 @@ package object config {
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")

private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled")
.doc("If true, Spark will attempt to use off-heap memory for certain operations. " +
"If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.")
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2744,6 +2744,42 @@ private[spark] object Utils extends Logging {
}
}

/**
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
require(rawMasterURL.startsWith("k8s://"),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this String representation to URI to make the below check more robust.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

@liyinan926 liyinan926 Dec 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes, URLs like k8s://host:port is no longer valid as the part host:port is parsed as a URI with a host scheme. Instead, k8s:///host:port should be used. I'm not super familiar with URIs, so I'm not sure if this is desirable, or should we stop automatically appending https:// completely. /cc @mccheah @foxish.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of the following should be supported and resolved as follows:

  • k8s://host:port -> https://host:port
  • k8s://https://host:port -> https://host:port
  • k8s://http://host:port -> http://host:port

Think we just need to use whatever code that is necessary to get to this state.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those URIs set in stone? This seems more readable to me:

scala> new URI("k8s:http://foo:1234")
res0: java.net.URI = k8s:http://foo:1234

scala> res0.getScheme
res1: String = k8s

scala> res0.getSchemeSpecificPart
res2: String = http://foo:1234

It also allows parsing using the URI APIs instead of doing string manipulation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This decision is to match how Mesos works with Zookeeper, as these Strings are valid IIRC: mesos://zk://host1:2181,host2:2181,host3:2181/mesos. That's just the precedent we are using, but we can use a different convention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, did some changes to make all supported formats work. See 51844cc.


// To handle master URLs, e.g., k8s://host:port.
if (!masterWithoutK8sPrefix.contains("://")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startsWith instead of contains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to differentiate https://host:port vs. host:port. So contains is the right method here.

val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
return s"k8s:$resolvedURL"
}

val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
val resolvedURL = masterScheme.toLowerCase match {
case "https" =>
masterWithoutK8sPrefix
case "http" =>
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
masterWithoutK8sPrefix
case null =>
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " +
s"URL is $resolvedURL.")
resolvedURL
case _ =>
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme)
}

return s"k8s:$resolvedURL"
}
}

private[util] object CallerContext extends Logging {
Expand Down
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}

test("client mode with a k8s master url") {
intercept[SparkException] {
sc = new SparkContext("k8s://https://host:port", "test", new SparkConf())
}
}

testCancellingTasks("that raise interrupted exception on cancel") {
Thread.sleep(9999999)
}
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,33 @@ class SparkSubmitSuite
conf.get("spark.ui.enabled") should be ("false")
}

test("handles k8s cluster mode") {
val clArgs = Seq(
"--deploy-mode", "cluster",
"--master", "k8s://host:port",
"--executor-memory", "5g",
"--class", "org.SomeClass",
"--driver-memory", "4g",
"--conf", "spark.kubernetes.namespace=spark",
"--conf", "spark.kubernetes.driver.docker.image=bar",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the arg "--kubernetes-namespace"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--kubernetes-namespace has been removed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"/home/thejar.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs)

val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar"))
childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
childArgsMap.get("--arg") should be (Some("arg1"))
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
classpath should have length (0)
conf.get("spark.master") should be ("k8s:https://host:port")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.kubernetes.namespace") should be ("spark")
conf.get("spark.kubernetes.driver.docker.image") should be ("bar")
}

test("handles confs with flag equivalents") {
val clArgs = Seq(
"--deploy-mode", "cluster",
Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,27 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
}
}

test("check Kubernetes master URL") {
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port")
assert(k8sMasterURLHttps === "k8s:https://host:port")

val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port")
assert(k8sMasterURLHttp === "k8s:http://host:port")

val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443")

val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1")

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port")
}

intercept[IllegalArgumentException] {
Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port")
}
}
}

private class SimpleExtension
Expand Down
20 changes: 20 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,33 @@ of the most common options to set are:
or in your default properties file.
</td>
</tr>
<tr>
<td><code>spark.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
This tends to grow with the container size (typically 6-10%). This option is currently supported
on YARN and Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.executor.memory</code></td>
<td>1g</td>
<td>
Amount of memory to use per executor process (e.g. <code>2g</code>, <code>8g</code>).
</td>
</tr>
<tr>
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that
accounts for things like VM overheads, interned strings, other native overheads, etc. This tends
to grow with the executor size (typically 6-10%). This option is currently supported on YARN and
Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
Expand Down
16 changes: 1 addition & 15 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,11 @@ To use a custom metrics.properties for the application master and executors, upd
The number of executors for static allocation. With <code>spark.dynamicAllocation.enabled</code>, the initial set of executors will be at least this large.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>driverMemory * 0.10, with minimum of 384 </td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.10, with minimum of 384 </td>
<td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class SparkSubmitOptionParser {
* name of the option, passed to {@link #handle(String, String)}.
* <p>
* Options not listed here nor in the "switch" list below will result in a call to
* {@link $#handleUnknown(String)}.
* {@link #handleUnknown(String)}.
* <p>
* These two arrays are visible for tests.
*/
Expand Down
Loading