Skip to content

Commit

Permalink
Merge branch 'master' into spark-8982
Browse files Browse the repository at this point in the history
  • Loading branch information
bdzimmer committed Jul 30, 2015
2 parents 0cac1d7 + 81464f2 commit 71c5573
Show file tree
Hide file tree
Showing 804 changed files with 41,270 additions and 11,987 deletions.
5 changes: 5 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,8 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd
8 changes: 6 additions & 2 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ LIB_DIR="$FWDIR/lib"

mkdir -p $LIB_DIR

pushd $FWDIR
pushd $FWDIR > /dev/null

# Generate Rd files if devtools is installed
Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtools); devtools::document(pkg="./pkg", roclets=c("rd")) }'

# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

popd
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Collate:
'client.R'
'context.R'
'deserialize.R'
'mllib.R'
'serialize.R'
'sparkR.R'
'utils.R'
'zzz.R'
6 changes: 6 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export("sparkR.init")
export("sparkR.stop")
export("print.jobj")

# MLlib integration
exportMethods("glm",
"predict")

# Job group lifecycle management methods
export("setJobGroup",
"clearJobGroup",
Expand All @@ -22,6 +26,7 @@ exportMethods("arrange",
"collect",
"columns",
"count",
"crosstab",
"describe",
"distinct",
"dropna",
Expand Down Expand Up @@ -77,6 +82,7 @@ exportMethods("abs",
"atan",
"atan2",
"avg",
"between",
"cast",
"cbrt",
"ceiling",
Expand Down
38 changes: 33 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand All @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] = path
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand All @@ -1337,7 +1337,7 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})
Expand Down Expand Up @@ -1375,8 +1375,8 @@ setMethod("saveDF",
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = 'character', source = 'character',
mode = 'character'),
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -1554,3 +1554,31 @@ setMethod("fillna",
}
dataFrame(sdf)
})

