Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26194][k8s] Auto generate auth secret for k8s apps. #23174

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,25 +348,36 @@ private[spark] class SecurityManager(
*/
def initializeAuth(): Unit = {
import SparkMasterRegex._
val k8sRegex = "k8s.*".r

if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
return
}

// TODO: this really should be abstracted somewhere else.
val master = sparkConf.get(SparkLauncher.SPARK_MASTER, "")
master match {
val storeInUgi = master match {
case "yarn" | "local" | LOCAL_N_REGEX(_) | LOCAL_N_FAILURES_REGEX(_, _) =>
// Secret generation allowed here
true

case k8sRegex() =>
// Don't propagate the secret through the user's credentials in kubernetes. That conflicts
// with the way k8s handles propagation of delegation tokens.
false

case _ =>
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
return
}

secretKey = Utils.createSecret(sparkConf)
val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
UserGroupInformation.getCurrentUser().addCredentials(creds)

if (storeInUgi) {
val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretKey.getBytes(UTF_8))
UserGroupInformation.getCurrentUser().addCredentials(creds)
}
}

// Default SecurityManager only has a single secret key, so ignore appId.
Expand Down
57 changes: 36 additions & 21 deletions core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,23 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(keyFromEnv === new SecurityManager(conf2).getSecretKey())
}

