Skip to content

Commit

Permalink
add support for secret env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Stavros Kontopoulos committed May 31, 2018
1 parent 5403268 commit b55d3f0
Show file tree
Hide file tree
Showing 18 changed files with 222 additions and 12 deletions.
22 changes: 22 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ namespace as that of the driver and executor pods. For example, to mount a secre
--conf spark.kubernetes.executor.secrets.spark-secret=/etc/secrets
```

To use a secret through an environment variable use the following options to the `spark-submit` command:
```
--conf spark.kubernetes.driver.secretKeyRef.ENV_NAME=name:key
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down Expand Up @@ -602,4 +608,20 @@ specific to Spark on Kubernetes.
<code>spark.kubernetes.executor.secrets.spark-secret=/etc/secrets</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.secretKeyRef.[EnvName]</code></td>
<td>(none)</td>
<td>
Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key <code> key </code> in the data of the referenced <a href="https://kubernetes.io/docs/concepts/configuration/secret/#using-secrets-as-environment-variables">Kubernetes Secret</a>. For example,
<code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ private[spark] object Config extends Logging {
val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."
val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef."

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets."
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -129,6 +130,8 @@ private[spark] object KubernetesConf {
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)

Expand All @@ -140,6 +143,7 @@ private[spark] object KubernetesConf {
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs)
}

Expand Down Expand Up @@ -167,8 +171,10 @@ private[spark] object KubernetesConf {
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap

KubernetesConf(
Expand All @@ -178,7 +184,8 @@ private[spark] object KubernetesConf {
appId,
executorLabels,
executorAnnotations,
executorSecrets,
executorMountSecrets,
executorEnvSecrets,
executorEnv)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}

private[spark] class EnvSecretsFeatureStep(
kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
extends KubernetesFeatureConfigStep {
override def configurePod(pod: SparkPod): SparkPod = {
val addedEnvSecrets = kubernetesConf
.roleSecretEnvNamesToKeyRefs
.map{ case (envName, keyRef) =>
// Keyref parts
val keyRefParts = keyRef.split(":")
require(keyRefParts.size == 2, "SecretKeyRef must be in the form name:key.")
val name = keyRefParts(0)
val key = keyRefParts(1)
new EnvVarBuilder()
.withName(envName)
.withNewValueFrom()
.withNewSecretKeyRef()
.withKey(key)
.withName(name)
.endSecretKeyRef()
.endValueFrom()
.build()
}

val containerWithEnvVars = new ContainerBuilder(pod.container)
.addAllToEnv(addedEnvSecrets.toSeq.asJava)
.build()
SparkPod(pod.pod, containerWithEnvVars)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep =
Expand All @@ -30,6 +30,9 @@ private[spark] class KubernetesDriverBuilder(
provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {
Expand All @@ -41,10 +44,14 @@ private[spark] class KubernetesDriverBuilder(
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,32 @@
package org.apache.spark.scheduler.cluster.k8s

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_),
provideSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep:
(KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
=> LocalDirsFeatureStep =
new LocalDirsFeatureStep(_)) {

def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) {
baseFeatures ++ Seq(provideSecretsStep(kubernetesConf))
} else baseFeatures

allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) {
allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf))
} else allFeatures

var executorPod = SparkPod.initialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class KubernetesConfSuite extends SparkFunSuite {
private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
"secret1" -> "/mnt/secrets/secret1",
"secret2" -> "/mnt/secrets/secret2")
private val SECRET_ENV_VARS = Map(
"envName1" -> "name1:key1",
"envName2" -> "name2:key2")
private val CUSTOM_ENVS = Map(
"customEnvKey1" -> "customEnvValue1",
"customEnvKey2" -> "customEnvValue2")
Expand Down Expand Up @@ -103,6 +106,9 @@ class KubernetesConfSuite extends SparkFunSuite {
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX$key", value)
}
CUSTOM_ENVS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_DRIVER_ENV_PREFIX$key", value)
}
Expand All @@ -121,6 +127,7 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
assert(conf.roleEnvs === CUSTOM_ENVS)
}

Expand Down Expand Up @@ -155,6 +162,9 @@ class KubernetesConfSuite extends SparkFunSuite {
CUSTOM_ANNOTATIONS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_ANNOTATION_PREFIX$key", value)
}
SECRET_ENV_VARS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX$key", value)
}
SECRET_NAMES_TO_MOUNT_PATHS.foreach { case (key, value) =>
sparkConf.set(s"$KUBERNETES_EXECUTOR_SECRETS_PREFIX$key", value)
}
Expand All @@ -170,6 +180,6 @@ class KubernetesConfSuite extends SparkFunSuite {
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ CUSTOM_LABELS)
assert(conf.roleAnnotations === CUSTOM_ANNOTATIONS)
assert(conf.roleSecretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
assert(conf.roleSecretEnvNamesToKeyRefs === SECRET_ENV_VARS)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
DRIVER_ENVS)

val featureStep = new BasicDriverFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -138,6 +139,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
DRIVER_LABELS,
DRIVER_ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down Expand Up @@ -124,6 +125,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty))
assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63)
}
Expand All @@ -142,6 +144,7 @@ class BasicExecutorFeatureStepSuite
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map("qux" -> "quux")))
val executor = step.configurePod(SparkPod.initialPod())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD)
Expand Down Expand Up @@ -88,6 +89,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)

val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
Expand Down Expand Up @@ -124,6 +126,7 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef
Map.empty,
Map.empty,
Map.empty,
Map.empty,
Map.empty)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf)
val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod())
assert(configurationStep.getAdditionalKubernetesResources().size === 1)
Expand Down Expand Up @@ -94,6 +95,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
DriverServiceFeatureStep.DRIVER_SVC_POSTFIX
Expand All @@ -113,6 +115,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty))
val resolvedService = configurationStep
.getAdditionalKubernetesResources()
Expand Down Expand Up @@ -141,6 +144,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
val driverService = configurationStep
Expand All @@ -166,6 +170,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver bind address should not be allowed.")
Expand All @@ -189,6 +194,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
DRIVER_LABELS,
Map.empty,
Map.empty,
Map.empty,
Map.empty),
clock)
fail("The driver host address should not be allowed.")
Expand Down
Loading

0 comments on commit b55d3f0

Please sign in to comment.