Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
FavioVazquez committed Jun 9, 2015
2 parents f27a20b + e6fb6ce commit ad882a3
Show file tree
Hide file tree
Showing 228 changed files with 7,366 additions and 5,391 deletions.
5 changes: 1 addition & 4 deletions R/create-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ set -e
export FWDIR="$(cd "`dirname "$0"`"; pwd)"
pushd $FWDIR

# Generate Rd file
Rscript -e 'library(devtools); devtools::document(pkg="./pkg", roclets=c("rd"))'

# Install the package
# Install the package (this will also generate the Rd files)
./install-dev.sh

# Now create HTML files
Expand Down
9 changes: 8 additions & 1 deletion R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,12 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

# Install R
pushd $FWDIR

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
14 changes: 10 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ dropTempTable <- function(sqlContext, tableName) {
#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
#' }

read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
Expand All @@ -462,15 +462,21 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
"org.apache.spark.sql.parquet")
}
sdf <- callJMethod(sqlContext, "load", source, options)
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source,
schema$jobj, options)
} else {
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, options)
}
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

loadDF <- function(sqlContext, path = NULL, source = NULL, ...) {
read.df(sqlContext, path, source, ...)
loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
read.df(sqlContext, path, source, schema, ...)
}

#' Create an external table
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ writeObject <- function(con, object, writeType = TRUE) {
# passing in vectors as arrays and instead require arrays to be passed
# as lists.
type <- class(object)[[1]] # class of POSIXlt is c("POSIXlt", "POSIXt")
# Checking types is needed here, since ‘is.na’ only handles atomic vectors,
# lists and pairlists
if (type %in% c("integer", "character", "logical", "double", "numeric")) {
if (is.na(object)) {
object <- NULL
type <- "NULL"
}
}
if (writeType) {
writeType(con, type)
}
Expand Down
50 changes: 50 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,43 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df), list(c("a", "int"), c("b", "string")))
})

test_that("convert NAs to null type in DataFrames", {
rdd <- parallelize(sc, list(list(1L, 2L), list(NA, 4L)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4L)

l <- data.frame(x = 1L, y = c(1L, NA_integer_, 3L))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1L)
expect_true(is.na(collect(df)[2, "y"]))

rdd <- parallelize(sc, list(list(1, 2), list(NA, 4)))
df <- createDataFrame(sqlContext, rdd, list("a", "b"))
expect_true(is.na(collect(df)[2, "a"]))
expect_equal(collect(df)[2, "b"], 4)

l <- data.frame(x = 1, y = c(1, NA_real_, 3))
df <- createDataFrame(sqlContext, l)
expect_equal(collect(df)[2, "x"], 1)
expect_true(is.na(collect(df)[2, "y"]))

l <- list("a", "b", NA, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")

l <- list("a", "b", NA_character_, "d")
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], "d")

l <- list(TRUE, FALSE, NA, TRUE)
df <- createDataFrame(sqlContext, l)
expect_true(is.na(collect(df)[3, "_1"]))
expect_equal(collect(df)[4, "_1"], TRUE)
})

test_that("toDF", {
rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) })
df <- toDF(rdd, list("a", "b"))
Expand Down Expand Up @@ -504,6 +541,19 @@ test_that("read.df() from json file", {
df <- read.df(sqlContext, jsonPath, "json")
expect_true(inherits(df, "DataFrame"))
expect_true(count(df) == 3)

# Check if we can apply a user defined schema
schema <- structType(structField("name", type = "string"),
structField("age", type = "double"))

df1 <- read.df(sqlContext, jsonPath, "json", schema)
expect_true(inherits(df1, "DataFrame"))
expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))

# Run the same with loadDF
df2 <- loadDF(sqlContext, jsonPath, "json", schema)
expect_true(inherits(df2, "DataFrame"))
expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
})

