Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f6fdd6a
Spark on Kubernetes - basic scheduler backend
foxish Sep 15, 2017
75e31a9
Adding to modules.py and SparkBuild.scala
foxish Oct 17, 2017
cf82b21
Exclude from unidoc, update travis
foxish Oct 17, 2017
488c535
Address a bunch of style and other comments
foxish Oct 17, 2017
82b79a7
Fix some style concerns
foxish Oct 18, 2017
c052212
Clean up YARN constants, unit test updates
foxish Oct 20, 2017
c565c9f
Couple of more style comments
foxish Oct 20, 2017
2fb596d
Address CR comments.
mccheah Oct 25, 2017
992acbe
Extract initial executor count to utils class
mccheah Oct 25, 2017
b0a5839
Fix scalastyle
mccheah Oct 25, 2017
a4f9797
Fix more scalastyle
mccheah Oct 25, 2017
2b5dcac
Pin down app ID in tests. Fix test style.
mccheah Oct 26, 2017
018f4d8
Address comments.
mccheah Nov 1, 2017
4b32134
Various fixes to the scheduler
mccheah Nov 1, 2017
6cf4ed7
Address comments
mccheah Nov 4, 2017
1f271be
Update fabric8 client version to 3.0.0
foxish Nov 13, 2017
71a971f
Addressed more comments
liyinan926 Nov 13, 2017
0ab9ca7
One more round of comments
liyinan926 Nov 14, 2017
7f14b71
Added a comment regarding how failed executor pods are handled
liyinan926 Nov 15, 2017
7afce3f
Addressed more comments
liyinan926 Nov 21, 2017
b75b413
Fixed Scala style error
liyinan926 Nov 21, 2017
3b587b4
Removed unused parameter in parsePrefixedKeyValuePairs
liyinan926 Nov 22, 2017
cb12fec
Another round of comments
liyinan926 Nov 22, 2017
ae396cf
Addressed latest comments
liyinan926 Nov 27, 2017
f8e3249
Addressed comments around licensing on new dependencies
liyinan926 Nov 27, 2017
a44c29e
Fixed unit tests and made maximum executor lost reason checks configu…
liyinan926 Nov 27, 2017
4bed817
Removed default value for executor Docker image
liyinan926 Nov 27, 2017
c386186
Close the executor pod watcher before deleting the executor pods
liyinan926 Nov 27, 2017
b85cfc4
Addressed more comments
liyinan926 Nov 28, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
8 changes: 8 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ def __hash__(self):
sbt_test_goals=["mesos/test"]
)

