-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22646] [Submission] Spark on Kubernetes - basic submission client #19717
Conversation
37c7ad6
to
6d50369
Compare
Test build #83692 has finished for PR 19717 at commit
|
Test build #83694 has finished for PR 19717 at commit
|
@vanzin @jiangxb1987 @mridulm PTAL. Thanks! |
6d50369
to
f38144b
Compare
Test build #84120 has finished for PR 19717 at commit
|
f38144b
to
60234a2
Compare
Test build #84121 has finished for PR 19717 at commit
|
+CC @srowen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been a while since I looked at spark-submit and related infra. I added a few comments.
I will defer to others more involved with the code for review though.
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with Logging { | |||
} | |||
} | |||
|
|||
if (isKubernetesCluster) { | |||
childMainClass = "org.apache.spark.deploy.k8s.submit.Client" | |||
childArgs ++= Array("--primary-java-resource", args.primaryResource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle primary resource == SparkLauncher.NO_RESOURCE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will rebase this PR once #19468 gets merged.
.withName(s"$ENV_JAVA_OPT_PREFIX$index") | ||
.withValue(option) | ||
.build() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As with #19468 I am not sure how this is being used in driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used in the driver Docker file included in this PR. See https://github.com/apache-spark-on-k8s/spark/blob/60234a29846955b8a6e8cb6fbb1dc35da3c3b4f2/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks ! Not sure how I missed this one when I searched ... something messed up in Intellij
|
||
require(mainAppResource.isDefined, | ||
"Main app resource must be defined by either --primary-py-file or --primary-java-resource.") | ||
require(mainClass.isDefined, "Main class must be specified via --main-class") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mainAppResource need not be valid here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For case of spark examples, spark thrift server, etc - there is no main app resource - the bits are bundled as part of spark itself.
You can take a look at how yarn or mesos handles this case : I would assume something similar should suffice (look for usage of SparkLauncher.NO_RESOURCE)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, removed the check.
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved" + | ||
s" URL is $resolvedURL") | ||
resolvedURL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it required that this should be a https binding ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Communication with the Kubernetes API server (aka master) from outside the cluster is secured by default, so we expect https
is always used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see http related check above.
If default is expected to be https
, this should be fine.
val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( | ||
submissionSparkConf, | ||
KUBERNETES_DRIVER_LABEL_PREFIX) | ||
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK_ROLE_LABEL
also ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, done.
closeWatch() | ||
|
||
case Action.ERROR => | ||
closeWatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for hasCompleted
? Are all error's non-recoverable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasCompleted
is checked below. Refactored the cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is checked only for case of action != DELETED
and action != ERROR
right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because in both DELETE
and ERROR
cases, the application is considered being terminated. In other cases, we need to check the phase of the driver pod to determine if the application terminated.
.endEnv() | ||
.addNewEnv() | ||
.withName(ENV_DRIVER_ARGS) | ||
.withValue(appArgs.mkString(" ")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this handle escaping, etc ?
If user specified arguments which have whitespace within, this will break ?
Example: appArgs = Array("a 1", "a\t2")
and so on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Added \"\"
around each argument.
"Driver client key file provided at %s does not exist or is not a file.") | ||
val clientCertDataBase64 = safeFileConfToBase64( | ||
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", | ||
"Driver client cert file provided at %s does not exist or is not a file.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to read up on how k8s does security before I can comment more here ... would be great if @vanzin and @tgravescs took a look.
childArgs ++= Array("--main-class", args.mainClass) | ||
if (args.childArgs != null) { | ||
args.childArgs.foreach { arg => | ||
childArgs += "--arg" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not childArgs += ("--arg", arg)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
val KUBERNETES_NAMESPACE = | ||
ConfigBuilder("spark.kubernetes.namespace") | ||
.doc("The namespace that will be used for running the driver and executor pods. When using" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for all the concat strings, always place the space into the end of one line, instead of the beginning of the next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should do it in #19468 as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will update #19468.
.stringConf | ||
.createWithDefault("spark") | ||
|
||
val KUBERNETES_ALLOCATION_BATCH_SIZE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config is not used in the current PR, so please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." | ||
|
||
def getK8sMasterUrl(rawMasterString: String): String = { | ||
if (!rawMasterString.startsWith("k8s://")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use require
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sparkConf.getAllWithPrefix(prefix).toMap | ||
} | ||
|
||
def requireSecondIfFirstIsDefined( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
currentDriverSpec = nextStep.configureDriver(currentDriverSpec) | ||
} | ||
|
||
val resolvedDriverJavaOpts = currentDriverSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this resolvedDriverJavaOpts
or resolvedDriverOpts
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolvedDriverJavaOpts
.
* number. | ||
*/ | ||
private[k8s] class LoggingPodStatusWatcherImpl( | ||
appId: String, maybeLoggingInterval: Option[Long]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: each param for a single line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
" Spark bookkeeping operations.") | ||
|
||
val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq | ||
.map(env => new EnvVarBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new EnvVarBuilder()
in the next line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
private class OptionSettableSparkConf(sparkConf: SparkConf) { | ||
def setOption(configEntry: String, option: Option[String]): SparkConf = { | ||
option.map( opt => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
option.map { opt =>
sparkConf.set(configEntry, opt)
}.getOrElse(sparkConf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val randomServiceId = clock.getTimeMillis() | ||
val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" | ||
logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is" + | ||
s" too long (must be <= 63 characters). Falling back to use $shorterServiceName" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: must be <= $MAX_SERVICE_NAME_LENGTH characters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Also cc @cloud-fan @ueshin |
I think we'd better to honor newly added |
Test build #84235 has finished for PR 19717 at commit
|
@jerryshao Made |
.createWithDefault(s"spark-executor:$sparkVersion") | ||
|
||
val DOCKER_IMAGE_PULL_POLICY = | ||
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This configuration seems like a set of String options, we should check all the legal options like this SQL conf:
val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
Beside we should add checkValue
for the ConfigEntry
if possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with Logging { | |||
} | |||
} | |||
|
|||
if (isKubernetesCluster) { | |||
childMainClass = "org.apache.spark.deploy.k8s.submit.Client" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the style should be unified after #19631 is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's wait for that and rebase this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." | ||
|
||
def getK8sMasterUrl(rawMasterString: String): String = { | ||
require(rawMasterString.startsWith("k8s://"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add some checks in SparkSubmitArguments
for k8s master to fail fast?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the convention for mesos and yarn? By the time this method is called, it's already verified that the URL starts with k8s
. So this check here just makes sure that the double slashes are also present. I don't see this check in mesos mode and yarn mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I was just wondering if we can fail fast for illegal master url.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we also check the slashes and "k8s://http" sequence up front?
we might want to explore re-using this check/logic by exposing a helper func
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a util method in Utils
and used it in SparkSubmit
to check the k8s master URL and set args.master
to the resolved URL. This method is no longer needed and is removed as the check is redundant.
| Kubernetes only: | ||
| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the | ||
| application must be launched. The namespace must already | ||
| exist in the cluster. (Default: default). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add check for validateKillArguments
and validateStatusRequstArguments
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that docker image is without a default value (spark.kubernetes.*.docker.image
), (yes that discussion is ongoing) I wonder if it makes sense to bubble that up as a --param
for visibility/convenient.
tbh I'm generally against adding --param
to submit because of the potential confusion it can cause, but since we are here and there's a --kubernetes-namespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we eventually decide to not have default docker images, we should make the options --param
ones. I'm not sure if we want to make a call and do that in this PR though. Can we defer this to a later time when we are clearer on how we publish and maintain the images?
.withName(ENV_SUBMIT_EXTRA_CLASSPATH) | ||
.withValue(classPath) | ||
.build() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you add support of this configuration "spark.driver.userClassPathFirst" and "spark.driver.userClassPathFirst"? Sorry I cannot find it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think so. @mccheah to confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't. It can be supported in the future. However, we also expect many of the use cases that normally would be handled by these properties to be manageable by building custom Docker images and letting those define the artifacts that are on the classpath.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
custom docker images ftw !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments from my first round.
def getK8sMasterUrl(rawMasterString: String): String = { | ||
require(rawMasterString.startsWith("k8s://"), | ||
"Master URL should start with k8s:// in Kubernetes mode.") | ||
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rawMasterString.substring("k8s://".length)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
var mainClass: Option[String] = None | ||
val driverArgs = mutable.Buffer.empty[String] | ||
|
||
args.sliding(2, 2).toList.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: foreach
instead of collect
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
def fromCommandLineArgs(args: Array[String]): ClientArguments = { | ||
var mainAppResource: Option[MainAppResource] = None | ||
var mainClass: Option[String] = None | ||
val driverArgs = mutable.Buffer.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use ArrayBuffer
explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
// client arguments that are passed in, created by orchestrator | ||
for (nextStep <- submissionSteps) { | ||
currentDriverSpec = nextStep.configureDriver(currentDriverSpec) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can use foldLeft
here?
I'm not sure which is preferable for other reviewers, though.
val currentDriverSpec =
submissionSteps.foldLeft(KubernetesDriverSpec.initialSpec(submissionSparkConf)) {
(spec, step) => step.configureDriver(spec)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that the current syntax is more readable and intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other reviews have stated that foldLeft
is to be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see, thanks.
case (confKey, confValue) => s"-D$confKey=$confValue" | ||
} ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) | ||
val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { | ||
case (option, index) => new EnvVarBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new EnvVarBuilder()
in the next line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sparkConf, | ||
None, | ||
None)) { | ||
kubernetesClient => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent, or we can put kubernetesClient =>
at the end of the previous line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
" address is managed and set to the driver pod's IP address.") | ||
require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, | ||
s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be" + | ||
" managed via a Kubernetes service.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ditto about concat strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ | ||
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ | ||
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ | ||
${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could you add a line break at the end of the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
WORKDIR /opt/spark/work-dir | ||
|
||
ENTRYPOINT [ "/opt/entrypoint.sh" ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
fi | ||
|
||
# Execute the container CMD under tini for better hygiene | ||
/sbin/tini -s -- "$@" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #84243 has finished for PR 19717 at commit
|
Test build #84268 has finished for PR 19717 at commit
|
@liyinan926, please change PR title to match sub-task: https://issues.apache.org/jira/browse/SPARK-22646 |
350932b
to
2d02e39
Compare
Updated the title and rebased onto the latest master. |
Test build #84306 has finished for PR 19717 at commit
|
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL | ||
* options for different components. | ||
*/ | ||
private[spark] object KubernetesClientFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between this and SparkKubernetesClientFactory
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's duplicated. Removed this class.
require(mainClass.isDefined, "Main class must be specified via --main-class") | ||
|
||
ClientArguments( | ||
mainAppResource.get, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check if mainAppResource
is defined or not before here? Otherwise some exception will be thrown if it is not defined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @mridulm pointed above, mainAppResource
may be empty in cases of spark examples, Thrift server, etc. --primary-java-resource
is set only if mainAppResource != SparkLauncher.NO_RESOURCE
in SparkSubmit
. So it could be empty here. I changed ClientArguments
to take an Option[MainAppResource]
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also added the missing step DependencyResolutionStep
that is essential for supporting local://
dependencies (jars and files).
mountedUserSpecified: Option[String], | ||
valueMountedFromSubmitter: Option[String], | ||
mountedCanonicalLocation: String): Option[String] = { | ||
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can remove {
and corresponding }
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
mountedUserSpecified: Option[String], | ||
valueMountedFromSubmitter: Option[String], | ||
secretName: String): Map[String, String] = { | ||
mountedUserSpecified.map { _ => Map.empty[String, String]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a space before }
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
78e5940
to
563e9f4
Compare
@vanzin Created https://issues.apache.org/jira/browse/SPARK-22743 to track the work on consolidating the common logic for handling driver and executor memory overhead. Addressed other comments in caf2206. Thanks! |
Test build #84672 has finished for PR 19717 at commit
|
sounds like a good to go from @vanzin and a couple of others too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comment, LGTM
@@ -294,6 +298,16 @@ object SparkSubmit extends CommandLineUtils with Logging { | |||
} | |||
} | |||
|
|||
if (clusterManager == KUBERNETES) { | |||
args.master = Utils.checkAndGetK8sMasterUrl(args.master) | |||
// Make sure YARN is included in our build if we're trying to use it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should say KUBERNETES
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) | ||
|
||
// To handle master URLs, e.g., k8s://host:port. | ||
if (!masterWithoutK8sPrefix.contains("://")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startsWith instead of contains?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used to differentiate https://host:port
vs. host:port
. So contains
is the right method here.
} | ||
} catch { | ||
case NonFatal(e) => | ||
kubernetesClient.pods().delete(createdDriverPod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if we have discussed this, should it delete the pod even when the exception is Fatal?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah commented above that we probably don't want to take action in case of OutOfMemoryError
for example. See #19717 (comment).
Test build #84701 has finished for PR 19717 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of nits left.
docs/configuration.md
Outdated
<td> | ||
The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is | ||
memory that accounts for things like VM overheads, interned strings, other native overheads, etc. | ||
This tends to grow with the container size (typically 6-10%). This option is supported in Yarn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
YARN. I'd also simplify:
"This option is currently supported on YARN and Kubernetes."
Same below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1146,6 +1146,27 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { | |||
} | |||
} | |||
|
|||
test("check Kubernetes master URL") { | |||
val k8sMasterURLHttps = Utils.checkAndGetK8sMasterUrl("k8s://https://host:port") | |||
assert(k8sMasterURLHttps == "k8s:https://host:port") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
===
(also in other places)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #84726 has finished for PR 19717 at commit
|
The SparkR tests seem flaky all of a sudden; other PRs are also failing. So I'm just going to ignore that failure. Merging to master. |
What changes were proposed in this pull request? This PR contains documentation on the usage of Kubernetes scheduler in Spark 2.3, and a shell script to make it easier to build docker images required to use the integration. The changes detailed here are covered by #19717 and #19468 which have merged already. How was this patch tested? The script has been in use for releases on our fork. Rest is documentation. cc rxin mateiz (shepherd) k8s-big-data SIG members & contributors: foxish ash211 mccheah liyinan926 erikerlandson ssuchter varunkatta kimoonkim tnachen ifilonenko reviewers: vanzin felixcheung jiangxb1987 mridulm TODO: - [x] Add dockerfiles directory to built distribution. (#20007) - [x] Change references to docker to instead say "container" (#19995) - [x] Update configuration table. - [x] Modify spark.kubernetes.allocation.batch.delay to take time instead of int (#20032) Author: foxish <[email protected]> Closes #19946 from foxish/update-k8s-docs.
…res` when deploying on K8s ### What changes were proposed in this pull request? Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit ### Why are the changes needed? `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](#19717 (comment)) also proves that ### Does this PR introduce _any_ user-facing change? The output of `spark-submit --help` changed ```patch ... - Spark standalone and Kubernetes only: + Spark standalone only: --total-executor-cores NUM Total cores for all executors. ... ``` ### How was this patch tested? Pass GA and review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43536 from pan3793/tec. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…res` when deploying on K8s Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](apache#19717 (comment)) also proves that The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` Pass GA and review. No Closes apache#43536 from pan3793/tec. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…res` when deploying on K8s Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](apache#19717 (comment)) also proves that The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` Pass GA and review. No Closes apache#43536 from pan3793/tec. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…res` when deploying on K8s Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](apache#19717 (comment)) also proves that The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` Pass GA and review. No Closes apache#43536 from pan3793/tec. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…or-cores` when deploying on K8s This is the cherry-pick of #43536 for branch-3.3 ### What changes were proposed in this pull request? Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit ### Why are the changes needed? `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](#19717 (comment)) also proves that ### Does this PR introduce _any_ user-facing change? The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` ### How was this patch tested? Pass GA and review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43548 from pan3793/SPARK-45670-3.3. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…or-cores` when deploying on K8s This is the cherry-pick of #43536 for branch-3.4 ### What changes were proposed in this pull request? Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit ### Why are the changes needed? `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](#19717 (comment)) also proves that ### Does this PR introduce _any_ user-facing change? The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` ### How was this patch tested? Pass GA and review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43549 from pan3793/SPARK-45670-3.4. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…or-cores` when deploying on K8s This is the cherry-pick of #43536 for branch-3.5 ### What changes were proposed in this pull request? Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit ### Why are the changes needed? `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](#19717 (comment)) also proves that ### Does this PR introduce _any_ user-facing change? The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` ### How was this patch tested? Pass GA and review. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43550 from pan3793/SPARK-45670-3.5. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…or-cores` when deploying on K8s This is the cherry-pick of apache#43536 for branch-3.4 ### What changes were proposed in this pull request? Remove Kubernetes from the support list of `--total-executor-cores` in SparkSubmit ### Why are the changes needed? `--total-executor-cores` does not take effect in Spark on K8s, [the comments from original PR](apache#19717 (comment)) also proves that ### Does this PR introduce _any_ user-facing change? The output of `spark-submit --help` changed ```patch ... - Spark standalone, Mesos and Kubernetes only: + Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. ... ``` ### How was this patch tested? Pass GA and review. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43549 from pan3793/SPARK-45670-3.4. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR contains implementation of the basic submission client for the cluster mode of Spark on Kubernetes. It's step 2 from the step-wise plan documented here.
This addition is covered by the SPIP vote which passed on Aug 31.
This PR and #19468 together form a MVP of Spark on Kubernetes that allows users to run Spark applications that use resources locally within the driver and executor containers on Kubernetes 1.6 and up. Some changes on pom and build/test setup are copied over from #19468 to make this PR self contained and testable.
The submission client is mainly responsible for creating the Kubernetes pod that runs the Spark driver. It follows a step-based approach to construct the driver pod, as the code under the
submit.steps
package shows. The steps are orchestrated byDriverConfigurationStepsOrchestrator
.Client
creates the driver pod and waits for the application to complete if it's configured to do so, which is the case by default.This PR also contains Dockerfiles of the driver and executor images. They are included because some of the environment variables set in the code would not make sense without referring to the Dockerfiles.
How was this patch tested?
cc @rxin @felixcheung @mateiz (shepherd)
k8s-big-data SIG members & contributors: @mccheah @foxish @ash211 @ssuchter @varunkatta @kimoonkim @erikerlandson @tnachen @ifilonenko @liyinan926