Skip to content

Commit

Permalink
add createTable, deprecate createExternalTable, fix invisible on API …
Browse files Browse the repository at this point in the history
…without return values
  • Loading branch information
felixcheung committed Apr 2, 2017
1 parent 93dbfe7 commit b424a52
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 21 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ setMethod("insertInto",
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
callJMethod(write, "insertInto", tableName)
invisible(callJMethod(write, "insertInto", tableName))
})

#' Cache
Expand Down Expand Up @@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
callJMethod(write, "saveAsTable", tableName)
invisible(callJMethod(write, "saveAsTable", tableName))
})

#' summary
Expand Down
69 changes: 55 additions & 14 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# catalog.R: SparkSession catalog functions

#' Create an external table
#' (Deprecated) Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
Expand All @@ -29,10 +29,11 @@
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param schema the schema of the data for certain data source.
#' @param schema the schema of the data required for some data sources.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @rdname createExternalTable-deprecated
#' @seealso \link{createTable}
#' @export
#' @examples
#'\dontrun{
Expand All @@ -43,24 +44,64 @@
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
.Deprecated("createTable", old = "createExternalTable")
createTable(tableName, path, source, schema, ...)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Creates a table based on the dataset in a data source
#'
#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
#' the table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
#' created from the data at the given path. Otherwise a managed table is created.
#'
#' @param tableName a name of the table.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data sources.
#' @param ... additional named parameters as options for the data source.
#' @return A SparkDataFrame.
#' @rdname createTable
#' @seealso \link{createExternalTable-deprecated}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
#' createTable("people", source = "json", schema = schema)
#' insertInto(df, "people")
#' }
#' @name createTable
#' @method createTable
#' @note createTable since 2.2.0
createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
catalog <- callJMethod(sparkSession, "catalog")
if (is.null(schema)) {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
sdf <- callJMethod(catalog, "createTable", tableName, source, options)
} else if (class(schema) == "structType") {
sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
} else {
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
stop("schema must be a structType.")
}
dataFrame(sdf)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Cache Table
#'
#' Caches the specified table in-memory.
Expand Down Expand Up @@ -162,14 +203,14 @@ clearCache <- function() {
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
.Deprecated("dropTempView", old = "dropTempTable")
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

Expand Down Expand Up @@ -430,11 +471,11 @@ recoverPartitions <- function(tableName) {
invisible(handledCallJMethod(catalog, "recoverPartitions", tableName))
}

#' Invalidate and refresh all the cached metadata of the given table
#' Invalidate and refresh all the cached data and metadata of the given table
#'
#' Invalidate and refresh all the cached metadata of the given table. For performance reasons,
#' Spark SQL or the external data source library it uses might cache certain metadata about a
#' table, such as the location of blocks. When those change outside of Spark SQL, users should
#' Invalidate and refresh all the cached data and metadata of the given table. For performance
#' reasons, Spark SQL or the external data source library it uses might cache certain metadata about
#' a table, such as the location of blocks. When those change outside of Spark SQL, users should
#' call this function to invalidate the cache.
#'
#' If this table is cached as an InMemoryRelation, drop the original cached version and make the
Expand Down
20 changes: 15 additions & 5 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
setHiveContext(sc)
sql("CREATE TABLE people (name string, age double, height float)")
df <- read.df(jsonPathNa, "json", schema)
invisible(insertInto(df, "people"))
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
c(16))
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
Expand Down Expand Up @@ -1268,33 +1268,43 @@ test_that("column calculation", {

test_that("test HiveContext", {
setHiveContext(sc)
df <- createExternalTable("json", jsonPath, "json")

schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
createTable("people", source = "json", schema = schema)
df <- read.df(jsonPathNa, "json", schema)
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
sql("DROP TABLE people")

df <- createTable("json", jsonPath, "json")
expect_is(df, "SparkDataFrame")
expect_equal(count(df), 3)
df2 <- sql("select * from json")
expect_is(df2, "SparkDataFrame")
expect_equal(count(df2), 3)

jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
saveAsTable(df, "json2", "json", "append", path = jsonPath2)
df3 <- sql("select * from json2")
expect_is(df3, "SparkDataFrame")
expect_equal(count(df3), 3)
unlink(jsonPath2)

hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
saveAsTable(df, "hivetestbl", path = hivetestDataPath)
df4 <- sql("select * from hivetestbl")
expect_is(df4, "SparkDataFrame")
expect_equal(count(df4), 3)
unlink(hivetestDataPath)

parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
df5 <- sql("select * from parquetest")
expect_is(df5, "SparkDataFrame")
expect_equal(count(df5), 3)
unlink(parquetDataPath)

unsetHiveContext()
})

Expand Down

0 comments on commit b424a52

Please sign in to comment.