Skip to content

Commit

Permalink
[SPARK-20159][SPARKR][SQL] Support all catalog API in R
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add a set of catalog API in R

```
"currentDatabase",
"listColumns",
"listDatabases",
"listFunctions",
"listTables",
"recoverPartitions",
"refreshByPath",
"refreshTable",
"setCurrentDatabase",
```
https://github.com/apache/spark/pull/17483/files#diff-6929e6c5e59017ff954e110df20ed7ff

## How was this patch tested?

manual tests, unit tests

Author: Felix Cheung <[email protected]>

Closes #17483 from felixcheung/rcatalog.
  • Loading branch information
felixcheung authored and Felix Cheung committed Apr 2, 2017
1 parent 657cb95 commit 93dbfe7
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 237 deletions.
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Collate:
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
'catalog.R'
'WindowSpec.R'
'backend.R'
'broadcast.R'
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,14 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
"jsonFile",
"listColumns",
"listDatabases",
"listFunctions",
"listTables",
"loadDF",
"parquetFile",
"read.df",
Expand All @@ -370,7 +375,11 @@ export("as.DataFrame",
"read.parquet",
"read.stream",
"read.text",
"recoverPartitions",
"refreshByPath",
"refreshTable",
"setCheckpointDir",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
"spark.getSparkFilesRootDirectory",
Expand Down
233 changes: 0 additions & 233 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -569,200 +569,6 @@ tableToDF <- function(tableName) {
dataFrame(sdf)
}

#' Tables
#'
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName name of the database
#' @return a SparkDataFrame
#' @rdname tables
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tables("hive")
#' }
#' @name tables
#' @method tables default
#' @note tables since 1.4.0
tables.default <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName)
dataFrame(jdf)
}

tables <- function(x, ...) {
dispatchFunc("tables(databaseName = NULL)", x, ...)
}

#' Table Names
#'
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName name of the database
#' @return a list of table names
#' @rdname tableNames
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tableNames("hive")
#' }
#' @name tableNames
#' @method tableNames default
#' @note tableNames since 1.4.0
tableNames.default <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"getTableNames",
sparkSession,
databaseName)
}

tableNames <- function(x, ...) {
dispatchFunc("tableNames(databaseName = NULL)", x, ...)
}

#' Cache Table
#'
#' Caches the specified table in-memory.
#'
#' @param tableName The name of the table being cached
#' @return SparkDataFrame
#' @rdname cacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' cacheTable("table")
#' }
#' @name cacheTable
#' @method cacheTable default
#' @note cacheTable since 1.4.0
cacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "cacheTable", tableName))
}

cacheTable <- function(x, ...) {
dispatchFunc("cacheTable(tableName)", x, ...)
}

#' Uncache Table
#'
#' Removes the specified table from the in-memory cache.
#'
#' @param tableName The name of the table being uncached
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' uncacheTable("table")
#' }
#' @name uncacheTable
#' @method uncacheTable default
#' @note uncacheTable since 1.4.0
uncacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "uncacheTable", tableName))
}

uncacheTable <- function(x, ...) {
dispatchFunc("uncacheTable(tableName)", x, ...)
}

#' Clear Cache
#'
#' Removes all cached tables from the in-memory cache.
#'
#' @rdname clearCache
#' @export
#' @examples
#' \dontrun{
#' clearCache()
#' }
#' @name clearCache
#' @method clearCache default
#' @note clearCache since 1.4.0
clearCache.default <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "clearCache"))
}

clearCache <- function() {
dispatchFunc("clearCache()")
}

#' (Deprecated) Drop Temporary Table
#'
#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
#' @param tableName The name of the SparkSQL table to be dropped.
#' @seealso \link{dropTempView}
#' @rdname dropTempTable-deprecated
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempTable("table")
#' }
#' @name dropTempTable
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

#' Drops the temporary view with the given view name in the catalog.
#'
#' Drops the temporary view with the given view name in the catalog.
#' If the view has been cached before, then it will also be uncached.
#'
#' @param viewName the name of the view to be dropped.
#' @return TRUE if the view is dropped successfully, FALSE otherwise.
#' @rdname dropTempView
#' @name dropTempView
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempView("table")
#' }
#' @note since 2.0.0

dropTempView <- function(viewName) {
sparkSession <- getSparkSession()
if (class(viewName) != "character") {
stop("viewName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "dropTempView", viewName)
}

#' Load a SparkDataFrame
#'
#' Returns the dataset in a data source as a SparkDataFrame
Expand Down Expand Up @@ -841,45 +647,6 @@ loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createExternalTable("myjson", path="path/to/json", source="json")
#' }
#' @name createExternalTable
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
catalog <- callJMethod(sparkSession, "catalog")
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
dataFrame(sdf)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Create a SparkDataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
Expand Down
Loading

0 comments on commit 93dbfe7

Please sign in to comment.