Skip to content

Commit

Permalink
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
Browse files Browse the repository at this point in the history
Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

manual, unit tests

Author: Felix Cheung <[email protected]>

Closes #16739 from felixcheung/rcoalesce.

(cherry picked from commit 671bc08)
Signed-off-by: Felix Cheung <[email protected]>
  • Loading branch information
felixcheung authored and Felix Cheung committed Feb 15, 2017
1 parent 8ee4ec8 commit 6c35399
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 18 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"coalesce",
"collect",
"colnames",
"colnames<-",
Expand Down
46 changes: 43 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -680,14 +680,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Coalesce
#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
#' the current partitions. If a larger number of partitions is requested, it will stay at the
#' current number of partitions.
#'
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
#' this may result in your computation taking place on fewer nodes than
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
#' call \code{repartition}. This will add a shuffle step, but means the
#' current upstream partitions will be executed in parallel (per whatever
#' the current partitioning is).
#'
#' @param numPartitions the number of partitions to use.
#'
#' @family SparkDataFrame functions
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
dataFrame(sdf)
})

#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame partitioned by
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
Expand All @@ -699,6 +738,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesce(x, numPartitions, TRUE)
coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
Expand All @@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
setMethod("coalesce",
setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
Expand Down
26 changes: 24 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ setMethod("ceil",
column(jc)
})

#' Returns the first column that is not NA
#'
#' Returns the first column that is not NA, or NA if all inputs are.
#'
#' @rdname coalesce
#' @name coalesce
#' @family normal_funcs
#' @export
#' @aliases coalesce,Column-method
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
#' @note coalesce(Column) since 2.1.1
setMethod("coalesce",
signature(x = "Column"),
function(x, ...) {
jcols <- lapply(list(x, ...), function (x) {
stopifnot(class(x) == "Column")
x@jc
})
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
column(jc)
})

#' Though scala functions has "col" function, we don't expose it in SparkR
#' because we don't want to conflict with the "col" function in the R base
#' package and we also have "column" function exported which is an alias of "col".
Expand All @@ -297,15 +319,15 @@ col <- function(x) {
#' Returns a Column based on the given column name
#'
#' Returns a Column based on the given column name.
#
#'
#' @param x Character column name.
#'
#' @rdname column
#' @name column
#' @family normal_funcs
#' @export
#' @aliases column,character-method
#' @examples \dontrun{column(df)}
#' @examples \dontrun{column("name")}
#' @note column since 1.6.0
setMethod("column",
signature(x = "character"),
Expand Down
9 changes: 8 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
# @rdname coalesce
# @seealso repartition
# @export
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })

# @rdname checkpoint-methods
# @export
Expand Down Expand Up @@ -406,6 +406,13 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })

#' @rdname coalesce
#' @param x a Column or a SparkDataFrame.
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
#' provided.
#' @export
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })

#' @rdname collect
#' @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
r3 <- coalesceRDD(rdd, 1)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
Expand Down
33 changes: 28 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)

expect_is(objectIn, "RDD")
Expand Down Expand Up @@ -1228,7 +1228,8 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
c19 <- spark_partition_id()
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")

# Test if base::is.nan() is exposed
expect_equal(is.nan(c("a", "b")), c(FALSE, FALSE))
Expand Down Expand Up @@ -2481,15 +2482,18 @@ test_that("repartition by columns on DataFrame", {
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)

# repartition by column and number of partitions
actual <- repartition(df, 3L, col = df$"a")
actual <- repartition(df, 3, col = df$"a")

# since we cannot access the number of partitions from dataframe, checking
# that at least the dimensions are identical
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)

# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)

expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)

# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
Expand All @@ -2505,6 +2509,25 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})

test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
expect_equal(getNumPartitions(coalesce(df, 3)), 3)
expect_equal(getNumPartitions(coalesce(df, 6)), 5)

df1 <- coalesce(df, 3)
expect_equal(getNumPartitions(df1), 3)
expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
expect_equal(getNumPartitions(coalesce(df1, 2)), 2)

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
})

test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,8 @@ abstract class RDD[T: ClassTag](
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,15 @@ def coalesce(self, numPartitions):
Similar to coalesce defined on an :class:`RDD`, this operation results in a
narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
there will not be a shuffle, instead each of the 100 new partitions will
claim 10 of the current partitions.
claim 10 of the current partitions. If a larger number of partitions is requested,
it will stay at the current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can call repartition(). This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).
>>> df.coalesce(1).rdd.getNumPartitions()
1
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2453,7 +2453,15 @@ class Dataset[T] private[sql](
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
* partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
* is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you see ShuffleExchange. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
Expand Down

0 comments on commit 6c35399

Please sign in to comment.