test("secret key generation") {
Seq(
("yarn", true),
("local", true),
("local[*]", true),
("local[1, 2]", true),
("local-cluster[2, 1, 1024]", false),
("invalid", false)
).foreach { case (master, shouldGenerateSecret) =>
// How is the secret expected to be generated and stored.
object SecretTestType extends Enumeration {
val MANUAL, AUTO, UGI = Value
}

import SecretTestType._

Seq(
("yarn", UGI),
("local", UGI),
("local[*]", UGI),
("local[1, 2]", UGI),
("k8s://127.0.0.1", AUTO),
("local-cluster[2, 1, 1024]", MANUAL),
("invalid", MANUAL)
).foreach { case (master, secretType) =>
test(s"secret key generation: master '$master'") {
val conf = new SparkConf()
.set(NETWORK_AUTH_ENABLED, true)
.set(SparkLauncher.SPARK_MASTER, master)
Expand All @@ -412,19 +420,26 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
if (shouldGenerateSecret) {
mgr.initializeAuth()
val creds = UserGroupInformation.getCurrentUser().getCredentials()
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
assert(secret != null)
assert(new String(secret, UTF_8) === mgr.getSecretKey())
} else {
intercept[IllegalArgumentException] {
secretType match {
case UGI =>
mgr.initializeAuth()
val creds = UserGroupInformation.getCurrentUser().getCredentials()
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
assert(secret != null)
assert(new String(secret, UTF_8) === mgr.getSecretKey())

case AUTO =>
mgr.initializeAuth()
}
intercept[IllegalArgumentException] {
mgr.getSecretKey()
}
val creds = UserGroupInformation.getCurrentUser().getCredentials()
assert(creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) === null)

case MANUAL =>
intercept[IllegalArgumentException] {
mgr.initializeAuth()
}
intercept[IllegalArgumentException] {
mgr.getSecretKey()
}
}
}
}
Expand Down
34 changes: 21 additions & 13 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,29 @@ not documented, Spark does not support.
Spark currently supports authentication for RPC channels using a shared secret. Authentication can
be turned on by setting the `spark.authenticate` configuration parameter.

The exact mechanism used to generate and distribute the shared secret is deployment-specific.
The exact mechanism used to generate and distribute the shared secret is deployment-specific. Unless
specified below, the secret must be defined by setting the `spark.authenticate.secret` config
option. The same secret is shared by all Spark applications and daemons in that case, which limits
the security of these deployments, especially on multi-tenant clusters.

For Spark on [YARN](running-on-yarn.html) and local deployments, Spark will automatically handle
generating and distributing the shared secret. Each application will use a unique shared secret. In
The REST Submission Server and the MesosClusterDispatcher do not support authentication. You should
ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
respectively by default) are restricted to hosts that are trusted to submit jobs.

### YARN

For Spark on [YARN](running-on-yarn.html), Spark will automatically handle generating and
distributing the shared secret. Each application will use a unique shared secret. In
the case of YARN, this feature relies on YARN RPC encryption being enabled for the distribution of
secrets to be secure.

For other resource managers, `spark.authenticate.secret` must be configured on each of the nodes.
This secret will be shared by all the daemons and applications, so this deployment configuration is
not as secure as the above, especially when considering multi-tenant clusters. In this
configuration, a user with the secret can effectively impersonate any other user.
### Kubernetes

The Rest Submission Server and the MesosClusterDispatcher do not support authentication. You should
ensure that all network access to the REST API & MesosClusterDispatcher (port 6066 and 7077
respectively by default) are restricted to hosts that are trusted to submit jobs.
On Kubernetes, Spark will also automatically generate an authentication secret unique to each
application. The secret is propagated to executor pods using environment variables. This means
that any user that can list pods in the namespace where the Spark application is running can
also see their authentication secret. Access control rules should be properly set up by the
Kubernetes admin to ensure that Spark authentication is secure.

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
Expand Down Expand Up @@ -738,10 +746,10 @@ tokens for supported will be created.
## Secure Interaction with Kubernetes

When talking to Hadoop-based services behind Kerberos, it was noted that Spark needs to obtain delegation tokens
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:
so that non-local processes can authenticate. These delegation tokens in Kubernetes are stored in Secrets that are
shared by the Driver and its Executors. As such, there are three ways of submitting a Kerberos job:

In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
In all cases you must define the environment variable: `HADOOP_CONF_DIR` or
`spark.kubernetes.hadoop.configMapName.`

It also important to note that the KDC needs to be visible from inside the containers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkException
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand All @@ -29,11 +29,12 @@ import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils

private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutorConf)
private[spark] class BasicExecutorFeatureStep(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager)
extends KubernetesFeatureConfigStep {

// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH)
private val executorContainerImage = kubernetesConf
.get(EXECUTOR_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the executor container image"))
Expand Down Expand Up @@ -87,44 +88,61 @@ private[spark] class BasicExecutorFeatureStep(kubernetesConf: KubernetesExecutor
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCoresRequest)
.build()
val executorExtraClasspathEnv = executorExtraClasspath.map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
val executorExtraJavaOptionsEnv = kubernetesConf
.get(EXECUTOR_JAVA_OPTIONS)
.map { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
val delimitedOpts = Utils.splitCommandString(subsOpts)
delimitedOpts.zipWithIndex.map {
case (opt, index) =>
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()

val executorEnv: Seq[EnvVar] = {
(Seq(
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId)
) ++ kubernetesConf.environment).map { case (k, v) =>
new EnvVarBuilder()
.withName(k)
.withValue(v)
.build()
}
}.getOrElse(Seq.empty[EnvVar])
val executorEnv = (Seq(
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores.toString),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, kubernetesConf.appId),
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
(ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL),
(ENV_EXECUTOR_ID, kubernetesConf.executorId)) ++
kubernetesConf.environment)
.map(env => new EnvVarBuilder()
.withName(env._1)
.withValue(env._2)
.build()
) ++ Seq(
new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
} ++ {
Seq(new EnvVarBuilder()
.withName(ENV_EXECUTOR_POD_IP)
.withValueFrom(new EnvVarSourceBuilder()
.withNewFieldRef("v1", "status.podIP")
.build())
.build())
.build()
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
} ++ {
Option(secMgr.getSecretKey()).map { authSecret =>
new EnvVarBuilder()
.withName(SecurityManager.ENV_AUTH_SECRET)
.withValue(authSecret)
Copy link
Contributor

@mccheah mccheah Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought about this a bit more and realized that this is more insecure than I originally read it to be.

If the secret is put directly in the environment variable field itself, then anyone who has permission to get the pod metadata from the Kubernetes API server can now read the secret generated by this application. In practice permissioning on pod specs is often far looser than permissioning on Kubernetes secret objects. (Edit: For example a system administrator who performs maintenance and debugs issues should be able to read the pod spec object but often can't access sensitive information in Kubernetes secrets in the same namespace.) In this solution the administrator has to restrict access to pod specs to only the user.

I think at the very least we want this to be configured via creating a Kubernetes secret object, then loading the environment variable to point to the secret object.

In the meantime I'm going to push the PR that allows secrets to be specified as file paths directly. I will also file a Spark ticket to avoid putting the environment variable directly in the pod spec object itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed https://issues.apache.org/jira/browse/SPARK-26301 to suggest the alternative scheme. Unlike SPARK-26139 this would change the functionality that was merged here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the secret is put directly in the environment variable field itself, then anyone who has permission to get the pod metadata from the Kubernetes API server can now read the secret generated by this application.

Yes, and it's extremely annoying that k8s allows anybody with access to the pods to read env variables, instead of just the pod owner. In fact, it doesn't even seem to have the concept of who owns the pod.

Anyway, this isn't different from someone else being able to read secrets in the same namespace as the pod.

As I said before, it all depends on how you configure your cluster for security, and in k8s there seems to be a lot of different options.

Copy link
Contributor

@mccheah mccheah Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, this isn't different from someone else being able to read secrets in the same namespace as the pod.

It isn't in theory, but in practice my understanding is that secrets are often permissioned more strictly than pod objects in the cluster. We should be optimizing for the more common use case, which will work out of the box for more users and also is more secure in the context of more common configurations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the more common use case,

Which is?

There's a lot to think about when you give permissions like "users can view, create and delete pods". If you do that, for example, you can delete other people's pods. That is also considered a security issue, since you can DoS other users.

Anyway, my point is that we should give people the choice of how they deploy things, and set up security according to their own constraints. This was just one way of doing it, and was not meant to be the only way.

.build()
}
} ++ {
kubernetesConf.get(EXECUTOR_CLASS_PATH).map { cp =>
new EnvVarBuilder()
.withName(ENV_CLASSPATH)
.withValue(cp)
.build()
}
} ++ {
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}

val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)

(userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
new EnvVarBuilder()
.withName(s"$ENV_JAVA_OPT_PREFIX$index")
.withValue(opt)
.build()
}
}

