Skip to content

Commit

Permalink
[SPARK-19399][SPARKR] Add R coalesce API for DataFrame and Column
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <[email protected]>

Closes apache#16739 from felixcheung/rcoalesce.
  • Loading branch information
felixcheung authored and cmonkey committed Feb 16, 2017
1 parent 98fe604 commit ad16bf2
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 18 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ exportMethods("arrange",
"as.data.frame",
"attach",
"cache",
"coalesce",
"collect",
"colnames",
"colnames<-",
Expand Down
46 changes: 43 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -678,14 +678,53 @@ setMethod("storageLevel",
storageLevelToString(callJMethod(x@sdf, "storageLevel"))
})

#' Coalesce
#'
#' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
#' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
#' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
#' the current partitions. If a larger number of partitions is requested, it will stay at the
#' current number of partitions.
#'
#' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
#' this may result in your computation taking place on fewer nodes than
#' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
#' call \code{repartition}. This will add a shuffle step, but means the
#' current upstream partitions will be executed in parallel (per whatever
#' the current partitioning is).
#'
#' @param numPartitions the number of partitions to use.
#'
#' @family SparkDataFrame functions
#' @rdname coalesce
#' @name coalesce
#' @aliases coalesce,SparkDataFrame-method
#' @seealso \link{repartition}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' newDF <- coalesce(df, 1L)
#'}
#' @note coalesce(SparkDataFrame) since 2.1.1
setMethod("coalesce",
signature(x = "SparkDataFrame"),
function(x, numPartitions) {
stopifnot(is.numeric(numPartitions))
sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
dataFrame(sdf)
})

