-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717
Changes from 13 commits
dcaac45
27c67ff
6d597d0
5b9fa39
5ccadb5
12f2797
c35fe48
faa2849
347ed69
0e8ca01
3a0b8e3
83d0b9c
44c40b1
67bc847
7d2b303
caf2206
2e7810b
cbcd30e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -380,6 +380,10 @@ class SparkContext(config: SparkConf) extends Logging { | |
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") | ||
} | ||
|
||
if (master.startsWith("k8s") && deployMode == "client") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you do this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the check into |
||
throw new SparkException("Client mode is currently not supported for Kubernetes.") | ||
} | ||
|
||
if (_conf.getBoolean("spark.logConf", false)) { | ||
logInfo("Spark configuration:\n" + _conf.toDebugString) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,7 +76,8 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
private val STANDALONE = 2 | ||
private val MESOS = 4 | ||
private val LOCAL = 8 | ||
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | ||
private val KUBERNETES = 16 | ||
private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES | ||
|
||
// Deploy modes | ||
private val CLIENT = 1 | ||
|
@@ -97,6 +98,8 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
"org.apache.spark.deploy.yarn.YarnClusterApplication" | ||
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() | ||
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() | ||
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS = | ||
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication" | ||
|
||
// scalastyle:off println | ||
private[spark] def printVersionAndExit(): Unit = { | ||
|
@@ -257,9 +260,10 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
YARN | ||
case m if m.startsWith("spark") => STANDALONE | ||
case m if m.startsWith("mesos") => MESOS | ||
case m if m.startsWith("k8s") => KUBERNETES | ||
case m if m.startsWith("local") => LOCAL | ||
case _ => | ||
printErrorAndExit("Master must either be yarn or start with spark, mesos, local") | ||
printErrorAndExit("Master must either be yarn or start with spark, mesos, k8s, or local") | ||
-1 | ||
} | ||
|
||
|
@@ -294,6 +298,16 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
} | ||
} | ||
|
||
if (clusterManager == KUBERNETES) { | ||
args.master = Utils.checkAndGetK8sMasterUrl(args.master) | ||
// Make sure YARN is included in our build if we're trying to use it | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should say KUBERNETES There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { | ||
printErrorAndExit( | ||
"Could not load KUBERNETES classes. " + | ||
"This copy of Spark may not have been compiled with KUBERNETES support.") | ||
} | ||
} | ||
|
||
// Fail fast, the following modes are not supported or applicable | ||
(clusterManager, deployMode) match { | ||
case (STANDALONE, CLUSTER) if args.isPython => | ||
|
@@ -302,6 +316,12 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
case (STANDALONE, CLUSTER) if args.isR => | ||
printErrorAndExit("Cluster deploy mode is currently not supported for R " + | ||
"applications on standalone clusters.") | ||
case (KUBERNETES, _) if args.isPython => | ||
printErrorAndExit("Python applications are currently not supported for Kubernetes.") | ||
case (KUBERNETES, _) if args.isR => | ||
printErrorAndExit("R applications are currently not supported for Kubernetes.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Not affect the result, but logically I think it is better: case (KUBERNETES, _) if args.isPython =>
printErrorAndExit("Python applications are currently not supported for Kubernetes.")
case (KUBERNETES, _) if args.isR =>
printErrorAndExit("R applications are currently not supported for Kubernetes.")
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
case (KUBERNETES, CLIENT) => | ||
printErrorAndExit("Client mode is currently not supported for Kubernetes.") | ||
case (LOCAL, CLUSTER) => | ||
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"") | ||
case (_, CLUSTER) if isShell(args.primaryResource) => | ||
|
@@ -322,6 +342,7 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER | ||
val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER | ||
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER | ||
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER | ||
|
||
if (!isMesosCluster && !isStandAloneCluster) { | ||
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files | ||
|
@@ -557,19 +578,19 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"), | ||
|
||
// Other options | ||
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.executor.cores"), | ||
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.executor.memory"), | ||
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, | ||
confKey = "spark.cores.max"), | ||
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, | ||
confKey = "spark.files"), | ||
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"), | ||
OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, confKey = "spark.jars"), | ||
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, | ||
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
confKey = "spark.driver.memory"), | ||
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, | ||
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER, | ||
confKey = "spark.driver.cores"), | ||
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, | ||
confKey = "spark.driver.supervise"), | ||
|
@@ -703,6 +724,19 @@ object SparkSubmit extends CommandLineUtils with Logging { | |
} | ||
} | ||
|
||
if (isKubernetesCluster) { | ||
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS | ||
if (args.primaryResource != SparkLauncher.NO_RESOURCE) { | ||
childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
} | ||
childArgs ++= Array("--main-class", args.mainClass) | ||
if (args.childArgs != null) { | ||
args.childArgs.foreach { arg => | ||
childArgs += ("--arg", arg) | ||
} | ||
} | ||
} | ||
|
||
// Load any properties specified through --conf and the default properties file | ||
for ((k, v) <- args.sparkProperties) { | ||
sparkConf.setIfMissing(k, v) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,10 @@ package object config { | |
.bytesConf(ByteUnit.MiB) | ||
.createWithDefaultString("1g") | ||
|
||
private[spark] val DRIVER_MEMORY_OVERHEAD = ConfigBuilder("spark.driver.memoryOverhead") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should also add doc for this config here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's added under |
||
.bytesConf(ByteUnit.MiB) | ||
.createOptional | ||
|
||
private[spark] val EVENT_LOG_COMPRESS = | ||
ConfigBuilder("spark.eventLog.compress") | ||
.booleanConf | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2744,6 +2744,42 @@ private[spark] object Utils extends Logging { | |
} | ||
} | ||
|
||
/** | ||
* Check the validity of the given Kubernetes master URL and return the resolved URL. Prefix | ||
* "k8s:" is appended to the resolved URL as the prefix is used by KubernetesClusterManager | ||
* in canCreate to determine if the KubernetesClusterManager should be used. | ||
*/ | ||
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { | ||
require(rawMasterURL.startsWith("k8s://"), | ||
"Kubernetes master URL must start with k8s://.") | ||
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we change this String representation to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the changes, URLs like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All of the following should be supported and resolved as follows:
Think we just need to use whatever code that is necessary to get to this state. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are those URIs set in stone? This seems more readable to me:
It also allows parsing using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This decision is to match how Mesos works with Zookeeper, as these Strings are valid IIRC: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, did some changes to make all supported formats work. See 51844cc. |
||
|
||
// To handle master URLs, e.g., k8s://host:port. | ||
if (!masterWithoutK8sPrefix.contains("://")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. startsWith instead of contains? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is used to differentiate |
||
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + | ||
s"URL is $resolvedURL.") | ||
return s"k8s:$resolvedURL" | ||
} | ||
|
||
val masterScheme = new URI(masterWithoutK8sPrefix).getScheme | ||
val resolvedURL = masterScheme.toLowerCase match { | ||
case "https" => | ||
masterWithoutK8sPrefix | ||
case "http" => | ||
logWarning("Kubernetes master URL uses HTTP instead of HTTPS.") | ||
masterWithoutK8sPrefix | ||
case null => | ||
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + | ||
s"URL is $resolvedURL.") | ||
resolvedURL | ||
case _ => | ||
throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme) | ||
} | ||
|
||
return s"k8s:$resolvedURL" | ||
} | ||
} | ||
|
||
private[util] object CallerContext extends Logging { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -388,6 +388,33 @@ class SparkSubmitSuite | |
conf.get("spark.ui.enabled") should be ("false") | ||
} | ||
|
||
test("handles k8s cluster mode") { | ||
val clArgs = Seq( | ||
"--deploy-mode", "cluster", | ||
"--master", "k8s://host:port", | ||
"--executor-memory", "5g", | ||
"--class", "org.SomeClass", | ||
"--driver-memory", "4g", | ||
"--conf", "spark.kubernetes.namespace=spark", | ||
"--conf", "spark.kubernetes.driver.docker.image=bar", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also test the arg "--kubernetes-namespace"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
"/home/thejar.jar", | ||
"arg1") | ||
val appArgs = new SparkSubmitArguments(clArgs) | ||
val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) | ||
|
||
val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap | ||
childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) | ||
childArgsMap.get("--main-class") should be (Some("org.SomeClass")) | ||
childArgsMap.get("--arg") should be (Some("arg1")) | ||
mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS) | ||
classpath should have length (0) | ||
conf.get("spark.master") should be ("k8s:https://host:port") | ||
conf.get("spark.executor.memory") should be ("5g") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
conf.get("spark.driver.memory") should be ("4g") | ||
conf.get("spark.kubernetes.namespace") should be ("spark") | ||
conf.get("spark.kubernetes.driver.docker.image") should be ("bar") | ||
} | ||
|
||
test("handles confs with flag equivalents") { | ||
val clArgs = Seq( | ||
"--deploy-mode", "cluster", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1146,6 +1146,27 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { | |
} | ||
} | ||
|
||
test("check Kubernetes master URL") { | ||
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port") | ||
assert(k8sMasterURLHttps == "k8s:https://host:port") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
val k8sMasterURLHttp = Utils.checkAndGetK8sMasterUrl("k8s://http://host:port") | ||
assert(k8sMasterURLHttp == "k8s:http://host:port") | ||
|
||
val k8sMasterURLWithoutScheme = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443") | ||
assert(k8sMasterURLWithoutScheme == "k8s:https://127.0.0.1:8443") | ||
|
||
val k8sMasterURLWithoutScheme2 = Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1") | ||
assert(k8sMasterURLWithoutScheme2 == "k8s:https://127.0.0.1") | ||
|
||
intercept[IllegalArgumentException] { | ||
Utils.checkAndGetK8sMasterUrl("k8s:https://host:port") | ||
} | ||
|
||
intercept[IllegalArgumentException] { | ||
Utils.checkAndGetK8sMasterUrl("k8s://foo://host:port") | ||
} | ||
} | ||
} | ||
|
||
private class SimpleExtension | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -157,6 +157,15 @@ of the most common options to set are: | |
or in your default properties file. | ||
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.driver.memoryOverhead</code></td> | ||
<td>driverMemory * 0.10, with minimum of 384 </td> | ||
<td> | ||
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is | ||
memory that accounts for things like VM overheads, interned strings, other native overheads, etc. | ||
This tends to grow with the container size (typically 6-10%). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should mention that not all cluster managers support this option, since this is now in the common configuration doc. Same below. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.executor.memory</code></td> | ||
<td>1g</td> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -234,18 +234,11 @@ To use a custom metrics.properties for the application master and executors, upd | |
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). | ||
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.yarn.driver.memoryOverhead</code></td> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should make this configuration backward compatible, user should still be able to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there an example PR for deprecating a config property that I can follow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Look for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, done. |
||
<td>driverMemory * 0.10, with minimum of 384 </td> | ||
<td> | ||
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). | ||
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.yarn.am.memoryOverhead</code></td> | ||
<td>AM memory * 0.10, with minimum of 384 </td> | ||
<td> | ||
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode. | ||
Same as <code>spark.driver.memoryOverhead</code>, but for the YARN Application Master in client mode. | ||
</td> | ||
</tr> | ||
<tr> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,7 +82,7 @@ class SparkSubmitOptionParser { | |
* name of the option, passed to {@link #handle(String, String)}. | ||
* <p> | ||
* Options not listed here nor in the "switch" list below will result in a call to | ||
* {@link $#handleUnknown(String)}. | ||
* {@link #handleUnknown(String)}. | ||
* <p> | ||
* These two arrays are visible for tests. | ||
*/ | ||
|
@@ -114,7 +114,7 @@ class SparkSubmitOptionParser { | |
{ QUEUE }, | ||
{ REPOSITORIES }, | ||
{ STATUS }, | ||
{ TOTAL_EXECUTOR_CORES }, | ||
{ TOTAL_EXECUTOR_CORES } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't change this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted. |
||
}; | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DRIVER_MEMORY_OVERHEAD.key
.Don't you also need one for the executor in k8s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we do need. Combined
spark.yarn.executor.memoryOverhead
andspark.kubernetes.executor.memoryOverhead
intospark.executor.memoryOverhead
.