Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24232][k8s] Add support for secret env vars #21317

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```

To use a secret through an environment variable use the following options to the `spark-submit` command:
```
--conf spark.kubernetes.driver.secretKeyRef.ENV_NAME=name:key
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -602,4 +608,20 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

Copy link
Contributor Author

@skonto skonto May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish I added the link in the properties bellow like in the case of the other type of secrets, is it ok now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @skonto; will merge once tests pass.

<td><code>spark.kubernetes.driver.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -129,6 +130,8 @@ private[spark] object KubernetesConf {
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

Expand All @@ -140,6 +143,7 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs)
}

Expand Down Expand Up @@ -167,8 +171,10 @@ private[spark] object KubernetesConf {
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap

KubernetesConf(
Expand All @@ -178,7 +184,8 @@ private[spark] object KubernetesConf {
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorMountSecrets,
executorEnvSecrets,
executorEnv)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}

private[spark] class EnvSecretsFeatureStep(
kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val addedEnvSecrets = kubernetesConf
.roleSecretEnvNamesToKeyRefs
.map{ case (envName, keyRef) =>
// Keyref parts
val keyRefParts = keyRef.split(":")
require(keyRefParts.size == 2, "SecretKeyRef must be in the form name:key.")
val name = keyRefParts(0)
val key = keyRefParts(1)
new EnvVarBuilder()
.withName(envName)
.withNewValueFrom()
.withNewSecretKeyRef()
.withKey(key)
.withName(name)
.endSecretKeyRef()
.endValueFrom()
.build()
}

val containerWithEnvVars = new ContainerBuilder(pod.container)
.addAllToEnv(addedEnvSecrets.toSeq.asJava)
.build()
SparkPod(pod.pod, containerWithEnvVars)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
Expand All @@ -30,6 +30,9 @@ private[spark] class KubernetesDriverBuilder(
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {
Expand All @@ -41,10 +44,14 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@
package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_),
provideSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {

def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite {
private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
"secret1" -> "/mnt/secrets/secret1",
"secret2" -> "/mnt/secrets/secret2")
private val SECRET_ENV_VARS = Map(
"envName1" -> "name1:key1",
"envName2" -> "name2:key2")
private val CUSTOM_ENVS = Map(
"customEnvKey1" -> "customEnvValue1",
"customEnvKey2" -> "customEnvValue2")
Expand Down Expand Up @@ -103,6 +106,9 @@ class KubernetesConfSuite extends SparkFunSuite {
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX$key", value)
}
CUSTOM_ENVS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
}
Expand All @@ -121,6 +127,7 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
assert(conf.roleEnvs === CUSTOM_ENVS)
}

Expand Down Expand Up @@ -155,6 +162,9 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX$key", value)
}
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
}
Expand All @@ -170,6 +180,6 @@ class KubernetesConfSuite extends SparkFunSuite {
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -138,6 +139,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down Expand Up @@ -124,6 +125,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
Expand All @@ -142,6 +144,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map("qux" -> "quux")))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
Expand Down Expand Up @@ -88,6 +89,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)

val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -124,6 +126,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
Expand Down Expand Up @@ -94,6 +95,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
Expand All @@ -113,6 +115,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
Expand Down Expand Up @@ -141,6 +144,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
val driverService = configurationStep
Expand All @@ -166,6 +170,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver bind address should not be allowed.")
Expand All @@ -189,6 +194,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver host address should not be allowed.")
Expand Down
Loading