#' crosstab
#'
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
#' non-zero pair frequencies will be returned.
#'
#' @param col1 name of the first column. Distinct items will make the first item of each row.
#' @param col2 name of the second column. Distinct items will make the column names of the output.
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have zero as their counts.
#'
#' @rdname statfunctions
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
#' ct = crosstab(df, "title", "gender")
#' }
setMethod("crosstab",
signature(x = "DataFrame", col1 = "character", col2 = "character"),
function(x, col1, col2) {
statFunctions <- callJMethod(x@sdf, "stat")
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
collect(dataFrame(sct))
})
2 changes: 0 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
serializedFuncArr,
rdd@env$prev_serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
} else {
Expand All @@ -175,7 +174,6 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
rdd@env$prev_serializedMode,
serializedMode,
packageNamesArr,
as.character(.sparkREnv[["libname"]]),
broadcastArr,
callJMethod(prev_jrdd, "classTag"))
}
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -506,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
Expand Down
6 changes: 3 additions & 3 deletions R/pkg/R/client.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ connectBackend <- function(hostname, port, timeout = 6000) {

determineSparkSubmitBin <- function() {
if (.Platform$OS.type == "unix") {
sparkSubmitBinName = "spark-submit"
sparkSubmitBinName <- "spark-submit"
} else {
sparkSubmitBinName = "spark-submit.cmd"
sparkSubmitBinName <- "spark-submit.cmd"
}
sparkSubmitBinName
}
Expand All @@ -48,7 +48,7 @@ generateSparkSubmitArgs <- function(args, sparkHome, jars, sparkSubmitOpts, pack
jars <- paste("--jars", jars)
}

if (packages != "") {
if (!identical(packages, "")) {
packages <- paste("--packages", packages)
}

Expand Down
17 changes: 17 additions & 0 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,23 @@ setMethod("substr", signature(x = "Column"),
column(jc)
})

#' between
#'
#' Test if the column is between the lower bound and upper bound, inclusive.
#'
#' @rdname column
#'
#' @param bounds lower and upper bounds
setMethod("between", signature(x = "Column"),
function(x, bounds) {
if (is.vector(bounds) && length(bounds) == 2) {
jc <- callJMethod(x@jc, "between", bounds[1], bounds[2])
column(jc)
} else {
stop("bounds should be a vector of lower and upper bounds")
}
})

#' Casts the column to a different data type.
#'
#' @rdname column
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Int -> integer
# String -> character
# Boolean -> logical
# Float -> double
# Double -> double
# Long -> double
# Array[Byte] -> raw
Expand Down Expand Up @@ -101,11 +102,11 @@ readList <- function(con) {

readRaw <- function(con) {
dataLen <- readInt(con)
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readRawLen <- function(con, dataLen) {
data <- readBin(con, raw(), as.integer(dataLen), endian = "big")
readBin(con, raw(), as.integer(dataLen), endian = "big")
}

readDeserialize <- function(con) {
Expand Down
12 changes: 12 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
# @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

# @rdname statfunctions
# @export
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down Expand Up @@ -567,6 +571,10 @@ setGeneric("asc", function(x) { standardGeneric("asc") })
#' @export
setGeneric("avg", function(x, ...) { standardGeneric("avg") })

#' @rdname column
#' @export
setGeneric("between", function(x, bounds) { standardGeneric("between") })

#' @rdname column
#' @export
setGeneric("cast", function(x, dataType) { standardGeneric("cast") })
Expand Down Expand Up @@ -657,3 +665,7 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
#' @rdname column
#' @export
setGeneric("upper", function(x) { standardGeneric("upper") })

#' @rdname glm
#' @export
setGeneric("glm")
4 changes: 2 additions & 2 deletions R/pkg/R/group.R
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ setMethod("count",
setMethod("agg",
signature(x = "GroupedData"),
function(x, ...) {
cols = list(...)
cols <- list(...)
stopifnot(length(cols) > 0)
if (is.character(cols[[1]])) {
cols <- varargsToEnv(...)
Expand All @@ -97,7 +97,7 @@ setMethod("agg",
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] = alias(cols[[n]], n)
cols[[n]] <- alias(cols[[n]], n)
}
}
}
Expand Down
73 changes: 73 additions & 0 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# mllib.R: Provides methods for MLlib integration

#' @title S4 class that represents a PipelineModel
#' @param model A Java object reference to the backing Scala PipelineModel
#' @export
setClass("PipelineModel", representation(model = "jobj"))

#' Fits a generalized linear model
#'
#' Fits a generalized linear model, similarly to R's glm(). Also see the glmnet package.
#'
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '+', '-', and '.'.
#' @param data DataFrame for training
#' @param family Error distribution. "gaussian" -> linear regression, "binomial" -> logistic reg.
#' @param lambda Regularization parameter
#' @param alpha Elastic-net mixing parameter (see glmnet's documentation for details)
#' @return a fitted MLlib model
#' @rdname glm
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' data(iris)
#' df <- createDataFrame(sqlContext, iris)
#' model <- glm(Sepal_Length ~ Sepal_Width, df)
#'}
setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFrame"),
function(formula, family = c("gaussian", "binomial"), data, lambda = 0, alpha = 0) {
family <- match.arg(family)
model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers",
"fitRModelFormula", deparse(formula), data@sdf, family, lambda,
alpha)
return(new("PipelineModel", model = model))
})

#' Make predictions from a model
#'
#' Makes predictions from a model produced by glm(), similarly to R's predict().
#'
#' @param model A fitted MLlib model
#' @param newData DataFrame for testing
#' @return DataFrame containing predicted values
#' @rdname glm
#' @export
#' @examples
#'\dontrun{
#' model <- glm(y ~ x, trainingData)
#' predicted <- predict(model, testData)
#' showDF(predicted)
#'}
setMethod("predict", signature(object = "PipelineModel"),
function(object, newData) {
return(dataFrame(callJMethod(object@model, "transform", newData@sdf)))
})
1 change: 0 additions & 1 deletion R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ setMethod("partitionBy",
serializedHashFuncBytes,
getSerializedMode(x),
packageNamesArr,
as.character(.sparkREnv$libname),
broadcastArr,
callJMethod(jrdd, "classTag"))

Expand Down
14 changes: 9 additions & 5 deletions R/pkg/R/schema.R
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ structType.structField <- function(x, ...) {
#' @param ... further arguments passed to or from other methods
print.structType <- function(x, ...) {
cat("StructType\n",
sapply(x$fields(), function(field) { paste("|-", "name = \"", field$name(),
"\", type = \"", field$dataType.toString(),
"\", nullable = ", field$nullable(), "\n",
sep = "") })
, sep = "")
sapply(x$fields(),
function(field) {
paste("|-", "name = \"", field$name(),
"\", type = \"", field$dataType.toString(),
"\", nullable = ", field$nullable(), "\n",
sep = "")
}),
sep = "")
}

#' structField
Expand Down Expand Up @@ -123,6 +126,7 @@ structField.character <- function(x, type, nullable = TRUE) {
}
options <- c("byte",
"integer",
"float",
"double",
"numeric",
"character",
Expand Down
Loading

0 comments on commit 71c5573

Please sign in to comment.