Skip to content

Commit

Permalink
Merge pull request #10 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Apr 21, 2015
2 parents f61210c + 8136810 commit f12fa50
Show file tree
Hide file tree
Showing 137 changed files with 5,311 additions and 1,144 deletions.
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Collate:
'jobj.R'
'RDD.R'
'pairRDD.R'
'SQLTypes.R'
'schema.R'
'column.R'
'group.R'
'DataFrame.R'
Expand Down
20 changes: 17 additions & 3 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ exportMethods(
"aggregateByKey",
"aggregateRDD",
"cache",
"cartesian",
"checkpoint",
"coalesce",
"cogroup",
Expand All @@ -28,6 +29,7 @@ exportMethods(
"fullOuterJoin",
"glom",
"groupByKey",
"intersection",
"join",
"keyBy",
"keys",
Expand All @@ -52,11 +54,14 @@ exportMethods(
"reduceByKeyLocally",
"repartition",
"rightOuterJoin",
"sampleByKey",
"sampleRDD",
"saveAsTextFile",
"saveAsObjectFile",
"sortBy",
"sortByKey",
"subtract",
"subtractByKey",
"sumRDD",
"take",
"takeOrdered",
Expand Down Expand Up @@ -95,6 +100,7 @@ exportClasses("DataFrame")
exportMethods("columns",
"distinct",
"dtypes",
"except",
"explain",
"filter",
"groupBy",
Expand All @@ -118,7 +124,6 @@ exportMethods("columns",
"show",
"showDF",
"sortDF",
"subtract",
"toJSON",
"toRDD",
"unionAll",
Expand Down Expand Up @@ -178,5 +183,14 @@ export("cacheTable",
"toDF",
"uncacheTable")

export("print.structType",
"print.structField")
export("sparkRSQL.init",
"sparkRHive.init")

export("structField",
"structField.jobj",
"structField.character",
"print.structField",
"structType",
"structType.jobj",
"structType.structField",
"print.structType")
18 changes: 10 additions & 8 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# DataFrame.R - DataFrame class and methods implemented in S4 OO classes

#' @include generics.R jobj.R SQLTypes.R RDD.R pairRDD.R column.R group.R
#' @include generics.R jobj.R schema.R RDD.R pairRDD.R column.R group.R
NULL

setOldClass("jobj")
Expand Down Expand Up @@ -1141,29 +1141,31 @@ setMethod("intersect",
dataFrame(intersected)
})

#' Subtract
#' except
#'
#' Return a new DataFrame containing rows in this DataFrame
#' but not in another DataFrame. This is equivalent to `EXCEPT` in SQL.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
#' @return A DataFrame containing the result of the subtract operation.
#' @rdname subtract
#' @return A DataFrame containing the result of the except operation.
#' @rdname except
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' df1 <- jsonFile(sqlCtx, path)
#' df2 <- jsonFile(sqlCtx, path2)
#' subtractDF <- subtract(df, df2)
#' exceptDF <- except(df, df2)
#' }
setMethod("subtract",
#' @rdname except
#' @export
setMethod("except",
signature(x = "DataFrame", y = "DataFrame"),
function(x, y) {
subtracted <- callJMethod(x@sdf, "except", y@sdf)
dataFrame(subtracted)
excepted <- callJMethod(x@sdf, "except", y@sdf)
dataFrame(excepted)
})

#' Save the contents of the DataFrame to a data source
Expand Down
205 changes: 133 additions & 72 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ setMethod("take",
index <- -1
jrdd <- getJRDD(x)
numPartitions <- numPartitions(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
# estimates similar to the scala version of `take`.
Expand All @@ -748,13 +749,14 @@ setMethod("take",
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = size,
serializedMode = getSerializedMode(x))
# TODO: Check if this append is O(n^2)?
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
})


#' First
#'
#' Return the first element of an RDD
Expand Down Expand Up @@ -1092,21 +1094,42 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
if (num < length(part)) {
# R limitation: order works only on primitive types!
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
list(part[ord[1:num]])
part[ord[1:num]]
} else {
list(part)
part
}
}

reduceFunc <- function(elems, part) {
newElems <- append(elems, part)
# R limitation: order works only on primitive types!
ord <- order(unlist(newElems, recursive = FALSE), decreasing = !ascending)
newElems[ord[1:num]]
}

newRdd <- mapPartitions(x, partitionFunc)
reduce(newRdd, reduceFunc)

resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- numPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
index <- index + 1

if (index >= numPartitions) {
ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
resList <- resList[ord[1:num]]
break
}

# a JList of byte arrays
partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
partition <- partitionArr[[1]]

# elems is capped to have at most `num` elements
elems <- convertJListToRList(partition,
flatten = TRUE,
logicalUpperBound = num,
serializedMode = serializedModeRDD)

resList <- append(resList, elems)
}
resList
}

#' Returns the first N elements from an RDD in ascending order.
Expand Down Expand Up @@ -1465,67 +1488,105 @@ setMethod("zipRDD",
stop("Can only zip RDDs which have the same number of partitions.")
}

if (getSerializedMode(x) != getSerializedMode(other) ||
getSerializedMode(x) == "byte") {
# Append the number of elements in each partition to that partition so that we can later
# check if corresponding partitions of both RDDs have the same number of elements.
#
# Note that this appending also serves the purpose of reserialization, because even if
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
# may be encoded as multiple byte arrays.
appendLength <- function(part) {
part[[length(part) + 1]] <- length(part) + 1
part
}
x <- lapplyPartition(x, appendLength)
other <- lapplyPartition(other, appendLength)
}
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

zippedJRDD <- callJMethod(getJRDD(x), "zip", getJRDD(other))
# The zippedRDD's elements are of scala Tuple2 type. The serialized
# flag Here is used for the elements inside the tuples.
serializerMode <- getSerializedMode(x)
zippedRDD <- RDD(zippedJRDD, serializerMode)
mergePartitions(rdd, TRUE)
})

