From e4808b656172e5d2994e0159ac4c0e326de1cb8a Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 30 Mar 2017 09:25:17 -0700 Subject: [PATCH 1/8] Move catalog-base method into a new file, add catalog APIs, tests --- R/pkg/DESCRIPTION | 1 + R/pkg/NAMESPACE | 9 + R/pkg/R/SQLContext.R | 233 ----------- R/pkg/R/catalog.R | 478 ++++++++++++++++++++++ R/pkg/R/utils.R | 18 + R/pkg/inst/tests/testthat/test_sparkSQL.R | 52 ++- 6 files changed, 555 insertions(+), 236 deletions(-) create mode 100644 R/pkg/R/catalog.R diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 2ea90f7d3666e..00dde64324ae7 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -32,6 +32,7 @@ Collate: 'pairRDD.R' 'DataFrame.R' 'SQLContext.R' + 'catalog.R' 'WindowSpec.R' 'backend.R' 'broadcast.R' diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 8be7875ad2d5f..c02046c94bf4d 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -358,9 +358,14 @@ export("as.DataFrame", "clearCache", "createDataFrame", "createExternalTable", + "currentDatabase", "dropTempTable", "dropTempView", "jsonFile", + "listColumns", + "listDatabases", + "listFunctions", + "listTables", "loadDF", "parquetFile", "read.df", @@ -370,7 +375,11 @@ export("as.DataFrame", "read.parquet", "read.stream", "read.text", + "recoverPartitions", + "refreshByPath", + "refreshTable", "setCheckpointDir", + "setCurrentDatabase", "spark.lapply", "spark.addFile", "spark.getSparkFilesRootDirectory", diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index b75fb0159d503..a1edef7608fa1 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -569,200 +569,6 @@ tableToDF <- function(tableName) { dataFrame(sdf) } -#' Tables -#' -#' Returns a SparkDataFrame containing names of tables in the given database. -#' -#' @param databaseName name of the database -#' @return a SparkDataFrame -#' @rdname tables -#' @export -#' @examples -#'\dontrun{ -#' sparkR.session() -#' tables("hive") -#' } -#' @name tables -#' @method tables default -#' @note tables since 1.4.0 -tables.default <- function(databaseName = NULL) { - sparkSession <- getSparkSession() - jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName) - dataFrame(jdf) -} - -tables <- function(x, ...) { - dispatchFunc("tables(databaseName = NULL)", x, ...) -} - -#' Table Names -#' -#' Returns the names of tables in the given database as an array. -#' -#' @param databaseName name of the database -#' @return a list of table names -#' @rdname tableNames -#' @export -#' @examples -#'\dontrun{ -#' sparkR.session() -#' tableNames("hive") -#' } -#' @name tableNames -#' @method tableNames default -#' @note tableNames since 1.4.0 -tableNames.default <- function(databaseName = NULL) { - sparkSession <- getSparkSession() - callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "getTableNames", - sparkSession, - databaseName) -} - -tableNames <- function(x, ...) { - dispatchFunc("tableNames(databaseName = NULL)", x, ...) -} - -#' Cache Table -#' -#' Caches the specified table in-memory. -#' -#' @param tableName The name of the table being cached -#' @return SparkDataFrame -#' @rdname cacheTable -#' @export -#' @examples -#'\dontrun{ -#' sparkR.session() -#' path <- "path/to/file.json" -#' df <- read.json(path) -#' createOrReplaceTempView(df, "table") -#' cacheTable("table") -#' } -#' @name cacheTable -#' @method cacheTable default -#' @note cacheTable since 1.4.0 -cacheTable.default <- function(tableName) { - sparkSession <- getSparkSession() - catalog <- callJMethod(sparkSession, "catalog") - invisible(callJMethod(catalog, "cacheTable", tableName)) -} - -cacheTable <- function(x, ...) { - dispatchFunc("cacheTable(tableName)", x, ...) -} - -#' Uncache Table -#' -#' Removes the specified table from the in-memory cache. -#' -#' @param tableName The name of the table being uncached -#' @return SparkDataFrame -#' @rdname uncacheTable -#' @export -#' @examples -#'\dontrun{ -#' sparkR.session() -#' path <- "path/to/file.json" -#' df <- read.json(path) -#' createOrReplaceTempView(df, "table") -#' uncacheTable("table") -#' } -#' @name uncacheTable -#' @method uncacheTable default -#' @note uncacheTable since 1.4.0 -uncacheTable.default <- function(tableName) { - sparkSession <- getSparkSession() - catalog <- callJMethod(sparkSession, "catalog") - invisible(callJMethod(catalog, "uncacheTable", tableName)) -} - -uncacheTable <- function(x, ...) { - dispatchFunc("uncacheTable(tableName)", x, ...) -} - -#' Clear Cache -#' -#' Removes all cached tables from the in-memory cache. -#' -#' @rdname clearCache -#' @export -#' @examples -#' \dontrun{ -#' clearCache() -#' } -#' @name clearCache -#' @method clearCache default -#' @note clearCache since 1.4.0 -clearCache.default <- function() { - sparkSession <- getSparkSession() - catalog <- callJMethod(sparkSession, "catalog") - invisible(callJMethod(catalog, "clearCache")) -} - -clearCache <- function() { - dispatchFunc("clearCache()") -} - -#' (Deprecated) Drop Temporary Table -#' -#' Drops the temporary table with the given table name in the catalog. -#' If the table has been cached/persisted before, it's also unpersisted. -#' -#' @param tableName The name of the SparkSQL table to be dropped. -#' @seealso \link{dropTempView} -#' @rdname dropTempTable-deprecated -#' @export -#' @examples -#' \dontrun{ -#' sparkR.session() -#' df <- read.df(path, "parquet") -#' createOrReplaceTempView(df, "table") -#' dropTempTable("table") -#' } -#' @name dropTempTable -#' @method dropTempTable default -#' @note dropTempTable since 1.4.0 -dropTempTable.default <- function(tableName) { - if (class(tableName) != "character") { - stop("tableName must be a string.") - } - dropTempView(tableName) -} - -dropTempTable <- function(x, ...) { - .Deprecated("dropTempView") - dispatchFunc("dropTempView(viewName)", x, ...) -} - -#' Drops the temporary view with the given view name in the catalog. -#' -#' Drops the temporary view with the given view name in the catalog. -#' If the view has been cached before, then it will also be uncached. -#' -#' @param viewName the name of the view to be dropped. -#' @return TRUE if the view is dropped successfully, FALSE otherwise. -#' @rdname dropTempView -#' @name dropTempView -#' @export -#' @examples -#' \dontrun{ -#' sparkR.session() -#' df <- read.df(path, "parquet") -#' createOrReplaceTempView(df, "table") -#' dropTempView("table") -#' } -#' @note since 2.0.0 - -dropTempView <- function(viewName) { - sparkSession <- getSparkSession() - if (class(viewName) != "character") { - stop("viewName must be a string.") - } - catalog <- callJMethod(sparkSession, "catalog") - callJMethod(catalog, "dropTempView", viewName) -} - #' Load a SparkDataFrame #' #' Returns the dataset in a data source as a SparkDataFrame @@ -841,45 +647,6 @@ loadDF <- function(x = NULL, ...) { dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...) } -#' Create an external table -#' -#' Creates an external table based on the dataset in a data source, -#' Returns a SparkDataFrame associated with the external table. -#' -#' The data source is specified by the \code{source} and a set of options(...). -#' If \code{source} is not specified, the default data source configured by -#' "spark.sql.sources.default" will be used. -#' -#' @param tableName a name of the table. -#' @param path the path of files to load. -#' @param source the name of external data source. -#' @param ... additional argument(s) passed to the method. -#' @return A SparkDataFrame. -#' @rdname createExternalTable -#' @export -#' @examples -#'\dontrun{ -#' sparkR.session() -#' df <- createExternalTable("myjson", path="path/to/json", source="json") -#' } -#' @name createExternalTable -#' @method createExternalTable default -#' @note createExternalTable since 1.4.0 -createExternalTable.default <- function(tableName, path = NULL, source = NULL, ...) { - sparkSession <- getSparkSession() - options <- varargsToStrEnv(...) - if (!is.null(path)) { - options[["path"]] <- path - } - catalog <- callJMethod(sparkSession, "catalog") - sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options) - dataFrame(sdf) -} - -createExternalTable <- function(x, ...) { - dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...) -} - #' Create a SparkDataFrame representing the database table accessible via JDBC URL #' #' Additional JDBC database connection properties can be set (...) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R new file mode 100644 index 0000000000000..9da2587bf1b01 --- /dev/null +++ b/R/pkg/R/catalog.R @@ -0,0 +1,478 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# catalog.R: SparkSession catalog functions + +#' Create an external table +#' +#' Creates an external table based on the dataset in a data source, +#' Returns a SparkDataFrame associated with the external table. +#' +#' The data source is specified by the \code{source} and a set of options(...). +#' If \code{source} is not specified, the default data source configured by +#' "spark.sql.sources.default" will be used. +#' +#' @param tableName a name of the table. +#' @param path the path of files to load. +#' @param source the name of external data source. +#' @param schema the schema of the data for certain data source. +#' @param ... additional argument(s) passed to the method. +#' @return A SparkDataFrame. +#' @rdname createExternalTable +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- createExternalTable("myjson", path="path/to/json", source="json", schema) +#' } +#' @name createExternalTable +#' @method createExternalTable default +#' @note createExternalTable since 1.4.0 +createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) { + sparkSession <- getSparkSession() + options <- varargsToStrEnv(...) + if (!is.null(path)) { + options[["path"]] <- path + } + catalog <- callJMethod(sparkSession, "catalog") + if (!is.null(schema)) { + sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options) + } else { + sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options) + } + dataFrame(sdf) +} + +createExternalTable <- function(x, ...) { + dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...) +} + +#' Cache Table +#' +#' Caches the specified table in-memory. +#' +#' @param tableName The name of the table being cached +#' @return SparkDataFrame +#' @rdname cacheTable +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' createOrReplaceTempView(df, "table") +#' cacheTable("table") +#' } +#' @name cacheTable +#' @method cacheTable default +#' @note cacheTable since 1.4.0 +cacheTable.default <- function(tableName) { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(callJMethod(catalog, "cacheTable", tableName)) +} + +cacheTable <- function(x, ...) { + dispatchFunc("cacheTable(tableName)", x, ...) +} + +#' Uncache Table +#' +#' Removes the specified table from the in-memory cache. +#' +#' @param tableName The name of the table being uncached +#' @return SparkDataFrame +#' @rdname uncacheTable +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' path <- "path/to/file.json" +#' df <- read.json(path) +#' createOrReplaceTempView(df, "table") +#' uncacheTable("table") +#' } +#' @name uncacheTable +#' @method uncacheTable default +#' @note uncacheTable since 1.4.0 +uncacheTable.default <- function(tableName) { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(callJMethod(catalog, "uncacheTable", tableName)) +} + +uncacheTable <- function(x, ...) { + dispatchFunc("uncacheTable(tableName)", x, ...) +} + +#' Clear Cache +#' +#' Removes all cached tables from the in-memory cache. +#' +#' @rdname clearCache +#' @export +#' @examples +#' \dontrun{ +#' clearCache() +#' } +#' @name clearCache +#' @method clearCache default +#' @note clearCache since 1.4.0 +clearCache.default <- function() { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(callJMethod(catalog, "clearCache")) +} + +clearCache <- function() { + dispatchFunc("clearCache()") +} + +#' (Deprecated) Drop Temporary Table +#' +#' Drops the temporary table with the given table name in the catalog. +#' If the table has been cached/persisted before, it's also unpersisted. +#' +#' @param tableName The name of the SparkSQL table to be dropped. +#' @seealso \link{dropTempView} +#' @rdname dropTempTable-deprecated +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' df <- read.df(path, "parquet") +#' createOrReplaceTempView(df, "table") +#' dropTempTable("table") +#' } +#' @name dropTempTable +#' @method dropTempTable default +#' @note dropTempTable since 1.4.0 +dropTempTable.default <- function(tableName) { + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + dropTempView(tableName) +} + +dropTempTable <- function(x, ...) { + .Deprecated("dropTempView") + dispatchFunc("dropTempView(viewName)", x, ...) +} + +#' Drops the temporary view with the given view name in the catalog. +#' +#' Drops the temporary view with the given view name in the catalog. +#' If the view has been cached before, then it will also be uncached. +#' +#' @param viewName the name of the view to be dropped. +#' @return TRUE if the view is dropped successfully, FALSE otherwise. +#' @rdname dropTempView +#' @name dropTempView +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' df <- read.df(path, "parquet") +#' createOrReplaceTempView(df, "table") +#' dropTempView("table") +#' } +#' @note since 2.0.0 +dropTempView <- function(viewName) { + sparkSession <- getSparkSession() + if (class(viewName) != "character") { + stop("viewName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "dropTempView", viewName) +} + +#' Tables +#' +#' Returns a SparkDataFrame containing names of tables in the given database. +#' +#' @param databaseName name of the database +#' @return a SparkDataFrame +#' @rdname tables +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' tables("hive") +#' } +#' @name tables +#' @method tables default +#' @note tables since 1.4.0 +tables.default <- function(databaseName = NULL) { + .Deprecated("listTables", old = "tables") + sparkSession <- getSparkSession() + jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName) + dataFrame(jdf) +} + +tables <- function(x, ...) { + dispatchFunc("tables(databaseName = NULL)", x, ...) +} + +#' Table Names +#' +#' Returns the names of tables in the given database as an array. +#' +#' @param databaseName name of the database +#' @return a list of table names +#' @rdname tableNames +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' tableNames("hive") +#' } +#' @name tableNames +#' @method tableNames default +#' @note tableNames since 1.4.0 +tableNames.default <- function(databaseName = NULL) { + sparkSession <- getSparkSession() + callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "getTableNames", + sparkSession, + databaseName) +} + +tableNames <- function(x, ...) { + dispatchFunc("tableNames(databaseName = NULL)", x, ...) +} + +#' Returns the current default database +#' +#' Returns the current default database. +#' +#' @return name of the current default database. +#' @rdname currentDatabase +#' @name currentDatabase +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' currentDatabase() +#' } +#' @note since 2.2.0 +currentDatabase <- function() { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "currentDatabase") +} + +#' Sets the current default database +#' +#' Sets the current default database. +#' +#' @param databaseName name of the database +#' @rdname setCurrentDatabase +#' @name setCurrentDatabase +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' setCurrentDatabase("default") +#' } +#' @note since 2.2.0 +setCurrentDatabase <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName)) +} + +#' Returns a list of databases available +#' +#' Returns a list of databases available. +#' +#' @return a SparkDataFrame of the list of databases. +#' @rdname listDatabases +#' @name listDatabases +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' listDatabases() +#' } +#' @note since 2.2.0 +listDatabases <- function() { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF")) +} + +#' Returns a list of tables in the specified database +#' +#' Returns a list of tables in the specified database. +#' This includes all temporary tables. +#' +#' @param databaseName (optional) name of the database +#' @return a SparkDataFrame of the list of tables. +#' @rdname listTables +#' @name listTables +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' listTables() +#' listTables("default") +#' } +#' @note since 2.2.0 +listTables <- function(databaseName = NULL) { + sparkSession <- getSparkSession() + if (!is.null(databaseName) && class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jdst <- if (is.null(databaseName)) { + callJMethod(catalog, "listTables") + } else { + handledCallJMethod(catalog, "listTables", databaseName) + } + dataFrame(callJMethod(jdst, "toDF")) +} + +#' Returns a list of columns for the given table in the specified database +#' +#' Returns a list of columns for the given table in the specified database. +#' +#' @param tableName a name of the table. +#' @param databaseName (optional) name of the database +#' @return a SparkDataFrame of the list of column descriptions. +#' @rdname listColumns +#' @name listColumns +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' listColumns("mytable") +#' } +#' @note since 2.2.0 +listColumns <- function(tableName, databaseName = NULL) { + sparkSession <- getSparkSession() + if (!is.null(databaseName) && class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jdst <- if (is.null(databaseName)) { + handledCallJMethod(catalog, "listColumns", tableName) + } else { + handledCallJMethod(catalog, "listColumns", databaseName, tableName) + } + dataFrame(callJMethod(jdst, "toDF")) +} + +#' Returns a list of functions registered in the specified database +#' +#' Returns a list of functions registered in the specified database. +#' This includes all temporary functions. +#' +#' @param databaseName (optional) name of the database +#' @return a SparkDataFrame of the list of function descriptions. +#' @rdname listFunctions +#' @name listFunctions +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' listFunctions() +#' } +#' @note since 2.2.0 +listFunctions <- function(databaseName = NULL) { + sparkSession <- getSparkSession() + if (!is.null(databaseName) && class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jdst <- if (is.null(databaseName)) { + callJMethod(catalog, "listFunctions") + } else { + handledCallJMethod(catalog, "listFunctions", databaseName) + } + dataFrame(callJMethod(jdst, "toDF")) +} + +#' Recover all the partitions in the directory of a table and update the catalog +#' +#' Recover all the partitions in the directory of a table and update the catalog. +#' +#' @param tableName a name of the table. +#' @rdname recoverPartitions +#' @name recoverPartitions +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' recoverPartitions("myTable") +#' } +#' @note since 2.2.0 +recoverPartitions <- function(tableName) { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(handledCallJMethod(catalog, "recoverPartitions", tableName)) +} + +#' Invalidate and refresh all the cached metadata of the given table +#' +#' Invalidate and refresh all the cached metadata of the given table. For performance reasons, +#' Spark SQL or the external data source library it uses might cache certain metadata about a +#' table, such as the location of blocks. When those change outside of Spark SQL, users should +#' call this function to invalidate the cache. +#' +#' If this table is cached as an InMemoryRelation, drop the original cached version and make the +#' new version cached lazily. +#' +#' @param tableName a name of the table. +#' @rdname refreshTable +#' @name refreshTable +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' refreshTable("myTable") +#' } +#' @note since 2.2.0 +refreshTable <- function(tableName) { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(handledCallJMethod(catalog, "refreshTable", tableName)) +} + +#' Invalidate and refresh all the cached data and metadata for SparkDataFrame containing path +#' +#' Invalidate and refresh all the cached data (and the associated metadata) for any SparkDataFrame +#' that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate +#' everything that is cached. +#' +#' @param path the path of the data source. +#' @rdname refreshByPath +#' @name refreshByPath +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' refreshByPath("/path") +#' } +#' @note since 2.2.0 +refreshByPath <- function(path) { + sparkSession <- getSparkSession() + catalog <- callJMethod(sparkSession, "catalog") + invisible(handledCallJMethod(catalog, "refreshByPath", path)) +} diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 810de9917e0ba..fbc89e98847bf 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -846,6 +846,24 @@ captureJVMException <- function(e, method) { # Extract the first message of JVM exception. first <- strsplit(msg[2], "\r?\n\tat")[[1]][1] stop(paste0(rmsg, "analysis error - ", first), call. = FALSE) + } else + if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ", stacktrace))) { + msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ", + fixed = TRUE)[[1]] + # Extract "Error in ..." message. + rmsg <- msg[1] + # Extract the first message of JVM exception. + first <- strsplit(msg[2], "\r?\n\tat")[[1]][1] + stop(paste0(rmsg, "no such database - ", first), call. = FALSE) + } else + if (any(grep("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ", stacktrace))) { + msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ", + fixed = TRUE)[[1]] + # Extract "Error in ..." message. + rmsg <- msg[1] + # Extract the first message of JVM exception. + first <- strsplit(msg[2], "\r?\n\tat")[[1]][1] + stop(paste0(rmsg, "no such table - ", first), call. = FALSE) } else { stop(stacktrace, call. = FALSE) } diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 5acf8719d1201..fea1ff0c7d598 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -645,16 +645,17 @@ test_that("test tableNames and tables", { df <- read.json(jsonPath) createOrReplaceTempView(df, "table1") expect_equal(length(tableNames()), 1) - tables <- tables() + tables <- listTables() expect_equal(count(tables), 1) + expect_equal(count(suppressWarnings(tables())), count(tables)) suppressWarnings(registerTempTable(df, "table2")) - tables <- tables() + tables <- listTables() expect_equal(count(tables), 2) suppressWarnings(dropTempTable("table1")) expect_true(dropTempView("table2")) - tables <- tables() + tables <- listTables() expect_equal(count(tables), 0) }) @@ -2977,6 +2978,51 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt")) }) +test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { + expect_equal(currentDatabase(), "default") + expect_error(setCurrentDatabase("default"), NA) + expect_error(setCurrentDatabase("foo"), + "Error in setCurrentDatabase : analysis error - Database 'foo' does not exist") + dbs <- collect(listDatabases()) + expect_equal(names(dbs), c("name", "description", "locationUri")) + expect_equal(dbs[[1]], "default") +}) + +test_that("catalog APIs, listTables, listColumns, listFunctions", { + tb <- listTables() + count <- count(suppressWarnings(tables())) + expect_equal(nrow(tb), count) + expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary")) + + createOrReplaceTempView(as.DataFrame(cars), "cars") + + tb <- listTables() + expect_equal(nrow(tb), count + 1) + tbs <- collect(tb) + expect_true(nrow(tbs[tbs$name == "cars", ]) > 0) + expect_error(listTables("bar"), + "Error in listTables : no such database - Database 'bar' not found") + + c <- listColumns("cars") + expect_equal(nrow(c), 2) + expect_equal(colnames(c), + c("name", "description", "dataType", "nullable", "isPartition", "isBucket")) + expect_equal(collect(c)[[1]][[1]], "speed") + expect_error(listColumns("foo", "default"), + "Error in listColumns : no such table - Table or view 'foo' not found in database 'default'") + + dropTempView("cars") + + f <- listFunctions() + expect_equal(nrow(f) >= 200) # 250 + expect_equal(colnames(f), + c("name", "database", "description", "className", "isTemporary")) + expect_equal(take(orderBy(f, "className"), 1)$className, + "org.apache.spark.sql.catalyst.expressions.Abs") + expect_error(listFunctions("foo_db"), + "Error in listFunctions : analysis error - Database 'foo_db' does not exist") +}) + compare_list <- function(list1, list2) { # get testthat to show the diff by first making the 2 lists equal in length expect_equal(length(list1), length(list2)) From 5ab583443d60f6bf1d85608552962b46ac088633 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 30 Mar 2017 20:43:19 -0700 Subject: [PATCH 2/8] fix test failure --- R/pkg/R/catalog.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 9da2587bf1b01..829adea135eb5 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -49,7 +49,7 @@ createExternalTable.default <- function(tableName, path = NULL, source = NULL, s options[["path"]] <- path } catalog <- callJMethod(sparkSession, "catalog") - if (!is.null(schema)) { + if (is.null(schema)) { sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options) } else { sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index fea1ff0c7d598..fc2df651c6a70 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -687,6 +687,9 @@ test_that("test cache, uncache and clearCache", { uncacheTable("table1") clearCache() expect_true(dropTempView("table1")) + + expect_error(uncacheTable("foo"), + "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'") }) test_that("insertInto() on a registered table", { @@ -3009,12 +3012,12 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { c("name", "description", "dataType", "nullable", "isPartition", "isBucket")) expect_equal(collect(c)[[1]][[1]], "speed") expect_error(listColumns("foo", "default"), - "Error in listColumns : no such table - Table or view 'foo' not found in database 'default'") + "Error in listColumns : analysis error - Table 'foo' does not exist in database 'default'") dropTempView("cars") f <- listFunctions() - expect_equal(nrow(f) >= 200) # 250 + expect_true(nrow(f) >= 200) # 250 expect_equal(colnames(f), c("name", "database", "description", "className", "isTemporary")) expect_equal(take(orderBy(f, "className"), 1)$className, From 28195b98bf71c36b47e3e191b24715806f8aed6e Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 30 Mar 2017 20:54:59 -0700 Subject: [PATCH 3/8] off by one char --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index fc2df651c6a70..31b7b24e518bd 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -689,7 +689,7 @@ test_that("test cache, uncache and clearCache", { expect_true(dropTempView("table1")) expect_error(uncacheTable("foo"), - "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'") + "Error in uncacheTable : no such table - Table or view 'foo' not found in database 'default'") }) test_that("insertInto() on a registered table", { From 9c768ae983f8fbeed11b7c308ca7f5662f88d809 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 30 Mar 2017 22:13:26 -0700 Subject: [PATCH 4/8] improve exception handling that fixes the test too --- R/pkg/R/catalog.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 829adea135eb5..c9afcba107737 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -83,7 +83,7 @@ createExternalTable <- function(x, ...) { cacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - invisible(callJMethod(catalog, "cacheTable", tableName)) + invisible(handledCallJMethod(catalog, "cacheTable", tableName)) } cacheTable <- function(x, ...) { @@ -112,7 +112,7 @@ cacheTable <- function(x, ...) { uncacheTable.default <- function(tableName) { sparkSession <- getSparkSession() catalog <- callJMethod(sparkSession, "catalog") - invisible(callJMethod(catalog, "uncacheTable", tableName)) + invisible(handledCallJMethod(catalog, "uncacheTable", tableName)) } uncacheTable <- function(x, ...) { From 5093891e5a8fc0f299ebb4303ddb488e86f87221 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Thu, 30 Mar 2017 22:52:19 -0700 Subject: [PATCH 5/8] address deprecation --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 31b7b24e518bd..618c1819f60d0 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2825,7 +2825,7 @@ test_that("createDataFrame sqlContext parameter backward compatibility", { # more tests for SPARK-16538 createOrReplaceTempView(df, "table") - SparkR::tables() + SparkR::listTables() SparkR::sql("SELECT 1") suppressWarnings(SparkR::sql(sqlContext, "SELECT * FROM table")) suppressWarnings(SparkR::dropTempTable(sqlContext, "table")) From 3c6693035b023d7c9c9e2caa3014247c130eb037 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sat, 1 Apr 2017 11:10:24 -0700 Subject: [PATCH 6/8] make tables() alias instead of deprecate, error handling in tableNames(), test for recoverPartitions/refresh* --- R/pkg/R/catalog.R | 18 ++++++++---------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 15 +++++++++++---- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index c9afcba107737..48d3e7fdfe9db 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -204,7 +204,7 @@ dropTempView <- function(viewName) { #' #' Returns a SparkDataFrame containing names of tables in the given database. #' -#' @param databaseName name of the database +#' @param databaseName (optional) name of the database #' @return a SparkDataFrame #' @rdname tables #' @export @@ -217,10 +217,8 @@ dropTempView <- function(viewName) { #' @method tables default #' @note tables since 1.4.0 tables.default <- function(databaseName = NULL) { - .Deprecated("listTables", old = "tables") - sparkSession <- getSparkSession() - jdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getTables", sparkSession, databaseName) - dataFrame(jdf) + # rename column to match previous output schema + withColumnRenamed(listTables(databaseName), "name", "tableName") } tables <- function(x, ...) { @@ -231,7 +229,7 @@ tables <- function(x, ...) { #' #' Returns the names of tables in the given database as an array. #' -#' @param databaseName name of the database +#' @param databaseName (optional) name of the database #' @return a list of table names #' @rdname tableNames #' @export @@ -245,10 +243,10 @@ tables <- function(x, ...) { #' @note tableNames since 1.4.0 tableNames.default <- function(databaseName = NULL) { sparkSession <- getSparkSession() - callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "getTableNames", - sparkSession, - databaseName) + handledCallJMethod("org.apache.spark.sql.api.r.SQLUtils", + "getTableNames", + sparkSession, + databaseName) } tableNames <- function(x, ...) { diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 618c1819f60d0..005a9f640b004 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -645,9 +645,12 @@ test_that("test tableNames and tables", { df <- read.json(jsonPath) createOrReplaceTempView(df, "table1") expect_equal(length(tableNames()), 1) + expect_equal(length(tableNames("default")), 1) tables <- listTables() expect_equal(count(tables), 1) - expect_equal(count(suppressWarnings(tables())), count(tables)) + expect_equal(count(tables()), count(tables)) + expect_true("tableName" %in% colnames(tables())) + expect_true(all(c("tableName", "database", "isTemporary") %in% colnames(tables()))) suppressWarnings(registerTempTable(df, "table2")) tables <- listTables() @@ -2993,7 +2996,7 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { test_that("catalog APIs, listTables, listColumns, listFunctions", { tb <- listTables() - count <- count(suppressWarnings(tables())) + count <- count(tables()) expect_equal(nrow(tb), count) expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary")) @@ -3014,8 +3017,6 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { expect_error(listColumns("foo", "default"), "Error in listColumns : analysis error - Table 'foo' does not exist in database 'default'") - dropTempView("cars") - f <- listFunctions() expect_true(nrow(f) >= 200) # 250 expect_equal(colnames(f), @@ -3024,6 +3025,12 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { "org.apache.spark.sql.catalyst.expressions.Abs") expect_error(listFunctions("foo_db"), "Error in listFunctions : analysis error - Database 'foo_db' does not exist") + + expect_error(recoverPartitions("cars"), NA) + expect_error(refreshTable("cars"), NA) + expect_error(refreshByPath("/"), NA) + + dropTempView("cars") }) compare_list <- function(list1, list2) { From 2aea0cb0f9b55acf741788aa72a573853560f3d9 Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sat, 1 Apr 2017 15:51:43 -0700 Subject: [PATCH 7/8] fix test --- R/pkg/R/catalog.R | 11 ++++++----- R/pkg/inst/tests/testthat/test_sparkSQL.R | 4 +++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index 48d3e7fdfe9db..d7c8cf26492ca 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -243,10 +243,10 @@ tables <- function(x, ...) { #' @note tableNames since 1.4.0 tableNames.default <- function(databaseName = NULL) { sparkSession <- getSparkSession() - handledCallJMethod("org.apache.spark.sql.api.r.SQLUtils", - "getTableNames", - sparkSession, - databaseName) + callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "getTableNames", + sparkSession, + databaseName) } tableNames <- function(x, ...) { @@ -409,7 +409,8 @@ listFunctions <- function(databaseName = NULL) { #' Recover all the partitions in the directory of a table and update the catalog #' -#' Recover all the partitions in the directory of a table and update the catalog. +#' Recover all the partitions in the directory of a table and update the catalog. The name should +#' reference a partitioned table, and not a temporary view. #' #' @param tableName a name of the table. #' @rdname recoverPartitions diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 005a9f640b004..ad06711a79a78 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -3026,7 +3026,9 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { expect_error(listFunctions("foo_db"), "Error in listFunctions : analysis error - Database 'foo_db' does not exist") - expect_error(recoverPartitions("cars"), NA) + # recoverPartitions does not work with tempory view + expect_error(recoverPartitions("cars"), + "no such table - Table or view 'cars' not found in database 'default'") expect_error(refreshTable("cars"), NA) expect_error(refreshByPath("/"), NA) From aff13a860282375a650f6323987c73364ed439cd Mon Sep 17 00:00:00 2001 From: Felix Cheung Date: Sat, 1 Apr 2017 15:56:11 -0700 Subject: [PATCH 8/8] add doc links --- R/pkg/R/catalog.R | 2 ++ 1 file changed, 2 insertions(+) diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index d7c8cf26492ca..07a89f763cde1 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -207,6 +207,7 @@ dropTempView <- function(viewName) { #' @param databaseName (optional) name of the database #' @return a SparkDataFrame #' @rdname tables +#' @seealso \link{listTables} #' @export #' @examples #'\dontrun{ @@ -325,6 +326,7 @@ listDatabases <- function() { #' @return a SparkDataFrame of the list of tables. #' @rdname listTables #' @name listTables +#' @seealso \link{tables} #' @export #' @examples #' \dontrun{