Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
FavioVazquez committed Apr 30, 2015
2 parents baed3f2 + 6c65da6 commit b2e2f8c
Show file tree
Hide file tree
Showing 145 changed files with 5,448 additions and 1,360 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ BSD-style licenses
The following components are provided under a BSD-style license. See project link for details.

(BSD 3 Clause) core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
(BSD 3 Clause) JPMML-Model (org.jpmml:pmml-model:1.1.15 - https://github.com/jpmml/jpmml-model)
(BSD 3-clause style license) jblas (org.jblas:jblas:1.2.3 - http://jblas.org/)
(BSD License) AntLR Parser Generator (antlr:antlr:2.7.7 - http://www.antlr.org/)
(BSD License) Javolution (javolution:javolution:5.5.1 - http://javolution.org)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A zero-argument function that returns an R.
*/
public interface Function0<R> extends Serializable {
public R call() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
*/
protected def askTracker[T: ClassTag](message: Any): T = {
try {
trackerEndpoint.askWithReply[T](message)
trackerEndpoint.askWithRetry[T](message)
} catch {
case e: Exception =>
logError("Error communicating with MapOutputTracker", e)
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
SparkEnv.executorActorSystemName,
RpcAddress(host, port),
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump))
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -713,7 +713,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, String)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
Expand Down Expand Up @@ -759,7 +761,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
RDD[(String, PortableDataStream)] = {
assertNotStopped()
val job = new NewHadoopJob(hadoopConfiguration)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new BinaryFileRDD(
this,
Expand Down Expand Up @@ -935,7 +939,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
// Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path)
}
Expand Down
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,18 @@ private[spark] object TestUtils {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}

private class JavaSourceFromString(val name: String, val code: String)
private[spark] class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
/** Creates a compiled class with the source file. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
sourceFile: JavaSourceFromString,
classpathUrls: Seq[URL]): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand All @@ -144,4 +139,18 @@ private[spark] object TestUtils {
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
out
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy

import java.util.concurrent.{Executors, TimeUnit}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{ThreadUtils, Utils}

import scala.util.control.NonFatal

private[spark] class ExecutorDelegationTokenUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration) extends Logging {

@volatile private var lastCredentialsFileSuffix = 0

private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")

private val delegationTokenRenewer =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))

// On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
private val executorUpdaterRunnable =
new Runnable {
override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
}

def updateCredentialsIfRequired(): Unit = {
try {
val credentialsFilePath = new Path(credentialsFile)
val remoteFs = FileSystem.get(hadoopConf)
SparkHadoopUtil.get.listFilesSorted(
remoteFs, credentialsFilePath.getParent,
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
.lastOption.foreach { credentialsStatus =>
val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
if (suffix > lastCredentialsFileSuffix) {
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
lastCredentialsFileSuffix = suffix
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
logInfo("Tokens updated from credentials file.")
} else {
// Check every hour to see if new credentials arrived.
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
"tokens yet, will check again in an hour.")
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
return
}
}
val timeFromNowToRenewal =
SparkHadoopUtil.get.getTimeFromNowToRenewal(
sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
if (timeFromNowToRenewal <= 0) {
executorUpdaterRunnable.run()
} else {
logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
delegationTokenRenewer.schedule(
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
}
} catch {
// Since the file may get deleted while we are reading it, catch the Exception and come
// back in an hour to try again
case NonFatal(e) =>
logWarning("Error while trying to update credentials, will try again in 1 hour", e)
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
}
}

private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
val stream = remoteFs.open(tokenPath)
try {
val newCredentials = new Credentials()
newCredentials.readTokenStorageStream(stream)
newCredentials
} finally {
stream.close()
}
}

def stop(): Unit = {
delegationTokenRenewer.shutdown()
}

}
69 changes: 67 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -32,14 +36,16 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._

/**
* :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(new SparkConf())
private val sparkConf = new SparkConf()
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -201,6 +207,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
* the respective files.
*/
def listFilesSorted(
remoteFs: FileSystem,
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
}

/**
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
* is valid the latest)?
* This will return -ve (or 0) value if the fraction of validity has already expired.
*/
def getTimeFromNowToRenewal(
sparkConf: SparkConf,
fraction: Double,
credentials: Credentials): Long = {
val now = System.currentTimeMillis()

val renewalInterval =
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)

credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
}.foldLeft(0L)(math.max)
}


private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
val fileName = credentialsPath.getName
fileName.substring(
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
}


private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored

/**
Expand Down Expand Up @@ -251,6 +312,10 @@ object SparkHadoopUtil {
}
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
Loading

0 comments on commit b2e2f8c

Please sign in to comment.