Skip to content

Commit

Permalink
Fix: only optimize for unserialized dataset case.
Browse files Browse the repository at this point in the history
  • Loading branch information
concretevitamin committed Nov 15, 2014
1 parent d399aeb commit c06fc90
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 16 deletions.
5 changes: 4 additions & 1 deletion pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,10 @@ setMethod("take",

size <- num - length(resList)
# elems is capped to have at most `size` elements
elems <- convertJListToRList(partition, flatten = TRUE, size = size)
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = size,
serialized = rdd@env$serialized)
# TODO: Check if this append is O(n^2)?
resList <- append(resList, elems)
}
Expand Down
38 changes: 23 additions & 15 deletions pkg/R/utils.R
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
# Utilities and Helpers

# Given a JList<T>, returns an R list containing the same elements, the number
# of which is optionally upper bounded by `size` (by default, return all elements).
# Takes care of deserializations and type conversions.
convertJListToRList <- function(jList, flatten, size = NULL) {
# of which is optionally upper bounded by `logicalUpperBound` (by default,
# return all elements). Takes care of deserializations and type conversions.
convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL, serialized = TRUE) {
arrSize <- .jcall(jList, "I", "size")

# Unserialized datasets (such as an RDD directly generated by textFile()):
# each partition is not dense-packed into one Array[Byte], and `arrSize`
# here corresponds to number of logical elements. Thus we can prune here.
if (!serialized && !is.null(logicalUpperBound)) {
arrSize <- min(arrSize, logicalUpperBound)
}

results <- if (arrSize > 0) {
lapply(0:(arrSize - 1),
function(index) {
Expand All @@ -22,6 +30,16 @@ convertJListToRList <- function(jList, flatten, size = NULL) {
rRaw <- .jevalArray(.jcastToArray(jElem))
res <- unserialize(rRaw)

# For serialized datasets, `obj` (and `rRaw`) here corresponds to
# one whole partition dense-packed together. We deserialize the
# whole partition first, then cap the number of elements to be returned.

# TODO: is it possible to distinguish element boundary so that we can
# unserialize only what we need?
if (!is.null(logicalUpperBound)) {
res <- head(res, n = logicalUpperBound)
}

} else if (inherits(obj, "jobjRef") &&
.jinstanceof(obj, "scala.Tuple2")) {
# JavaPairRDD[Array[Byte], Array[Byte]].
Expand Down Expand Up @@ -50,19 +68,9 @@ convertJListToRList <- function(jList, flatten, size = NULL) {
}

if (flatten) {
r <- as.list(unlist(results, recursive = FALSE))
} else {
r <- as.list(results)
}

if (!is.null(size)) {
# Invariant: whenever `size` is passed in, it applies to the
# logical representation of the data, namely the user doesn't
# and shouldn't think about byte arrays and/or serde. Hence we
# apply the upper bound directly after the flatten semantics.
r <- head(r, n = size)
as.list(unlist(results, recursive = FALSE))
} else {
r
as.list(results)
}
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/src/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.9.1")

addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.6.0")

0 comments on commit c06fc90

Please sign in to comment.