From ac0508985154e600bb8a313f2d151a035790c9f1 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 14 Dec 2016 17:47:34 -0800 Subject: [PATCH 1/8] Set default warehouse dir to tempdir To do this we introduce a new SQL config that is set to tempdir from SparkR. --- R/pkg/R/sparkR.R | 7 +++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++++++++ .../org/apache/spark/sql/internal/SQLConf.scala | 12 +++++++++--- .../org/apache/spark/sql/internal/SharedState.scala | 13 +++++++++---- .../apache/spark/sql/internal/SQLConfSuite.scala | 13 +++++++++++++ 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index c57cc8f285613..8c5b6093b4a71 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -363,6 +363,13 @@ sparkR.session <- function( ...) { sparkConfigMap <- convertNamedListToEnv(sparkConfig) + + # NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements + # See SPARK-18817 for more details + if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) { + assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap) + } + namedParams <- list(...) if (length(namedParams) > 0) { paramMap <- convertNamedListToEnv(namedParams) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index e8ccff81222d0..08d37d48f7717 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2165,6 +2165,14 @@ test_that("SQL error message is returned from JVM", { expect_equal(grepl("blah", retError), TRUE) }) +test_that("Default warehouse dir should be set to tempdir", { + # nothing should be written outside tempdir() without explicit user permission + inital_working_directory_files <- list.files() + result <- sql("CREATE TABLE warehouse") + expect_equal(inital_working_directory_files, list.files()) + result <- sql("DELETE TABLE warehouse") +}) + irisDF <- suppressWarnings(createDataFrame(iris)) test_that("Method as.data.frame as a synonym for collect()", { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4d25f54caa130..14fa3f79992dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -819,7 +819,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) - def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString + def warehousePath: String = new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) @@ -964,11 +964,17 @@ object StaticSQLConf { } } - val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir") - .doc("The default location for managed databases and tables.") + val DEFAULT_WAREHOUSE_PATH = buildConf("spark.sql.default.warehouse.dir") + .doc("The default location for managed databases and tables. " + + "Used if spark.sql.warehouse.dir is not set") .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir") + .doc("The location for managed databases and tables.") + .stringConf + .createOptional + val CATALOG_IMPLEMENTATION = buildConf("spark.sql.catalogImplementation") .internal() .stringConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 8de95fe64e663..c297007940300 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -55,14 +55,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") hiveWarehouseDir - } else { + } else if (sparkContext.conf.contains(WAREHOUSE_PATH.key) && + sparkContext.conf.get(WAREHOUSE_PATH).isDefined) { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using // the value of spark.sql.warehouse.dir. - // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, - // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) + val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH).get sparkContext.conf.set("hive.metastore.warehouse.dir", sparkWarehouseDir) sparkWarehouseDir + } else { + // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, + // we will set hive.metastore.warehouse.dir to the value of spark.sql.default.warehouse.dir. + val sparkDefaultWarehouseDir = sparkContext.conf.get(DEFAULT_WAREHOUSE_PATH) + sparkContext.conf.set("hive.metastore.warehouse.dir", sparkDefaultWarehouseDir) + sparkDefaultWarehouseDir } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a283ff971adcd..faeea1a25daf4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -221,6 +221,19 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { .sessionState.conf.warehousePath.stripSuffix("/")) } + test("changing default value of warehouse path") { + try { + val newWarehouseDefault = "spark-warehouse2" + val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString + sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath) + val spark = new SparkSession(sparkContext) + assert(newWarehouseDefaultPath.stripSuffix("/") === spark + .sessionState.conf.warehousePath.stripSuffix("/")) + } finally { + sparkContext.conf.remove("spark.sql.default.warehouse.dir") + } + } + test("MAX_CASES_BRANCHES") { withTable("tab1") { spark.range(10).write.saveAsTable("tab1") From 25834109588e8e545deafb1da162958766a057e2 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 14 Dec 2016 17:51:27 -0800 Subject: [PATCH 2/8] Fix drop table command --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 08d37d48f7717..c07d50803f851 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2170,7 +2170,7 @@ test_that("Default warehouse dir should be set to tempdir", { inital_working_directory_files <- list.files() result <- sql("CREATE TABLE warehouse") expect_equal(inital_working_directory_files, list.files()) - result <- sql("DELETE TABLE warehouse") + result <- sql("DROP TABLE warehouse") }) irisDF <- suppressWarnings(createDataFrame(iris)) From 1d0d1d219f392721e9be73e21752100db0ce065f Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Wed, 14 Dec 2016 22:22:43 -0800 Subject: [PATCH 3/8] Handle default warehouse path in SQLConf --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 14fa3f79992dc..8d778ed8dd514 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -819,7 +819,13 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) - def warehousePath: String = new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString + def warehousePath: String = { + if (contains(StaticSQLConf.WAREHOUSE_PATH.key)) { + new Path(getConf(StaticSQLConf.WAREHOUSE_PATH).get).toString + } else { + new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString + } + } def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) From 014d7e1666e89940b66fb42e4cf0f93bfff455d9 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Thu, 15 Dec 2016 21:42:31 -0800 Subject: [PATCH 4/8] Update unit test. Address comments --- R/pkg/R/sparkR.R | 12 ++++++------ R/pkg/inst/tests/testthat/test_context.R | 14 ++++++++++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 -------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 8c5b6093b4a71..eb02b152a69e2 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -364,12 +364,6 @@ sparkR.session <- function( sparkConfigMap <- convertNamedListToEnv(sparkConfig) - # NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements - # See SPARK-18817 for more details - if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) { - assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap) - } - namedParams <- list(...) if (length(namedParams) > 0) { paramMap <- convertNamedListToEnv(namedParams) @@ -383,6 +377,12 @@ sparkR.session <- function( overrideEnvs(sparkConfigMap, paramMap) } + # NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements + # See SPARK-18817 for more details + if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) { + assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap) + } + deployMode <- "" if (exists("spark.submit.deployMode", envir = sparkConfigMap)) { deployMode <- sparkConfigMap[["spark.submit.deployMode"]] diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index caca06933952b..0b05f9a7aecb5 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -72,6 +72,20 @@ test_that("repeatedly starting and stopping SparkSession", { } }) +test_that("Default warehouse dir should be set to tempdir", { + sparkR.session.stop() + sparkR.session(enableHiveSupport = FALSE) + + # Create a temporary table + sql("CREATE TABLE people_warehouse_test") + # spark-warehouse should be written only tempdir() and not current working directory + res <- list.files(path = ".", pattern = ".*spark-warehouse.*", + recursive = TRUE, include.dirs = TRUE) + expect_equal(length(res), 0) + result <- sql("DROP TABLE people_warehouse_test") + sparkR.session.stop() +}) + test_that("rdd GC across sparkR.stop", { sc <- sparkR.sparkContext() # sc should get id 0 rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1 diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index c07d50803f851..e8ccff81222d0 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2165,14 +2165,6 @@ test_that("SQL error message is returned from JVM", { expect_equal(grepl("blah", retError), TRUE) }) -test_that("Default warehouse dir should be set to tempdir", { - # nothing should be written outside tempdir() without explicit user permission - inital_working_directory_files <- list.files() - result <- sql("CREATE TABLE warehouse") - expect_equal(inital_working_directory_files, list.files()) - result <- sql("DROP TABLE warehouse") -}) - irisDF <- suppressWarnings(createDataFrame(iris)) test_that("Method as.data.frame as a synonym for collect()", { From 6eec97d463b027c93b48621b55e3bd4005dc0f7e Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Fri, 16 Dec 2016 12:03:46 -0800 Subject: [PATCH 5/8] Update comments, fix style --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../scala/org/apache/spark/sql/internal/SharedState.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8d778ed8dd514..5208feaeef79c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -971,13 +971,13 @@ object StaticSQLConf { } val DEFAULT_WAREHOUSE_PATH = buildConf("spark.sql.default.warehouse.dir") - .doc("The default location for managed databases and tables. " + - "Used if spark.sql.warehouse.dir is not set") + .doc("Default location used for managed databases and tables " + + "if spark.sql.warehouse.dir is not set") .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) val WAREHOUSE_PATH = buildConf("spark.sql.warehouse.dir") - .doc("The location for managed databases and tables.") + .doc("The default location for managed databases and tables.") .stringConf .createOptional diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index c297007940300..769b982071825 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -56,7 +56,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") hiveWarehouseDir } else if (sparkContext.conf.contains(WAREHOUSE_PATH.key) && - sparkContext.conf.get(WAREHOUSE_PATH).isDefined) { + sparkContext.conf.get(WAREHOUSE_PATH).isDefined) { // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using // the value of spark.sql.warehouse.dir. val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH).get From f7b4772a4b72c28047afb7d614e3af3317af896d Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sat, 17 Dec 2016 17:19:27 -0800 Subject: [PATCH 6/8] Address code review comments --- .../apache/spark/sql/internal/SQLConf.scala | 4 +- .../spark/sql/internal/SQLConfSuite.scala | 44 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5208feaeef79c..e9fb7adbedfbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -820,7 +820,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) def warehousePath: String = { - if (contains(StaticSQLConf.WAREHOUSE_PATH.key)) { + if (contains(StaticSQLConf.WAREHOUSE_PATH.key) && + getConf(StaticSQLConf.WAREHOUSE_PATH).isDefined) { new Path(getConf(StaticSQLConf.WAREHOUSE_PATH).get).toString } else { new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString @@ -971,6 +972,7 @@ object StaticSQLConf { } val DEFAULT_WAREHOUSE_PATH = buildConf("spark.sql.default.warehouse.dir") + .internal() .doc("Default location used for managed databases and tables " + "if spark.sql.warehouse.dir is not set") .stringConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index faeea1a25daf4..4b73c2e56050d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -222,6 +222,7 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { } test("changing default value of warehouse path") { + // Set sql.default.warehouse.dir but not sql.warehouse.dir try { val newWarehouseDefault = "spark-warehouse2" val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString @@ -229,9 +230,52 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { val spark = new SparkSession(sparkContext) assert(newWarehouseDefaultPath.stripSuffix("/") === spark .sessionState.conf.warehousePath.stripSuffix("/")) + assert(newWarehouseDefaultPath.stripSuffix("/") === spark + .sharedState.warehousePath.stripSuffix("/")) + } finally { + sparkContext.conf.remove("spark.sql.default.warehouse.dir") + } + + // Set sql.warehouse.dir and sql.default.warehouse.dir. The first one should be used + try { + val newWarehouseDefault = "spark-warehouse2" + val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString + sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath) + + val newWarehouse = "spark-warehouse3" + val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString + sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath) + val spark = new SparkSession(sparkContext) + assert(newWarehousePath.stripSuffix("/") === spark + .sessionState.conf.warehousePath.stripSuffix("/")) + assert(newWarehousePath.stripSuffix("/") === spark + .sharedState.warehousePath.stripSuffix("/")) } finally { sparkContext.conf.remove("spark.sql.default.warehouse.dir") + sparkContext.conf.remove("spark.sql.warehouse.dir") } + + // Set sql.warehouse.dir but not sql.default.warehouse.dir + try { + val newWarehouse = "spark-warehouse4" + val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString + sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath) + val spark = new SparkSession(sparkContext) + assert(newWarehousePath.stripSuffix("/") === spark + .sessionState.conf.warehousePath.stripSuffix("/")) + assert(newWarehousePath.stripSuffix("/") === spark + .sharedState.warehousePath.stripSuffix("/")) + } finally { + sparkContext.conf.remove("spark.sql.warehouse.dir") + } + + // Set neither of the two configs. The default value should be "spark-warehouse" + + val spark = new SparkSession(sparkContext) + assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark + .sessionState.conf.warehousePath.stripSuffix("/")) + assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark + .sharedState.warehousePath.stripSuffix("/")) } test("MAX_CASES_BRANCHES") { From b14c3028e325c7803353aafa81ba37706eba537f Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 5 Mar 2017 22:59:40 -0800 Subject: [PATCH 7/8] Revise PR to set tmpdir in SQLUtils Also add a unit test that checks if new table is created in tmpdir --- R/pkg/R/sparkR.R | 13 ++--- R/pkg/inst/tests/testthat/test_context.R | 14 ----- R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++++ .../org/apache/spark/sql/api/r/SQLUtils.scala | 13 ++++- .../apache/spark/sql/internal/SQLConf.scala | 11 +--- .../spark/sql/api/r/SQLUtilsSuite.scala | 28 ++++++++- .../spark/sql/internal/SQLConfSuite.scala | 57 ------------------- 7 files changed, 59 insertions(+), 91 deletions(-) diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 388561507aa3f..7955cc54d4b56 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -377,12 +377,6 @@ sparkR.session <- function( overrideEnvs(sparkConfigMap, paramMap) } - # NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements - # See SPARK-18817 for more details - if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) { - assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap) - } - deployMode <- "" if (exists("spark.submit.deployMode", envir = sparkConfigMap)) { deployMode <- sparkConfigMap[["spark.submit.deployMode"]] @@ -407,11 +401,16 @@ sparkR.session <- function( sparkConfigMap) } else { jsc <- get(".sparkRjsc", envir = .sparkREnv) + # NOTE(shivaram): Pass in a tempdir that is optionally used if the user has not + # overridden this. See SPARK-18817 for more details + warehouseTmpDir <- file.path(tempdir(), "spark-warehouse") + sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getOrCreateSparkSession", jsc, sparkConfigMap, - enableHiveSupport) + enableHiveSupport, + warehouseTmpDir) assign(".sparkRsession", sparkSession, envir = .sparkREnv) } sparkSession diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 0b05f9a7aecb5..caca06933952b 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -72,20 +72,6 @@ test_that("repeatedly starting and stopping SparkSession", { } }) -test_that("Default warehouse dir should be set to tempdir", { - sparkR.session.stop() - sparkR.session(enableHiveSupport = FALSE) - - # Create a temporary table - sql("CREATE TABLE people_warehouse_test") - # spark-warehouse should be written only tempdir() and not current working directory - res <- list.files(path = ".", pattern = ".*spark-warehouse.*", - recursive = TRUE, include.dirs = TRUE) - expect_equal(length(res), 0) - result <- sql("DROP TABLE people_warehouse_test") - sparkR.session.stop() -}) - test_that("rdd GC across sparkR.stop", { sc <- sparkR.sparkContext() # sc should get id 0 rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1 diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7c096597fea66..3403410a7d12d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2890,6 +2890,20 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt")) }) +test_that("Default warehouse dir should be set to tempdir", { + setHiveContext(sc) + + # Create a temporary database and a table in it + sql("CREATE DATABASE db1") + sql("USE db1") + sql("CREATE TABLE boxes (width INT, length INT, height INT)") + # spark-warehouse should be written only tempdir() and not current working directory + expect_true(file.exists(file.path(tempdir(), "spark-warehouse", "db1.db", "boxes"))) + sql("DROP TABLE boxes") + sql("DROP DATABASE db1") + unsetHiveContext(sc) +}) + unlink(parquetPath) unlink(orcPath) unlink(jsonPath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index a4c5bf756cd5a..e34f35e4d5b70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.types._ private[sql] object SQLUtils extends Logging { @@ -46,7 +47,17 @@ private[sql] object SQLUtils extends Logging { def getOrCreateSparkSession( jsc: JavaSparkContext, sparkConfigMap: JMap[Object, Object], - enableHiveSupport: Boolean): SparkSession = { + enableHiveSupport: Boolean, + warehouseDir: String): SparkSession = { + + // Check if SparkContext of sparkConfigMap contains spark.sql.warehouse.dir + // If not, set it to warehouseDir chosen by the R process. + // NOTE: We need to do this before creating the SparkSession. + val sqlWarehouseKey = WAREHOUSE_PATH.key + if (!jsc.sc.conf.contains(sqlWarehouseKey) && !sparkConfigMap.containsKey(sqlWarehouseKey)) { + jsc.sc.conf.set(sqlWarehouseKey, warehouseDir) + } + val spark = if (SparkSession.hiveClassesArePresent && enableHiveSupport && jsc.sc.conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { SparkSession.builder().sparkContext(withHiveExternalCatalog(jsc.sc)).getOrCreate() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8cac2b8df0d2d..461dfe3a66e1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -864,14 +864,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) - def warehousePath: String = { - if (contains(StaticSQLConf.WAREHOUSE_PATH.key) && - getConf(StaticSQLConf.WAREHOUSE_PATH).isDefined) { - new Path(getConf(StaticSQLConf.WAREHOUSE_PATH).get).toString - } else { - new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString - } - } + def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString def hiveThriftServerSingleSession: Boolean = getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION) @@ -1019,7 +1012,7 @@ object StaticSQLConf { val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir") .doc("The default location for managed databases and tables.") .stringConf - .createOptional + .createWithDefault(Utils.resolveURI("spark-warehouse").toString) val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala index f54e23e3aa6cb..9b2932409c1b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala @@ -17,13 +17,23 @@ package org.apache.spark.sql.api.r -import org.apache.spark.sql.test.SharedSQLContext +import java.util.HashMap -class SQLUtilsSuite extends SharedSQLContext { +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.SharedSQLContext - import testImplicits._ +class SQLUtilsSuite extends SparkFunSuite { test("dfToCols should collect and transpose a data frame") { + val sparkSession = SparkSession.builder() + .master("local") + .config("spark.ui.enabled", value = false) + .getOrCreate() + + import sparkSession.implicits._ + val df = Seq( (1, 2, 3), (4, 5, 6) @@ -33,6 +43,18 @@ class SQLUtilsSuite extends SharedSQLContext { Array(2, 5), Array(3, 6) )) + sparkSession.stop() } + test("warehouse path is set correctly by R constructor") { + SparkSession.clearDefaultSession() + val conf = new SparkConf().setAppName("test").setMaster("local") + val sparkContext2 = new SparkContext(conf) + val jsc = new JavaSparkContext(sparkContext2) + val warehouseDir = "/tmp/test-warehouse-dir" + val session = SQLUtils.getOrCreateSparkSession( + jsc, new HashMap[Object, Object], false, warehouseDir) + assert(session.sessionState.conf.warehousePath == warehouseDir) + session.stop() + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 4b73c2e56050d..a283ff971adcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -221,63 +221,6 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { .sessionState.conf.warehousePath.stripSuffix("/")) } - test("changing default value of warehouse path") { - // Set sql.default.warehouse.dir but not sql.warehouse.dir - try { - val newWarehouseDefault = "spark-warehouse2" - val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString - sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath) - val spark = new SparkSession(sparkContext) - assert(newWarehouseDefaultPath.stripSuffix("/") === spark - .sessionState.conf.warehousePath.stripSuffix("/")) - assert(newWarehouseDefaultPath.stripSuffix("/") === spark - .sharedState.warehousePath.stripSuffix("/")) - } finally { - sparkContext.conf.remove("spark.sql.default.warehouse.dir") - } - - // Set sql.warehouse.dir and sql.default.warehouse.dir. The first one should be used - try { - val newWarehouseDefault = "spark-warehouse2" - val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString - sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath) - - val newWarehouse = "spark-warehouse3" - val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString - sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath) - val spark = new SparkSession(sparkContext) - assert(newWarehousePath.stripSuffix("/") === spark - .sessionState.conf.warehousePath.stripSuffix("/")) - assert(newWarehousePath.stripSuffix("/") === spark - .sharedState.warehousePath.stripSuffix("/")) - } finally { - sparkContext.conf.remove("spark.sql.default.warehouse.dir") - sparkContext.conf.remove("spark.sql.warehouse.dir") - } - - // Set sql.warehouse.dir but not sql.default.warehouse.dir - try { - val newWarehouse = "spark-warehouse4" - val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString - sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath) - val spark = new SparkSession(sparkContext) - assert(newWarehousePath.stripSuffix("/") === spark - .sessionState.conf.warehousePath.stripSuffix("/")) - assert(newWarehousePath.stripSuffix("/") === spark - .sharedState.warehousePath.stripSuffix("/")) - } finally { - sparkContext.conf.remove("spark.sql.warehouse.dir") - } - - // Set neither of the two configs. The default value should be "spark-warehouse" - - val spark = new SparkSession(sparkContext) - assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark - .sessionState.conf.warehousePath.stripSuffix("/")) - assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark - .sharedState.warehousePath.stripSuffix("/")) - } - test("MAX_CASES_BRANCHES") { withTable("tab1") { spark.range(10).write.saveAsTable("tab1") From 7a98b91274e6108a2fcbccf6e6e47d49964cf22c Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 6 Mar 2017 22:27:38 -0800 Subject: [PATCH 8/8] Clear default session after test --- .../test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala index 9b2932409c1b4..40feb0c99c07f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/api/r/SQLUtilsSuite.scala @@ -56,5 +56,6 @@ class SQLUtilsSuite extends SparkFunSuite { jsc, new HashMap[Object, Object], false, warehouseDir) assert(session.sessionState.conf.warehousePath == warehouseDir) session.stop() + SparkSession.clearDefaultSession() } }