diff --git a/pkg/NAMESPACE b/pkg/NAMESPACE index 4d583387d8fb6..812e85238e9c6 100644 --- a/pkg/NAMESPACE +++ b/pkg/NAMESPACE @@ -46,6 +46,8 @@ exportMethods( "sampleRDD", "saveAsTextFile", "saveAsObjectFile", + "sortBy", + "sortByKey", "take", "takeSample", "unionRDD", diff --git a/pkg/R/RDD.R b/pkg/R/RDD.R index f321f874c8abf..3f6d96251365b 100644 --- a/pkg/R/RDD.R +++ b/pkg/R/RDD.R @@ -1264,6 +1264,36 @@ setMethod("flatMapValues", flatMap(X, flatMapFunc) }) +#' Sort an RDD by the given key function. +#' +#' @param rdd An RDD to be sorted. +#' @param func A function used to compute the sort key for each element. +#' @param ascending A flag to indicate whether the sorting is ascending or descending. +#' @param numPartitions Number of partitions to create. +#' @return An RDD where all elements are sorted. +#' @rdname sortBy +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(3, 2, 1)) +#' collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3) +#'} +setGeneric("sortBy", function(rdd, + func, + ascending = TRUE, + numPartitions = 1L) { + standardGeneric("sortBy") + }) + +#' @rdname sortBy +#' @aliases sortBy,RDD,RDD-method +setMethod("sortBy", + signature(rdd = "RDD", func = "function"), + function(rdd, func, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) { + values(sortByKey(keyBy(rdd, func), ascending, numPartitions)) + }) + ############ Shuffle Functions ############ #' Partition an RDD by key @@ -1858,6 +1888,76 @@ setMethod("cogroup", group.func) }) +#' Sort a (k, v) pair RDD by k. +#' +#' @param rdd A (k, v) pair RDD to be sorted. +#' @param ascending A flag to indicate whether the sorting is ascending or descending. +#' @param numPartitions Number of partitions to create. +#' @return An RDD where all (k, v) pair elements are sorted. +#' @rdname sortByKey +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) +#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) +#'} +setGeneric("sortByKey", function(rdd, + ascending = TRUE, + numPartitions = 1L) { + standardGeneric("sortByKey") + }) + +#' @rdname sortByKey +#' @aliases sortByKey,RDD,RDD-method +setMethod("sortByKey", + signature(rdd = "RDD"), + function(rdd, ascending = TRUE, numPartitions = SparkR::numPartitions(rdd)) { + rangeBounds <- list() + + if (numPartitions > 1) { + rddSize <- count(rdd) + # constant from Spark's RangePartitioner + maxSampleSize <- numPartitions * 20 + fraction <- min(maxSampleSize / max(rddSize, 1), 1.0) + + samples <- collect(keys(sampleRDD(rdd, FALSE, fraction, 1L))) + + # Note: the built-in R sort() function only works on atomic vectors + samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending) + + if (length(samples) > 0) { + rangeBounds <- lapply(seq_len(numPartitions - 1), + function(i) { + j <- ceiling(length(samples) * i / numPartitions) + samples[j] + }) + } + } + + rangePartitionFunc <- function(key) { + partition <- 0 + + # TODO: Use binary search instead of linear search, similar with Spark + while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) { + partition <- partition + 1 + } + + if (ascending) { + partition + } else { + numPartitions - partition - 1 + } + } + + partitionFunc <- function(part) { + sortKeyValueList(part, decreasing = !ascending) + } + + newRDD <- partitionBy(rdd, numPartitions, rangePartitionFunc) + lapplyPartition(newRDD, partitionFunc) + }) + # TODO: Consider caching the name in the RDD's environment #' Return an RDD's name. #' diff --git a/pkg/R/utils.R b/pkg/R/utils.R index 820866ad80fe1..778ae67def047 100644 --- a/pkg/R/utils.R +++ b/pkg/R/utils.R @@ -197,9 +197,9 @@ initAccumulator <- function() { # Utility function to sort a list of key value pairs # Used in unit tests -sortKeyValueList <- function(kv_list) { +sortKeyValueList <- function(kv_list, decreasing = FALSE) { keys <- sapply(kv_list, function(x) x[[1]]) - kv_list[order(keys)] + kv_list[order(keys, decreasing = decreasing)] } # Utility function to generate compact R lists from grouped rdd diff --git a/pkg/inst/tests/test_rdd.R b/pkg/inst/tests/test_rdd.R index 5d6b128cd51b1..2f48db61020fd 100644 --- a/pkg/inst/tests/test_rdd.R +++ b/pkg/inst/tests/test_rdd.R @@ -267,6 +267,17 @@ test_that("keyBy on RDDs", { expect_equal(actual, lapply(nums, function(x) { list(func(x), x) })) }) +test_that("sortBy() on RDDs", { + sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE) + actual <- collect(sortedRdd) + expect_equal(actual, as.list(sort(nums, decreasing = TRUE))) + + rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) + sortedRdd2 <- sortBy(rdd2, function(x) { x * x }) + actual <- collect(sortedRdd2) + expect_equal(actual, as.list(nums)) +}) + test_that("keys() on RDDs", { keys <- keys(intRdd) actual <- collect(keys) @@ -387,6 +398,55 @@ test_that("fullOuterJoin() on pairwise RDDs", { sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3))))) }) +test_that("sortByKey() on pairwise RDDs", { + numPairsRdd <- map(rdd, function(x) { list (x, x) }) + sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE) + actual <- collect(sortedRdd) + numPairs <- lapply(nums, function(x) { list (x, x) }) + expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE)) + + rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) + numPairsRdd2 <- map(rdd2, function(x) { list (x, x) }) + sortedRdd2 <- sortByKey(numPairsRdd2) + actual <- collect(sortedRdd2) + expect_equal(actual, numPairs) + + # sort by string keys + l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5)) + rdd3 <- parallelize(sc, l, 2L) + sortedRdd3 <- sortByKey(rdd3) + actual <- collect(sortedRdd3) + expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) + + # test on the boundary cases + + # boundary case 1: the RDD to be sorted has only 1 partition + rdd4 <- parallelize(sc, l, 1L) + sortedRdd4 <- sortByKey(rdd4) + actual <- collect(sortedRdd4) + expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) + + # boundary case 2: the sorted RDD has only 1 partition + rdd5 <- parallelize(sc, l, 2L) + sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L) + actual <- collect(sortedRdd5) + expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) + + # boundary case 3: the RDD to be sorted has only 1 element + l2 <- list(list("a", 1)) + rdd6 <- parallelize(sc, l2, 2L) + sortedRdd6 <- sortByKey(rdd6) + actual <- collect(sortedRdd6) + expect_equal(actual, l2) + + # boundary case 4: the RDD to be sorted has 0 element + l3 <- list() + rdd7 <- parallelize(sc, l3, 2L) + sortedRdd7 <- sortByKey(rdd7) + actual <- collect(sortedRdd7) + expect_equal(actual, l3) +}) + test_that("collectAsMap() on a pairwise RDD", { rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) vals <- collectAsMap(rdd) @@ -404,4 +464,3 @@ test_that("collectAsMap() on a pairwise RDD", { vals <- collectAsMap(rdd) expect_equal(vals, list(`1` = "a", `2` = "b")) }) - diff --git a/pkg/man/sortBy.Rd b/pkg/man/sortBy.Rd new file mode 100644 index 0000000000000..d3a231c745240 --- /dev/null +++ b/pkg/man/sortBy.Rd @@ -0,0 +1,36 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{sortBy} +\alias{sortBy} +\alias{sortBy,RDD,RDD-method} +\alias{sortBy,RDD,function,missingOrLogical,missingOrInteger-method} +\title{Sort an RDD by the given key function.} +\usage{ +sortBy(rdd, func, ascending, numPartitions) + +\S4method{sortBy}{RDD,`function`,missingOrLogical,missingOrInteger}(rdd, func, + ascending, numPartitions) +} +\arguments{ +\item{rdd}{An RDD to be sorted.} + +\item{func}{A function used to compute the sort key for each element.} + +\item{ascending}{A flag to indicate whether the sorting is ascending or descending.} + +\item{numPartitions}{Number of partitions to create.} +} +\value{ +An RDD where all elements are sorted. +} +\description{ +Sort an RDD by the given key function. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(3, 2, 1)) +collect(sortBy(rdd, function(x) { x })) # list (1, 2, 3) +} +} + diff --git a/pkg/man/sortByKey.Rd b/pkg/man/sortByKey.Rd new file mode 100644 index 0000000000000..b39aff6ca8757 --- /dev/null +++ b/pkg/man/sortByKey.Rd @@ -0,0 +1,34 @@ +% Generated by roxygen2 (4.0.2): do not edit by hand +\docType{methods} +\name{sortByKey} +\alias{sortByKey} +\alias{sortByKey,RDD,RDD-method} +\alias{sortByKey,RDD,missingOrLogical,missingOrInteger-method} +\title{Sort a (k, v) pair RDD by k.} +\usage{ +sortByKey(rdd, ascending, numPartitions) + +\S4method{sortByKey}{RDD,missingOrLogical,missingOrInteger}(rdd, ascending, + numPartitions) +} +\arguments{ +\item{rdd}{A (k, v) pair RDD to be sorted.} + +\item{ascending}{A flag to indicate whether the sorting is ascending or descending.} + +\item{numPartitions}{Number of partitions to create.} +} +\value{ +An RDD where all (k, v) pair elements are sorted. +} +\description{ +Sort a (k, v) pair RDD by k. +} +\examples{ +\dontrun{ +sc <- sparkR.init() +rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) +collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) +} +} +