#' Cartesian product of this RDD and another one.
#'
#' Return the Cartesian product of this RDD and another one,
#' that is, the RDD of all pairs of elements (a, b) where a
#' is in this and b is in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @return A new RDD which is the Cartesian product of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2)
#' sortByKey(cartesian(rdd, rdd))
#' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
#'}
#' @rdname cartesian
#' @aliases cartesian,RDD,RDD-method
setMethod("cartesian",
signature(x = "RDD", other = "RDD"),
function(x, other) {
rdds <- appendPartitionLengths(x, other)
jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
# The jrdd's elements are of scala Tuple2 type. The serialized
# flag here is used for the elements inside the tuples.
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))

partitionFunc <- function(split, part) {
len <- length(part)
if (len > 0) {
if (serializerMode == "byte") {
lengthOfValues <- part[[len]]
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# check if corresponding partitions of both RDDs have the same number of elements.
if (lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
}

if (lengthOfKeys > 1) {
keys <- part[1 : (lengthOfKeys - 1)]
values <- part[(lengthOfKeys + 1) : (len - 1)]
} else {
keys <- list()
values <- list()
}
} else {
# Keys, values must have same length here, because this has
# been validated inside the JavaRDD.zip() function.
keys <- part[c(TRUE, FALSE)]
values <- part[c(FALSE, TRUE)]
}
mapply(
function(k, v) {
list(k, v)
},
keys,
values,
SIMPLIFY = FALSE,
USE.NAMES = FALSE)
} else {
part
}
mergePartitions(rdd, FALSE)
})

#' Subtract an RDD with another RDD.
#'
#' Return an RDD with the elements from this that are not in other.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions Number of the partitions in the result RDD.
#' @return An RDD with the elements from this that are not in other.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
#' rdd2 <- parallelize(sc, list(2, 4))
#' collect(subtract(rdd1, rdd2))
#' # list(1, 1, 3)
#'}
#' @rdname subtract
#' @aliases subtract,RDD
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR::numPartitions(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
keys(subtractByKey(rdd1, rdd2, numPartitions))
})

#' Intersection of this RDD and another one.
#'
#' Return the intersection of this RDD and another one.
#' The output will not contain any duplicate elements,
#' even if the input RDDs did. Performs a hash partition
#' across the cluster.
#' Note that this method performs a shuffle internally.
#'
#' @param x An RDD.
#' @param other An RDD.
#' @param numPartitions The number of partitions in the result RDD.
#' @return An RDD which is the intersection of these two RDDs.
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
#' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
#' collect(sortBy(intersection(rdd1, rdd2), function(x) { x }))
#' # list(1, 2, 3)
#'}
#' @rdname intersection
#' @aliases intersection,RDD
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR::numPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

filterFunction <- function(elem) {
iters <- elem[[2]]
all(as.vector(
lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
}
PipelinedRDD(zippedRDD, partitionFunc)

keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
})
Loading

0 comments on commit f12fa50

Please sign in to comment.