Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
Initilize local master branch.
  • Loading branch information
zhangjiajin committed Jul 15, 2015
2 parents 4dd1c8a + bb870e7 commit a8fde87
Show file tree
Hide file tree
Showing 571 changed files with 15,236 additions and 7,381 deletions.
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ help/*
html/*
INDEX
.lintr
gen-java.*
.*avpr
2 changes: 1 addition & 1 deletion R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SparkR is an R package that provides a light-weight frontend to use Spark from R

#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-PsparkR` profile to build the R package. For example to use the default Hadoop versions you can run
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
```
build/mvn -DskipTests -Psparkr package
```
Expand Down
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
1 change: 0 additions & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ Collate:
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
2 changes: 0 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
Expand All @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ infer_type <- function(x) {
createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) {
if (is.data.frame(data)) {
# get the names of columns, they will be put into RDD
schema <- names(data)
if (is.null(schema)) {
schema <- names(data)
}
n <- nrow(data)
m <- ncol(data)
# get rid of factor type
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
setGeneric("aggregateRDD",
function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

# @rdname cache-methods
# @export
Expand Down
13 changes: 6 additions & 7 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ setMethod("partitionBy",
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

Expand Down Expand Up @@ -560,8 +559,8 @@ setMethod("join",
# Left outer join two RDDs
#
# @description
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -597,8 +596,8 @@ setMethod("leftOuterJoin",
# Right outer join two RDDs
#
# @description
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -634,8 +633,8 @@ setMethod("rightOuterJoin",
# Full outer join two RDDs
#
# @description
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down
19 changes: 6 additions & 13 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

.sparkREnv <- new.env()

sparkR.onLoad <- function(libname, pkgname) {
.sparkREnv$libname <- libname
}

# Utility function that returns TRUE if we have an active connection to the
# backend and FALSE otherwise
connExists <- function(env) {
Expand Down Expand Up @@ -80,7 +76,6 @@ sparkR.stop <- function() {
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @param sparkRLibDir The path where R is installed on the worker nodes.
#' @param sparkPackages Character string vector of packages from spark-packages.org
#' @export
#' @examples
Expand All @@ -101,11 +96,11 @@ sparkR.init <- function(
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "",
sparkRLibDir = "",
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
cat(paste("Re-using existing Spark Context.",
"Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}

Expand Down Expand Up @@ -169,25 +164,23 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}

if (nchar(sparkRLibDir) != 0) {
.sparkREnv$libname <- sparkRLibDir
}

sparkEnvirMap <- new.env()
for (varname in names(sparkEnvir)) {
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
}

sparkExecutorEnvMap <- new.env()
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
}

nonEmptyJars <- Filter(function(x) { x != "" }, jars)
localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
localJarPaths <- sapply(nonEmptyJars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })

# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
Expand Down
31 changes: 18 additions & 13 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,21 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"MEMORY_ONLY_SER_2",
"OFF_HEAP")) {
match.arg(newLevel)
storageLevelClass <- "org.apache.spark.storage.StorageLevel"
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

# Utility function for functions where an argument needs to be integer but we want to allow
Expand Down Expand Up @@ -545,9 +548,11 @@ mergePartitions <- function(rdd, zip) {
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
# For zip operation, check if corresponding partitions
# of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
stop(paste("Can only zip RDDs with same number of elements",
"in each pair of corresponding partitions."))
}

if (lengthOfKeys > 1) {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#

.First <- function() {
home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
.libPaths(c(packageDir, .libPaths()))
Sys.setenv(NOAWT=1)
}
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
jarPath <- paste("--jars",
shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
Expand Down
12 changes: 8 additions & 4 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -669,27 +669,31 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
Expand Down
11 changes: 9 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})

test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
Expand Down Expand Up @@ -756,7 +756,14 @@ test_that("toJSON() returns an RDD of the correct values", {
test_that("showDF()", {
df <- jsonFile(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
expected <- paste("+----+-------+\n",
"| age| name|\n",
"+----+-------+\n",
"|null|Michael|\n",
"| 30| Andy|\n",
"| 19| Justin|\n",
"+----+-------+\n", sep="")
expect_output(s , expected)
})

test_that("isLocal()", {
Expand Down
11 changes: 10 additions & 1 deletion build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,17 @@ install_scala() {
# the environment
ZINC_PORT=${ZINC_PORT:-"3030"}

# Check for the `--force` flag dictating that `mvn` should be downloaded
# regardless of whether the system already has a `mvn` install
if [ "$1" == "--force" ]; then
FORCE_MVN=1
shift
fi

# Install Maven if necessary
MVN_BIN="$(command -v mvn)"

if [ ! "$MVN_BIN" ]; then
if [ ! "$MVN_BIN" -o -n "$FORCE_MVN" ]; then
install_mvn
fi

Expand All @@ -139,5 +146,7 @@ fi
# Set any `mvn` options if not already present
export MAVEN_OPTS=${MAVEN_OPTS:-"$_COMPILE_JVM_OPTS"}

echo "Using \`mvn\` from path: $MVN_BIN"

# Last, call the `mvn` command as usual
${MVN_BIN} "$@"
20 changes: 10 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,28 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Loading

0 comments on commit a8fde87

Please sign in to comment.