kubernetes = Module(
name="kubernetes",
dependencies=[],
source_file_regexes=["resource-managers/kubernetes/core"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove 'core'

build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)

# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2648,6 +2648,13 @@
</modules>
</profile>

<profile>
<id>kubernetes</id>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this new profile is not use in build script, shall we add this so that the new contributions in this PR will be built and tested?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also change the sbt file to make it work using sbt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. PTAL.

<modules>
<module>resource-managers/kubernetes/core</module>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be a multi-module module later? If so, might want to create a parent pom in resource-managers/kubernetes. Otherwise probably should remove core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expected it would be a multi-module (https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes). The other modules being - configuration files for the docker images and integration tests. The docker files are pretty much static configuration files, so, moving that instead to a directory like conf/kubernetes/ might make sense also. I'm not sure where the integration tests will be eventually.

cc @mccheah @ssuchter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on a discussion in last week's meeting with Shane Knapp from RISELab, we want to keep the integration tests as a sub-module here - in the interest of keeping test code together. We should have the additional parent pom to facilitate that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin this isn't a multi-module project in the sense that the Kubernetes cluster manager and spark-submit implementation are split across multiple projects - but rather that there is a module for said cluster manager + spark-submit implementation, and then there are modules for integration testing said code.

@foxish The Dockerfiles feel more like application code rather than static configuration but that might just be a matter of implementation. The structure of the CMD in the Dockerfiles is particular to what spark-submit will expect for example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are integration tests in a separate module? e.g. maven has an integration-test phase which is separate from the usual test phase used for unit tests. And that's assuming that you really don't want to run them during unit tests. Then all code could potentially live in the same module.

in the interest of keeping test code together

That would mean keeping the test code in the same module as the core code, not in a separate module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not absolutely necessary to have integration tests in a specific separate module. However, there are some nice organizational benefits we can get. For example, integration tests in the same module as the core code will need a specific package namespace that is omitted from the test phase and only executed in the integrationTest phase. Having a separate module means that the integration test pom can just make the test phase a no-op and integrationTest runs all tests in the test folder. (I don't know if Maven has a concept of a difference between src/test/scala and src/integrationTest/scala, which would help a lot.)

It's also IMO easier to read the pom.xml of the integration test separately from the pom.xml of the Kubernetes core implementation. FWIW this is what we have in the integration test POM at the moment: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/integration-tests/pom.xml. (The minikube related things are going away with apache-spark-on-k8s#521).

And that's assuming that you really don't want to run them during unit tests.

We definitely don't want to run these during unit tests - they are relatively expensive, require building Docker images, and require Minikube to be pre-installed on the given machine. Having them in at least the separate integration test phase makes these differences clear.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That (keeping them separate) is actually pretty useful for SBT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both yarn and meson don't have a sub-directory called core, are we going to add more modules?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

</modules>
</profile>

<profile>
<id>hive-thriftserver</id>
<modules>
Expand Down
8 changes: 4 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn,
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
Seq("kubernetes", "mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
Expand Down Expand Up @@ -671,9 +671,9 @@ object Unidoc {
publish := {},

unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),

unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Expand Down
102 changes: 102 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>2.2.13</kubernetes.client.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why client version is from previous major version and much older minor version ?
I was looking at https://mvnrepository.com/artifact/io.fabric8/kubernetes-client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason, except that newer versions have not been tested as much.

We recently updated the client on our fork as well. https://github.com/apache-spark-on-k8s/spark/pull/528/files. We're waiting to see that there aren't any regressions and then will update this PR as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish . Could you update this PR including this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun, done. Updated to 3.0.0. which we've been running for a while. cc/ @liyinan926

</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these dependencies need to be listed, please see
http://www.apache.org/dev/licensing-howto.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependency needs to be added to NOTICE, right?

Copy link
Contributor

@liyinan926 liyinan926 Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Listed in f8e3249.

<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use * here instead of multiple exclusions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Required by kubernetes-client but we exclude it -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.k8s

import org.apache.spark.SparkConf

private[spark] object ConfigurationUtils {
def parsePrefixedKeyValuePairs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add comment to explain what does the function do, it not only return the configs, but also ensure no duplicate configs are set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

sparkConf: SparkConf,
prefix: String,
configType: String): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we use this parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not really using it in the context of this PR. Removed this parameter.

val fromPrefix = sparkConf.getAllWithPrefix(prefix)
fromPrefix.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this ever trigger given that sparkConf is basically a map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think this will trigger. Removed the check. @foxish.

s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
fromPrefix.toMap
}

def requireBothOrNeitherDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shall we need the function for this PR? Seems it's just used by ResourceStagingServerSslOptionsProvider which is out of scope here.

opt1: Option[_],
opt2: Option[_],
errMessageWhenFirstIsMissing: String,
errMessageWhenSecondIsMissing: String): Unit = {
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing)
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

def requireSecondIfFirstIsDefined(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we don't need this function here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

opt1: Option[_],
opt2: Option[_],
errMessageWhenSecondIsMissing: String): Unit = {
opt1.foreach { _ =>
require(opt2.isDefined, errMessageWhenSecondIsMissing)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Line length > 100 ? You can split it if yes ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think Scalastyle checks for line lengths in imports. But these imports for the Fabric8 client have typically been quite long. I've been wondering if it's worth just importing the whole package with io.fabric8.kubernetes.client._.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also be split into multiple lines, sorted alphabetically ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be, but I don't quite see that elsewhere in the project? Do we mean something like this:

import io.fabric8.kubernetes.client.{
  Config,
  ConfigBuilder,
  ...
}

or do we mean something more like this:

import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.ConfigBuilder
...

Either case I don't think is necessarily standard in the project, but I have seen a few places where the entire package is imported via the _ token.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following is recommended:

import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient,
  KubernetesClient}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 Is this the convention across the codebase? I don't think most IDEs will preserve that formatting.

import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems new, should this be listed in pom file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and license..

Copy link
Contributor

@liyinan926 liyinan926 Nov 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f8e3249.


import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.util.ThreadUtils

/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
* options for different components.
*/
private[spark] object SparkKubernetesClientFactory {

def createKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like defaultBlah would be a better name than maybeBlah, given its use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create constants like for other config options?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lacks context from the spark-submit implementation that is not in this PR.

We intend to have two different sets of authentication options for the Kubernetes API. The first is the credentials for creating a driver pod and all the Kubernetes resources that the application requires outside of executor pods. The second is a set of credentials that the driver can use to create executor pods. These options will have shared suffixes in the configuration keys but different prefixes.

The reasoning for two sets of credentials is twofold:

  • The driver needs strictly fewer privileges than spark-submit, because the driver only creates + deletes pods but spark-submit needs to make pods and other Kubernetes resources. Two sets of credentials allows the driver to have an appropriately limited scope of API access.
  • Part of the credentials includes TLS certificates for accessing the Kubernetes API over HTTPs. A common environment is to have the Kubernetes API server be accessible from a proxy into the cluster from an outside location, but then the driver will access the API server from inside the cluster. A front door for the API server typically asks for a different certificate than the certificate one would present when accessing the API server internally.

.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
ConfigurationUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we'd better place the space at the end of the previous line instead of the beginning of a new line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand from the response to @vanzin, this is used in both spark-submit and in driver ?
I am assuming clientKeyFile is private key for the submiting user ?

Assuming both are true - how is the private key transmitted and secured ? Are there any security concerns ?
Is it available only at the driver or available/required at executors too ? If yes, any concerns there ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The submission client will have a bootstrap to send these over via a Kubernetes secret volume. This secret material isn't necessarily used by spark-submit itself, but spark-submit provides this secret material through said volume. This should only be read by the driver; we don't mount secrets for Kubernetes credentials into executor pods in ExecutorPodFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying.
I am assuming the key is used for all requests submitted transparently - that is without any explicit use of it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client is configured to always use this value in HTTP requests once the client object is constructed by the builder.

val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
.build()
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something but why is this needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the use of withOption above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the following would be better for this purpose:

  private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
    extends AnyVal {

    def withOption[T]
        (option: Option[T])
        (configurator: ((T, ConfigBuilder) => ConfigBuilder)): ConfigBuilder = {
      option.map { opt =>
        configurator(opt, configBuilder)
      }.getOrElse(configBuilder)
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def withOption[T]
(option: Option[T])
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = {
new OptionConfigurableConfigBuilder(option.map { opt =>
configurator(opt, configBuilder)
}.getOrElse(configBuilder))
}

def build(): Config = configBuilder.build()
}
}
Loading