From f6b66ef2e9e08de1b3f48afd8c5d4b715eb030cb Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 8 Dec 2015 20:09:14 +0800 Subject: [PATCH 1/9] [SPARK-12204][SPARKR] Implement drop method for DataFrame in SparkR. --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 75 ++++++++++++++++------- R/pkg/R/generics.R | 4 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 28 +++++++-- 4 files changed, 82 insertions(+), 26 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 00634c1a70c26..2cc1544bef080 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -39,6 +39,7 @@ exportMethods("arrange", "describe", "dim", "distinct", + "drop", "dropDuplicates", "dropna", "dtypes", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 629c1ce2eddc1..ba2f9d2b98e77 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1192,23 +1192,10 @@ setMethod("$", signature(x = "DataFrame"), setMethod("$<-", signature(x = "DataFrame"), function(x, name, value) { stopifnot(class(value) == "Column" || is.null(value)) - cols <- columns(x) - if (name %in% cols) { - if (is.null(value)) { - cols <- Filter(function(c) { c != name }, cols) - } - cols <- lapply(cols, function(c) { - if (c == name) { - alias(value, name) - } else { - col(c) - } - }) - nx <- select(x, cols) + + if (is.null(value)) { + nx <- drop(x, name) } else { - if (is.null(value)) { - return(x) - } nx <- withColumn(x, name, value) } x@sdf <- nx@sdf @@ -1386,12 +1373,13 @@ setMethod("selectExpr", #' WithColumn #' -#' Return a new DataFrame with the specified column added. +#' Return a new DataFrame by adding a column or replacing the existing column +#' that has the same name. #' #' @param x A DataFrame -#' @param colName A string containing the name of the new column. +#' @param colName A column name. #' @param col A Column expression. -#' @return A DataFrame with the new column added. +#' @return A DataFrame with the new column added or the existing column replaced. #' @family DataFrame functions #' @rdname withColumn #' @name withColumn @@ -1404,12 +1392,16 @@ setMethod("selectExpr", #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) #' newDF <- withColumn(df, "newCol", df$col1 * 5) +#' # Replace an existing column +#' newDF2 <- withColumn(newDF, "newCol", newDF$col1) #' } setMethod("withColumn", signature(x = "DataFrame", colName = "character", col = "Column"), function(x, colName, col) { - select(x, x$"*", alias(col, colName)) + sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc) + dataFrame(sdf) }) + #' Mutate #' #' Return a new DataFrame with the specified columns added. @@ -2401,4 +2393,45 @@ setMethod("str", cat(paste0("\nDisplaying first ", ncol(localDF), " columns only.")) } } - }) \ No newline at end of file + }) + +#' drop +#' +#' Returns a new DataFrame with columns dropped. +#' This is a no-op if schema doesn't contain column name(s). +#' +#' @param x A SparkSQL DataFrame. +#' @param cols A character vector of column names or a Column. +#' @return A DataFrame +#' +#' @family DataFrame functions +#' @rdname drop +#' @name drop +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlCtx <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- jsonFile(sqlCtx, path) +#' drop(df, "col1") +#' drop(df, c("col1", "col2")) +#' drop(df, df$col1) +#' } +setMethod("drop", + signature(x = "DataFrame", col = "character"), + function(x, col) { + sdf <- callJMethod(x@sdf, "drop", as.list(col)) + dataFrame(sdf) + }) + +#' @rdname drop +#' @name drop +#' @export +setMethod("drop", + signature(x = "DataFrame", col = "Column"), + function(x, col) { + sdf <- callJMethod(x@sdf, "drop", col@jc) + dataFrame(sdf) + }) + diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index d616266ead41b..b0513cf94957a 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -428,6 +428,10 @@ setGeneric("corr", function(x, ...) {standardGeneric("corr") }) #' @export setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) +#' @rdname drop +#' @export +setGeneric("drop", function(x, col) { standardGeneric("drop") }) + #' @rdname dropduplicates #' @export setGeneric("dropDuplicates", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 14d40d5066e78..b528ca08d0ed4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -824,11 +824,6 @@ test_that("select operators", { df$age2 <- df$age * 2 expect_equal(columns(df), c("name", "age", "age2")) expect_equal(count(where(df, df$age2 == df$age * 2)), 2) - - df$age2 <- NULL - expect_equal(columns(df), c("name", "age")) - df$age3 <- NULL - expect_equal(columns(df), c("name", "age")) }) test_that("select with column", { @@ -854,6 +849,24 @@ test_that("select with column", { "To select multiple columns, use a character vector or list for col") }) +test_that("drop column", { + df <- select(jsonFile(sqlContext, jsonPath), "name", "age") + df1 <- drop(df, "name") + expect_equal(columns(df1), c("age")) + + df$age2 <- df$age + df1 <- drop(df, c("name", "age")) + expect_equal(columns(df1), c("age2")) + + df1 <- drop(df, df$age) + expect_equal(columns(df1), c("name", "age2")) + + df$age2 <- NULL + expect_equal(columns(df), c("name", "age")) + df$age3 <- NULL + expect_equal(columns(df), c("name", "age")) +}) + test_that("subsetting", { # read.json returns columns in random order df <- select(read.json(sqlContext, jsonPath), "name", "age") @@ -1462,6 +1475,11 @@ test_that("withColumn() and withColumnRenamed()", { expect_equal(columns(newDF)[3], "newAge") expect_equal(first(filter(newDF, df$name != "Michael"))$newAge, 32) + # Replace existing column + newDF <- withColumn(df, "age", df$age + 2) + expect_equal(length(columns(newDF)), 2) + expect_equal(first(filter(newDF, df$name != "Michael"))$age, 32) + newDF2 <- withColumnRenamed(df, "age", "newerAge") expect_equal(length(columns(newDF2)), 2) expect_equal(columns(newDF2)[1], "newerAge") From 6416d3cb14a4372cb8cd67d2c84322e8dc5a1c65 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 11 Dec 2015 13:28:34 +0800 Subject: [PATCH 2/9] expose base::drop. --- R/pkg/R/DataFrame.R | 21 ++++++++++++--------- R/pkg/R/generics.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 3 +++ 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index ba2f9d2b98e77..edcb92fe095d7 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2419,19 +2419,22 @@ setMethod("str", #' drop(df, df$col1) #' } setMethod("drop", - signature(x = "DataFrame", col = "character"), + signature(x = "DataFrame"), function(x, col) { - sdf <- callJMethod(x@sdf, "drop", as.list(col)) + stopifnot(class(col) == "character" || class(col) == "Column") + + if (class(col) == "character") { + sdf <- callJMethod(x@sdf, "drop", as.list(col)) + } else { + sdf <- callJMethod(x@sdf, "drop", col@jc) + } dataFrame(sdf) }) -#' @rdname drop -#' @name drop -#' @export +# Expose base::drop setMethod("drop", - signature(x = "DataFrame", col = "Column"), - function(x, col) { - sdf <- callJMethod(x@sdf, "drop", col@jc) - dataFrame(sdf) + signature(x = "ANY"), + function(x) { + base::drop(x) }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index b0513cf94957a..9a8ab97bb8f9a 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -430,7 +430,7 @@ setGeneric("describe", function(x, col, ...) { standardGeneric("describe") }) #' @rdname drop #' @export -setGeneric("drop", function(x, col) { standardGeneric("drop") }) +setGeneric("drop", function(x, ...) { standardGeneric("drop") }) #' @rdname dropduplicates #' @export diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index b528ca08d0ed4..65a4429c99c36 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -865,6 +865,9 @@ test_that("drop column", { expect_equal(columns(df), c("name", "age")) df$age3 <- NULL expect_equal(columns(df), c("name", "age")) + + # Test to make sure base::drop is not masked + expect_equal(drop(1:3 %*% 2:4), 20) }) test_that("subsetting", { From 6f44594e8f7f7551a4ce4fb96472ff8949fa40bc Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Fri, 11 Dec 2015 17:05:54 +0800 Subject: [PATCH 3/9] Fix comments. --- R/pkg/R/DataFrame.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index edcb92fe095d7..d18d1d0534c52 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2423,10 +2423,10 @@ setMethod("drop", function(x, col) { stopifnot(class(col) == "character" || class(col) == "Column") - if (class(col) == "character") { - sdf <- callJMethod(x@sdf, "drop", as.list(col)) - } else { + if (class(col) == "Column") { sdf <- callJMethod(x@sdf, "drop", col@jc) + } else { + sdf <- callJMethod(x@sdf, "drop", as.list(col)) } dataFrame(sdf) }) From 9bd4ef191e6cd4243f1c7a60a26ce70b9f06b22f Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 14 Dec 2015 14:29:31 +0800 Subject: [PATCH 4/9] Update documentation for withColumn. --- docs/sql-programming-guide.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index bc89c781562bd..1d126a12b25fd 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2150,6 +2150,8 @@ options. --conf spark.sql.hive.thriftServer.singleSession=true \ ... {% endhighlight %} + - Since 1.6.1, withColumn method in sparkR supports adding new a column to or replacing existing columns + of the same name of a DataFrame. - From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This change was made to match the behavior of Hive 1.2 for more consistent type casting to TimestampType @@ -2183,6 +2185,7 @@ options. users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate the DataFrame and the new DataFrame will include new files. + - DataFrame.withColumn method in pySpark supports adding new a column or replacing existing columns of the same name. ## Upgrading from Spark SQL 1.3 to 1.4 @@ -2262,6 +2265,15 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false") +#### Behavior change on DataFrame.withColumn + +Prior to 1.4, DataFrame.withColumn() supports adding new a column only. The new column will always be added no +matter whether there is any existing column of the same name or not. Since 1.4, DataFrame.withColumn() supports +adding new a column or replacing existing columns of the same name. + +Note that this change is only for Scala API, not for PySpark and SparkR. + + ## Upgrading from Spark SQL 1.0-1.2 to 1.3 In Spark 1.3 we removed the "Alpha" label from Spark SQL and as part of this did a cleanup of the From 464676003b62eceb39fc9ecdfb6ec10e9bfbbcd1 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 14 Dec 2015 15:21:47 +0800 Subject: [PATCH 5/9] Use read.json() instead of deprecated jsonFile(). --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 65a4429c99c36..f9b8bd9940c84 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -850,7 +850,7 @@ test_that("select with column", { }) test_that("drop column", { - df <- select(jsonFile(sqlContext, jsonPath), "name", "age") + df <- select(read.json(sqlContext, jsonPath), "name", "age") df1 <- drop(df, "name") expect_equal(columns(df1), c("age")) From 9ebae5981a8c074b683e0d0de80bfaf20b42d4c9 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 15 Dec 2015 14:46:13 +0800 Subject: [PATCH 6/9] Refine documentation update. --- docs/sql-programming-guide.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 1d126a12b25fd..fddc51379406b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -2150,7 +2150,7 @@ options. --conf spark.sql.hive.thriftServer.singleSession=true \ ... {% endhighlight %} - - Since 1.6.1, withColumn method in sparkR supports adding new a column to or replacing existing columns + - Since 1.6.1, withColumn method in sparkR supports adding a new column to or replacing existing columns of the same name of a DataFrame. - From Spark 1.6, LongType casts to TimestampType expect seconds instead of microseconds. This @@ -2185,7 +2185,7 @@ options. users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate the DataFrame and the new DataFrame will include new files. - - DataFrame.withColumn method in pySpark supports adding new a column or replacing existing columns of the same name. + - DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name. ## Upgrading from Spark SQL 1.3 to 1.4 @@ -2267,9 +2267,10 @@ sqlContext.setConf("spark.sql.retainGroupColumns", "false") #### Behavior change on DataFrame.withColumn -Prior to 1.4, DataFrame.withColumn() supports adding new a column only. The new column will always be added no -matter whether there is any existing column of the same name or not. Since 1.4, DataFrame.withColumn() supports -adding new a column or replacing existing columns of the same name. +Prior to 1.4, DataFrame.withColumn() supports adding a column only. The column will always be added +as a new column with its specified name in the result DataFrame even if there may be any existing +columns of the same name. Since 1.4, DataFrame.withColumn() supports adding a column of a different +name from names of all existing columns or replacing existing columns of the same name. Note that this change is only for Scala API, not for PySpark and SparkR. From 89657f8f059ea91b83cace02314b1b6865eea80d Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 16 Dec 2015 15:47:10 +0800 Subject: [PATCH 7/9] Use read.json() instead of jsonFile() in function description of drop(). --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index d18d1d0534c52..51510e4101e44 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2413,7 +2413,7 @@ setMethod("str", #' sc <- sparkR.init() #' sqlCtx <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- read.json(sqlCtx, path) #' drop(df, "col1") #' drop(df, c("col1", "col2")) #' drop(df, df$col1) From c08c1ea9979c25aded74dcb505b9f03302b2c3cd Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 20 Jan 2016 17:07:50 +0800 Subject: [PATCH 8/9] Fix R style. --- R/pkg/R/DataFrame.R | 1 - 1 file changed, 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 51510e4101e44..4653a73e11be3 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2437,4 +2437,3 @@ setMethod("drop", function(x) { base::drop(x) }) - From 5eb30044e3655d884de2aebaa39b7245d099fbdb Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Thu, 21 Jan 2016 10:58:37 +0800 Subject: [PATCH 9/9] Fix test break. --- R/pkg/inst/tests/testthat/test_context.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3b14a497b487a..ad3f9722a4802 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -26,7 +26,7 @@ test_that("Check masked functions", { maskedBySparkR <- masked[funcSparkROrEmpty] namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", - "summary", "transform") + "summary", "transform", "drop") expect_equal(length(maskedBySparkR), length(namesOfMasked)) expect_equal(sort(maskedBySparkR), sort(namesOfMasked)) # above are those reported as masked when `library(SparkR)`