Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-7387
Browse files Browse the repository at this point in the history
  • Loading branch information
Ram Sriharsha committed Jun 2, 2015
2 parents 615e91c + 0f80990 commit 54a500c
Show file tree
Hide file tree
Showing 208 changed files with 2,359 additions and 889 deletions.
46 changes: 46 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,52 @@ and

Vis.js may be distributed under either license.

========================================================================
For dagre-d3 (core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js):
========================================================================
Copyright (c) 2013 Chris Pettitt

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

========================================================================
For graphlib-dot (core/src/main/resources/org/apache/spark/ui/static/graphlib-dot.min.js):
========================================================================
Copyright (c) 2012-2013 Chris Pettitt

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

========================================================================
BSD-style licenses
========================================================================
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ exportMethods("arrange",
"count",
"describe",
"distinct",
"dropna",
"dtypes",
"except",
"explain",
"fillna",
"filter",
"first",
"group_by",
Expand Down
125 changes: 125 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1429,3 +1429,128 @@ setMethod("describe",
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})

#' dropna
#'
#' Returns a new DataFrame omitting rows with null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param how "any" or "all".
#' if "any", drop a row if it contains any nulls.
#' if "all", drop a row only if all its values are null.
#' if minNonNulls is specified, how is ignored.
#' @param minNonNulls If specified, drop rows that have less than
#' minNonNulls non-null values.
#' This overwrites the how parameter.
#' @param cols Optional list of column names to consider.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' dropna(df)
#' }
setMethod("dropna",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
how <- match.arg(how)
if (is.null(cols)) {
cols <- columns(x)
}
if (is.null(minNonNulls)) {
minNonNulls <- if (how == "any") { length(cols) } else { 1 }
}

naFunctions <- callJMethod(x@sdf, "na")
sdf <- callJMethod(naFunctions, "drop",
as.integer(minNonNulls), listToSeq(as.list(cols)))
dataFrame(sdf)
})

#' @aliases dropna
#' @export
setMethod("na.omit",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
dropna(x, how, minNonNulls, cols)
})

#' fillna
#'
#' Replace null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param value Value to replace null values with.
#' Should be an integer, numeric, character or named list.
#' If the value is a named list, then cols is ignored and
#' value must be a mapping from column name (character) to
#' replacement value. The replacement value must be an
#' integer, numeric or character.
#' @param cols optional list of column names to consider.
#' Columns specified in cols that do not have matching data
#' type are ignored. For example, if value is a character, and
#' subset contains a non-character column, then the non-character
#' column is simply ignored.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' fillna(df, 1)
#' fillna(df, list("age" = 20, "name" = "unknown"))
#' }
setMethod("fillna",
signature(x = "DataFrame"),
function(x, value, cols = NULL) {
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
stop("value should be an integer, numeric, charactor or named list.")
}

if (class(value) == "list") {
# Check column names in the named list
colNames <- names(value)
if (length(colNames) == 0 || !all(colNames != "")) {
stop("value should be an a named list with each name being a column name.")
}

# Convert to the named list to an environment to be passed to JVM
valueMap <- new.env()
for (col in colNames) {
# Check each item in the named list is of valid type
v <- value[[col]]
if (!(class(v) %in% c("integer", "numeric", "character"))) {
stop("Each item in value should be an integer, numeric or charactor.")
}
valueMap[[col]] <- v
}

# When value is a named list, caller is expected not to pass in cols
if (!is.null(cols)) {
warning("When value is a named list, cols is ignored!")
cols <- NULL
}

value <- valueMap
} else if (is.integer(value)) {
# Cast an integer to a numeric
value <- as.numeric(value)
}

naFunctions <- callJMethod(x@sdf, "na")
sdf <- if (length(cols) == 0) {
callJMethod(naFunctions, "fill", value)
} else {
callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
}
dataFrame(sdf)
})
18 changes: 18 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

#' @rdname nafunctions
#' @export
setGeneric("dropna",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("dropna")
})

#' @rdname nafunctions
#' @export
setGeneric("na.omit",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("na.omit")
})

#' @rdname schema
#' @export
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
Expand All @@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @export
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname nafunctions
#' @export
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

#' @rdname filter
#' @export
setGeneric("filter", function(x, condition) { standardGeneric("filter") })
Expand Down
10 changes: 9 additions & 1 deletion R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ writeList <- function(con, arr) {
}
}

# Used to pass arrays where the elements can be of different types
writeGenericList <- function(con, list) {
writeInt(con, length(list))
for (elem in list) {
writeObject(con, elem)
}
}

# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
Expand All @@ -168,7 +176,7 @@ writeEnv <- function(con, env) {
if (len > 0) {
writeList(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
writeList(con, as.list(vals))
writeGenericList(con, as.list(vals))
}
}

Expand Down
109 changes: 109 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
writeLines(mockLines, jsonPath)

# For test nafunctions, like dropna(), fillna(),...
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
"{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
"{\"name\":\"David\",\"age\":60,\"height\":null}",
"{\"name\":\"Amy\",\"age\":null,\"height\":null}",
"{\"name\":null,\"age\":null,\"height\":null}")
jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesNa, jsonPathNa)

test_that("infer types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
Expand Down Expand Up @@ -765,5 +774,105 @@ test_that("describe() on a DataFrame", {
expect_equal(collect(stats)[5, "age"], "30")
})

test_that("dropna() on a DataFrame", {
df <- jsonFile(sqlContext, jsonPathNa)
rows <- collect(df)

# drop with columns

expected <- rows[!is.na(rows$name),]
actual <- collect(dropna(df, cols = "name"))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age),]
actual <- collect(dropna(df, cols = "age"))
row.names(expected) <- row.names(actual)
# identical on two dataframes does not work here. Don't know why.
# use identical on all columns as a workaround.
expect_true(identical(expected$age, actual$age))
expect_true(identical(expected$height, actual$height))
expect_true(identical(expected$name, actual$name))

expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
actual <- collect(dropna(df, cols = c("age", "height")))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df))
expect_true(identical(expected, actual))

# drop with how

expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
actual <- collect(dropna(df, "all"))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df, "any"))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
actual <- collect(dropna(df, "any", cols = c("age", "height")))
expect_true(identical(expected, actual))

expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
actual <- collect(dropna(df, "all", cols = c("age", "height")))
expect_true(identical(expected, actual))

# drop with threshold

expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
expect_true(identical(expected, actual))

expected <- rows[as.integer(!is.na(rows$age)) +
as.integer(!is.na(rows$height)) +
as.integer(!is.na(rows$name)) >= 3,]
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_true(identical(expected, actual))
})

test_that("fillna() on a DataFrame", {
df <- jsonFile(sqlContext, jsonPathNa)
rows <- collect(df)

# fill with value

expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
actual <- collect(fillna(df, 50.6))
expect_true(identical(expected, actual))

expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown"))
expect_true(identical(expected, actual))

expected <- rows
expected$age[is.na(expected$age)] <- 50
actual <- collect(fillna(df, 50.6, "age"))
expect_true(identical(expected, actual))

expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown", c("age", "name")))
expect_true(identical(expected, actual))

# fill with named list

expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
expect_true(identical(expected, actual))
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
Loading

0 comments on commit 54a500c

Please sign in to comment.