Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable #17511

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ setMethod("insertInto",
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
invisible(callJMethod(write, "insertInto", tableName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering what is the reason why we add invisible in this PR? Is it related to this deprecation?

Copy link
Member Author

@felixcheung felixcheung Apr 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a R usability thing: for method not returning any value (in these cases the Scala method is defined as foo : Unit), it will print NULL.

so we say invisible to get it to not print out the value, that's all

so no, it's not really related to this deprecation - it's probably been around for quite a while

})

#' Cache
Expand Down Expand Up @@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
invisible(callJMethod(write, "saveAsTable", tableName))
})

#' summary
Expand Down
69 changes: 55 additions & 14 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# catalog.R: SparkSession catalog functions

#' Create an external table
#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
Expand All @@ -29,10 +29,11 @@
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param schema the schema of the data for certain data source.
#' @param schema the schema of the data required for some data sources.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @rdname createExternalTable-deprecated
#' @seealso \link{createTable}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -43,24 +44,64 @@
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
.Deprecated("createTable", old = "createExternalTable")
createTable(tableName, path, source, schema, ...)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Creates a table based on the dataset in a data source
#'
#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
#' the table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
#' created from the data at the given path. Otherwise a managed table is created.
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data sources.
#' @param ... additional named parameters as options for the data source.
#' @return A SparkDataFrame.
#' @rdname createTable
#' @seealso \link{createExternalTable}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
#' createTable("people", source = "json", schema = schema)
#' insertInto(df, "people")
#' }
#' @name createTable
#' @note createTable since 2.2.0
createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
catalog <- callJMethod(sparkSession, "catalog")
if (is.null(schema)) {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
sdf <- callJMethod(catalog, "createTable", tableName, source, options)
} else if (class(schema) == "structType") {
sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
} else {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
stop("schema must be a structType.")
}
dataFrame(sdf)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Cache Table
#'
#' Caches the specified table in-memory.
Expand Down Expand Up @@ -162,14 +203,14 @@ clearCache <- function() {
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
.Deprecated("dropTempView", old = "dropTempTable")
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

Expand Down Expand Up @@ -430,11 +471,11 @@ recoverPartitions <- function(tableName) {
invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
}

#' Invalidate and refresh all the cached metadata of the given table
#' Invalidates and refreshes all the cached data and metadata of the given table
#'
#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
#' Spark SQL or the external data source library it uses might cache certain metadata about a
#' table, such as the location of blocks. When those change outside of Spark SQL, users should
#' Invalidates and refreshes all the cached data and metadata of the given table. For performance
#' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
#' a table, such as the location of blocks. When those change outside of Spark SQL, users should
#' call this function to invalidate the cache.
#'
#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
Expand Down
20 changes: 15 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
df <- read.df(jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
Expand Down Expand Up @@ -1268,33 +1268,43 @@ test_that("column calculation", {

test_that("test HiveContext", {
setHiveContext(sc)
df <- createExternalTable("json", jsonPath, "json")

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
createTable("people", source = "json", schema = schema)
df <- read.df(jsonPathNa, "json", schema)
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
sql("DROP TABLE people")

df <- createTable("json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql("select * from json")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)

jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
saveAsTable(df, "json2", "json", "append", path = jsonPath2)
df3 <- sql("select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
unlink(jsonPath2)

hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
saveAsTable(df, "hivetestbl", path = hivetestDataPath)
df4 <- sql("select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
unlink(hivetestDataPath)

parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
df5 <- sql("select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
unlink(parquetDataPath)

unsetHiveContext()
})

Expand Down