diff --git a/README.md b/README.md index 795b0a1477305..6d6b097222ade 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,6 @@ SparkR requires Scala 2.10 and Spark version >= 0.9.0. Current build by default Apache Spark 1.1.0. You can also build SparkR against a different Spark version (>= 0.9.0) by modifying `pkg/src/build.sbt`. -SparkR also requires the R package `rJava` to be installed. To install `rJava`, -you can run the following command in R: - - install.packages("rJava") - - ### Package installation To develop SparkR, you can build the scala package and the R package using @@ -31,9 +25,9 @@ If you wish to try out the package directly from github, you can use [`install_g SparkR by default uses Apache Spark 1.1.0. You can switch to a different Spark version by setting the environment variable `SPARK_VERSION`. For example, to -use Apache Spark 1.2.0, you can run +use Apache Spark 1.3.0, you can run - SPARK_VERSION=1.2.0 ./install-dev.sh + SPARK_VERSION=1.3.0 ./install-dev.sh SparkR by default links to Hadoop 1.0.4. To use SparkR with other Hadoop versions, you will need to rebuild SparkR with the same version that [Spark is @@ -97,8 +91,9 @@ To run one of them, use `./sparkR `. For example: ./sparkR examples/pi.R local[2] -You can also run the unit-tests for SparkR by running +You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first): + R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")' ./run-tests.sh ## Running on EC2 @@ -110,7 +105,7 @@ Instructions for running SparkR on EC2 can be found in the Currently, SparkR supports running on YARN with the `yarn-client` mode. These steps show how to build SparkR with YARN support and run SparkR programs on a YARN cluster: ``` -# assumes Java, R, rJava, yarn, spark etc. are installed on the whole cluster. +# assumes Java, R, yarn, spark etc. are installed on the whole cluster. cd SparkR-pkg/ USE_YARN=1 SPARK_YARN_VERSION=2.4.0 SPARK_HADOOP_VERSION=2.4.0 ./install-dev.sh ``` diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index fd7eaae7278ca..3d9995e12b744 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -2,6 +2,7 @@ exportClasses("RDD") exportClasses("Broadcast") exportMethods( + "aggregateByKey", "aggregateRDD", "cache", "checkpoint", @@ -19,6 +20,7 @@ exportMethods( "flatMap", "flatMapValues", "fold", + "foldByKey", "foreach", "foreachPartition", "fullOuterJoin", @@ -41,6 +43,7 @@ exportMethods( "numPartitions", "partitionBy", "persist", + "pipeRDD", "reduce", "reduceByKey", "reduceByKeyLocally", diff --git a/pkg/R/DataFrame.R b/pkg/R/DataFrame.R index 71564064caa17..0a69825d85dff 100644 --- a/pkg/R/DataFrame.R +++ b/pkg/R/DataFrame.R @@ -128,10 +128,10 @@ setMethod("count", #' } setMethod("collect", - signature(rdd = "DataFrame"), - function(rdd) { + signature(x = "DataFrame"), + function(x) { # listCols is a list of raw vectors, one per column - listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", rdd@sdf) + listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", x@sdf) cols <- lapply(listCols, function(col) { objRaw <- rawConnection(col) numRows <- readInt(objRaw) @@ -139,7 +139,7 @@ setMethod("collect", close(objRaw) col }) - colNames <- callJMethod(rdd@sdf, "columns") + colNames <- callJMethod(x@sdf, "columns") names(cols) <- colNames dfOut <- do.call(cbind.data.frame, cols) dfOut @@ -187,9 +187,9 @@ setMethod("limit", #' } setMethod("take", - signature(rdd = "DataFrame", num = "numeric"), - function(rdd, num) { - limited <- limit(rdd, num) + signature(x = "DataFrame", num = "numeric"), + function(x, num) { + limited <- limit(x, num) collect(limited) }) @@ -264,15 +264,15 @@ setMethod("mapPartitions", }) setMethod("foreach", - signature(rdd = "DataFrame", func = "function"), - function(rdd, func) { - rddIn <- toRDD(rdd) - foreach(rddIn, func) + signature(x = "DataFrame", func = "function"), + function(x, func) { + rdd <- toRDD(x) + foreach(rdd, func) }) setMethod("foreachPartition", - signature(rdd = "DataFrame", func = "function"), - function(rdd, func) { - rddIn <- toRDD(rdd) - foreachPartition(rddIn, func) + signature(x = "DataFrame", func = "function"), + function(x, func) { + rdd <- toRDD(x) + foreachPartition(rdd, func) }) diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index e6b7f31a14106..1d1a645f2094d 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -81,7 +81,6 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline # Get the serialization mode of the parent RDD .Object@env$prev_serializedMode <- prev@env$prev_serializedMode - } .Object @@ -134,12 +133,10 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"), computeFunc <- function(split, part) { rdd@func(split, part) } - serializedFuncArr <- serialize("computeFunc", connection = NULL, - ascii = TRUE) + serializedFuncArr <- serialize("computeFunc", connection = NULL) packageNamesArr <- serialize(.sparkREnv[[".packages"]], - connection = NULL, - ascii = TRUE) + connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), function(name) { get(name, .broadcastNames) }) @@ -195,7 +192,7 @@ setValidity("RDD", #' #' Persist this RDD with the default storage level (MEMORY_ONLY). #' -#' @param rdd The RDD to cache +#' @param x The RDD to cache #' @rdname cache-methods #' @export #' @examples @@ -204,16 +201,16 @@ setValidity("RDD", #' rdd <- parallelize(sc, 1:10, 2L) #' cache(rdd) #'} -setGeneric("cache", function(rdd) { standardGeneric("cache") }) +setGeneric("cache", function(x) { standardGeneric("cache") }) #' @rdname cache-methods #' @aliases cache,RDD-method setMethod("cache", - signature(rdd = "RDD"), - function(rdd) { - callJMethod(getJRDD(rdd), "cache") - rdd@env$isCached <- TRUE - rdd + signature(x = "RDD"), + function(x) { + callJMethod(getJRDD(x), "cache") + x@env$isCached <- TRUE + x }) #' Persist an RDD @@ -222,7 +219,7 @@ setMethod("cache", #' supported storage levels, refer to #' http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence. #' -#' @param rdd The RDD to persist +#' @param x The RDD to persist #' @param newLevel The new storage level to be assigned #' @rdname persist #' @export @@ -232,13 +229,13 @@ setMethod("cache", #' rdd <- parallelize(sc, 1:10, 2L) #' persist(rdd, "MEMORY_AND_DISK") #'} -setGeneric("persist", function(rdd, newLevel) { standardGeneric("persist") }) +setGeneric("persist", function(x, newLevel) { standardGeneric("persist") }) #' @rdname persist #' @aliases persist,RDD-method setMethod("persist", - signature(rdd = "RDD", newLevel = "character"), - function(rdd, newLevel = c("DISK_ONLY", + signature(x = "RDD", newLevel = "character"), + function(x, newLevel = c("DISK_ONLY", "DISK_ONLY_2", "MEMORY_AND_DISK", "MEMORY_AND_DISK_2", @@ -263,9 +260,9 @@ setMethod("persist", "MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"), "OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP")) - callJMethod(getJRDD(rdd), "persist", storageLevel) - rdd@env$isCached <- TRUE - rdd + callJMethod(getJRDD(x), "persist", storageLevel) + x@env$isCached <- TRUE + x }) #' Unpersist an RDD @@ -283,16 +280,16 @@ setMethod("persist", #' cache(rdd) # rdd@@env$isCached == TRUE #' unpersist(rdd) # rdd@@env$isCached == FALSE #'} -setGeneric("unpersist", function(rdd) { standardGeneric("unpersist") }) +setGeneric("unpersist", function(x) { standardGeneric("unpersist") }) #' @rdname unpersist-methods #' @aliases unpersist,RDD-method setMethod("unpersist", - signature(rdd = "RDD"), - function(rdd) { - callJMethod(getJRDD(rdd), "unpersist") - rdd@env$isCached <- FALSE - rdd + signature(x = "RDD"), + function(x) { + callJMethod(getJRDD(x), "unpersist") + x@env$isCached <- FALSE + x }) @@ -314,22 +311,22 @@ setMethod("unpersist", #' rdd <- parallelize(sc, 1:10, 2L) #' checkpoint(rdd) #'} -setGeneric("checkpoint", function(rdd) { standardGeneric("checkpoint") }) +setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") }) #' @rdname checkpoint-methods #' @aliases checkpoint,RDD-method setMethod("checkpoint", - signature(rdd = "RDD"), - function(rdd) { - jrdd <- getJRDD(rdd) + signature(x = "RDD"), + function(x) { + jrdd <- getJRDD(x) callJMethod(jrdd, "checkpoint") - rdd@env$isCheckpointed <- TRUE - rdd + x@env$isCheckpointed <- TRUE + x }) #' Gets the number of partitions of an RDD #' -#' @param rdd A RDD. +#' @param x A RDD. #' @return the number of partitions of rdd as an integer. #' @rdname numPartitions #' @export @@ -339,14 +336,14 @@ setMethod("checkpoint", #' rdd <- parallelize(sc, 1:10, 2L) #' numPartitions(rdd) # 2L #'} -setGeneric("numPartitions", function(rdd) { standardGeneric("numPartitions") }) +setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") }) #' @rdname numPartitions #' @aliases numPartitions,RDD-method setMethod("numPartitions", - signature(rdd = "RDD"), - function(rdd) { - jrdd <- getJRDD(rdd) + signature(x = "RDD"), + function(x) { + jrdd <- getJRDD(x) partitions <- callJMethod(jrdd, "splits") callJMethod(partitions, "size") }) @@ -356,7 +353,7 @@ setMethod("numPartitions", #' @description #' \code{collect} returns a list that contains all of the elements in this RDD. #' -#' @param rdd The RDD to collect +#' @param x The RDD to collect #' @param ... Other optional arguments to collect #' @param flatten FALSE if the list should not flattened #' @return a list containing elements in the RDD @@ -369,17 +366,17 @@ setMethod("numPartitions", #' collect(rdd) # list from 1 to 10 #' collectPartition(rdd, 0L) # list from 1 to 5 #'} -setGeneric("collect", function(rdd, ...) { standardGeneric("collect") }) +setGeneric("collect", function(x, ...) { standardGeneric("collect") }) #' @rdname collect-methods #' @aliases collect,RDD-method setMethod("collect", - signature(rdd = "RDD"), - function(rdd, flatten = TRUE) { + signature(x = "RDD"), + function(x, flatten = TRUE) { # Assumes a pairwise RDD is backed by a JavaPairRDD. - collected <- callJMethod(getJRDD(rdd), "collect") + collected <- callJMethod(getJRDD(x), "collect") convertJListToRList(collected, flatten, - serializedMode = getSerializedMode(rdd)) + serializedMode = getSerializedMode(x)) }) @@ -390,22 +387,22 @@ setMethod("collect", #' in the specified partition of the RDD. #' @param partitionId the partition to collect (starts from 0) setGeneric("collectPartition", - function(rdd, partitionId) { + function(x, partitionId) { standardGeneric("collectPartition") }) #' @rdname collect-methods #' @aliases collectPartition,integer,RDD-method setMethod("collectPartition", - signature(rdd = "RDD", partitionId = "integer"), - function(rdd, partitionId) { - jPartitionsList <- callJMethod(getJRDD(rdd), + signature(x = "RDD", partitionId = "integer"), + function(x, partitionId) { + jPartitionsList <- callJMethod(getJRDD(x), "collectPartitions", as.list(as.integer(partitionId))) jList <- jPartitionsList[[1]] convertJListToRList(jList, flatten = TRUE, - serializedMode = getSerializedMode(rdd)) + serializedMode = getSerializedMode(x)) }) #' @rdname collect-methods @@ -419,16 +416,16 @@ setMethod("collectPartition", #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L) #' collectAsMap(rdd) # list(`1` = 2, `3` = 4) #'} -setGeneric("collectAsMap", function(rdd) { standardGeneric("collectAsMap") }) +setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") }) #' @rdname collect-methods #' @aliases collectAsMap,RDD-method setMethod("collectAsMap", - signature(rdd = "RDD"), - function(rdd) { - pairList <- collect(rdd) + signature(x = "RDD"), + function(x) { + pairList <- collect(x) map <- new.env() - lapply(pairList, function(x) { assign(as.character(x[[1]]), x[[2]], envir = map) }) + lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) }) as.list(map) }) @@ -474,7 +471,7 @@ setMethod("length", #' #' Same as countByValue in Spark. #' -#' @param rdd The RDD to count +#' @param x The RDD to count #' @return list of (value, count) pairs, where count is number of each unique #' value in rdd. #' @rdname countByValue @@ -485,15 +482,15 @@ setMethod("length", #' rdd <- parallelize(sc, c(1,2,3,2,1)) #' countByValue(rdd) # (1,2L), (2,2L), (3,1L) #'} -setGeneric("countByValue", function(rdd) { standardGeneric("countByValue") }) +setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) #' @rdname countByValue #' @aliases countByValue,RDD-method setMethod("countByValue", - signature(rdd = "RDD"), - function(rdd) { - ones <- lapply(rdd, function(item) { list(item, 1L) }) - collect(reduceByKey(ones, `+`, numPartitions(rdd))) + signature(x = "RDD"), + function(x) { + ones <- lapply(x, function(item) { list(item, 1L) }) + collect(reduceByKey(ones, `+`, numPartitions(x))) }) #' Apply a function to all elements @@ -710,26 +707,26 @@ setMethod("Filter", #' rdd <- parallelize(sc, 1:10) #' reduce(rdd, "+") # 55 #'} -setGeneric("reduce", function(rdd, func) { standardGeneric("reduce") }) +setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) #' @rdname reduce #' @aliases reduce,RDD,ANY-method setMethod("reduce", - signature(rdd = "RDD", func = "ANY"), - function(rdd, func) { + signature(x = "RDD", func = "ANY"), + function(x, func) { reducePartition <- function(part) { Reduce(func, part) } - partitionList <- collect(lapplyPartition(rdd, reducePartition), + partitionList <- collect(lapplyPartition(x, reducePartition), flatten = FALSE) Reduce(func, partitionList) }) #' Get the maximum element of an RDD. #' -#' @param rdd The RDD to get the maximum element from +#' @param x The RDD to get the maximum element from #' @export #' @rdname maximum #' @examples @@ -738,19 +735,19 @@ setMethod("reduce", #' rdd <- parallelize(sc, 1:10) #' maximum(rdd) # 10 #'} -setGeneric("maximum", function(rdd) { standardGeneric("maximum") }) +setGeneric("maximum", function(x) { standardGeneric("maximum") }) #' @rdname maximum #' @aliases maximum,RDD setMethod("maximum", - signature(rdd = "RDD"), - function(rdd) { - reduce(rdd, max) + signature(x = "RDD"), + function(x) { + reduce(x, max) }) #' Get the minimum element of an RDD. #' -#' @param rdd The RDD to get the minimum element from +#' @param x The RDD to get the minimum element from #' @export #' @rdname minimum #' @examples @@ -759,19 +756,19 @@ setMethod("maximum", #' rdd <- parallelize(sc, 1:10) #' minimum(rdd) # 1 #'} -setGeneric("minimum", function(rdd) { standardGeneric("minimum") }) +setGeneric("minimum", function(x) { standardGeneric("minimum") }) #' @rdname minimum #' @aliases minimum,RDD setMethod("minimum", - signature(rdd = "RDD"), - function(rdd) { - reduce(rdd, min) + signature(x = "RDD"), + function(x) { + reduce(x, min) }) #' Applies a function to all elements in an RDD, and force evaluation. #' -#' @param rdd The RDD to apply the function +#' @param x The RDD to apply the function #' @param func The function to be applied. #' @return invisible NULL. #' @export @@ -782,18 +779,18 @@ setMethod("minimum", #' rdd <- parallelize(sc, 1:10) #' foreach(rdd, function(x) { save(x, file=...) }) #'} -setGeneric("foreach", function(rdd, func) { standardGeneric("foreach") }) +setGeneric("foreach", function(x, func) { standardGeneric("foreach") }) #' @rdname foreach #' @aliases foreach,RDD,function-method setMethod("foreach", - signature(rdd = "RDD", func = "function"), - function(rdd, func) { + signature(x = "RDD", func = "function"), + function(x, func) { partition.func <- function(x) { lapply(x, func) NULL } - invisible(collect(mapPartitions(rdd, partition.func))) + invisible(collect(mapPartitions(x, partition.func))) }) #' Applies a function to each partition in an RDD, and force evaluation. @@ -807,14 +804,14 @@ setMethod("foreach", #' foreachPartition(rdd, function(part) { save(part, file=...); NULL }) #'} setGeneric("foreachPartition", - function(rdd, func) { standardGeneric("foreachPartition") }) + function(x, func) { standardGeneric("foreachPartition") }) #' @rdname foreach #' @aliases foreachPartition,RDD,function-method setMethod("foreachPartition", - signature(rdd = "RDD", func = "function"), - function(rdd, func) { - invisible(collect(mapPartitions(rdd, func))) + signature(x = "RDD", func = "function"), + function(x, func) { + invisible(collect(mapPartitions(x, func))) }) #' Take elements from an RDD. @@ -822,7 +819,7 @@ setMethod("foreachPartition", #' This function takes the first NUM elements in the RDD and #' returns them in a list. #' -#' @param rdd The RDD to take elements from +#' @param x The RDD to take elements from #' @param num Number of elements to take #' @rdname take #' @export @@ -832,17 +829,17 @@ setMethod("foreachPartition", #' rdd <- parallelize(sc, 1:10) #' take(rdd, 2L) # list(1, 2) #'} -setGeneric("take", function(rdd, num) { standardGeneric("take") }) +setGeneric("take", function(x, num) { standardGeneric("take") }) #' @rdname take #' @aliases take,RDD,numeric-method setMethod("take", - signature(rdd = "RDD", num = "numeric"), - function(rdd, num) { + signature(x = "RDD", num = "numeric"), + function(x, num) { resList <- list() index <- -1 - jrdd <- getJRDD(rdd) - numPartitions <- numPartitions(rdd) + jrdd <- getJRDD(x) + numPartitions <- numPartitions(x) # TODO(shivaram): Collect more than one partition based on size # estimates similar to the scala version of `take`. @@ -861,7 +858,7 @@ setMethod("take", elems <- convertJListToRList(partition, flatten = TRUE, logicalUpperBound = size, - serializedMode = getSerializedMode(rdd)) + serializedMode = getSerializedMode(x)) # TODO: Check if this append is O(n^2)? resList <- append(resList, elems) } @@ -873,7 +870,7 @@ setMethod("take", #' This function returns a new RDD containing the distinct elements in the #' given RDD. The same as `distinct()' in Spark. #' -#' @param rdd The RDD to remove duplicates from. +#' @param x The RDD to remove duplicates from. #' @param numPartitions Number of partitions to create. #' @rdname distinct #' @export @@ -884,18 +881,18 @@ setMethod("take", #' sort(unlist(collect(distinct(rdd)))) # c(1, 2, 3) #'} setGeneric("distinct", - function(rdd, numPartitions) { standardGeneric("distinct") }) + function(x, numPartitions) { standardGeneric("distinct") }) setClassUnion("missingOrInteger", c("missing", "integer")) #' @rdname distinct #' @aliases distinct,RDD,missingOrInteger-method setMethod("distinct", - signature(rdd = "RDD", numPartitions = "missingOrInteger"), - function(rdd, numPartitions) { + signature(x = "RDD", numPartitions = "missingOrInteger"), + function(x, numPartitions) { if (missing(numPartitions)) { - numPartitions <- SparkR::numPartitions(rdd) + numPartitions <- SparkR::numPartitions(x) } - identical.mapped <- lapply(rdd, function(x) { list(x, NULL) }) + identical.mapped <- lapply(x, function(x) { list(x, NULL) }) reduced <- reduceByKey(identical.mapped, function(x, y) { x }, numPartitions) @@ -908,7 +905,7 @@ setMethod("distinct", #' The same as `sample()' in Spark. (We rename it due to signature #' inconsistencies with the `sample()' function in R's base package.) #' -#' @param rdd The RDD to sample elements from +#' @param x The RDD to sample elements from #' @param withReplacement Sampling with replacement or not #' @param fraction The (rough) sample target fraction #' @param seed Randomness seed value @@ -922,16 +919,16 @@ setMethod("distinct", #' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates #'} setGeneric("sampleRDD", - function(rdd, withReplacement, fraction, seed) { + function(x, withReplacement, fraction, seed) { standardGeneric("sampleRDD") }) #' @rdname sampleRDD #' @aliases sampleRDD,RDD setMethod("sampleRDD", - signature(rdd = "RDD", withReplacement = "logical", + signature(x = "RDD", withReplacement = "logical", fraction = "numeric", seed = "integer"), - function(rdd, withReplacement, fraction, seed) { + function(x, withReplacement, fraction, seed) { # The sampler: takes a partition and returns its sampled version. samplingFunc <- function(split, part) { @@ -968,13 +965,13 @@ setMethod("sampleRDD", list() } - lapplyPartitionsWithIndex(rdd, samplingFunc) + lapplyPartitionsWithIndex(x, samplingFunc) }) #' Return a list of the elements that are a sampled subset of the given RDD. #' -#' @param rdd The RDD to sample elements from +#' @param x The RDD to sample elements from #' @param withReplacement Sampling with replacement or not #' @param num Number of elements to return #' @param seed Randomness seed value @@ -990,19 +987,19 @@ setMethod("sampleRDD", #' takeSample(rdd, FALSE, 5L, 16181618L) #'} setGeneric("takeSample", - function(rdd, withReplacement, num, seed) { + function(x, withReplacement, num, seed) { standardGeneric("takeSample") }) #' @rdname takeSample #' @aliases takeSample,RDD -setMethod("takeSample", signature(rdd = "RDD", withReplacement = "logical", +setMethod("takeSample", signature(x = "RDD", withReplacement = "logical", num = "integer", seed = "integer"), - function(rdd, withReplacement, num, seed) { + function(x, withReplacement, num, seed) { # This function is ported from RDD.scala. fraction <- 0.0 total <- 0 multiplier <- 3.0 - initialCount <- count(rdd) + initialCount <- count(x) maxSelected <- 0 MAXINT <- .Machine$integer.max @@ -1024,7 +1021,7 @@ setMethod("takeSample", signature(rdd = "RDD", withReplacement = "logical", } set.seed(seed) - samples <- collect(sampleRDD(rdd, withReplacement, fraction, + samples <- collect(sampleRDD(x, withReplacement, fraction, as.integer(ceiling(runif(1, -MAXINT, MAXINT))))) @@ -1032,7 +1029,7 @@ setMethod("takeSample", signature(rdd = "RDD", withReplacement = "logical", # take samples; this shouldn't happen often because we use a big # multiplier for thei initial size while (length(samples) < total) - samples <- collect(sampleRDD(rdd, withReplacement, fraction, + samples <- collect(sampleRDD(x, withReplacement, fraction, as.integer(ceiling(runif(1, -MAXINT, MAXINT))))) @@ -1043,7 +1040,7 @@ setMethod("takeSample", signature(rdd = "RDD", withReplacement = "logical", #' Creates tuples of the elements in this RDD by applying a function. #' -#' @param rdd The RDD. +#' @param x The RDD. #' @param func The function to be applied. #' @rdname keyBy #' @export @@ -1053,22 +1050,22 @@ setMethod("takeSample", signature(rdd = "RDD", withReplacement = "logical", #' rdd <- parallelize(sc, list(1, 2, 3)) #' collect(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3)) #'} -setGeneric("keyBy", function(rdd, func) { standardGeneric("keyBy") }) +setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) #' @rdname keyBy #' @aliases keyBy,RDD setMethod("keyBy", - signature(rdd = "RDD", func = "function"), - function(rdd, func) { + signature(x = "RDD", func = "function"), + function(x, func) { apply.func <- function(x) { list(func(x), x) } - lapply(rdd, apply.func) + lapply(x, apply.func) }) #' Save this RDD as a SequenceFile of serialized objects. #' -#' @param rdd The RDD to save +#' @param x The RDD to save #' @param path The directory where the file is saved #' @rdname saveAsObjectFile #' @seealso objectFile @@ -1079,25 +1076,25 @@ setMethod("keyBy", #' rdd <- parallelize(sc, 1:3) #' saveAsObjectFile(rdd, "/tmp/sparkR-tmp") #'} -setGeneric("saveAsObjectFile", function(rdd, path) { standardGeneric("saveAsObjectFile") }) +setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") }) #' @rdname saveAsObjectFile #' @aliases saveAsObjectFile,RDD setMethod("saveAsObjectFile", - signature(rdd = "RDD", path = "character"), - function(rdd, path) { + signature(x = "RDD", path = "character"), + function(x, path) { # If serializedMode == "string" we need to serialize the data before saving it since # objectFile() assumes serializedMode == "byte". - if (getSerializedMode(rdd) != "byte") { - rdd <- serializeToBytes(rdd) + if (getSerializedMode(x) != "byte") { + x <- serializeToBytes(x) } # Return nothing - invisible(callJMethod(getJRDD(rdd), "saveAsObjectFile", path)) + invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path)) }) #' Save this RDD as a text file, using string representations of elements. #' -#' @param rdd The RDD to save +#' @param x The RDD to save #' @param path The directory where the splits of the text file are saved #' @rdname saveAsTextFile #' @export @@ -1107,17 +1104,17 @@ setMethod("saveAsObjectFile", #' rdd <- parallelize(sc, 1:3) #' saveAsTextFile(rdd, "/tmp/sparkR-tmp") #'} -setGeneric("saveAsTextFile", function(rdd, path) { standardGeneric("saveAsTextFile") }) +setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") }) #' @rdname saveAsTextFile #' @aliases saveAsTextFile,RDD setMethod("saveAsTextFile", - signature(rdd = "RDD", path = "character"), - function(rdd, path) { - func <- function(x) { - toString(x) + signature(x = "RDD", path = "character"), + function(x, path) { + func <- function(str) { + toString(str) } - stringRdd <- lapply(rdd, func) + stringRdd <- lapply(x, func) # Return nothing invisible( callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path)) @@ -1125,7 +1122,7 @@ setMethod("saveAsTextFile", #' Sort an RDD by the given key function. #' -#' @param rdd An RDD to be sorted. +#' @param x An RDD to be sorted. #' @param func A function used to compute the sort key for each element. #' @param ascending A flag to indicate whether the sorting is ascending or descending. #' @param numPartitions Number of partitions to create. @@ -1138,7 +1135,7 @@ setMethod("saveAsTextFile", #' rdd <- parallelize(sc, list(3, 2, 1)) #' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3) #'} -setGeneric("sortBy", function(rdd, +setGeneric("sortBy", function(x, func, ascending = TRUE, numPartitions = 1L) { @@ -1148,20 +1145,20 @@ setGeneric("sortBy", function(rdd, #' @rdname sortBy #' @aliases sortBy,RDD,RDD-method setMethod("sortBy", - signature(rdd = "RDD", func = "function"), - function(rdd, func, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) { - values(sortByKey(keyBy(rdd, func), ascending, numPartitions)) + signature(x = "RDD", func = "function"), + function(x, func, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) { + values(sortByKey(keyBy(x, func), ascending, numPartitions)) }) # Helper function to get first N elements from an RDD in the specified order. # Param: -# rdd An RDD. +# x An RDD. # num Number of elements to return. # ascending A flag to indicate whether the sorting is ascending or descending. # Return: # A list of the first N elements from the RDD in the specified order. # -takeOrderedElem <- function(rdd, num, ascending = TRUE) { +takeOrderedElem <- function(x, num, ascending = TRUE) { if (num <= 0L) { return(list()) } @@ -1183,13 +1180,13 @@ takeOrderedElem <- function(rdd, num, ascending = TRUE) { newElems[ord[1:num]] } - newRdd <- mapPartitions(rdd, partitionFunc) + newRdd <- mapPartitions(x, partitionFunc) reduce(newRdd, reduceFunc) } #' Returns the first N elements from an RDD in ascending order. #' -#' @param rdd An RDD. +#' @param x An RDD. #' @param num Number of elements to return. #' @return The first N elements from the RDD in ascending order. #' @rdname takeOrdered @@ -1200,19 +1197,19 @@ takeOrderedElem <- function(rdd, num, ascending = TRUE) { #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) #' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6) #'} -setGeneric("takeOrdered", function(rdd, num) { standardGeneric("takeOrdered") }) +setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") }) #' @rdname takeOrdered #' @aliases takeOrdered,RDD,RDD-method setMethod("takeOrdered", - signature(rdd = "RDD", num = "integer"), - function(rdd, num) { - takeOrderedElem(rdd, num) + signature(x = "RDD", num = "integer"), + function(x, num) { + takeOrderedElem(x, num) }) #' Returns the top N elements from an RDD. #' -#' @param rdd An RDD. +#' @param x An RDD. #' @param num Number of elements to return. #' @return The top N elements from the RDD. #' @rdname top @@ -1223,14 +1220,14 @@ setMethod("takeOrdered", #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7)) #' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4) #'} -setGeneric("top", function(rdd, num) { standardGeneric("top") }) +setGeneric("top", function(x, num) { standardGeneric("top") }) #' @rdname top #' @aliases top,RDD,RDD-method setMethod("top", - signature(rdd = "RDD", num = "integer"), - function(rdd, num) { - takeOrderedElem(rdd, num, FALSE) + signature(x = "RDD", num = "integer"), + function(x, num) { + takeOrderedElem(x, num, FALSE) }) #' Fold an RDD using a given associative function and a neutral "zero value". @@ -1238,7 +1235,7 @@ setMethod("top", #' Aggregate the elements of each partition, and then the results for all the #' partitions, using a given associative function and a neutral "zero value". #' -#' @param rdd An RDD. +#' @param x An RDD. #' @param zeroValue A neutral "zero value". #' @param op An associative function for the folding operation. #' @return The folding result. @@ -1251,14 +1248,14 @@ setMethod("top", #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5)) #' fold(rdd, 0, "+") # 15 #'} -setGeneric("fold", function(rdd, zeroValue, op) { standardGeneric("fold") }) +setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") }) #' @rdname fold #' @aliases fold,RDD,RDD-method setMethod("fold", - signature(rdd = "RDD", zeroValue = "ANY", op = "ANY"), - function(rdd, zeroValue, op) { - aggregateRDD(rdd, zeroValue, op, op) + signature(x = "RDD", zeroValue = "ANY", op = "ANY"), + function(x, zeroValue, op) { + aggregateRDD(x, zeroValue, op, op) }) #' Aggregate an RDD using the given combine functions and a neutral "zero value". @@ -1266,7 +1263,7 @@ setMethod("fold", #' Aggregate the elements of each partition, and then the results for all the #' partitions, using given combine functions and a neutral "zero value". #' -#' @param rdd An RDD. +#' @param x An RDD. #' @param zeroValue A neutral "zero value". #' @param seqOp A function to aggregate the RDD elements. It may return a different #' result type from the type of the RDD elements. @@ -1284,26 +1281,63 @@ setMethod("fold", #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } #' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4) #'} -setGeneric("aggregateRDD", function(rdd, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) +setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) #' @rdname aggregateRDD #' @aliases aggregateRDD,RDD,RDD-method setMethod("aggregateRDD", - signature(rdd = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"), - function(rdd, zeroValue, seqOp, combOp) { + signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"), + function(x, zeroValue, seqOp, combOp) { partitionFunc <- function(part) { Reduce(seqOp, part, zeroValue) } - partitionList <- collect(lapplyPartition(rdd, partitionFunc), + partitionList <- collect(lapplyPartition(x, partitionFunc), flatten = FALSE) Reduce(combOp, partitionList, zeroValue) }) +#' Pipes elements to a forked external process. +#' +#' The same as 'pipe()' in Spark. +#' +#' @param x The RDD whose elements are piped to the forked external process. +#' @param command The command to fork an external process. +#' @param env A named list to set environment variables of the external process. +#' @return A new RDD created by piping all elements to a forked external process. +#' @rdname pipeRDD +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, 1:10) +#' collect(pipeRDD(rdd, "more") +#' Output: c("1", "2", ..., "10") +#'} +setGeneric("pipeRDD", function(x, command, env = list()) { + standardGeneric("pipeRDD") +}) + +#' @rdname pipeRDD +#' @aliases pipeRDD,RDD,character-method +setMethod("pipeRDD", + signature(x = "RDD", command = "character"), + function(x, command, env = list()) { + func <- function(part) { + trim.trailing.func <- function(x) { + sub("[\r\n]*$", "", toString(x)) + } + input <- unlist(lapply(part, trim.trailing.func)) + res <- system2(command, stdout = TRUE, input = input, env = env) + lapply(res, trim.trailing.func) + } + lapplyPartition(x, func) + }) + # TODO: Consider caching the name in the RDD's environment #' Return an RDD's name. #' -#' @param rdd The RDD whose name is returned. +#' @param x The RDD whose name is returned. #' @rdname name #' @export #' @examples @@ -1312,19 +1346,19 @@ setMethod("aggregateRDD", #' rdd <- parallelize(sc, list(1,2,3)) #' name(rdd) # NULL (if not set before) #'} -setGeneric("name", function(rdd) { standardGeneric("name") }) +setGeneric("name", function(x) { standardGeneric("name") }) #' @rdname name #' @aliases name,RDD setMethod("name", - signature(rdd = "RDD"), - function(rdd) { - callJMethod(getJRDD(rdd), "name") + signature(x = "RDD"), + function(x) { + callJMethod(getJRDD(x), "name") }) #' Set an RDD's name. #' -#' @param rdd The RDD whose name is to be set. +#' @param x The RDD whose name is to be set. #' @param name The RDD name to be set. #' @return a new RDD renamed. #' @rdname setName @@ -1336,15 +1370,15 @@ setMethod("name", #' setName(rdd, "myRDD") #' name(rdd) # "myRDD" #'} -setGeneric("setName", function(rdd, name) { standardGeneric("setName") }) +setGeneric("setName", function(x, name) { standardGeneric("setName") }) #' @rdname setName #' @aliases setName,RDD setMethod("setName", - signature(rdd = "RDD", name = "character"), - function(rdd, name) { - callJMethod(getJRDD(rdd), "setName", name) - rdd + signature(x = "RDD", name = "character"), + function(x, name) { + callJMethod(getJRDD(x), "setName", name) + x }) ############ Binary Functions ############# diff --git a/pkg/R/context.R b/pkg/R/context.R index 90f338cea47f7..90a48c0188101 100644 --- a/pkg/R/context.R +++ b/pkg/R/context.R @@ -180,7 +180,7 @@ includePackage <- function(sc, pkg) { #'} broadcast <- function(sc, object) { objName <- as.character(substitute(object)) - serializedObj <- serialize(object, connection = NULL, ascii = TRUE) + serializedObj <- serialize(object, connection = NULL) jBroadcast <- callJMethod(sc, "broadcast", serializedObj) id <- as.character(callJMethod(jBroadcast, "id")) diff --git a/pkg/R/pairRDD.R b/pkg/R/pairRDD.R index b65d4070108c8..2b3c2a0bc8052 100644 --- a/pkg/R/pairRDD.R +++ b/pkg/R/pairRDD.R @@ -7,7 +7,7 @@ #' @description #' \code{lookup} returns a list of values in this RDD for key key. #' -#' @param rdd The RDD to collect +#' @param x The RDD to collect #' @param key The key to look up for #' @return a list of values in this RDD for key key #' @rdname lookup @@ -19,18 +19,18 @@ #' rdd <- parallelize(sc, pairs) #' lookup(rdd, 1) # list(1, 3) #'} -setGeneric("lookup", function(rdd, key) { standardGeneric("lookup") }) +setGeneric("lookup", function(x, key) { standardGeneric("lookup") }) #' @rdname lookup #' @aliases lookup,RDD-method setMethod("lookup", - signature(rdd = "RDD", key = "ANY"), - function(rdd, key) { + signature(x = "RDD", key = "ANY"), + function(x, key) { partitionFunc <- function(part) { - filtered <- part[unlist(lapply(part, function(x) { identical(key, x[[1]]) }))] - lapply(filtered, function(x) { x[[2]] }) + filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))] + lapply(filtered, function(i) { i[[2]] }) } - valsRDD <- lapplyPartition(rdd, partitionFunc) + valsRDD <- lapplyPartition(x, partitionFunc) collect(valsRDD) }) @@ -39,7 +39,7 @@ setMethod("lookup", #' #' Same as countByKey in Spark. #' -#' @param rdd The RDD to count keys. +#' @param x The RDD to count keys. #' @return list of (key, count) pairs, where count is number of each key in rdd. #' @rdname countByKey #' @export @@ -49,20 +49,20 @@ setMethod("lookup", #' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1))) #' countByKey(rdd) # ("a", 2L), ("b", 1L) #'} -setGeneric("countByKey", function(rdd) { standardGeneric("countByKey") }) +setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) #' @rdname countByKey #' @aliases countByKey,RDD-method setMethod("countByKey", - signature(rdd = "RDD"), - function(rdd) { - keys <- lapply(rdd, function(item) { item[[1]] }) + signature(x = "RDD"), + function(x) { + keys <- lapply(x, function(item) { item[[1]] }) countByValue(keys) }) #' Return an RDD with the keys of each tuple. #' -#' @param rdd The RDD from which the keys of each tuple is returned. +#' @param x The RDD from which the keys of each tuple is returned. #' @rdname keys #' @export #' @examples @@ -71,22 +71,22 @@ setMethod("countByKey", #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) #' collect(keys(rdd)) # list(1, 3) #'} -setGeneric("keys", function(rdd) { standardGeneric("keys") }) +setGeneric("keys", function(x) { standardGeneric("keys") }) #' @rdname keys #' @aliases keys,RDD setMethod("keys", - signature(rdd = "RDD"), - function(rdd) { - func <- function(x) { - x[[1]] + signature(x = "RDD"), + function(x) { + func <- function(k) { + k[[1]] } - lapply(rdd, func) + lapply(x, func) }) #' Return an RDD with the values of each tuple. #' -#' @param rdd The RDD from which the values of each tuple is returned. +#' @param x The RDD from which the values of each tuple is returned. #' @rdname values #' @export #' @examples @@ -95,17 +95,17 @@ setMethod("keys", #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) #' collect(values(rdd)) # list(2, 4) #'} -setGeneric("values", function(rdd) { standardGeneric("values") }) +setGeneric("values", function(x) { standardGeneric("values") }) #' @rdname values #' @aliases values,RDD setMethod("values", - signature(rdd = "RDD"), - function(rdd) { - func <- function(x) { - x[[2]] + signature(x = "RDD"), + function(x) { + func <- function(v) { + v[[2]] } - lapply(rdd, func) + lapply(x, func) }) #' Applies a function to all values of the elements, without modifying the keys. @@ -176,7 +176,7 @@ setMethod("flatMapValues", #' For each element of this RDD, the partitioner is used to compute a hash #' function and the RDD is partitioned using this hash value. #' -#' @param rdd The RDD to partition. Should be an RDD where each element is +#' @param x The RDD to partition. Should be an RDD where each element is #' list(K, V) or c(K, V). #' @param numPartitions Number of partitions to create. #' @param ... Other optional arguments to partitionBy. @@ -195,15 +195,15 @@ setMethod("flatMapValues", #' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4) #'} setGeneric("partitionBy", - function(rdd, numPartitions, ...) { + function(x, numPartitions, ...) { standardGeneric("partitionBy") }) #' @rdname partitionBy #' @aliases partitionBy,RDD,integer-method setMethod("partitionBy", - signature(rdd = "RDD", numPartitions = "integer"), - function(rdd, numPartitions, partitionFunc = hashCode) { + signature(x = "RDD", numPartitions = "integer"), + function(x, numPartitions, partitionFunc = hashCode) { #if (missing(partitionFunc)) { # partitionFunc <- hashCode @@ -212,15 +212,13 @@ setMethod("partitionBy", depsBinArr <- getDependencies(partitionFunc) serializedHashFuncBytes <- serialize(as.character(substitute(partitionFunc)), - connection = NULL, - ascii = TRUE) + connection = NULL) packageNamesArr <- serialize(.sparkREnv$.packages, - connection = NULL, - ascii = TRUE) + connection = NULL) broadcastArr <- lapply(ls(.broadcastNames), function(name) { get(name, .broadcastNames) }) - jrdd <- getJRDD(rdd) + jrdd <- getJRDD(x) # We create a PairwiseRRDD that extends RDD[(Array[Byte], # Array[Byte])], where the key is the hashed split, the value is @@ -229,7 +227,7 @@ setMethod("partitionBy", callJMethod(jrdd, "rdd"), as.integer(numPartitions), serializedHashFuncBytes, - getSerializedMode(rdd), + getSerializedMode(x), depsBinArr, packageNamesArr, as.character(.sparkREnv$libname), @@ -256,7 +254,7 @@ setMethod("partitionBy", #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). #' and group values for each key in the RDD into a single sequence. #' -#' @param rdd The RDD to group. Should be an RDD where each element is +#' @param x The RDD to group. Should be an RDD where each element is #' list(K, V) or c(K, V). #' @param numPartitions Number of partitions to create. #' @return An RDD where each element is list(K, list(V)) @@ -273,27 +271,27 @@ setMethod("partitionBy", #' grouped[[1]] # Should be a list(1, list(2, 4)) #'} setGeneric("groupByKey", - function(rdd, numPartitions) { + function(x, numPartitions) { standardGeneric("groupByKey") }) #' @rdname groupByKey #' @aliases groupByKey,RDD,integer-method setMethod("groupByKey", - signature(rdd = "RDD", numPartitions = "integer"), - function(rdd, numPartitions) { - shuffled <- partitionBy(rdd, numPartitions) + signature(x = "RDD", numPartitions = "integer"), + function(x, numPartitions) { + shuffled <- partitionBy(x, numPartitions) groupVals <- function(part) { vals <- new.env() keys <- new.env() pred <- function(item) exists(item$hash, keys) - appendList <- function(acc, x) { - addItemToAccumulator(acc, x) + appendList <- function(acc, i) { + addItemToAccumulator(acc, i) acc } - makeList <- function(x) { + makeList <- function(i) { acc <- initAccumulator() - addItemToAccumulator(acc, x) + addItemToAccumulator(acc, i) acc } # Each item in the partition is list of (K, V) @@ -305,9 +303,9 @@ setMethod("groupByKey", }) # extract out data field vals <- eapply(vals, - function(x) { - length(x$data) <- x$counter - x$data + function(i) { + length(i$data) <- i$counter + i$data }) # Every key in the environment contains a list # Convert that to list(K, Seq[V]) @@ -321,7 +319,7 @@ setMethod("groupByKey", #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). #' and merges the values for each key using an associative reduce function. #' -#' @param rdd The RDD to reduce by key. Should be an RDD where each element is +#' @param x The RDD to reduce by key. Should be an RDD where each element is #' list(K, V) or c(K, V). #' @param combineFunc The associative reduce function to use. #' @param numPartitions Number of partitions to create. @@ -340,15 +338,15 @@ setMethod("groupByKey", #' reduced[[1]] # Should be a list(1, 6) #'} setGeneric("reduceByKey", - function(rdd, combineFunc, numPartitions) { + function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey") }) #' @rdname reduceByKey #' @aliases reduceByKey,RDD,integer-method setMethod("reduceByKey", - signature(rdd = "RDD", combineFunc = "ANY", numPartitions = "integer"), - function(rdd, combineFunc, numPartitions) { + signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"), + function(x, combineFunc, numPartitions) { reduceVals <- function(part) { vals <- new.env() keys <- new.env() @@ -360,7 +358,7 @@ setMethod("reduceByKey", }) convertEnvsToList(keys, vals) } - locallyReduced <- lapplyPartition(rdd, reduceVals) + locallyReduced <- lapplyPartition(x, reduceVals) shuffled <- partitionBy(locallyReduced, numPartitions) lapplyPartition(shuffled, reduceVals) }) @@ -371,7 +369,7 @@ setMethod("reduceByKey", #' and merges the values for each key using an associative reduce function, but return the #' results immediately to the driver as an R list. #' -#' @param rdd The RDD to reduce by key. Should be an RDD where each element is +#' @param x The RDD to reduce by key. Should be an RDD where each element is #' list(K, V) or c(K, V). #' @param combineFunc The associative reduce function to use. #' @return A list of elements of type list(K, V') where V' is the merged value for each key @@ -387,15 +385,15 @@ setMethod("reduceByKey", #' reduced # list(list(1, 6), list(1.1, 3)) #'} setGeneric("reduceByKeyLocally", - function(rdd, combineFunc) { + function(x, combineFunc) { standardGeneric("reduceByKeyLocally") }) #' @rdname reduceByKeyLocally #' @aliases reduceByKeyLocally,RDD,integer-method setMethod("reduceByKeyLocally", - signature(rdd = "RDD", combineFunc = "ANY"), - function(rdd, combineFunc) { + signature(x = "RDD", combineFunc = "ANY"), + function(x, combineFunc) { reducePart <- function(part) { vals <- new.env() keys <- new.env() @@ -419,7 +417,7 @@ setMethod("reduceByKeyLocally", }) accum } - reduced <- mapPartitions(rdd, reducePart) + reduced <- mapPartitions(x, reducePart) merged <- reduce(reduced, mergeParts) convertEnvsToList(merged[[1]], merged[[2]]) }) @@ -439,7 +437,7 @@ setMethod("reduceByKeyLocally", #' two lists). #' } #' -#' @param rdd The RDD to combine. Should be an RDD where each element is +#' @param x The RDD to combine. Should be an RDD where each element is #' list(K, V) or c(K, V). #' @param createCombiner Create a combiner (C) given a value (V) #' @param mergeValue Merge the given value (V) with an existing combiner (C) @@ -460,16 +458,16 @@ setMethod("reduceByKeyLocally", #' combined[[1]] # Should be a list(1, 6) #'} setGeneric("combineByKey", - function(rdd, createCombiner, mergeValue, mergeCombiners, numPartitions) { + function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { standardGeneric("combineByKey") }) #' @rdname combineByKey #' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method setMethod("combineByKey", - signature(rdd = "RDD", createCombiner = "ANY", mergeValue = "ANY", + signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY", mergeCombiners = "ANY", numPartitions = "integer"), - function(rdd, createCombiner, mergeValue, mergeCombiners, numPartitions) { + function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { combineLocally <- function(part) { combiners <- new.env() keys <- new.env() @@ -481,7 +479,7 @@ setMethod("combineByKey", }) convertEnvsToList(keys, combiners) } - locallyCombined <- lapplyPartition(rdd, combineLocally) + locallyCombined <- lapplyPartition(x, combineLocally) shuffled <- partitionBy(locallyCombined, numPartitions) mergeAfterShuffle <- function(part) { combiners <- new.env() @@ -497,6 +495,88 @@ setMethod("combineByKey", lapplyPartition(shuffled, mergeAfterShuffle) }) +#' Aggregate a pair RDD by each key. +#' +#' Aggregate the values of each key in an RDD, using given combine functions +#' and a neutral "zero value". This function can return a different result type, +#' U, than the type of the values in this RDD, V. Thus, we need one operation +#' for merging a V into a U and one operation for merging two U's, The former +#' operation is used for merging values within a partition, and the latter is +#' used for merging values between partitions. To avoid memory allocation, both +#' of these functions are allowed to modify and return their first argument +#' instead of creating a new U. +#' +#' @param x An RDD. +#' @param zeroValue A neutral "zero value". +#' @param seqOp A function to aggregate the values of each key. It may return +#' a different result type from the type of the values. +#' @param combOp A function to aggregate results of seqOp. +#' @return An RDD containing the aggregation result. +#' @rdname aggregateByKey +#' @seealso foldByKey, combineByKey +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +#' zeroValue <- list(0, 0) +#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } +#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } +#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) +#' # list(list(1, list(3, 2)), list(2, list(7, 2))) +#'} +setGeneric("aggregateByKey", + function(x, zeroValue, seqOp, combOp, numPartitions) { + standardGeneric("aggregateByKey") + }) + +#' @rdname aggregateByKey +#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method +setMethod("aggregateByKey", + signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", + combOp = "ANY", numPartitions = "integer"), + function(x, zeroValue, seqOp, combOp, numPartitions) { + createCombiner <- function(v) { + do.call(seqOp, list(zeroValue, v)) + } + + combineByKey(x, createCombiner, seqOp, combOp, numPartitions) + }) + +#' Fold a pair RDD by each key. +#' +#' Aggregate the values of each key in an RDD, using an associative function "func" +#' and a neutral "zero value" which may be added to the result an arbitrary +#' number of times, and must not change the result (e.g., 0 for addition, or +#' 1 for multiplication.). +#' +#' @param x An RDD. +#' @param zeroValue A neutral "zero value". +#' @param func An associative function for folding values of each key. +#' @return An RDD containing the aggregation result. +#' @rdname foldByKey +#' @seealso aggregateByKey, combineByKey +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7)) +#'} +setGeneric("foldByKey", + function(x, zeroValue, func, numPartitions) { + standardGeneric("foldByKey") + }) + +#' @rdname foldByKey +#' @aliases foldByKey,RDD,ANY,ANY,integer-method +setMethod("foldByKey", + signature(x = "RDD", zeroValue = "ANY", + func = "ANY", numPartitions = "integer"), + function(x, zeroValue, func, numPartitions) { + aggregateByKey(x, zeroValue, func, func, numPartitions) + }) + ############ Binary Functions ############# #' Join two RDDs @@ -505,9 +585,9 @@ setMethod("combineByKey", #' \code{join} This function joins two RDDs where every element is of the form list(K, V). #' The key types of the two RDDs should be the same. #' -#' @param rdd1 An RDD to be joined. Should be an RDD where each element is +#' @param x An RDD to be joined. Should be an RDD where each element is #' list(K, V). -#' @param rdd2 An RDD to be joined. Should be an RDD where each element is +#' @param y An RDD to be joined. Should be an RDD where each element is #' list(K, V). #' @param numPartitions Number of partitions to create. #' @return a new RDD containing all pairs of elements with matching keys in @@ -521,21 +601,21 @@ setMethod("combineByKey", #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) #' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)) #'} -setGeneric("join", function(rdd1, rdd2, numPartitions) { standardGeneric("join") }) +setGeneric("join", function(x, y, numPartitions) { standardGeneric("join") }) #' @rdname join-methods #' @aliases join,RDD,RDD-method setMethod("join", - signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"), - function(rdd1, rdd2, numPartitions) { - rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) }) - rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) }) + signature(x = "RDD", y = "RDD", numPartitions = "integer"), + function(x, y, numPartitions) { + xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) + yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) doJoin <- function(v) { joinTaggedList(v, list(FALSE, FALSE)) } - joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin) + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) #' Left outer join two RDDs @@ -544,12 +624,12 @@ setMethod("join", #' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V). #' The key types of the two RDDs should be the same. #' -#' @param rdd1 An RDD to be joined. Should be an RDD where each element is +#' @param x An RDD to be joined. Should be an RDD where each element is #' list(K, V). -#' @param rdd2 An RDD to be joined. Should be an RDD where each element is +#' @param y An RDD to be joined. Should be an RDD where each element is #' list(K, V). #' @param numPartitions Number of partitions to create. -#' @return For each element (k, v) in rdd1, the resulting RDD will either contain +#' @return For each element (k, v) in x, the resulting RDD will either contain #' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) #' if no elements in rdd2 have key k. #' @rdname join-methods @@ -562,21 +642,21 @@ setMethod("join", #' leftOuterJoin(rdd1, rdd2, 2L) #' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) #'} -setGeneric("leftOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("leftOuterJoin") }) +setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") }) #' @rdname join-methods #' @aliases leftOuterJoin,RDD,RDD-method setMethod("leftOuterJoin", - signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"), - function(rdd1, rdd2, numPartitions) { - rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) }) - rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) }) + signature(x = "RDD", y = "RDD", numPartitions = "integer"), + function(x, y, numPartitions) { + xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) + yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) doJoin <- function(v) { joinTaggedList(v, list(FALSE, TRUE)) } - joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin) + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) #' Right outer join two RDDs @@ -585,14 +665,14 @@ setMethod("leftOuterJoin", #' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V). #' The key types of the two RDDs should be the same. #' -#' @param rdd1 An RDD to be joined. Should be an RDD where each element is +#' @param x An RDD to be joined. Should be an RDD where each element is #' list(K, V). -#' @param rdd2 An RDD to be joined. Should be an RDD where each element is +#' @param y An RDD to be joined. Should be an RDD where each element is #' list(K, V). #' @param numPartitions Number of partitions to create. -#' @return For each element (k, w) in rdd2, the resulting RDD will either contain -#' all pairs (k, (v, w)) for (k, v) in rdd1, or the pair (k, (NULL, w)) -#' if no elements in rdd1 have key k. +#' @return For each element (k, w) in y, the resulting RDD will either contain +#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w)) +#' if no elements in x have key k. #' @rdname join-methods #' @export #' @examples @@ -603,21 +683,21 @@ setMethod("leftOuterJoin", #' rightOuterJoin(rdd1, rdd2, 2L) #' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) #'} -setGeneric("rightOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("rightOuterJoin") }) +setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") }) #' @rdname join-methods #' @aliases rightOuterJoin,RDD,RDD-method setMethod("rightOuterJoin", - signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"), - function(rdd1, rdd2, numPartitions) { - rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) }) - rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) }) + signature(x = "RDD", y = "RDD", numPartitions = "integer"), + function(x, y, numPartitions) { + xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) + yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) doJoin <- function(v) { joinTaggedList(v, list(TRUE, FALSE)) } - joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin) + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) #' Full outer join two RDDs @@ -626,15 +706,15 @@ setMethod("rightOuterJoin", #' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). #' The key types of the two RDDs should be the same. #' -#' @param rdd1 An RDD to be joined. Should be an RDD where each element is +#' @param x An RDD to be joined. Should be an RDD where each element is #' list(K, V). -#' @param rdd2 An RDD to be joined. Should be an RDD where each element is +#' @param y An RDD to be joined. Should be an RDD where each element is #' list(K, V). #' @param numPartitions Number of partitions to create. -#' @return For each element (k, v) in rdd1 and (k, w) in rdd2, the resulting RDD -#' will contain all pairs (k, (v, w)) for both (k, v) in rdd1 and and -#' (k, w) in rdd2, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements -#' in rdd1/rdd2 have key k. +#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD +#' will contain all pairs (k, (v, w)) for both (k, v) in x and +#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements +#' in x/y have key k. #' @rdname join-methods #' @export #' @examples @@ -647,22 +727,22 @@ setMethod("rightOuterJoin", #' # list(2, list(NULL, 4))) #' # list(3, list(3, NULL)), #'} -setGeneric("fullOuterJoin", function(rdd1, rdd2, numPartitions) { standardGeneric("fullOuterJoin") }) +setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") }) #' @rdname join-methods #' @aliases fullOuterJoin,RDD,RDD-method setMethod("fullOuterJoin", - signature(rdd1 = "RDD", rdd2 = "RDD", numPartitions = "integer"), - function(rdd1, rdd2, numPartitions) { - rdd1Tagged <- lapply(rdd1, function(x) { list(x[[1]], list(1L, x[[2]])) }) - rdd2Tagged <- lapply(rdd2, function(x) { list(x[[1]], list(2L, x[[2]])) }) + signature(x = "RDD", y = "RDD", numPartitions = "integer"), + function(x, y, numPartitions) { + xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) + yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) doJoin <- function(v) { joinTaggedList(v, list(TRUE, TRUE)) } - joined <- flatMapValues(groupByKey(unionRDD(rdd1Tagged, rdd2Tagged), numPartitions), doJoin) + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) #' For each key k in several RDDs, return a resulting RDD that @@ -729,7 +809,7 @@ setMethod("cogroup", #' Sort a (k, v) pair RDD by k. #' -#' @param rdd A (k, v) pair RDD to be sorted. +#' @param x A (k, v) pair RDD to be sorted. #' @param ascending A flag to indicate whether the sorting is ascending or descending. #' @param numPartitions Number of partitions to create. #' @return An RDD where all (k, v) pair elements are sorted. @@ -741,7 +821,7 @@ setMethod("cogroup", #' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) #' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) #'} -setGeneric("sortByKey", function(rdd, +setGeneric("sortByKey", function(x, ascending = TRUE, numPartitions = 1L) { standardGeneric("sortByKey") @@ -750,17 +830,17 @@ setGeneric("sortByKey", function(rdd, #' @rdname sortByKey #' @aliases sortByKey,RDD,RDD-method setMethod("sortByKey", - signature(rdd = "RDD"), - function(rdd, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) { + signature(x = "RDD"), + function(x, ascending = TRUE, numPartitions = SparkR::numPartitions(x)) { rangeBounds <- list() if (numPartitions > 1) { - rddSize <- count(rdd) + rddSize <- count(x) # constant from Spark's RangePartitioner maxSampleSize <- numPartitions * 20 fraction <- min(maxSampleSize / max(rddSize, 1), 1.0) - samples <- collect(keys(sampleRDD(rdd, FALSE, fraction, 1L))) + samples <- collect(keys(sampleRDD(x, FALSE, fraction, 1L))) # Note: the built-in R sort() function only works on atomic vectors samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending) @@ -793,7 +873,7 @@ setMethod("sortByKey", sortKeyValueList(part, decreasing = !ascending) } - newRDD <- partitionBy(rdd, numPartitions, rangePartitionFunc) + newRDD <- partitionBy(x, numPartitions, rangePartitionFunc) lapplyPartition(newRDD, partitionFunc) }) diff --git a/pkg/R/sparkR.R b/pkg/R/sparkR.R index f3a5dfa9745ec..757f830316f5e 100644 --- a/pkg/R/sparkR.R +++ b/pkg/R/sparkR.R @@ -4,7 +4,6 @@ assemblyJarName <- "sparkr-assembly-0.1.jar" sparkR.onLoad <- function(libname, pkgname) { assemblyJarPath <- paste(libname, "/SparkR/", assemblyJarName, sep = "") - assemblyJarPath <- gsub(" ", "\\ ", assemblyJarPath, fixed = T) packageStartupMessage("[SparkR] Initializing with classpath ", assemblyJarPath, "\n") .sparkREnv$libname <- libname @@ -83,14 +82,15 @@ sparkR.stop <- function(env = .sparkREnv) { #'} sparkR.init <- function( - master = "local", + master = "", appName = "SparkR", sparkHome = Sys.getenv("SPARK_HOME"), sparkEnvir = list(), sparkExecutorEnv = list(), sparkJars = "", sparkRLibDir = "", - sparkRBackendPort = 12345) { + sparkRBackendPort = as.integer(Sys.getenv("SPARKR_BACKEND_PORT", "12345")), + sparkRRetryCount = 6) { if (exists(".sparkRjsc", envir = .sparkREnv)) { cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n") @@ -98,21 +98,68 @@ sparkR.init <- function( } sparkMem <- Sys.getenv("SPARK_MEM", "512m") - jars <- c(as.character(.sparkREnv$assemblyJarPath), as.character(sparkJars)) - - cp <- paste0(jars, collapse = ":") + jars <- suppressWarnings( + normalizePath(c(as.character(.sparkREnv$assemblyJarPath), as.character(sparkJars)))) + + # Classpath separator is ";" on Windows + # URI needs four /// as from http://stackoverflow.com/a/18522792 + if (.Platform$OS.type == "unix") { + collapseChar <- ":" + uriSep <- "//" + } else { + collapseChar <- ";" + uriSep <- "////" + } + cp <- paste0(jars, collapse = collapseChar) yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "") if (yarn_conf_dir != "") { cp <- paste(cp, yarn_conf_dir, sep = ":") } - launchBackend(classPath = cp, - mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend", - args = as.character(sparkRBackendPort), - javaOpts = paste("-Xmx", sparkMem, sep = "")) - Sys.sleep(2) # Wait for backend to come up + + sparkRExistingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "") + if (sparkRExistingPort != "") { + sparkRBackendPort <- sparkRExistingPort + } else { + if (Sys.getenv("SPARKR_USE_SPARK_SUBMIT", "") == "") { + launchBackend(classPath = cp, + mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend", + args = as.character(sparkRBackendPort), + javaOpts = paste("-Xmx", sparkMem, sep = "")) + } else { + # TODO: We should deprecate sparkJars and ask users to add it to the + # command line (using --jars) which is picked up by SparkSubmit + launchBackendSparkSubmit( + mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend", + args = as.character(sparkRBackendPort), + appJar = .sparkREnv$assemblyJarPath, + sparkHome = sparkHome, + sparkSubmitOpts = Sys.getenv("SPARKR_SUBMIT_ARGS", "")) + } + } + .sparkREnv$sparkRBackendPort <- sparkRBackendPort - connectBackend("localhost", sparkRBackendPort) # Connect to it + cat("Waiting for JVM to come up...\n") + tries <- 0 + while (tries < sparkRRetryCount) { + if (!connExists(.sparkREnv)) { + Sys.sleep(2 ^ tries) + tryCatch({ + connectBackend("localhost", .sparkREnv$sparkRBackendPort) + }, error = function(err) { + cat("Error in Connection, retrying...\n") + }, warning = function(war) { + cat("No Connection Found, retrying...\n") + }) + tries <- tries + 1 + } else { + cat("Connection ok.\n") + break + } + } + if (tries == sparkRRetryCount) { + stop(sprintf("Failed to connect JVM after %d tries.\n", sparkRRetryCount)) + } if (nchar(sparkHome) != 0) { sparkHome <- normalizePath(sparkHome) @@ -136,7 +183,7 @@ sparkR.init <- function( } nonEmptyJars <- Filter(function(x) { x != "" }, jars) - localJarPaths <- sapply(nonEmptyJars, function(j) { paste("file://", j, sep = "") }) + localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) }) assign( ".sparkRjsc", diff --git a/pkg/R/sparkRClient.R b/pkg/R/sparkRClient.R index 61ddf03575325..1a747031586d2 100644 --- a/pkg/R/sparkRClient.R +++ b/pkg/R/sparkRClient.R @@ -35,7 +35,36 @@ launchBackend <- function( } else { java_bin <- java_bin_name } + # Quote the classpath to make sure it handles spaces on Windows + classPath <- shQuote(classPath) combinedArgs <- paste(javaOpts, "-cp", classPath, mainClass, args, sep = " ") cat("Launching java with command ", java_bin, " ", combinedArgs, "\n") invisible(system2(java_bin, combinedArgs, wait = F)) } + +launchBackendSparkSubmit <- function( + mainClass, + args, + appJar, + sparkHome, + sparkSubmitOpts) { + if (.Platform$OS.type == "unix") { + sparkSubmitBinName = "spark-submit" + } else { + sparkSubmitBinName = "spark-submit.cmd" + } + + if (sparkHome != "") { + sparkSubmitBin <- file.path(sparkHome, "bin", sparkSubmitBinName) + } else { + sparkSubmitBin <- sparkSubmitBinName + } + + # Since this function is only used while launching R shell using spark-submit, + # the format we need to construct is + # spark-submit --class + + combinedArgs <- paste("--class", mainClass, sparkSubmitOpts, appJar, args, sep = " ") + cat("Launching java with spark-submit command ", sparkSubmitBin, " ", combinedArgs, "\n") + invisible(system2(sparkSubmitBin, combinedArgs, wait = F)) +} diff --git a/pkg/inst/sparkR-submit b/pkg/inst/sparkR-submit new file mode 100755 index 0000000000000..9c451ab8e3712 --- /dev/null +++ b/pkg/inst/sparkR-submit @@ -0,0 +1,74 @@ +#!/bin/bash +# This script launches SparkR through spark-submit. This accepts +# the same set of options as spark-submit and requires SPARK_HOME +# to be set. + +FWDIR="$(cd `dirname $0`; pwd)" + +export PROJECT_HOME="$FWDIR" + +export SPARKR_JAR_FILE="$FWDIR/sparkr-assembly-0.1.jar" + +# Exit if the user hasn't set SPARK_HOME +if [ ! -f "$SPARK_HOME/bin/spark-submit" ]; then + echo "SPARK_HOME must be set to use sparkR-submit" + exit 1 +fi + +source "$SPARK_HOME/bin/utils.sh" + +function usage() { + echo "Usage: ./sparkR-submit [options]" 1>&2 + "$SPARK_HOME"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + exit 0 +} + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + usage +fi + +# Build up arguments list manually to preserve quotes and backslashes. +SUBMIT_USAGE_FUNCTION=usage +gatherSparkSubmitOpts "$@" + +SPARKR_SUBMIT_ARGS="" +whitespace="[[:space:]]" +for i in "${SUBMISSION_OPTS[@]}" +do + if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi + if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi + SPARKR_SUBMIT_ARGS="$SPARKR_SUBMIT_ARGS $i" +done +export SPARKR_SUBMIT_ARGS +export SPARKR_USE_SPARK_SUBMIT=1 + +NUM_APPLICATION_OPTS=${#APPLICATION_OPTS[@]} + +# If a R file is provided, directly run spark-submit. +if [[ $NUM_APPLICATION_OPTS -gt 0 && "${APPLICATION_OPTS[0]}" =~ \.R$ ]]; then + + primary="${APPLICATION_OPTS[0]}" + shift + # Set the main class to SparkRRunner and add the primary R file to --files to make sure its copied to the cluster + echo "Running $SPARK_HOME/bin/spark-submit --class edu.berkeley.cs.amplab.sparkr.SparkRRunner --files $primary ${SUBMISSION_OPTS[@]} $SPARKR_JAR_FILE $primary" "${APPLICATION_OPTS[@]:1}" + exec "$SPARK_HOME"/bin/spark-submit --class edu.berkeley.cs.amplab.sparkr.SparkRRunner --files "$primary" "${SUBMISSION_OPTS[@]}" "$SPARKR_JAR_FILE" "$primary" "${APPLICATION_OPTS[@]:1}" +else + + export R_PROFILE_USER="/tmp/sparkR.profile" + + # If we don't have an R file to run, run R shell +cat > /tmp/sparkR.profile << EOF +.First <- function() { + projecHome <- Sys.getenv("PROJECT_HOME") + Sys.setenv(NOAWT=1) + .libPaths(c(paste(projecHome,"/..", sep=""), .libPaths())) + require(SparkR) + sc <- sparkR.init() + assign("sc", sc, envir=.GlobalEnv) + cat("\n Welcome to SparkR!") + cat("\n Spark context is available as sc\n") +} +EOF + R + +fi diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 89d7890fbf685..f0c00d71d076c 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -336,6 +336,23 @@ test_that("values() on RDDs", { expect_equal(actual, lapply(intPairs, function(x) { x[[2]] })) }) +test_that("pipeRDD() on RDDs", { + actual <- collect(pipeRDD(rdd, "more")) + expected <- as.list(as.character(1:10)) + expect_equal(actual, expected) + + trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n")) + actual <- collect(pipeRDD(trailed.rdd, "sort")) + expected <- list("", "1", "2", "3") + expect_equal(actual, expected) + + rev.nums <- 9:0 + rev.rdd <- parallelize(sc, rev.nums, 2L) + actual <- collect(pipeRDD(rev.rdd, "sort")) + expected <- as.list(as.character(c(5:9, 0:4))) + expect_equal(actual, expected) +}) + test_that("join() on pairwise RDDs", { rdd1 <- parallelize(sc, list(list(1,1), list(2,4))) rdd2 <- parallelize(sc, list(list(1,2), list(1,3))) diff --git a/pkg/inst/tests/test_shuffle.R b/pkg/inst/tests/test_shuffle.R index 3683287fd0423..8df9b439a08cd 100644 --- a/pkg/inst/tests/test_shuffle.R +++ b/pkg/inst/tests/test_shuffle.R @@ -70,6 +70,77 @@ test_that("combineByKey for doubles", { expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) }) +test_that("aggregateByKey", { + # test aggregateByKey for int keys + rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) + + zeroValue <- list(0, 0) + seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } + combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } + aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) + + actual <- collect(aggregatedRDD) + + expected <- list(list(1, list(3, 2)), list(2, list(7, 2))) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) + + # test aggregateByKey for string keys + rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4))) + + zeroValue <- list(0, 0) + seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } + combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } + aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) + + actual <- collect(aggregatedRDD) + + expected <- list(list("a", list(3, 2)), list("b", list(7, 2))) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) +}) + +test_that("foldByKey", { + # test foldByKey for int keys + folded <- foldByKey(intRdd, 0, "+", 2L) + + actual <- collect(folded) + + expected <- list(list(2L, 101), list(1L, 199)) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) + + # test foldByKey for double keys + folded <- foldByKey(doubleRdd, 0, "+", 2L) + + actual <- collect(folded) + + expected <- list(list(1.5, 199), list(2.5, 101)) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) + + # test foldByKey for string keys + stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200)) + + stringKeyRDD <- parallelize(sc, stringKeyPairs) + folded <- foldByKey(stringKeyRDD, 0, "+", 2L) + + actual <- collect(folded) + + expected <- list(list("b", 101), list("a", 199)) + expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) + + # test foldByKey for empty pair RDD + rdd <- parallelize(sc, list()) + folded <- foldByKey(rdd, 0, "+", 2L) + actual <- collect(folded) + expected <- list() + expect_equal(actual, expected) + + # test foldByKey for RDD with only 1 pair + rdd <- parallelize(sc, list(list(1, 1))) + folded <- foldByKey(rdd, 0, "+", 2L) + actual <- collect(folded) + expected <- list(list(1, 1)) + expect_equal(actual, expected) +}) + test_that("partitionBy() partitions data correctly", { # Partition by magnitude partitionByMagnitude <- function(key) { if (key >= 3) 1 else 0 } diff --git a/pkg/man/aggregateByKey.Rd b/pkg/man/aggregateByKey.Rd new file mode 100644 index 0000000000000..7bcbd5cc69d12 --- /dev/null +++ b/pkg/man/aggregateByKey.Rd @@ -0,0 +1,50 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{aggregateByKey} +\alias{aggregateByKey} +\alias{aggregateByKey,RDD,ANY,ANY,ANY,integer-method} +\title{Aggregate a pair RDD by each key.} +\usage{ +aggregateByKey(rdd, zeroValue, seqOp, combOp, numPartitions) + +\S4method{aggregateByKey}{RDD,ANY,ANY,ANY,integer}(rdd, zeroValue, seqOp, + combOp, numPartitions) +} +\arguments{ +\item{rdd}{An RDD.} + +\item{zeroValue}{A neutral "zero value".} + +\item{seqOp}{A function to aggregate the values of each key. It may return +a different result type from the type of the values.} + +\item{combOp}{A function to aggregate results of seqOp.} +} +\value{ +An RDD containing the aggregation result. +} +\description{ +Aggregate the values of each key in an RDD, using given combine functions +and a neutral "zero value". This function can return a different result type, +U, than the type of the values in this RDD, V. Thus, we need one operation +for merging a V into a U and one operation for merging two U's, The former +operation is used for merging values within a partition, and the latter is +used for merging values between partitions. To avoid memory allocation, both +of these functions are allowed to modify and return their first argument +instead of creating a new U. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +zeroValue <- list(0, 0) +seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } +combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } +aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) + # list(list(1, list(3, 2)), list(2, list(7, 2))) +} +} +\seealso{ +foldByKey, combineByKey +} + diff --git a/pkg/man/foldByKey.Rd b/pkg/man/foldByKey.Rd new file mode 100644 index 0000000000000..a2822c51a8ff4 --- /dev/null +++ b/pkg/man/foldByKey.Rd @@ -0,0 +1,38 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{foldByKey} +\alias{foldByKey} +\alias{foldByKey,RDD,ANY,ANY,integer-method} +\title{Fold a pair RDD by each key.} +\usage{ +foldByKey(rdd, zeroValue, func, numPartitions) + +\S4method{foldByKey}{RDD,ANY,ANY,integer}(rdd, zeroValue, func, numPartitions) +} +\arguments{ +\item{rdd}{An RDD.} + +\item{zeroValue}{A neutral "zero value".} + +\item{func}{An associative function for folding values of each key.} +} +\value{ +An RDD containing the aggregation result. +} +\description{ +Aggregate the values of each key in an RDD, using an associative function "func" +and a neutral "zero value" which may be added to the result an arbitrary +number of times, and must not change the result (e.g., 0 for addition, or +1 for multiplication.). +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7)) +} +} +\seealso{ +aggregateByKey, combineByKey +} + diff --git a/pkg/man/pipeRDD.Rd b/pkg/man/pipeRDD.Rd new file mode 100644 index 0000000000000..0964d3415ecb8 --- /dev/null +++ b/pkg/man/pipeRDD.Rd @@ -0,0 +1,34 @@ +% Generated by roxygen2 (4.1.0): do not edit by hand +% Please edit documentation in R/RDD.R +\docType{methods} +\name{pipeRDD} +\alias{pipeRDD} +\alias{pipeRDD,RDD,character-method} +\title{Pipes elements to a forked external process.} +\usage{ +pipeRDD(rdd, command, env = list()) + +\S4method{pipeRDD}{RDD,character}(rdd, command, env = list()) +} +\arguments{ +\item{rdd}{The RDD whose elements are piped to the forked external process.} + +\item{command}{The command to fork an external process.} + +\item{env}{A named list to set environment variables of the external process.} +} +\value{ +A new RDD created by piping all elements to a forked external process. +} +\description{ +The same as 'pipe()' in Spark. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, 1:10) +collect(pipeRDD(rdd, "more") +Output: c("1", "2", ..., "10") +} +} + diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala index 959613697774a..bfe449786b76d 100644 --- a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/RRDD.scala @@ -360,10 +360,19 @@ object RRDD { sparkEnvirMap: JMap[Object, Object], sparkExecutorEnvMap: JMap[Object, Object]): JavaSparkContext = { - val sparkConf = new SparkConf().setMaster(master) - .setAppName(appName) + val sparkConf = new SparkConf().setAppName(appName) .setSparkHome(sparkHome) .setJars(jars) + + // Override `master` if we have a user-specified value + if (master != "") { + sparkConf.setMaster(master) + } else { + // If conf has no master set it to "local" to maintain + // backwards compatibility + sparkConf.setIfMissing("spark.master", "local") + } + for ((name, value) <- sparkEnvirMap) { sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String]) } diff --git a/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SparkRRunner.scala b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SparkRRunner.scala new file mode 100644 index 0000000000000..fb356f89b2d1d --- /dev/null +++ b/pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SparkRRunner.scala @@ -0,0 +1,105 @@ +package edu.berkeley.cs.amplab.sparkr + +import java.io._ +import java.net.URI +import java.util.concurrent.Semaphore +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ + +import org.apache.hadoop.fs.Path + +/** + * Main class used to launch SparkR applications using spark-submit. It executes R as a + * subprocess and then has it connect back to the JVM to access system properties etc. + */ +object SparkRRunner { + def main(args: Array[String]) { + val rFile = args(0) + + val otherArgs = args.slice(1, args.length) + + // Time to wait for SparkR backend to initialize in seconds + val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt + // TODO: Can we get this from SparkConf ? + val sparkRBackendPort = sys.env.getOrElse("SPARKR_BACKEND_PORT", "12345").toInt + val rCommand = "Rscript" + + // Check if the file path exists. + // If not, change directory to current working directory for YARN cluster mode + val rF = new File(rFile) + val rFileNormalized = if (!rF.exists()) { + new Path(rFile).getName + } else { + rFile + } + + + // Launch a SparkR backend server for the R process to connect to; this will let it see our + // Java system properties etc. + val sparkRBackend = new SparkRBackend() + val sparkRBackendThread = new Thread() { + val finishedInit = new Semaphore(0) + + override def run() { + sparkRBackend.init(sparkRBackendPort) + finishedInit.release() + sparkRBackend.run() + } + + def stopBackend() { + sparkRBackend.close() + } + } + + sparkRBackendThread.start() + // Wait for SparkRBackend initialization to finish + if (sparkRBackendThread.finishedInit.tryAcquire(backendTimeout, TimeUnit.SECONDS)) { + // Launch R + val returnCode = try { + val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) + val env = builder.environment() + env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = builder.start() + + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + + process.waitFor() + } finally { + sparkRBackendThread.stopBackend() + } + System.exit(returnCode) + } else { + System.err.println("SparkR backend did not initialize in " + backendTimeout + " seconds") + System.exit(-1) + } + } + + private class RedirectThread( + in: InputStream, + out: OutputStream, + name: String, + propagateEof: Boolean = false) + extends Thread(name) { + + setDaemon(true) + override def run() { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + try { + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } finally { + if (propagateEof) { + out.close() + } + } + } + } +} diff --git a/sparkR b/sparkR index 01ed365fd4b15..916523d57f846 100755 --- a/sparkR +++ b/sparkR @@ -28,7 +28,7 @@ cat > /tmp/sparkR.profile << EOF Sys.setenv(NOAWT=1) .libPaths(c(paste(projecHome,"/lib", sep=""), .libPaths())) require(SparkR) - sc <- sparkR.init(Sys.getenv("MASTER", unset = "local")) + sc <- sparkR.init(Sys.getenv("MASTER", unset = "")) assign("sc", sc, envir=.GlobalEnv) cat("\n Welcome to SparkR!") cat("\n Spark context is available as sc\n")