Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into serializer2
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 17, 2015
2 parents 6d07678 + c84d916 commit 9f1ed92
Show file tree
Hide file tree
Showing 142 changed files with 2,504 additions and 1,996 deletions.
6 changes: 3 additions & 3 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ License: Apache License (== 2.0)
Collate:
'generics.R'
'jobj.R'
'SQLTypes.R'
'RDD.R'
'pairRDD.R'
'SQLTypes.R'
'column.R'
'group.R'
'DataFrame.R'
'SQLContext.R'
'backend.R'
'broadcast.R'
'client.R'
'context.R'
'deserialize.R'
'serialize.R'
'sparkR.R'
'backend.R'
'client.R'
'utils.R'
'zzz.R'
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# DataFrame.R - DataFrame class and methods implemented in S4 OO classes

#' @include jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
#' @include generics.R jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
NULL

setOldClass("jobj")
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# Column Class

#' @include generics.R jobj.R
#' @include generics.R jobj.R SQLTypes.R
NULL

setOldClass("jobj")
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

# group.R - GroupedData class and methods implemented in S4 OO classes

#' @include generics.R jobj.R SQLTypes.R column.R
NULL

setOldClass("jobj")

#' @title S4 class that represents a GroupedData
Expand Down
3 changes: 3 additions & 0 deletions R/pkg/R/jobj.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
# References to objects that exist on the JVM backend
# are maintained using the jobj.

#' @include generics.R
NULL

# Maintain a reference count of Java object references
# This allows us to GC the java object when it is safe
.validJobjs <- new.env(parent = emptyenv())
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#

# Operations supported on RDDs contains pairs (i.e key, value)
#' @include generics.R jobj.R RDD.R
NULL

############ Actions and Transformations ############

Expand Down
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export PYTHONSTARTUP="$SPARK_HOME/python/pyspark/shell.py"
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
export PYTHONHASHSEED=0
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

# Only define a usage function if an upstream script hasn't done so.
if ! type -t usage >/dev/null 2>&1; then
usage() {
Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ rem
rem This is the entry point for running Spark submit. To avoid polluting the
rem environment, it just launches a new cmd to do the real work.

rem disable randomized hash for string in Python 3.3+
set PYTHONHASHSEED=0

set CLASS=org.apache.spark.deploy.SparkSubmit
call %~dp0spark-class2.cmd %CLASS% %*
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,13 @@ table.sortable td {
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
}

span.kill-link {
a.kill-link {
margin-right: 2px;
margin-left: 20px;
color: gray;
float: right;
}

span.kill-link a {
color: gray;
}

span.expand-details {
font-size: 10pt;
cursor: pointer;
Expand Down
174 changes: 95 additions & 79 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
settings.put(key, value)
this
}
Expand Down Expand Up @@ -134,13 +135,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

/** Set multiple parameters together */
def setAll(settings: Traversable[(String, String)]): SparkConf = {
this.settings.putAll(settings.toMap.asJava)
settings.foreach { case (k, v) => set(k, v) }
this
}

/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
settings.putIfAbsent(key, value)
if (settings.putIfAbsent(key, value) == null) {
logDeprecationWarning(key)
}
this
}

Expand Down Expand Up @@ -174,45 +177,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}

/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
/**
* Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then seconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsSeconds(key: String): Long = {
Utils.timeStringAsSeconds(get(key))
}

/**
* Get a time parameter as seconds, falling back to a default if not set. If no
/**
* Get a time parameter as seconds, falling back to a default if not set. If no
* suffix is provided then seconds are assumed.
*
*/
def getTimeAsSeconds(key: String, defaultValue: String): Long = {
Utils.timeStringAsSeconds(get(key, defaultValue))
}

/**
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then milliseconds are assumed.
/**
* Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no
* suffix is provided then milliseconds are assumed.
* @throws NoSuchElementException
*/
def getTimeAsMs(key: String): Long = {
Utils.timeStringAsMs(get(key))
}

/**
* Get a time parameter as milliseconds, falling back to a default if not set. If no
* suffix is provided then milliseconds are assumed.
/**
* Get a time parameter as milliseconds, falling back to a default if not set. If no
* suffix is provided then milliseconds are assumed.
*/
def getTimeAsMs(key: String, defaultValue: String): Long = {
Utils.timeStringAsMs(get(key, defaultValue))
}


/** Get a parameter as an Option */
def getOption(key: String): Option[String] = {
Option(settings.get(key))
Option(settings.get(key)).orElse(getDeprecatedConfig(key, this))
}