test_that("write.df() as parquet file", {
Expand Down
16 changes: 1 addition & 15 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,10 @@
# limitations under the License.
#

# Figure out where Spark is installed
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

source "$SPARK_HOME"/bin/load-spark-env.sh

function usage() {
if [ -n "$1" ]; then
echo $1
fi
echo "Usage: ./bin/pyspark [options]" 1>&2
"$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit $2
}
export -f usage

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
fi
export _SPARK_CMD_USAGE="Usage: ./bin/pyspark [options]"

# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rem Figure out where the Spark framework is installed
set SPARK_HOME=%~dp0..

call %SPARK_HOME%\bin\load-spark-env.cmd
set _SPARK_CMD_USAGE=Usage: bin\pyspark.cmd [options]

rem Figure out which Python to use.
if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
Expand Down
31 changes: 1 addition & 30 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -e

# Figure out where Spark is installed
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

. "$SPARK_HOME"/bin/load-spark-env.sh

if [ -z "$1" ]; then
echo "Usage: spark-class <class> [<args>]" 1>&2
exit 1
fi

# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
Expand Down Expand Up @@ -64,24 +58,6 @@ fi

SPARK_ASSEMBLY_JAR="${ASSEMBLY_DIR}/${ASSEMBLY_JARS}"

# Verify that versions of java used to build the jars and run Spark are compatible
if [ -n "$JAVA_HOME" ]; then
JAR_CMD="$JAVA_HOME/bin/jar"
else
JAR_CMD="jar"
fi

if [ $(command -v "$JAR_CMD") ] ; then
jar_error_check=$("$JAR_CMD" -tf "$SPARK_ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi
fi

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
Expand All @@ -98,9 +74,4 @@ CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")

if [ "${CMD[0]}" = "usage" ]; then
"${CMD[@]}"
else
exec "${CMD[@]}"
fi
exec "${CMD[@]}"
15 changes: 1 addition & 14 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,7 @@ esac
set -o posix

export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

usage() {
if [ -n "$1" ]; then
echo "$1"
fi
echo "Usage: ./bin/spark-shell [options]"
"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit "$2"
}
export -f usage

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage "" 0
fi
export _SPARK_CMD_USAGE="Usage: ./bin/spark-shell [options]"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag manually. We
Expand Down
21 changes: 2 additions & 19 deletions bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,7 @@ rem limitations under the License.
rem

set SPARK_HOME=%~dp0..

echo "%*" | findstr " \<--help\> \<-h\>" >nul
if %ERRORLEVEL% equ 0 (
call :usage
exit /b 0
)
set _SPARK_CMD_USAGE=Usage: .\bin\spark-shell.cmd [options]

rem SPARK-4161: scala does not assume use of the java classpath,
rem so we need to add the "-Dscala.usejavacp=true" flag manually. We
Expand All @@ -37,16 +32,4 @@ if "x%SPARK_SUBMIT_OPTS%"=="x" (
set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"

:run_shell
call %SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
call :usage
exit /b 1
)
exit /b %SPARK_ERROR_LEVEL%

:usage
echo %SPARK_LAUNCHER_USAGE_ERROR%
echo "Usage: .\bin\spark-shell.cmd [options]" >&2
call %SPARK_HOME%\bin\spark-submit2.cmd --help 2>&1 | findstr /V "Usage" 1>&2
goto :eof
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
39 changes: 2 additions & 37 deletions bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,6 @@
# limitations under the License.
#

#
# Shell script for starting the Spark SQL CLI

# Enter posix mode for bash
set -o posix

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
export CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"

# Figure out where Spark is installed
export FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

function usage {
if [ -n "$1" ]; then
echo "$1"
fi
echo "Usage: ./bin/spark-sql [options] [cli option]"
pattern="usage"
pattern+="\|Spark assembly has been built with Hive"
pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
pattern+="\|Spark Command: "
pattern+="\|--help"
pattern+="\|======="

"$FWDIR"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
echo
echo "CLI options:"
"$FWDIR"/bin/spark-class "$CLASS" --help 2>&1 | grep -v "$pattern" 1>&2
exit "$2"
}
export -f usage

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage "" 0
fi

exec "$FWDIR"/bin/spark-submit --class "$CLASS" "$@"
export _SPARK_CMD_USAGE="Usage: ./bin/spark-sql [options] [cli option]"
exec "$FWDIR"/bin/spark-submit --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver "$@"
12 changes: 0 additions & 12 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,4 @@ SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

# Only define a usage function if an upstream script hasn't done so.
if ! type -t usage >/dev/null 2>&1; then
usage() {
if [ -n "$1" ]; then
echo "$1"
fi
"$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
exit "$2"
}
export -f usage
fi

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
13 changes: 1 addition & 12 deletions bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,4 @@ rem disable randomized hash for string in Python 3.3+
set PYTHONHASHSEED=0

set CLASS=org.apache.spark.deploy.SparkSubmit
call %~dp0spark-class2.cmd %CLASS% %*
set SPARK_ERROR_LEVEL=%ERRORLEVEL%
if not "x%SPARK_LAUNCHER_USAGE_ERROR%"=="x" (
call :usage
exit /b 1
)
exit /b %SPARK_ERROR_LEVEL%

:usage
echo %SPARK_LAUNCHER_USAGE_ERROR%
call %SPARK_HOME%\bin\spark-class2.cmd %CLASS% --help
goto :eof
%~dp0spark-class2.cmd %CLASS% %*
18 changes: 1 addition & 17 deletions bin/sparkR
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,7 @@
# limitations under the License.
#

# Figure out where Spark is installed
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"

source "$SPARK_HOME"/bin/load-spark-env.sh

function usage() {
if [ -n "$1" ]; then
echo $1
fi
echo "Usage: ./bin/sparkR [options]" 1>&2
"$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit $2
}
export -f usage

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
usage
fi

export _SPARK_CMD_USAGE="Usage: ./bin/sparkR [options]"
exec "$SPARK_HOME"/bin/spark-submit sparkr-shell-main "$@"
Loading

0 comments on commit ad882a3

Please sign in to comment.