diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 0000000000000..9ffeb4d500296 --- /dev/null +++ b/core/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1,3 @@ +org.apache.spark.deploy.security.HadoopFSCredentialProvider +org.apache.spark.deploy.security.HBaseCredentialProvider +org.apache.spark.deploy.security.HiveCredentialProvider diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2a2ce0504dbbf..039421c873ea4 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -639,7 +639,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.akka.frameSize", "1.6")), "spark.yarn.jars" -> Seq( AlternateConfig("spark.yarn.jar", "2.0")), - "spark.yarn.access.hadoopFileSystems" -> Seq( + "spark.security.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")) ) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0e36a30c933d0..ea75daebcaadf 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -27,23 +27,25 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReferenc import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.generic.Growable -import scala.collection.mutable.HashMap +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.language.implicitConversions -import scala.reflect.{classTag, ClassTag} +import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal import com.google.common.collect.MapMaker import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} -import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} +import org.apache.hadoop.io._ +import org.apache.hadoop.mapred.{Utils => _, _} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialRenewer} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -215,6 +217,8 @@ class SparkContext(config: SparkConf) extends Logging { private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _credentialRenewer: Option[CredentialRenewer] = None + /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -497,6 +501,13 @@ class SparkContext(config: SparkConf) extends Logging { _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) + // we need to prepare credentials for executors both in conf and hdfs before they launch. + // TODO: Separate credentials files by appID, and clean all files at the end of the application + if (_conf.contains(PRINCIPAL.key)) { + _credentialRenewer = createCredentialRenewer() + _credentialRenewer.foreach(_ => SparkHadoopUtil.get.startCredentialUpdater(_conf)) + } + // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched @@ -593,6 +604,37 @@ class SparkContext(config: SparkConf) extends Logging { } } + private def createCredentialRenewer(): Option[CredentialRenewer] = { + master match { + case "yarn" => None + case _ => + val appStagingBaseDir = _conf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(_hadoopConfiguration).getHomeDirectory()) + val stagingDirPath = + appStagingBaseDir + + Path.SEPARATOR + ".sparkStaging" + Path.SEPARATOR + UUID.randomUUID().toString + val credentialsFile = "credentials-" + UUID.randomUUID().toString + _conf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) + logInfo(s"Credentials file set to: $credentialsFile") + val credentials = new Credentials + val ccm = new ConfigurableCredentialManager(_conf, _hadoopConfiguration) + ccm.obtainCredentials(_hadoopConfiguration, credentials) + + UserGroupInformation.getCurrentUser.addCredentials(credentials) + + val tokenList = ArrayBuffer[String]() + credentials.getAllTokens.asScala.foreach { token => + tokenList += (token.encodeToUrlString()) + } + if (tokenList.nonEmpty) { + _conf.set(CREDENTIALS_ENTITY, tokenList) + } + val credentialRenewer = ccm.credentialRenewer() + credentialRenewer.scheduleLoginFromKeytab() + Some(credentialRenewer) + } + } + /** * Called by the web UI to obtain executor thread dumps. This method may be expensive. * Logs an error and returns None if we failed to obtain a thread dump, which could occur due @@ -1938,6 +1980,14 @@ class SparkContext(config: SparkConf) extends Logging { } SparkEnv.set(null) } + + Utils.tryLogNonFatalError { + _credentialRenewer.foreach { cr => + cr.stop() + SparkHadoopUtil.get.stopCredentialUpdater() + } + } + // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() @@ -2664,6 +2714,7 @@ object SparkContext extends Logging { } } + /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f475ce87540aa..a959b381e3670 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy import java.io.IOException +import java.lang.reflect.UndeclaredThrowableException +import java.nio.charset.StandardCharsets.UTF_8 import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -28,6 +30,7 @@ import scala.util.control.NonFatal import com.google.common.primitives.Longs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} @@ -35,6 +38,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater} import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -48,6 +52,8 @@ class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration(sparkConf) UserGroupInformation.setConfiguration(conf) + private var credentialUpdater: CredentialUpdater = _ + /** * Runs the given function with a Hadoop UserGroupInformation as a thread local variable * (distributed to child threads), used for authenticating HDFS and YARN calls. @@ -61,9 +67,63 @@ class SparkHadoopUtil extends Logging { logDebug("running as user: " + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) - ugi.doAs(new PrivilegedExceptionAction[Unit] { - def run: Unit = func() - }) + tryDoAsUserOrExit(ugi, func) + } + + /** + * Create a proxy user ugi with username of the effective user and the ugi of the real user. + * Transfer the all credentials to the proxy user ugi. Runs the given function with the proxy + * user ugi. + * @param user + * @param func + */ + def runAsProxyUser(user: String)(func: () => Unit) { + require(Some(user).nonEmpty, "running as proxy user `null` is not allowed") + val realUser = UserGroupInformation.getCurrentUser + logInfo(s"User being impersonated is: ${realUser.getShortUserName}") + val proxyUser = UserGroupInformation.createProxyUser(user, realUser) + proxyUser.addCredentials(realUser.getCredentials) + tryDoAsUserOrExit(proxyUser, func) + } + + /** + * Run some code as the real logged in user (which may differ from the current user, for + * example, when using proxying). + */ + private[spark] def doAsRealUser[T](fn: => T): T = { + val currentUser = UserGroupInformation.getCurrentUser() + val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) + + // For some reason the Scala-generated anonymous class ends up causing an + // UndeclaredThrowableException, even if you annotate the method with @throws. + try { + realUser.doAs(new PrivilegedExceptionAction[T]() { + override def run(): T = fn + }) + } catch { + case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) + } + } + + private[this] def tryDoAsUserOrExit(user: UserGroupInformation, func: () => Unit) { + try { + user.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } catch { + case e: Exception => + // Hadoop's AuthorizationException suppresses the exception's stack trace, which + // makes the message printed to the output by the JVM not very helpful. Instead, + // detect exceptions with empty stack traces here, and treat them differently. + if (e.getStackTrace().length == 0) { + // scalastyle:off println + System.err.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") + // scalastyle:on println + System.exit(1) + } else { + throw e + } + } } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { @@ -72,7 +132,6 @@ class SparkHadoopUtil extends Logging { } } - /** * Appends S3-specific, spark.hadoop.*, and spark.buffer.size configurations to a Hadoop * configuration. @@ -122,17 +181,30 @@ class SparkHadoopUtil extends Logging { * Add any user credentials to the job conf which are necessary for running on a secure Hadoop * cluster. */ - def addCredentials(conf: JobConf) {} + def addCredentials(conf: JobConf): Unit = { + val jobCreds = conf.getCredentials() + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } def isYarnMode(): Boolean = { false } - def getCurrentUserCredentials(): Credentials = { null } + def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } - def addCurrentUserCredentials(creds: Credentials) {} + def addCurrentUserCredentials(creds: Credentials): Unit = { + UserGroupInformation.getCurrentUser().addCredentials(creds) + } - def addSecretKeyToUserCredentials(key: String, secret: String) {} + def addSecretKeyToUserCredentials(key: String, secret: String): Unit = { + val creds = new Credentials() + creds.addSecretKey(new Text(key), secret.getBytes(UTF_8)) + addCurrentUserCredentials(creds) + } - def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } + def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { + val credentials = getCurrentUserCredentials() + if (credentials != null) credentials.getSecretKey(new Text(key)) else null } def loginUserFromKeytab(principalName: String, keytabFilename: String) { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) @@ -290,12 +362,21 @@ class SparkHadoopUtil extends Logging { * Start a thread to periodically update the current user's credentials with new credentials so * that access to secured service does not fail. */ - private[spark] def startCredentialUpdater(conf: SparkConf) {} + private[spark] def startCredentialUpdater(sparkConf: SparkConf): Unit = { + credentialUpdater = + new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() + credentialUpdater.start() + } /** * Stop the thread that does the credential updates. */ - private[spark] def stopCredentialUpdater() {} + private[spark] def stopCredentialUpdater(): Unit = { + if (credentialUpdater != null) { + credentialUpdater.stop() + credentialUpdater = null + } + } /** * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism. diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 1e50eb6635651..25c4328ad6aa8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL -import java.security.PrivilegedExceptionAction import java.text.ParseException import scala.annotation.tailrec @@ -45,8 +44,9 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.util._ +import org.apache.spark.util.{ChildFirstURLClassLoader, CommandLineUtils, MutableURLClassLoader, Utils} /** * Whether to submit, kill, or request the status of an application. @@ -152,27 +152,8 @@ object SparkSubmit extends CommandLineUtils { def doRunMain(): Unit = { if (args.proxyUser != null) { - val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, - UserGroupInformation.getCurrentUser()) - try { - proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { - override def run(): Unit = { - runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) - } - }) - } catch { - case e: Exception => - // Hadoop's AuthorizationException suppresses the exception's stack trace, which - // makes the message printed to the output by the JVM not very helpful. Instead, - // detect exceptions with empty stack traces here, and treat them differently. - if (e.getStackTrace().length == 0) { - // scalastyle:off println - printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") - // scalastyle:on println - exitFn(1) - } else { - throw e - } + SparkHadoopUtil.get.runAsProxyUser(args.proxyUser) { + () => runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) @@ -547,21 +528,19 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { - if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified") - if (!new File(args.keytab).exists()) { - throw new SparkException(s"Keytab file: ${args.keytab} does not exist") - } else { - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) - - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - } + if (args.principal != null) { + require(args.keytab != null, "Keytab must be specified when principal is specified") + if (!new File(args.keytab).exists()) { + throw new SparkException(s"Keytab file: ${args.keytab} does not exist") + } else { + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put(KEYTAB.key, args.keytab) + sysProps.put(PRINCIPAL.key, args.principal) + + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala similarity index 86% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala index 4f4be52a0d691..7402f0d2f0206 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.ServiceLoader @@ -37,21 +37,21 @@ import org.apache.spark.util.Utils * interface and put into resources/META-INF/services to be loaded by ServiceLoader. * * Also each credential provider is controlled by - * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false. + * spark.security.credentials.{service}.enabled, it will not be loaded in if set to false. * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by - * the configuration spark.yarn.security.credentials.hive.enabled. + * the configuration spark.security.credentials.hive.enabled. */ -private[yarn] final class ConfigurableCredentialManager( +private[spark] final class ConfigurableCredentialManager( sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" - private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" + private val providerEnabledConfig = "spark.security.credentials.%s.enabled" // Maintain all the registered credential providers private val credentialProviders = { val providers = ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader).asScala - // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false. + // Filter out credentials in which spark.security.credentials.{service}.enabled is false. providers.filter { p => sparkConf.getOption(providerEnabledConfig.format(p.serviceName)) .orElse { @@ -89,11 +89,11 @@ private[yarn] final class ConfigurableCredentialManager( } /** - * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this + * Create an [[CredentialRenewer]] instance, caller should be responsible to stop this * instance when it is not used. AM will use it to renew credentials periodically. */ - def credentialRenewer(): AMCredentialRenewer = { - new AMCredentialRenewer(sparkConf, hadoopConf, this) + def credentialRenewer(): CredentialRenewer = { + new CredentialRenewer(sparkConf, hadoopConf, this) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialRenewer.scala similarity index 95% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala rename to core/src/main/scala/org/apache/spark/deploy/security/CredentialRenewer.scala index 7e76f402db249..5db87175c0d1f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialRenewer.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{Executors, TimeUnit} @@ -25,8 +25,6 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.util.ThreadUtils @@ -34,12 +32,13 @@ import org.apache.spark.util.ThreadUtils /** * The following methods are primarily meant to make sure long-running apps like Spark * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC + * scheduleLoginFromKeytab method is called on the AM(YARN mode)/Driver(Others) to get + * the new credentials. This method wakes up a thread that logs into the KDC * once 75% of the renewal interval of the original credentials used for the container * has elapsed. It then obtains new credentials and writes them to HDFS in a * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. + * spark.security.credentials.file (so the file(s) would be named c-timestamp1-1, + * c-timestamp2-2 etc. * - each update goes to a new file, with a monotonically increasing suffix), also the * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. @@ -51,7 +50,7 @@ import org.apache.spark.util.ThreadUtils * appeared, it will read the credentials and update the currently running UGI with it. This * process happens again once 80% of the validity of this has expired. */ -private[yarn] class AMCredentialRenewer( +private[spark] class CredentialRenewer( sparkConf: SparkConf, hadoopConf: Configuration, credentialManager: ConfigurableCredentialManager) extends Logging { @@ -62,7 +61,7 @@ private[yarn] class AMCredentialRenewer( Executors.newSingleThreadScheduledExecutor( ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - private val hadoopUtil = YarnSparkHadoopUtil.get + private val hadoopUtil = SparkHadoopUtil.get private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) @@ -76,7 +75,7 @@ private[yarn] class AMCredentialRenewer( * Schedule a login from the keytab and principal set using the --principal and --keytab * arguments to spark-submit. This login happens only when the credentials of the current user * are about to expire. This method reads spark.yarn.principal and spark.yarn.keytab from - * SparkConf to do the login. This method is a no-op in non-YARN mode. + * SparkConf to do the login. * */ private[spark] def scheduleLoginFromKeytab(): Unit = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala rename to core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala index 41b7b5d60b038..62728e6818ca7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialUpdater.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.concurrent.{Executors, TimeUnit} @@ -27,7 +27,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala index 5571df09a2ec9..b56336f9c0bf0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import scala.reflect.runtime.universe import scala.util.control.NonFatal diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala index f65c886db944e..ea7b3d983b5d2 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import scala.collection.JavaConverters._ import scala.util.Try @@ -27,7 +27,6 @@ import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala similarity index 83% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala index 16d8fc32bb42d..bc0009124e22d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala @@ -15,10 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security - -import java.lang.reflect.UndeclaredThrowableException -import java.security.PrivilegedExceptionAction +package org.apache.spark.deploy.security import scala.reflect.runtime.universe import scala.util.control.NonFatal @@ -30,6 +27,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -87,7 +85,7 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider classOf[String], classOf[String]) val getHive = hiveClass.getMethod("get", hiveConfClass) - doAsRealUser { + SparkHadoopUtil.get.doAsRealUser { val hive = getHive.invoke(null, conf) val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal) .asInstanceOf[String] @@ -107,23 +105,4 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider None } - - /** - * Run some code as the real logged in user (which may differ from the current user, for - * example, when using proxying). - */ - private def doAsRealUser[T](fn: => T): T = { - val currentUser = UserGroupInformation.getCurrentUser() - val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser) - - // For some reason the Scala-generated anonymous class ends up causing an - // UndeclaredThrowableException, even if you annotate the method with @throws. - try { - realUser.doAs(new PrivilegedExceptionAction[T]() { - override def run(): T = fn - }) - } catch { - case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e) - } - } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala similarity index 97% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala index 4e3fcce8dbb1d..667960c0f43b5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.{Credentials, UserGroupInformation} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b376ecd301eab..90f32028c0b83 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,22 +19,29 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} import scala.util.control.NonFatal +import scala.util.{Success, Failure} + +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.Token +import org.apache.hadoop.security.{Credentials, UserGroupInformation} -import org.apache.spark._ import org.apache.spark.TaskState.TaskState +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{Utils, ThreadUtils} private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, @@ -184,38 +191,52 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { Utils.initDaemon(log) - SparkHadoopUtil.get.runAsSparkUser { () => - // Debug code - Utils.checkHost(hostname) - - // Bootstrap to fetch the driver's Spark properties. - val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) - val fetcher = RpcEnv.create( - "driverPropsFetcher", - hostname, - port, - executorConf, - new SecurityManager(executorConf), - clientMode = true) - val driver = fetcher.setupEndpointRefByURI(driverUrl) - val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) - val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) - fetcher.shutdown() - - // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf() - for ((key, value) <- props) { - // this is required for SSL in standalone mode - if (SparkConf.isExecutorStartupConf(key)) { - driverConf.setIfMissing(key, value) - } else { - driverConf.set(key, value) - } + // Debug code + Utils.checkHost(hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val port = executorConf.getInt("spark.executor.port", 0) + val fetcher = RpcEnv.create( + "driverPropsFetcher", + hostname, + port, + executorConf, + new SecurityManager(executorConf), + clientMode = true) + val driver = fetcher.setupEndpointRefByURI(driverUrl) + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) + fetcher.shutdown() + + // Create SparkEnv using properties we fetched from the driver. + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) } - if (driverConf.contains("spark.yarn.credentials.file")) { + } + + val creds = new Credentials() + + if (driverConf.contains(CREDENTIALS_ENTITY)) { + val tokenList = driverConf.get(CREDENTIALS_ENTITY) + tokenList.foreach { tokenStr => + val token = new Token[DelegationTokenIdentifier] + token.decodeFromUrlString(tokenStr) + creds.addToken(new Text(UUID.randomUUID().toString), token) + } + } + + SparkHadoopUtil.get.runAsSparkUser { () => + UserGroupInformation.getCurrentUser.addCredentials(creds) + + if (driverConf.contains(CREDENTIALS_FILE_PATH)) { logInfo("Will periodically update credentials from: " + - driverConf.get("spark.yarn.credentials.file")) + driverConf.get(CREDENTIALS_FILE_PATH)) SparkHadoopUtil.get.startCredentialUpdater(driverConf) } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 223c921810378..c77c03e36ca21 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -75,13 +75,23 @@ package object config { private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val SPARK_KEYTAB = ConfigBuilder("spark.keytab") + .doc("Location of user's keytab.") + .fallbackConf(KEYTAB) + private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab") .doc("Location of user's keytab.") - .stringConf.createOptional + .stringConf + .createOptional + + private[spark] val SPARK_PRINCIPAL = ConfigBuilder("spark.principal") + .doc("Name of the Kerberos principal.") + .fallbackConf(PRINCIPAL) private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal") .doc("Name of the Kerberos principal.") - .stringConf.createOptional + .stringConf + .createOptional private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf @@ -264,4 +274,55 @@ package object config { .booleanConf .createWithDefault(false) + + /* Security configuration. */ + + private[spark] val CREDENTIALS_FILE_PATH = ConfigBuilder("spark.security.credentials.file") + .internal() + .stringConf + .createWithDefault(null) + + private[spark] val CREDENTIALS_ENTITY = ConfigBuilder("spark.security.credentials.entities") + .internal() + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val CREDENTIAL_FILE_MAX_COUNT = + ConfigBuilder("spark.security.credentials.file.retention.count") + .intConf + .createWithDefault(5) + + private[spark] val CREDENTIALS_FILE_MAX_RETENTION = + ConfigBuilder("spark.security.credentials.file.retention.days") + .intConf + .createWithDefault(5) + + private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") + .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + + "fs.defaultFS does not need to be listed here.") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val FILESYSTEMS_TO_ACCESS = + ConfigBuilder("spark.security.access.hadoopFileSystems") + .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + + "that hosts fs.defaultFS does not need to be listed here.") + .fallbackConf(NAMENODES_TO_ACCESS) + + private[spark] val CREDENTIALS_RENEWAL_TIME = ConfigBuilder("spark.credentials.renewalTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val CREDENTIALS_UPDATE_TIME = ConfigBuilder("spark.credentials.updateTime") + .internal() + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(Long.MaxValue) + + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .createOptional } diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 0000000000000..2676a0ad589fa --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1 @@ +org.apache.spark.deploy.security.TestCredentialProvider diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala similarity index 97% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala index b0067aa4517c7..4c912b1d5ad7c 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.Text @@ -24,7 +24,6 @@ import org.apache.hadoop.security.token.Token import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { private var credentialManager: ConfigurableCredentialManager = null @@ -54,7 +53,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit } test("disable hive credential provider") { - sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") + sparkConf.set("spark.security.credentials.hive.enabled", "false") credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala similarity index 98% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala rename to core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala index f50ee193c258f..88dc184c7064d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSCredentialProviderSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.scalatest.{Matchers, PrivateMethodTester} diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 2355d40d1e6fe..4ae5b0b083241 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,7 +102,7 @@ spark-deps-.* org.apache.spark.scheduler.ExternalClusterManager .*\.sql .Rbuildignore -org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +org.apache.spark.deploy.security.ServiceCredentialProvider spark-warehouse structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e9ddaa76a797f..3e7925d480e45 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -276,11 +276,11 @@ To use a custom metrics.properties for the application master and executors, upd
spark.yarn.access.hadoopFileSystems
spark.security.access.hadoopFileSystems
spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
+ example, spark.security.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
webhdfs://nn3.com:50070
. The Spark application must have access to the filesystems listed
and Kerberos must be properly configured to be able to access them (either in the same realm
or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
@@ -426,7 +426,7 @@ To use a custom metrics.properties for the application master and executors, upd
spark.yarn.security.credentials.${service}.enabled
spark.security.credentials.${service}.enabled
true