Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20159][SPARKR][SQL] Support all catalog API in R #17483

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Collate:
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
'catalog.R'
'WindowSpec.R'
'backend.R'
'broadcast.R'
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,14 @@ export("as.DataFrame",
"clearCache",
"createDataFrame",
"createExternalTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
"jsonFile",
"listColumns",
"listDatabases",
"listFunctions",
"listTables",
"loadDF",
"parquetFile",
"read.df",
Expand All @@ -370,7 +375,11 @@ export("as.DataFrame",
"read.parquet",
"read.stream",
"read.text",
"recoverPartitions",
"refreshByPath",
"refreshTable",
"setCheckpointDir",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
"spark.getSparkFilesRootDirectory",
Expand Down
233 changes: 0 additions & 233 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -569,200 +569,6 @@ tableToDF <- function(tableName) {
dataFrame(sdf)
}

#' Tables
#'
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName name of the database
#' @return a SparkDataFrame
#' @rdname tables
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tables("hive")
#' }
#' @name tables
#' @method tables default
#' @note tables since 1.4.0
tables.default <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName)
Copy link
Member

@MaxGekk MaxGekk Nov 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I am not mistaken, the method getTables() is not used any more in R. Can we remove it from r.SQLUtils:

def getTables(sparkSession: SparkSession, databaseName: String): DataFrame = {
databaseName match {
case n: String if n != null && n.trim.nonEmpty =>
Dataset.ofRows(sparkSession, ShowTablesCommand(Some(n), None))
case _ =>
Dataset.ofRows(sparkSession, ShowTablesCommand(None, None))
}
}

? cc @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw the PR. LGTM

dataFrame(jdf)
}

tables <- function(x, ...) {
dispatchFunc("tables(databaseName = NULL)", x, ...)
}

#' Table Names
#'
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName name of the database
#' @return a list of table names
#' @rdname tableNames
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' tableNames("hive")
#' }
#' @name tableNames
#' @method tableNames default
#' @note tableNames since 1.4.0
tableNames.default <- function(databaseName = NULL) {
sparkSession <- getSparkSession()
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"getTableNames",
sparkSession,
databaseName)
}

tableNames <- function(x, ...) {
dispatchFunc("tableNames(databaseName = NULL)", x, ...)
}

#' Cache Table
#'
#' Caches the specified table in-memory.
#'
#' @param tableName The name of the table being cached
#' @return SparkDataFrame
#' @rdname cacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' cacheTable("table")
#' }
#' @name cacheTable
#' @method cacheTable default
#' @note cacheTable since 1.4.0
cacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "cacheTable", tableName))
}

cacheTable <- function(x, ...) {
dispatchFunc("cacheTable(tableName)", x, ...)
}

#' Uncache Table
#'
#' Removes the specified table from the in-memory cache.
#'
#' @param tableName The name of the table being uncached
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' createOrReplaceTempView(df, "table")
#' uncacheTable("table")
#' }
#' @name uncacheTable
#' @method uncacheTable default
#' @note uncacheTable since 1.4.0
uncacheTable.default <- function(tableName) {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "uncacheTable", tableName))
}

uncacheTable <- function(x, ...) {
dispatchFunc("uncacheTable(tableName)", x, ...)
}

#' Clear Cache
#'
#' Removes all cached tables from the in-memory cache.
#'
#' @rdname clearCache
#' @export
#' @examples
#' \dontrun{
#' clearCache()
#' }
#' @name clearCache
#' @method clearCache default
#' @note clearCache since 1.4.0
clearCache.default <- function() {
sparkSession <- getSparkSession()
catalog <- callJMethod(sparkSession, "catalog")
invisible(callJMethod(catalog, "clearCache"))
}

clearCache <- function() {
dispatchFunc("clearCache()")
}

#' (Deprecated) Drop Temporary Table
#'
#' Drops the temporary table with the given table name in the catalog.
#' If the table has been cached/persisted before, it's also unpersisted.
#'
#' @param tableName The name of the SparkSQL table to be dropped.
#' @seealso \link{dropTempView}
#' @rdname dropTempTable-deprecated
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempTable("table")
#' }
#' @name dropTempTable
#' @method dropTempTable default
#' @note dropTempTable since 1.4.0
dropTempTable.default <- function(tableName) {
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
dropTempView(tableName)
}

dropTempTable <- function(x, ...) {
.Deprecated("dropTempView")
dispatchFunc("dropTempView(viewName)", x, ...)
}

#' Drops the temporary view with the given view name in the catalog.
#'
#' Drops the temporary view with the given view name in the catalog.
#' If the view has been cached before, then it will also be uncached.
#'
#' @param viewName the name of the view to be dropped.
#' @return TRUE if the view is dropped successfully, FALSE otherwise.
#' @rdname dropTempView
#' @name dropTempView
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- read.df(path, "parquet")
#' createOrReplaceTempView(df, "table")
#' dropTempView("table")
#' }
#' @note since 2.0.0

dropTempView <- function(viewName) {
sparkSession <- getSparkSession()
if (class(viewName) != "character") {
stop("viewName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "dropTempView", viewName)
}

#' Load a SparkDataFrame
#'
#' Returns the dataset in a data source as a SparkDataFrame
Expand Down Expand Up @@ -841,45 +647,6 @@ loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' Create an external table
#'
#' Creates an external table based on the dataset in a data source,
#' Returns a SparkDataFrame associated with the external table.
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
#' @param tableName a name of the table.
#' @param path the path of files to load.
#' @param source the name of external data source.
#' @param ... additional argument(s) passed to the method.
#' @return A SparkDataFrame.
#' @rdname createExternalTable
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- createExternalTable("myjson", path="path/to/json", source="json")
#' }
#' @name createExternalTable
#' @method createExternalTable default
#' @note createExternalTable since 1.4.0
createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
catalog <- callJMethod(sparkSession, "catalog")
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
dataFrame(sdf)
}

createExternalTable <- function(x, ...) {
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
}

#' Create a SparkDataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
Expand Down
Loading