Skip to content

Commit

Permalink
Merge pull request apache#182 from cafreeman/sparkr-sql
Browse files Browse the repository at this point in the history
Merge changes to master and update DataFrame methods to use new generic args
  • Loading branch information
shivaram committed Feb 26, 2015
2 parents 041d22b + a9bbe0b commit 785898b
Show file tree
Hide file tree
Showing 17 changed files with 902 additions and 316 deletions.
15 changes: 5 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@ SparkR requires Scala 2.10 and Spark version >= 0.9.0. Current build by default
Apache Spark 1.1.0. You can also build SparkR against a
different Spark version (>= 0.9.0) by modifying `pkg/src/build.sbt`.

SparkR also requires the R package `rJava` to be installed. To install `rJava`,
you can run the following command in R:

install.packages("rJava")


### Package installation
To develop SparkR, you can build the scala package and the R package using

Expand All @@ -31,9 +25,9 @@ If you wish to try out the package directly from github, you can use [`install_g

SparkR by default uses Apache Spark 1.1.0. You can switch to a different Spark
version by setting the environment variable `SPARK_VERSION`. For example, to
use Apache Spark 1.2.0, you can run
use Apache Spark 1.3.0, you can run

SPARK_VERSION=1.2.0 ./install-dev.sh
SPARK_VERSION=1.3.0 ./install-dev.sh

SparkR by default links to Hadoop 1.0.4. To use SparkR with other Hadoop
versions, you will need to rebuild SparkR with the same version that [Spark is
Expand Down Expand Up @@ -97,8 +91,9 @@ To run one of them, use `./sparkR <filename> <args>`. For example:

./sparkR examples/pi.R local[2]

You can also run the unit-tests for SparkR by running
You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first):

R -e 'install.packages("testthat", repos="http://cran.us.r-project.org")'
./run-tests.sh

## Running on EC2
Expand All @@ -110,7 +105,7 @@ Instructions for running SparkR on EC2 can be found in the
Currently, SparkR supports running on YARN with the `yarn-client` mode. These steps show how to build SparkR with YARN support and run SparkR programs on a YARN cluster:

```
# assumes Java, R, rJava, yarn, spark etc. are installed on the whole cluster.
# assumes Java, R, yarn, spark etc. are installed on the whole cluster.
cd SparkR-pkg/
USE_YARN=1 SPARK_YARN_VERSION=2.4.0 SPARK_HADOOP_VERSION=2.4.0 ./install-dev.sh
```
Expand Down
3 changes: 3 additions & 0 deletions pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
exportClasses("RDD")
exportClasses("Broadcast")
exportMethods(
"aggregateByKey",
"aggregateRDD",
"cache",
"checkpoint",
Expand All @@ -19,6 +20,7 @@ exportMethods(
"flatMap",
"flatMapValues",
"fold",
"foldByKey",
"foreach",
"foreachPartition",
"fullOuterJoin",
Expand All @@ -41,6 +43,7 @@ exportMethods(
"numPartitions",
"partitionBy",
"persist",
"pipeRDD",
"reduce",
"reduceByKey",
"reduceByKeyLocally",
Expand Down
30 changes: 15 additions & 15 deletions pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,18 @@ setMethod("count",
#' }

setMethod("collect",
signature(rdd = "DataFrame"),
function(rdd) {
signature(x = "DataFrame"),
function(x) {
# listCols is a list of raw vectors, one per column
listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", rdd@sdf)
listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", x@sdf)
cols <- lapply(listCols, function(col) {
objRaw <- rawConnection(col)
numRows <- readInt(objRaw)
col <- readCol(objRaw, numRows)
close(objRaw)
col
})
colNames <- callJMethod(rdd@sdf, "columns")
colNames <- callJMethod(x@sdf, "columns")
names(cols) <- colNames
dfOut <- do.call(cbind.data.frame, cols)
dfOut
Expand Down Expand Up @@ -187,9 +187,9 @@ setMethod("limit",
#' }

setMethod("take",
signature(rdd = "DataFrame", num = "numeric"),
function(rdd, num) {
limited <- limit(rdd, num)
signature(x = "DataFrame", num = "numeric"),
function(x, num) {
limited <- limit(x, num)
collect(limited)
})

Expand Down Expand Up @@ -264,15 +264,15 @@ setMethod("mapPartitions",
})

setMethod("foreach",
signature(rdd = "DataFrame", func = "function"),
function(rdd, func) {
rddIn <- toRDD(rdd)
foreach(rddIn, func)
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreach(rdd, func)
})

setMethod("foreachPartition",
signature(rdd = "DataFrame", func = "function"),
function(rdd, func) {
rddIn <- toRDD(rdd)
foreachPartition(rddIn, func)
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreachPartition(rdd, func)
})
Loading

0 comments on commit 785898b

Please sign in to comment.