/** Get all parameters as a list of pairs */
Expand Down Expand Up @@ -379,13 +381,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
}
}

// Warn against the use of deprecated configs
deprecatedConfigs.values.foreach { dc =>
if (contains(dc.oldName)) {
dc.warn()
}
}
}

/**
Expand All @@ -400,19 +395,44 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

private[spark] object SparkConf extends Logging {

/**
* Maps deprecated config keys to information about the deprecation.
*
* The extra information is logged as a warning when the config is present in the user's
* configuration.
*/
private val deprecatedConfigs: Map[String, DeprecatedConfig] = {
val configs = Seq(
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
DeprecatedConfig("spark.yarn.user.classpath.first", "1.3",
"Please use spark.{driver,executor}.userClassPathFirst instead."))
Map(configs.map { cfg => (cfg.key -> cfg) }:_*)
}

/**
* Maps a current config key to alternate keys that were used in previous version of Spark.
*
* The alternates are used in the order defined in this map. If deprecated configs are
* present in the user's configuration, a warning is logged.
*/
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
"spark.history.fs.update.interval" -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3"))
)

/**
* A view of `configsWithAlternatives` that makes it more efficient to look up deprecated
* config keys.
*
* Maps the deprecated config name to a 2-tuple (new config name, alternate config info).
*/
private val allAlternatives: Map[String, (String, AlternateConfig)] = {
configsWithAlternatives.keys.flatMap { key =>
configsWithAlternatives(key).map { cfg => (cfg.key -> (key -> cfg)) }
}.toMap
}

/**
Expand Down Expand Up @@ -443,61 +463,57 @@ private[spark] object SparkConf extends Logging {
}

/**
* Translate the configuration key if it is deprecated and has a replacement, otherwise just
* returns the provided key.
*
* @param userKey Configuration key from the user / caller.
* @param warn Whether to print a warning if the key is deprecated. Warnings will be printed
* only once for each key.
* Looks for available deprecated keys for the given config option, and return the first
* value available.
*/
private def translateConfKey(userKey: String, warn: Boolean = false): String = {
deprecatedConfigs.get(userKey)
.map { deprecatedKey =>
if (warn) {
deprecatedKey.warn()
}
deprecatedKey.newName.getOrElse(userKey)
}.getOrElse(userKey)
def getDeprecatedConfig(key: String, conf: SparkConf): Option[String] = {
configsWithAlternatives.get(key).flatMap { alts =>
alts.collectFirst { case alt if conf.contains(alt.key) =>
val value = conf.get(alt.key)
alt.translation.map(_(value)).getOrElse(value)
}
}
}

/**
* Holds information about keys that have been deprecated or renamed.
* Logs a warning message if the given config key is deprecated.
*/
def logDeprecationWarning(key: String): Unit = {
deprecatedConfigs.get(key).foreach { cfg =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
}

allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
}
}

/**
* Holds information about keys that have been deprecated and do not have a replacement.
*
* @param oldName Old configuration key.
* @param newName New configuration key, or `null` if key has no replacement, in which case the
* deprecated key will be used (but the warning message will still be printed).
* @param key The deprecated key.
* @param version Version of Spark where key was deprecated.
* @param deprecationMessage Message to include in the deprecation warning; mandatory when
* `newName` is not provided.
* @param deprecationMessage Message to include in the deprecation warning.
*/
private case class DeprecatedConfig(
oldName: String,
_newName: String,
key: String,
version: String,
deprecationMessage: String = null) {

private val warned = new AtomicBoolean(false)
val newName = Option(_newName)
deprecationMessage: String)

if (newName == null && (deprecationMessage == null || deprecationMessage.isEmpty())) {
throw new IllegalArgumentException("Need new config name or deprecation message.")
}

def warn(): Unit = {
if (warned.compareAndSet(false, true)) {
if (newName != null) {
val message = Option(deprecationMessage).getOrElse(
s"Please use the alternative '$newName' instead.")
logWarning(
s"The configuration option '$oldName' has been replaced as of Spark $version and " +
s"may be removed in the future. $message")
} else {
logWarning(
s"The configuration option '$oldName' has been deprecated as of Spark $version and " +
s"may be removed in the future. $deprecationMessage")
}
}
}
/**
* Information about an alternate configuration key that has been deprecated.
*
* @param key The deprecated config key.
* @param version The Spark version in which the key was deprecated.
* @param translation A translation function for converting old config values into new ones.
*/
private case class AlternateConfig(
key: String,
version: String,
translation: Option[String => String] = None)

}
}
Loading

0 comments on commit 9f1ed92

Please sign in to comment.