Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20060][Deploy][Kerberos]Support Standalone visiting secured HDFS #17387

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
org.apache.spark.deploy.security.HadoopFSCredentialProvider
org.apache.spark.deploy.security.HBaseCredentialProvider
org.apache.spark.deploy.security.HiveCredentialProvider
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),
"spark.yarn.access.hadoopFileSystems" -> Seq(
"spark.security.access.hadoopFileSystems" -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2"))
)

Expand Down
59 changes: 55 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

import com.google.common.collect.MapMaker
import org.apache.commons.lang3.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.io._
import org.apache.hadoop.mapred.{Utils => _, _}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialRenewer}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -215,6 +217,8 @@ class SparkContext(config: SparkConf) extends Logging {
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _credentialRenewer: Option[CredentialRenewer] = None


/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -497,6 +501,13 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

// we need to prepare credentials for executors both in conf and hdfs before they launch.
// TODO: Separate credentials files by appID, and clean all files at the end of the application
if (_conf.contains(PRINCIPAL.key)) {
_credentialRenewer = createCredentialRenewer()
_credentialRenewer.foreach(_ => SparkHadoopUtil.get.startCredentialUpdater(_conf))
}

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
Expand Down Expand Up @@ -593,6 +604,37 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

private def createCredentialRenewer(): Option[CredentialRenewer] = {
master match {
case "yarn" => None
case _ =>
val appStagingBaseDir = _conf.get(STAGING_DIR).map { new Path(_) }
.getOrElse(FileSystem.get(_hadoopConfiguration).getHomeDirectory())
val stagingDirPath =
appStagingBaseDir +
Path.SEPARATOR + ".sparkStaging" + Path.SEPARATOR + UUID.randomUUID().toString
val credentialsFile = "credentials-" + UUID.randomUUID().toString
_conf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
val credentials = new Credentials
val ccm = new ConfigurableCredentialManager(_conf, _hadoopConfiguration)
ccm.obtainCredentials(_hadoopConfiguration, credentials)

UserGroupInformation.getCurrentUser.addCredentials(credentials)

val tokenList = ArrayBuffer[String]()
credentials.getAllTokens.asScala.foreach { token =>
tokenList += (token.encodeToUrlString())
}
if (tokenList.nonEmpty) {
_conf.set(CREDENTIALS_ENTITY, tokenList)
}
val credentialRenewer = ccm.credentialRenewer()
credentialRenewer.scheduleLoginFromKeytab()
Some(credentialRenewer)
}
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
Expand Down Expand Up @@ -1938,6 +1980,14 @@ class SparkContext(config: SparkConf) extends Logging {
}
SparkEnv.set(null)
}

Utils.tryLogNonFatalError {
_credentialRenewer.foreach { cr =>
cr.stop()
SparkHadoopUtil.get.stopCredentialUpdater()
}
}

// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
SparkContext.clearActiveContext()
Expand Down Expand Up @@ -2664,6 +2714,7 @@ object SparkContext extends Logging {
}
}


/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
Expand Down
103 changes: 92 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.deploy

import java.io.IOException
import java.lang.reflect.UndeclaredThrowableException
import java.nio.charset.StandardCharsets.UTF_8
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
Expand All @@ -28,13 +30,15 @@ import scala.util.control.NonFatal
import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand All @@ -48,6 +52,8 @@ class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

private var credentialUpdater: CredentialUpdater = _

/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
* (distributed to child threads), used for authenticating HDFS and YARN calls.
Expand All @@ -61,9 +67,63 @@ class SparkHadoopUtil extends Logging {
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
tryDoAsUserOrExit(ugi, func)
}

/**
* Create a proxy user ugi with username of the effective user and the ugi of the real user.
* Transfer the all credentials to the proxy user ugi. Runs the given function with the proxy
* user ugi.
* @param user
* @param func
*/
def runAsProxyUser(user: String)(func: () => Unit) {
require(Some(user).nonEmpty, "running as proxy user `null` is not allowed")
val realUser = UserGroupInformation.getCurrentUser
logInfo(s"User being impersonated is: ${realUser.getShortUserName}")
val proxyUser = UserGroupInformation.createProxyUser(user, realUser)
proxyUser.addCredentials(realUser.getCredentials)
tryDoAsUserOrExit(proxyUser, func)
}

/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
private[spark] def doAsRealUser[T](fn: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser()
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)

// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
realUser.doAs(new PrivilegedExceptionAction[T]() {
override def run(): T = fn
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
}
}

private[this] def tryDoAsUserOrExit(user: UserGroupInformation, func: () => Unit) {
try {
user.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
System.err.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
System.exit(1)
} else {
throw e
}
}
}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
Expand All @@ -72,7 +132,6 @@ class SparkHadoopUtil extends Logging {
}
}


/**
* Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop
* configuration.
Expand Down Expand Up @@ -122,17 +181,30 @@ class SparkHadoopUtil extends Logging {
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
* cluster.
*/
def addCredentials(conf: JobConf) {}
def addCredentials(conf: JobConf): Unit = {
val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

def isYarnMode(): Boolean = { false }

def getCurrentUserCredentials(): Credentials = { null }
def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

def addCurrentUserCredentials(creds: Credentials) {}
def addCurrentUserCredentials(creds: Credentials): Unit = {
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

def addSecretKeyToUserCredentials(key: String, secret: String) {}
def addSecretKeyToUserCredentials(key: String, secret: String): Unit = {
val creds = new Credentials()
creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
addCurrentUserCredentials(creds)
}

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = {
val credentials = getCurrentUserCredentials()
if (credentials != null) credentials.getSecretKey(new Text(key)) else null }

def loginUserFromKeytab(principalName: String, keytabFilename: String) {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
Expand Down Expand Up @@ -290,12 +362,21 @@ class SparkHadoopUtil extends Logging {
* Start a thread to periodically update the current user's credentials with new credentials so
* that access to secured service does not fail.
*/
private[spark] def startCredentialUpdater(conf: SparkConf) {}
private[spark] def startCredentialUpdater(sparkConf: SparkConf): Unit = {
credentialUpdater =
new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
credentialUpdater.start()
}

/**
* Stop the thread that does the credential updates.
*/
private[spark] def stopCredentialUpdater() {}
private[spark] def stopCredentialUpdater(): Unit = {
if (credentialUpdater != null) {
credentialUpdater.stop()
credentialUpdater = null
}
}

/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
Expand Down
55 changes: 17 additions & 38 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy
import java.io.{File, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.PrivilegedExceptionAction
import java.text.ParseException

import scala.annotation.tailrec
Expand All @@ -45,8 +44,9 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
import org.apache.spark.util.{ChildFirstURLClassLoader, CommandLineUtils, MutableURLClassLoader, Utils}

/**
* Whether to submit, kill, or request the status of an application.
Expand Down Expand Up @@ -152,27 +152,8 @@ object SparkSubmit extends CommandLineUtils {

def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
SparkHadoopUtil.get.runAsProxyUser(args.proxyUser) {
() => runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
Expand Down Expand Up @@ -547,21 +528,19 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
throw new SparkException(s"Keytab file: ${args.keytab} does not exist")
} else {
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put(KEYTAB.key, args.keytab)
sysProps.put(PRINCIPAL.key, args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

Expand Down
Loading