#' Repartition
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{1.} {Return a new SparkDataFrame partitioned by
#' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame hash partitioned by
#' the given columns into \code{numPartitions}.}
#' \item{2.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
#' \item{3.} {Return a new SparkDataFrame partitioned by the given column(s),
#' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#' @param x a SparkDataFrame.
Expand All @@ -697,6 +736,7 @@ setMethod("storageLevel",
#' @rdname repartition
#' @name repartition
#' @aliases repartition,SparkDataFrame-method
#' @seealso \link{coalesce}
#' @export
#' @examples
#'\dontrun{
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ setMethod("repartitionRDD",
signature(x = "RDD"),
function(x, numPartitions) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesce(x, numPartitions, TRUE)
coalesceRDD(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
Expand All @@ -1049,7 +1049,7 @@ setMethod("repartitionRDD",
#' @rdname coalesce
#' @aliases coalesce,RDD
#' @noRd
setMethod("coalesce",
setMethod("coalesceRDD",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
Expand Down
26 changes: 24 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,28 @@ setMethod("ceil",
column(jc)
})

#' Returns the first column that is not NA
#'
#' Returns the first column that is not NA, or NA if all inputs are.
#'
#' @rdname coalesce
#' @name coalesce
#' @family normal_funcs
#' @export
#' @aliases coalesce,Column-method
#' @examples \dontrun{coalesce(df$c, df$d, df$e)}
#' @note coalesce(Column) since 2.1.1
setMethod("coalesce",
signature(x = "Column"),
function(x, ...) {
jcols <- lapply(list(x, ...), function (x) {
stopifnot(class(x) == "Column")
x@jc
})
jc <- callJStatic("org.apache.spark.sql.functions", "coalesce", jcols)
column(jc)
})

#' Though scala functions has "col" function, we don't expose it in SparkR
#' because we don't want to conflict with the "col" function in the R base
#' package and we also have "column" function exported which is an alias of "col".
Expand All @@ -297,15 +319,15 @@ col <- function(x) {
#' Returns a Column based on the given column name
#'
#' Returns a Column based on the given column name.
#
#'
#' @param x Character column name.
#'
#' @rdname column
#' @name column
#' @family normal_funcs
#' @export
#' @aliases column,character-method
#' @examples \dontrun{column(df)}
#' @examples \dontrun{column("name")}
#' @note column since 1.6.0
setMethod("column",
signature(x = "character"),
Expand Down
9 changes: 8 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ setGeneric("cacheRDD", function(x) { standardGeneric("cacheRDD") })
# @rdname coalesce
# @seealso repartition
# @export
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
setGeneric("coalesceRDD", function(x, numPartitions, ...) { standardGeneric("coalesceRDD") })

# @rdname checkpoint-methods
# @export
Expand Down Expand Up @@ -406,6 +406,13 @@ setGeneric("attach")
#' @export
setGeneric("cache", function(x) { standardGeneric("cache") })

#' @rdname coalesce
#' @param x a Column or a SparkDataFrame.
#' @param ... additional argument(s). If \code{x} is a Column, additional Columns can be optionally
#' provided.
#' @export
setGeneric("coalesce", function(x, ...) { standardGeneric("coalesce") })

#' @rdname collect
#' @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") })
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ test_that("repartition/coalesce on RDDs", {
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
r3 <- coalesceRDD(rdd, 1)
expect_equal(getNumPartitionsRDD(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
Expand Down
32 changes: 27 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ test_that("objectFile() works with row serialization", {
objectPath <- tempfile(pattern = "spark-test", fileext = ".tmp")
df <- read.json(jsonPath)
dfRDD <- toRDD(df)
saveAsObjectFile(coalesce(dfRDD, 1L), objectPath)
saveAsObjectFile(coalesceRDD(dfRDD, 1L), objectPath)
objectIn <- objectFile(sc, objectPath)

expect_is(objectIn, "RDD")
Expand Down Expand Up @@ -1236,7 +1236,7 @@ test_that("column functions", {
c16 <- is.nan(c) + isnan(c) + isNaN(c)
c17 <- cov(c, c1) + cov("c", "c1") + covar_samp(c, c1) + covar_samp("c", "c1")
c18 <- covar_pop(c, c1) + covar_pop("c", "c1")
c19 <- spark_partition_id()
c19 <- spark_partition_id() + coalesce(c) + coalesce(c1, c2, c3)
c20 <- to_timestamp(c) + to_timestamp(c, "yyyy") + to_date(c, "yyyy")

# Test if base::is.nan() is exposed
Expand Down Expand Up @@ -2491,15 +2491,18 @@ test_that("repartition by columns on DataFrame", {
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)

# repartition by column and number of partitions
actual <- repartition(df, 3L, col = df$"a")
actual <- repartition(df, 3, col = df$"a")

# since we cannot access the number of partitions from dataframe, checking
# that at least the dimensions are identical
# Checking that at least the dimensions are identical
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 3L)

# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))
expect_equal(getNumPartitions(actual), 13L)

expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)

# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
Expand All @@ -2515,6 +2518,25 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})

test_that("coalesce, repartition, numPartitions", {
df <- as.DataFrame(cars, numPartitions = 5)
expect_equal(getNumPartitions(df), 5)
expect_equal(getNumPartitions(coalesce(df, 3)), 3)
expect_equal(getNumPartitions(coalesce(df, 6)), 5)

df1 <- coalesce(df, 3)
expect_equal(getNumPartitions(df1), 3)
expect_equal(getNumPartitions(coalesce(df1, 6)), 5)
expect_equal(getNumPartitions(coalesce(df1, 4)), 4)
expect_equal(getNumPartitions(coalesce(df1, 2)), 2)

df2 <- repartition(df1, 10)
expect_equal(getNumPartitions(df2), 10)
expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
})

test_that("gapply() and gapplyCollect() on a DataFrame", {
df <- createDataFrame (
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ abstract class RDD[T: ClassTag](
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
* new partitions will claim 10 of the current partitions. If a larger number
* of partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
Expand Down
10 changes: 9 additions & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,15 @@ def coalesce(self, numPartitions):
Similar to coalesce defined on an :class:`RDD`, this operation results in a
narrow dependency, e.g. if you go from 1000 partitions to 100 partitions,
there will not be a shuffle, instead each of the 100 new partitions will
claim 10 of the current partitions.
claim 10 of the current partitions. If a larger number of partitions is requested,
it will stay at the current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can call repartition(). This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).
>>> df.coalesce(1).rdd.getNumPartitions()
1
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2432,7 +2432,15 @@ class Dataset[T] private[sql](
* Returns a new Dataset that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an `RDD`, this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of
* partitions is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can call repartition. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* @group typedrel
* @since 1.6.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,15 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
* Physical plan for returning a new RDD that has exactly `numPartitions` partitions.
* Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
* if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
* the 100 new partitions will claim 10 of the current partitions.
* the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
* is requested, it will stay at the current number of partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you see ShuffleExchange. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*/
case class CoalesceExec(numPartitions: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
Expand Down

0 comments on commit ad16bf2

Please sign in to comment.