Skip to content

Commit

Permalink
Update existing SparkSQL functions
Browse files Browse the repository at this point in the history
Updated methods to work with new `x` argument in the RDD methods.
  • Loading branch information
cafreeman committed Feb 26, 2015
1 parent 8c241a3 commit a9bbe0b
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,18 @@ setMethod("count",
#' }

setMethod("collect",
signature(rdd = "DataFrame"),
function(rdd) {
signature(x = "DataFrame"),
function(x) {
# listCols is a list of raw vectors, one per column
listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", rdd@sdf)
listCols <- callJStatic("edu.berkeley.cs.amplab.sparkr.SQLUtils", "dfToCols", x@sdf)
cols <- lapply(listCols, function(col) {
objRaw <- rawConnection(col)
numRows <- readInt(objRaw)
col <- readCol(objRaw, numRows)
close(objRaw)
col
})
colNames <- callJMethod(rdd@sdf, "columns")
colNames <- callJMethod(x@sdf, "columns")
names(cols) <- colNames
dfOut <- do.call(cbind.data.frame, cols)
dfOut
Expand Down Expand Up @@ -187,9 +187,9 @@ setMethod("limit",
#' }

setMethod("take",
signature(rdd = "DataFrame", num = "numeric"),
function(rdd, num) {
limited <- limit(rdd, num)
signature(x = "DataFrame", num = "numeric"),
function(x, num) {
limited <- limit(x, num)
collect(limited)
})

Expand Down Expand Up @@ -264,15 +264,15 @@ setMethod("mapPartitions",
})

setMethod("foreach",
signature(rdd = "DataFrame", func = "function"),
function(rdd, func) {
rddIn <- toRDD(rdd)
foreach(rddIn, func)
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreach(rdd, func)
})

setMethod("foreachPartition",
signature(rdd = "DataFrame", func = "function"),
function(rdd, func) {
rddIn <- toRDD(rdd)
foreachPartition(rddIn, func)
signature(x = "DataFrame", func = "function"),
function(x, func) {
rdd <- toRDD(x)
foreachPartition(rdd, func)
})

0 comments on commit a9bbe0b

Please sign in to comment.