Skip to content

Commit

Permalink
[SPARK-18788][SPARKR] Add API for getNumPartitions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

With doc to say this would convert DF into RDD

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <[email protected]>

Closes #16668 from felixcheung/rgetnumpartitions.
  • Loading branch information
felixcheung authored and Felix Cheung committed Jan 27, 2017
1 parent c0ba284 commit 90817a6
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 31 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ exportMethods("arrange",
"freqItems",
"gapply",
"gapplyCollect",
"getNumPartitions",
"group_by",
"groupBy",
"head",
Expand Down
23 changes: 23 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -3428,3 +3428,26 @@ setMethod("randomSplit",
}
sapply(sdfs, dataFrame)
})

#' getNumPartitions
#'
#' Return the number of partitions
#'
#' @param x A SparkDataFrame
#' @family SparkDataFrame functions
#' @aliases getNumPartitions,SparkDataFrame-method
#' @rdname getNumPartitions
#' @name getNumPartitions
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createDataFrame(cars, numPartitions = 2)
#' getNumPartitions(df)
#' }
#' @note getNumPartitions since 2.1.1
setMethod("getNumPartitions",
signature(x = "SparkDataFrame"),
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
30 changes: 15 additions & 15 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ setMethod("checkpoint",
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
setMethod("getNumPartitions",
setMethod("getNumPartitionsRDD",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
Expand All @@ -329,7 +329,7 @@ setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
.Deprecated("getNumPartitions")
getNumPartitions(x)
getNumPartitionsRDD(x)
})

#' Collect elements of an RDD
Expand Down Expand Up @@ -460,7 +460,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
collectRDD(reduceByKey(ones, `+`, getNumPartitions(x)))
collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
})

#' Apply a function to all elements
Expand Down Expand Up @@ -780,7 +780,7 @@ setMethod("takeRDD",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
numPartitions <- getNumPartitions(x)
numPartitions <- getNumPartitionsRDD(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
Expand Down Expand Up @@ -846,7 +846,7 @@ setMethod("firstRDD",
#' @noRd
setMethod("distinctRDD",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
Expand Down Expand Up @@ -1053,7 +1053,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
Expand Down Expand Up @@ -1143,7 +1143,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})

Expand Down Expand Up @@ -1175,7 +1175,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- getNumPartitions(newRdd)
numPartitions <- getNumPartitionsRDD(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)

partitionFunc <- function(partIndex, part) {
mapply(
Expand Down Expand Up @@ -1450,7 +1450,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
n <- getNumPartitions(x)
n <- getNumPartitionsRDD(x)
if (n > 1) {
nums <- collectRDD(lapplyPartition(x,
function(part) {
Expand Down Expand Up @@ -1566,8 +1566,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
n1 <- getNumPartitions(x)
n2 <- getNumPartitions(other)
n1 <- getNumPartitionsRDD(x)
n2 <- getNumPartitionsRDD(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
Expand Down Expand Up @@ -1637,7 +1637,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
Expand Down Expand Up @@ -1671,7 +1671,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

Expand Down Expand Up @@ -1714,7 +1714,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, getNumPartitions)
nPart <- sapply(rrdds, getNumPartitionsRDD)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
Expand Down
8 changes: 6 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
# @export
setGeneric("name", function(x) { standardGeneric("name") })

# @rdname getNumPartitions
# @rdname getNumPartitionsRDD
# @export
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })
setGeneric("getNumPartitionsRDD", function(x) { standardGeneric("getNumPartitionsRDD") })

# @rdname getNumPartitions
# @export
Expand Down Expand Up @@ -492,6 +492,10 @@ setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
#' @export
setGeneric("gapplyCollect", function(x, ...) { standardGeneric("gapplyCollect") })

# @rdname getNumPartitions
# @export
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })

#' @rdname summary
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ setMethod("cogroup",
#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
rangeBounds <- list()

if (numPartitions > 1) {
Expand Down Expand Up @@ -850,7 +850,7 @@ setMethod("sortByKey",
#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
filterFunction <- function(elem) {
iters <- elem[[2]]
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
intRdd <- parallelize(sc, intPairs, 2L)

test_that("get number of partitions in RDD", {
expect_equal(getNumPartitions(rdd), 2)
expect_equal(getNumPartitions(intRdd), 2)
expect_equal(getNumPartitionsRDD(rdd), 2)
expect_equal(getNumPartitionsRDD(intRdd), 2)
})

test_that("first on RDD", {
Expand Down Expand Up @@ -305,18 +305,18 @@ test_that("repartition/coalesce on RDDs", {

# repartition
r1 <- repartitionRDD(rdd, 2)
expect_equal(getNumPartitions(r1), 2L)
expect_equal(getNumPartitionsRDD(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)

r2 <- repartitionRDD(rdd, 6)
expect_equal(getNumPartitions(r2), 6L)
expect_equal(getNumPartitionsRDD(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
expect_equal(getNumPartitions(r3), 1L)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
})
Expand Down
14 changes: 7 additions & 7 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,26 @@ test_that("create DataFrame from RDD", {
expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), c("height", "float")))
expect_equal(as.list(collect(where(df, df$name == "John"))),
list(name = "John", age = 19L, height = 176.5))
expect_equal(getNumPartitions(toRDD(df)), 1)
expect_equal(getNumPartitions(df), 1)

df <- as.DataFrame(cars, numPartitions = 2)
expect_equal(getNumPartitions(toRDD(df)), 2)
expect_equal(getNumPartitions(df), 2)
df <- createDataFrame(cars, numPartitions = 3)
expect_equal(getNumPartitions(toRDD(df)), 3)
expect_equal(getNumPartitions(df), 3)
# validate limit by num of rows
df <- createDataFrame(cars, numPartitions = 60)
expect_equal(getNumPartitions(toRDD(df)), 50)
expect_equal(getNumPartitions(df), 50)
# validate when 1 < (length(coll) / numSlices) << length(coll)
df <- createDataFrame(cars, numPartitions = 20)
expect_equal(getNumPartitions(toRDD(df)), 20)
expect_equal(getNumPartitions(df), 20)

df <- as.DataFrame(data.frame(0))
expect_is(df, "SparkDataFrame")
df <- createDataFrame(list(list(1)))
expect_is(df, "SparkDataFrame")
df <- as.DataFrame(data.frame(0), numPartitions = 2)
# no data to partition, goes to 1
expect_equal(getNumPartitions(toRDD(df)), 1)
expect_equal(getNumPartitions(df), 1)

setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
Expand All @@ -234,7 +234,7 @@ test_that("createDataFrame uses files for large objects", {
conf <- callJMethod(sparkSession, "conf")
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
expect_equal(getNumPartitions(toRDD(df)), 3)
expect_equal(getNumPartitions(df), 3)

# Resetting the conf back to default value
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
Expand Down

0 comments on commit 90817a6

Please sign in to comment.