val requiredPorts = Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
Expand All @@ -31,6 +31,7 @@ import org.apache.spark.util.{Clock, Utils}

private[spark] class ExecutorPodsAllocator(
conf: SparkConf,
secMgr: SecurityManager,
executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
Expand Down Expand Up @@ -133,7 +134,7 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf,
sc.env.securityManager,
KubernetesExecutorBuilder(kubernetesClient, sc.conf),
kubernetesClient,
snapshotsStore,
Expand All @@ -110,7 +111,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit

new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc.env.rpcEnv,
sc,
kubernetesClient,
requestExecutorsService,
snapshotsStore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService
import io.fabric8.kubernetes.client.KubernetesClient
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
Expand All @@ -30,15 +31,15 @@ import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class KubernetesClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
rpcEnv: RpcEnv,
sc: SparkContext,
kubernetesClient: KubernetesClient,
requestExecutorsService: ExecutorService,
snapshotsStore: ExecutorPodsSnapshotsStore,
podAllocator: ExecutorPodsAllocator,
lifecycleEventHandler: ExecutorPodsLifecycleManager,
watchEvents: ExecutorPodsWatchSnapshotSource,
pollEvents: ExecutorPodsPollingSnapshotSource)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
requestExecutorsService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import java.io.File

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesExecutorBuilder(
provideBasicStep: (KubernetesExecutorConf => BasicExecutorFeatureStep) =
new BasicExecutorFeatureStep(_),
provideBasicStep: (KubernetesExecutorConf, SecurityManager) => BasicExecutorFeatureStep =
new BasicExecutorFeatureStep(_, _),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
Expand All @@ -44,13 +44,16 @@ private[spark] class KubernetesExecutorBuilder(
new HadoopSparkUserExecutorFeatureStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {

def buildFromFeatures(kubernetesConf: KubernetesExecutorConf): SparkPod = {
def buildFromFeatures(
kubernetesConf: KubernetesExecutorConf,
secMgr: SecurityManager): SparkPod = {
val sparkConf = kubernetesConf.sparkConf
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_NAME)
val maybeDTSecretName = sparkConf.getOption(KERBEROS_DT_SECRET_NAME)
val maybeDTDataItem = sparkConf.getOption(KERBEROS_DT_SECRET_KEY)

val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf))
val baseFeatures = Seq(provideBasicStep(kubernetesConf, secMgr),
provideLocalDirsStep(kubernetesConf))
val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
Expand Down
Loading