From aebdfc6863a0a049b1062218a882a5ae486558ad Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 20 Feb 2017 21:01:28 +0800 Subject: [PATCH 01/28] [SPARK-19667][SQL]create table with hiveenabled in default database use warehouse path instead of the location of default database --- .../apache/spark/sql/internal/SQLConf.scala | 8 ++++ .../spark/sql/hive/HiveExternalCatalog.scala | 10 ++++- .../sql/hive/execution/HiveDDLSuite.scala | 45 +++++++++++++++++++ 3 files changed, 61 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc0f130406932..60d21ea79944e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -672,6 +672,14 @@ object SQLConf { .stringConf .createWithDefault(TimeZone.getDefault().getID()) + // for test + val HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH = + buildConf("spark.hive.createTable.defaultDB.location.useWarehousePath") + .doc("Enables test case to use warehouse path instead of db location when " + + "create table in default database.") + .booleanConf + .createWithDefault(false) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea48256147857..db649ee9295b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.HiveSerDe +import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructType} @@ -407,8 +407,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties } + // if the database is default, the value of WAREHOUSE_PATH in conf returned private def defaultTablePath(tableIdent: TableIdentifier): String = { - val dbLocation = getDatabase(tableIdent.database.get).locationUri + val dbLocation = if (tableIdent.database.get == SessionCatalog.DEFAULT_DATABASE + || conf.get(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { + conf.get(WAREHOUSE_PATH) + } else { + getDatabase(tableIdent.database.get).locationUri + } new Path(new Path(dbLocation), tableIdent.table).toString } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c04b9ee0f2cd5..72f25ff722988 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1587,4 +1587,49 @@ class HiveDDLSuite } } } + + test("create table with default database use warehouse path instead of database location") { + withTable("t") { + withTempDir { dir => + spark.sparkContext.conf + .set(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") + + spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" ) + val db = spark.sessionState.catalog.getDatabaseMetadata("default_test") + assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + spark.sql("USE default_test") + val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" + + spark.sql("CREATE TABLE t(a string)") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location.stripSuffix("/") == sparkWarehouseTablePath ) + + // clear + spark.sparkContext.conf + .remove(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) + spark.sql("DROP TABLE t") + spark.sql("DROP DATABASE default_test") + spark.sql("USE DEFAULT") + } + + withTempDir { dir => + spark.sql(s"CREATE DATABASE test_not_default LOCATION '$dir'" ) + val db = spark.sessionState.catalog.getDatabaseMetadata("test_not_default") + assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + spark.sql("USE test_not_default") + val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" + + spark.sql("CREATE TABLE t(a string)") + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.location.stripSuffix("/") == s"${db.locationUri.stripSuffix("/")}/t" ) + + // clear + spark.sparkContext.conf + .remove(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) + spark.sql("DROP TABLE t") + spark.sql("DROP DATABASE test_not_default") + spark.sql("USE DEFAULT") + } + } + } } From 825c0ade864e7ab65f06a91c1c2d18eeeadea2aa Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 20 Feb 2017 22:51:47 +0800 Subject: [PATCH 02/28] rename a conf name --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 6 ++---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 60d21ea79944e..1aa4867573cd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -673,8 +673,8 @@ object SQLConf { .createWithDefault(TimeZone.getDefault().getID()) // for test - val HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH = - buildConf("spark.hive.createTable.defaultDB.location.useWarehousePath") + val TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH = + buildConf("spark.hive.test.createTable.defaultDB.location.useWarehousePath") .doc("Enables test case to use warehouse path instead of db location when " + "create table in default database.") .booleanConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index db649ee9295b9..5c1563d763699 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -410,7 +410,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // if the database is default, the value of WAREHOUSE_PATH in conf returned private def defaultTablePath(tableIdent: TableIdentifier): String = { val dbLocation = if (tableIdent.database.get == SessionCatalog.DEFAULT_DATABASE - || conf.get(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { + || conf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { conf.get(WAREHOUSE_PATH) } else { getDatabase(tableIdent.database.get).locationUri diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 72f25ff722988..2c3eee7d2ce80 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1592,7 +1592,7 @@ class HiveDDLSuite withTable("t") { withTempDir { dir => spark.sparkContext.conf - .set(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") + .set(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" ) val db = spark.sessionState.catalog.getDatabaseMetadata("default_test") @@ -1606,7 +1606,7 @@ class HiveDDLSuite // clear spark.sparkContext.conf - .remove(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) + .remove(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) spark.sql("DROP TABLE t") spark.sql("DROP DATABASE default_test") spark.sql("USE DEFAULT") @@ -1624,8 +1624,6 @@ class HiveDDLSuite assert(table.location.stripSuffix("/") == s"${db.locationUri.stripSuffix("/")}/t" ) // clear - spark.sparkContext.conf - .remove(SQLConf.HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) spark.sql("DROP TABLE t") spark.sql("DROP DATABASE test_not_default") spark.sql("USE DEFAULT") From a2c91682b3824160bd1095e5b61e932a022f3672 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 21 Feb 2017 13:46:38 +0800 Subject: [PATCH 03/28] fix test faile --- .../scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 4bfab0f9cfbfb..2356a1ea3ff02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -344,6 +344,8 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing ) table.copy( + storage = table.storage.copy( + locationUri = table.storage.locationUri.map(_.stripPrefix("file:"))), createTime = 0L, lastAccessTime = 0L, properties = table.properties.filterKeys(!nondeterministicProps.contains(_)) From bacd528749ef26b99bf813081bef480ab4c24f97 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 12:53:04 +0800 Subject: [PATCH 04/28] process default database location when create/get database from metastore --- .../spark/sql/internal/SharedState.scala | 4 +++- .../spark/sql/hive/HiveExternalCatalog.scala | 10 ++-------- .../spark/sql/hive/client/HiveClientImpl.scala | 14 ++++++++++++-- .../sql/hive/execution/HiveDDLSuite.scala | 18 ++++++++++++------ 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 8de95fe64e663..ae99dd840b36c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -90,8 +90,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // Create the default database if it doesn't exist. { + // default database set to empty string, + // when reload from metastore using warehouse path to replace it val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map()) + SessionCatalog.DEFAULT_DATABASE, "default database", "", Map()) // Initialize default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c1563d763699..ea48256147857 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructType} @@ -407,14 +407,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties } - // if the database is default, the value of WAREHOUSE_PATH in conf returned private def defaultTablePath(tableIdent: TableIdentifier): String = { - val dbLocation = if (tableIdent.database.get == SessionCatalog.DEFAULT_DATABASE - || conf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { - conf.get(WAREHOUSE_PATH) - } else { - getDatabase(tableIdent.database.get).locationUri - } + val dbLocation = getDatabase(tableIdent.database.get).locationUri new Path(new Path(dbLocation), tableIdent.table).toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index dc9c3ff33542d..7f8d1cdd77c16 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,6 +48,8 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -311,11 +313,12 @@ private[hive] class HiveClientImpl( override def createDatabase( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { + // default database's location always use the warehouse path, here set to emtpy string client.createDatabase( new HiveDatabase( database.name, database.description, - database.locationUri, + if (database.name == SessionCatalog.DEFAULT_DATABASE) "" else database.locationUri, Option(database.properties).map(_.asJava).orNull), ignoreIfExists) } @@ -339,10 +342,17 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => + // default database's location always use the warehouse path + // TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH is a flag fro test + val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE + || sparkConf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { + sparkConf.get(WAREHOUSE_PATH) + } else d.getLocationUri + CatalogDatabase( name = d.getName, description = d.getDescription, - locationUri = d.getLocationUri, + locationUri = dbLocation, properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2c3eee7d2ce80..14062be653195 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1590,38 +1590,42 @@ class HiveDDLSuite test("create table with default database use warehouse path instead of database location") { withTable("t") { + // default database use warehouse path as its location withTempDir { dir => spark.sparkContext.conf .set(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") + val sparkWarehousePath = spark.sharedState.warehousePath.stripSuffix("/") spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" ) val db = spark.sessionState.catalog.getDatabaseMetadata("default_test") - assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + assert(db.locationUri.stripSuffix("/") == sparkWarehousePath) spark.sql("USE default_test") - val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" spark.sql("CREATE TABLE t(a string)") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == sparkWarehouseTablePath ) + assert(table.location.stripSuffix("/").stripPrefix("file:") == + new File(sparkWarehousePath, "t").getAbsolutePath.stripSuffix("/")) // clear spark.sparkContext.conf .remove(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) + spark.sql("DROP TABLE t") spark.sql("DROP DATABASE default_test") spark.sql("USE DEFAULT") } + // not default database use its's location from the create command withTempDir { dir => + val dirPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" spark.sql(s"CREATE DATABASE test_not_default LOCATION '$dir'" ) val db = spark.sessionState.catalog.getDatabaseMetadata("test_not_default") - assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + assert(db.locationUri.stripSuffix("/") == dirPath) spark.sql("USE test_not_default") - val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" spark.sql("CREATE TABLE t(a string)") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == s"${db.locationUri.stripSuffix("/")}/t" ) + assert(table.location.stripSuffix("/") == s"$dirPath/t" ) // clear spark.sql("DROP TABLE t") @@ -1631,3 +1635,5 @@ class HiveDDLSuite } } } + +case class cc(a: Array[Byte]) From 3f6e06195412bad45d84114df68c729d7b0fa237 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 12:57:04 +0800 Subject: [PATCH 05/28] remove an redundant line --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 14062be653195..8c4034d9e5aac 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1635,5 +1635,3 @@ class HiveDDLSuite } } } - -case class cc(a: Array[Byte]) From 96dcc7ddb0de7c903f3fa8373ada317a760057d7 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 14:31:01 +0800 Subject: [PATCH 06/28] fix empty string location of database --- .../scala/org/apache/spark/sql/internal/SharedState.scala | 4 +--- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ae99dd840b36c..8de95fe64e663 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -90,10 +90,8 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // Create the default database if it doesn't exist. { - // default database set to empty string, - // when reload from metastore using warehouse path to replace it val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, "default database", "", Map()) + SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map()) // Initialize default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7f8d1cdd77c16..22aa2e55dadfb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -313,12 +313,11 @@ private[hive] class HiveClientImpl( override def createDatabase( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { - // default database's location always use the warehouse path, here set to emtpy string client.createDatabase( new HiveDatabase( database.name, database.description, - if (database.name == SessionCatalog.DEFAULT_DATABASE) "" else database.locationUri, + database.locationUri, Option(database.properties).map(_.asJava).orNull), ignoreIfExists) } From f329387a6e5c083abd19314f00648f8a855d6a70 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 17:08:50 +0800 Subject: [PATCH 07/28] modify the test case --- .../apache/spark/sql/internal/SQLConf.scala | 8 -- .../sql/hive/client/HiveClientImpl.scala | 5 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 117 ++++++++++++++++++ .../spark/sql/hive/client/VersionsSuite.scala | 4 +- .../sql/hive/execution/HiveDDLSuite.scala | 47 ------- 5 files changed, 120 insertions(+), 61 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1aa4867573cd8..dc0f130406932 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -672,14 +672,6 @@ object SQLConf { .stringConf .createWithDefault(TimeZone.getDefault().getID()) - // for test - val TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH = - buildConf("spark.hive.test.createTable.defaultDB.location.useWarehousePath") - .doc("Enables test case to use warehouse path instead of db location when " + - "create table in default database.") - .booleanConf - .createWithDefault(false) - object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 22aa2e55dadfb..6a1e36bc513c2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,7 +48,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -342,9 +341,7 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => // default database's location always use the warehouse path - // TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH is a flag fro test - val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE - || sparkConf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { + val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { sparkConf.get(WAREHOUSE_PATH) } else d.getLocationUri diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 8f0d5d886c9d5..ef96c3e649d8d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -334,6 +334,35 @@ class HiveSparkSubmitSuite runSparkSubmit(argsForShowTables) } + test("SPARK-19667: create table in default database with HiveEnabled use warehouse path " + + "instead of the location of default database") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val warehousePath1 = Utils.createTempDir("wh1") + val argsForCreateTable = Seq( + "--class", SPARK_19667_CREATE_TABLE.getClass.getName.stripSuffix("$"), + "--name", "SPARK-19667", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=$warehousePath1", + unusedJar.toString) + runSparkSubmit(argsForCreateTable) + + val warehousePath2 = Utils.createTempDir("wh2") + val argsForShowTables = Seq( + "--class", SPARK_19667_VERIFY_TABLE_PATH.getClass.getName.stripSuffix("$"), + "--name", "SPARK-19667", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=$warehousePath2", + unusedJar.toString) + runSparkSubmit(argsForShowTables) + + Utils.deleteRecursively(warehousePath1) + Utils.deleteRecursively(warehousePath2) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -905,3 +934,91 @@ object SPARK_18989_DESC_TABLE { } } } + +object SPARK_19667_CREATE_TABLE { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder().enableHiveSupport().getOrCreate() + try { + val warehousePath = spark.sharedState.warehousePath.stripSuffix("/") + val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") + // default database use warehouse path as its location + assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) + spark.sql("CREATE TABLE t(a string)") + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + // table in default database use the location of default database which is also warehouse path + assert(table.location.stripSuffix("/") == s"file:$warehousePath/t") + spark.sql("INSERT INTO TABLE t SELECT 1") + assert(spark.sql("SELECT * FROM t").count == 1) + + spark.sql("CREATE DATABASE not_default") + spark.sql("USE not_default") + spark.sql("CREATE TABLE t1(b string)") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + // table in not default database use the location of its own database + assert(table1.location.stripSuffix("/") == s"file:$warehousePath/not_default.db/t1") + } finally { + spark.sql("USE default") + } + } +} + +object SPARK_19667_VERIFY_TABLE_PATH { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder().enableHiveSupport().getOrCreate() + try { + val warehousePath = spark.sharedState.warehousePath.stripSuffix("/") + val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") + // default database use warehouse path as its location + assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + // the table in default database created in job(SPARK_19667_CREATE_TABLE) above, + // which has different warehouse path from this job, its location still equals to + // the location when it's created. + assert(table.location.stripSuffix("/") != s"file:$warehousePath/t") + assert(spark.sql("SELECT * FROM t").count == 1) + + spark.sql("CREATE TABLE t3(d string)") + val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3")) + // the table in default database created here in this job, it will use the warehouse path + // of this job as its location + assert(table3.location.stripSuffix("/") == s"file:$warehousePath/t3") + + spark.sql("USE not_default") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + // the table in not default database create in job(SPARK_19667_CREATE_TABLE) above, + // which has different warehouse path from this job, its location still equals to + // the location when it's created. + assert(table1.location.stripSuffix("/") != s"$warehousePath/not_default.db/t1") + assert(!new File(s"$warehousePath/not_default.db/t1").exists()) + + spark.sql("CREATE TABLE t2(c string)") + val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + // the table in not default database created here in this job, it will use the location + // of the database as its location, not the warehouse path in this job + assert(table2.location.stripSuffix("/") != s"file:$warehousePath/not_default.db/t2") + + spark.sql("CREATE DATABASE not_default_1") + spark.sql("USE not_default_1") + spark.sql("CREATE TABLE t4(e string)") + val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4")) + // the table created in the database which created in this job, it will use the location + // of the database. + assert(table4.location.stripSuffix("/") == s"file:$warehousePath/not_default_1.db/t4") + + } finally { + spark.sql("DROP TABLE IF EXISTS t4") + spark.sql("DROP DATABASE not_default_1") + + spark.sql("USE not_default") + spark.sql("DROP TABLE IF EXISTS t1") + spark.sql("DROP TABLE IF EXISTS t2") + spark.sql("DROP DATABASE not_default") + + spark.sql("USE default") + spark.sql("DROP TABLE IF EXISTS t") + spark.sql("DROP TABLE IF EXISTS t3") + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index fe14824cf0967..0390e284ff353 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -655,7 +655,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val tPath = new Path(spark.sessionState.conf.warehousePath, "t") Seq("1").toDF("a").write.saveAsTable("t") - val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}" + val expectedPath = tPath.toUri.getPath.stripSuffix("/") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.location.stripSuffix("/") == expectedPath) @@ -665,7 +665,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1") spark.sql("create table t1 using parquet as select 2 as a") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}" + val expectedPath1 = t1Path.toUri.getPath.stripSuffix("/") assert(table1.location.stripSuffix("/") == expectedPath1) assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 8c4034d9e5aac..c04b9ee0f2cd5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1587,51 +1587,4 @@ class HiveDDLSuite } } } - - test("create table with default database use warehouse path instead of database location") { - withTable("t") { - // default database use warehouse path as its location - withTempDir { dir => - spark.sparkContext.conf - .set(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") - - val sparkWarehousePath = spark.sharedState.warehousePath.stripSuffix("/") - spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" ) - val db = spark.sessionState.catalog.getDatabaseMetadata("default_test") - assert(db.locationUri.stripSuffix("/") == sparkWarehousePath) - spark.sql("USE default_test") - - spark.sql("CREATE TABLE t(a string)") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/").stripPrefix("file:") == - new File(sparkWarehousePath, "t").getAbsolutePath.stripSuffix("/")) - - // clear - spark.sparkContext.conf - .remove(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) - - spark.sql("DROP TABLE t") - spark.sql("DROP DATABASE default_test") - spark.sql("USE DEFAULT") - } - - // not default database use its's location from the create command - withTempDir { dir => - val dirPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" - spark.sql(s"CREATE DATABASE test_not_default LOCATION '$dir'" ) - val db = spark.sessionState.catalog.getDatabaseMetadata("test_not_default") - assert(db.locationUri.stripSuffix("/") == dirPath) - spark.sql("USE test_not_default") - - spark.sql("CREATE TABLE t(a string)") - val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == s"$dirPath/t" ) - - // clear - spark.sql("DROP TABLE t") - spark.sql("DROP DATABASE test_not_default") - spark.sql("USE DEFAULT") - } - } - } } From 58a0020b8aa46ae98558594f42e7d16635383ede Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 17:15:28 +0800 Subject: [PATCH 08/28] fix test failed --- .../scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 9082261af7b00..2fd532d631d86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,7 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) + new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier)}")) } /** From 1dce2d7d597a33a368709392a51895370ca1cc05 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 19:35:19 +0800 Subject: [PATCH 09/28] add log to find out why jenkins failed --- .../org/apache/spark/sql/sources/BucketedWriteSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 2fd532d631d86..b2aba3ffac2f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,6 +92,11 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") + val x = spark.sessionState.catalog.defaultTablePath(identifier) + val y = s"file:$x" + val z = URI.create(y) + println(s"tableDir $x,$y,$z") + new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier)}")) } From 12f81d3b2daf914fea7439f519664f5e5d893365 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 19:48:16 +0800 Subject: [PATCH 10/28] add scalastyle:off for println --- .../scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index b2aba3ffac2f5..fda940c4ed9b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -95,6 +95,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { val x = spark.sessionState.catalog.defaultTablePath(identifier) val y = s"file:$x" val z = URI.create(y) + // scalastyle:off println println(s"tableDir $x,$y,$z") new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier)}")) From 56e83d5f3da4078098617441967ac62c82797e20 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 22:08:35 +0800 Subject: [PATCH 11/28] fix test faile --- .../apache/spark/sql/sources/BucketedWriteSuite.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index fda940c4ed9b3..36d73005d3912 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,13 +92,8 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - val x = spark.sessionState.catalog.defaultTablePath(identifier) - val y = s"file:$x" - val z = URI.create(y) - // scalastyle:off println - println(s"tableDir $x,$y,$z") - - new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier)}")) + new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier) + .stripPrefix("file:")}")) } /** From 901bb1c37ae510008b51d260750b96b88def93f8 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 11:30:21 +0800 Subject: [PATCH 12/28] make warehouse path qualified for default database --- .../sql/catalyst/catalog/SessionCatalog.scala | 26 +++++++++---------- .../sql/sources/BucketedWriteSuite.scala | 3 +-- .../sql/hive/client/HiveClientImpl.scala | 4 ++- .../spark/sql/hive/ShowCreateTableSuite.scala | 2 -- .../spark/sql/hive/client/VersionsSuite.scala | 4 +-- 5 files changed, 19 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 73ef0e6a1869e..95ee8810f5f5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -37,6 +37,18 @@ import org.apache.spark.sql.catalyst.util.StringUtils object SessionCatalog { val DEFAULT_DATABASE = "default" + + /** + * This method is used to make the given path qualified before we + * store this path in the underlying external catalog. So, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + def makeQualifiedPath(path: String, conf: Configuration): Path = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(conf) + fs.makeQualified(hadoopPath) + } } /** @@ -125,18 +137,6 @@ class SessionCatalog( CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } - /** - * This method is used to make the given path qualified before we - * store this path in the underlying external catalog. So, when a path - * does not contain a scheme, this path will not be changed after the default - * FileSystem is changed. - */ - private def makeQualifiedPath(path: String): Path = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(hadoopConf) - fs.makeQualified(hadoopPath) - } - private def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { throw new NoSuchDatabaseException(db) @@ -170,7 +170,7 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) - val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri, hadoopConf).toString externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), ignoreIfExists) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 36d73005d3912..0423f343d577e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,8 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(s"file:${spark.sessionState.catalog.defaultTablePath(identifier) - .stripPrefix("file:")}")) + new File(URI.create(s"${spark.sessionState.catalog.defaultTablePath(identifier)}")) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6a1e36bc513c2..d7e3c8d0bf4ae 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -341,8 +341,10 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => // default database's location always use the warehouse path + // since the location of database stored in metastore is qualified, + // here we also make qualify for warehouse location val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { - sparkConf.get(WAREHOUSE_PATH) + SessionCatalog.makeQualifiedPath(sparkConf.get(WAREHOUSE_PATH), hadoopConf).toString } else d.getLocationUri CatalogDatabase( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index 2356a1ea3ff02..4bfab0f9cfbfb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -344,8 +344,6 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing ) table.copy( - storage = table.storage.copy( - locationUri = table.storage.locationUri.map(_.stripPrefix("file:"))), createTime = 0L, lastAccessTime = 0L, properties = table.properties.filterKeys(!nondeterministicProps.contains(_)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 0390e284ff353..fe14824cf0967 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -655,7 +655,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val tPath = new Path(spark.sessionState.conf.warehousePath, "t") Seq("1").toDF("a").write.saveAsTable("t") - val expectedPath = tPath.toUri.getPath.stripSuffix("/") + val expectedPath = s"file:${tPath.toUri.getPath.stripSuffix("/")}" val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.location.stripSuffix("/") == expectedPath) @@ -665,7 +665,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w val t1Path = new Path(spark.sessionState.conf.warehousePath, "t1") spark.sql("create table t1 using parquet as select 2 as a") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val expectedPath1 = t1Path.toUri.getPath.stripSuffix("/") + val expectedPath1 = s"file:${t1Path.toUri.getPath.stripSuffix("/")}" assert(table1.location.stripSuffix("/") == expectedPath1) assert(t1Path.getFileSystem(spark.sessionState.newHadoopConf()).exists(t1Path)) From 99d9746b7ee9a212f520e704f238e16387299027 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 11:31:49 +0800 Subject: [PATCH 13/28] remove a string s --- .../scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala index 0423f343d577e..9082261af7b00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala @@ -92,7 +92,7 @@ abstract class BucketedWriteSuite extends QueryTest with SQLTestUtils { def tableDir: File = { val identifier = spark.sessionState.sqlParser.parseTableIdentifier("bucketed_table") - new File(URI.create(s"${spark.sessionState.catalog.defaultTablePath(identifier)}")) + new File(URI.create(spark.sessionState.catalog.defaultTablePath(identifier))) } /** From db555e380bff7d50eaecd6954980544213f7a569 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 11:39:17 +0800 Subject: [PATCH 14/28] modify a comment --- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d7e3c8d0bf4ae..6be876d539916 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -340,8 +340,8 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => - // default database's location always use the warehouse path - // since the location of database stored in metastore is qualified, + // default database's location always use the warehouse path, + // and since the location of database stored in metastore is qualified, // here we also make qualify for warehouse location val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { SessionCatalog.makeQualifiedPath(sparkConf.get(WAREHOUSE_PATH), hadoopConf).toString From d327994593395a69465717a2d401672f025cac36 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 13:22:08 +0800 Subject: [PATCH 15/28] fix test failed --- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index ef96c3e649d8d..0817d567a51ca 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -939,7 +939,7 @@ object SPARK_19667_CREATE_TABLE { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().enableHiveSupport().getOrCreate() try { - val warehousePath = spark.sharedState.warehousePath.stripSuffix("/") + val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}" val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") // default database use warehouse path as its location assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) @@ -947,7 +947,7 @@ object SPARK_19667_CREATE_TABLE { val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) // table in default database use the location of default database which is also warehouse path - assert(table.location.stripSuffix("/") == s"file:$warehousePath/t") + assert(table.location.stripSuffix("/") == s"$warehousePath/t") spark.sql("INSERT INTO TABLE t SELECT 1") assert(spark.sql("SELECT * FROM t").count == 1) @@ -956,7 +956,7 @@ object SPARK_19667_CREATE_TABLE { spark.sql("CREATE TABLE t1(b string)") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) // table in not default database use the location of its own database - assert(table1.location.stripSuffix("/") == s"file:$warehousePath/not_default.db/t1") + assert(table1.location.stripSuffix("/") == s"$warehousePath/not_default.db/t1") } finally { spark.sql("USE default") } @@ -967,7 +967,7 @@ object SPARK_19667_VERIFY_TABLE_PATH { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().enableHiveSupport().getOrCreate() try { - val warehousePath = spark.sharedState.warehousePath.stripSuffix("/") + val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}" val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") // default database use warehouse path as its location assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) @@ -976,14 +976,14 @@ object SPARK_19667_VERIFY_TABLE_PATH { // the table in default database created in job(SPARK_19667_CREATE_TABLE) above, // which has different warehouse path from this job, its location still equals to // the location when it's created. - assert(table.location.stripSuffix("/") != s"file:$warehousePath/t") + assert(table.location.stripSuffix("/") != s"$warehousePath/t") assert(spark.sql("SELECT * FROM t").count == 1) spark.sql("CREATE TABLE t3(d string)") val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3")) // the table in default database created here in this job, it will use the warehouse path // of this job as its location - assert(table3.location.stripSuffix("/") == s"file:$warehousePath/t3") + assert(table3.location.stripSuffix("/") == s"$warehousePath/t3") spark.sql("USE not_default") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) @@ -997,7 +997,7 @@ object SPARK_19667_VERIFY_TABLE_PATH { val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) // the table in not default database created here in this job, it will use the location // of the database as its location, not the warehouse path in this job - assert(table2.location.stripSuffix("/") != s"file:$warehousePath/not_default.db/t2") + assert(table2.location.stripSuffix("/") != s"$warehousePath/not_default.db/t2") spark.sql("CREATE DATABASE not_default_1") spark.sql("USE not_default_1") @@ -1005,7 +1005,7 @@ object SPARK_19667_VERIFY_TABLE_PATH { val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4")) // the table created in the database which created in this job, it will use the location // of the database. - assert(table4.location.stripSuffix("/") == s"file:$warehousePath/not_default_1.db/t4") + assert(table4.location.stripSuffix("/") == s"$warehousePath/not_default_1.db/t4") } finally { spark.sql("DROP TABLE IF EXISTS t4") From 73c880234badebfd1ee25708529a8b36e05b5901 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 22:26:09 +0800 Subject: [PATCH 16/28] move to sessioncatalog --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 11 ++++++++++- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 8 +++++++- .../apache/spark/sql/hive/client/HiveClientImpl.scala | 9 +-------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 95ee8810f5f5e..1436089b23261 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -193,7 +193,16 @@ class SessionCatalog( def getDatabaseMetadata(db: String): CatalogDatabase = { val dbName = formatDatabaseName(db) requireDbExists(dbName) - externalCatalog.getDatabase(dbName) + val database = externalCatalog.getDatabase(dbName) + + // default database's location always use the warehouse path, + // and since the location of database stored in metastore is qualified, + // here we also make qualify for warehouse location + val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { + SessionCatalog.makeQualifiedPath(conf.warehousePath, hadoopConf).toString + } else database.locationUri + + database.copy(locationUri = dbLocation) } def databaseExists(db: String): Boolean = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea48256147857..002efbe1cca3f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -408,7 +408,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def defaultTablePath(tableIdent: TableIdentifier): String = { - val dbLocation = getDatabase(tableIdent.database.get).locationUri + // default database's location always use the warehouse path, + // and since the location of database stored in metastore is qualified, + // here we also make qualify for warehouse location + val dbLocation = if (tableIdent.database.orNull == SessionCatalog.DEFAULT_DATABASE) { + SessionCatalog.makeQualifiedPath(conf.get(WAREHOUSE_PATH), hadoopConf).toString + } else getDatabase(tableIdent.database.get).locationUri + new Path(new Path(dbLocation), tableIdent.table).toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6be876d539916..84880f37dd4cf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -340,17 +340,10 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => - // default database's location always use the warehouse path, - // and since the location of database stored in metastore is qualified, - // here we also make qualify for warehouse location - val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { - SessionCatalog.makeQualifiedPath(sparkConf.get(WAREHOUSE_PATH), hadoopConf).toString - } else d.getLocationUri - CatalogDatabase( name = d.getName, description = d.getDescription, - locationUri = dbLocation, + locationUri = d.getLocationUri, properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) } From 747b31abed7e972068ffcad40c4114a93a6c91cd Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 22:43:53 +0800 Subject: [PATCH 17/28] remove import --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 84880f37dd4cf..dc9c3ff33542d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,7 +48,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ -import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} From 8f8063f8c4e11cb4f6158c90d21d8a0985d38584 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 23 Feb 2017 23:09:24 +0800 Subject: [PATCH 18/28] remove an import --- .../scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 84880f37dd4cf..dc9c3ff33542d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,7 +48,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ -import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} From 4dc11c1e5b4fbf4f293d4f7003090ccd39e8fd68 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 24 Feb 2017 12:19:50 +0800 Subject: [PATCH 19/28] modify some codestyle and some comment --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 10 ++++++---- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1436089b23261..ac6e16bba83c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -195,12 +195,14 @@ class SessionCatalog( requireDbExists(dbName) val database = externalCatalog.getDatabase(dbName) - // default database's location always use the warehouse path, - // and since the location of database stored in metastore is qualified, - // here we also make qualify for warehouse location + // The default database's location always uses the warehouse path. + // Since the location of database stored in metastore is qualified, + // we also make the warehouse location qualified. val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { SessionCatalog.makeQualifiedPath(conf.warehousePath, hadoopConf).toString - } else database.locationUri + } else { + database.locationUri + } database.copy(locationUri = dbLocation) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 002efbe1cca3f..e7bc3d119c134 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -408,12 +408,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def defaultTablePath(tableIdent: TableIdentifier): String = { - // default database's location always use the warehouse path, - // and since the location of database stored in metastore is qualified, - // here we also make qualify for warehouse location + // The default database's location always uses the warehouse path. + // Since the location of database stored in metastore is qualified, + // we also make the warehouse location qualified. val dbLocation = if (tableIdent.database.orNull == SessionCatalog.DEFAULT_DATABASE) { SessionCatalog.makeQualifiedPath(conf.get(WAREHOUSE_PATH), hadoopConf).toString - } else getDatabase(tableIdent.database.get).locationUri + } else { + getDatabase(tableIdent.database.get).locationUri + } new Path(new Path(dbLocation), tableIdent.table).toString } From 80b8133fa5caf8f17e07526fb89f321d51cb1a5e Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 11:56:40 +0800 Subject: [PATCH 20/28] mv defaultdb path logic to ExternalCatalog --- .../catalyst/catalog/ExternalCatalog.scala | 22 +++++++++++++++++-- .../catalyst/catalog/InMemoryCatalog.scala | 6 +++-- .../sql/catalyst/catalog/SessionCatalog.scala | 13 +---------- .../spark/sql/hive/HiveExternalCatalog.scala | 15 ++++--------- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 5233699facae0..1207f79a55b6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression @@ -30,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog { +abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { import CatalogTypes.TablePartitionSpec protected def requireDbExists(db: String): Unit = { @@ -74,7 +77,19 @@ abstract class ExternalCatalog { */ def alterDatabase(dbDefinition: CatalogDatabase): Unit - def getDatabase(db: String): CatalogDatabase + def getDatabase(db: String): CatalogDatabase = { + val database = getDatabaseInternal(db) + // The default database's location always uses the warehouse path. + // Since the location of database stored in metastore is qualified, + // we also make the warehouse location qualified. + if (db == SessionCatalog.DEFAULT_DATABASE) { + val qualifiedWarehousePath = SessionCatalog + .makeQualifiedPath(warehousePath, hadoopConf).toString + database.copy(locationUri = qualifiedWarehousePath) + } else { + database + } + } def databaseExists(db: String): Boolean @@ -269,4 +284,7 @@ abstract class ExternalCatalog { def listFunctions(db: String, pattern: String): Seq[String] + protected def getDatabaseInternal(db: String): CatalogDatabase + + protected def warehousePath: String } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1bdf..5d6410029e336 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils class InMemoryCatalog( conf: SparkConf = new SparkConf, hadoopConfig: Configuration = new Configuration) - extends ExternalCatalog { + extends ExternalCatalog(conf, hadoopConfig) { import CatalogTypes.TablePartitionSpec @@ -93,6 +93,8 @@ class InMemoryCatalog( } } + protected override def warehousePath: String = + catalog(SessionCatalog.DEFAULT_DATABASE).db.locationUri // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -156,7 +158,7 @@ class InMemoryCatalog( catalog(dbDefinition.name).db = dbDefinition } - override def getDatabase(db: String): CatalogDatabase = synchronized { + protected override def getDatabaseInternal(db: String): CatalogDatabase = synchronized { requireDbExists(db) catalog(db).db } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ac6e16bba83c8..95ee8810f5f5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -193,18 +193,7 @@ class SessionCatalog( def getDatabaseMetadata(db: String): CatalogDatabase = { val dbName = formatDatabaseName(db) requireDbExists(dbName) - val database = externalCatalog.getDatabase(dbName) - - // The default database's location always uses the warehouse path. - // Since the location of database stored in metastore is qualified, - // we also make the warehouse location qualified. - val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE) { - SessionCatalog.makeQualifiedPath(conf.warehousePath, hadoopConf).toString - } else { - database.locationUri - } - - database.copy(locationUri = dbLocation) + externalCatalog.getDatabase(dbName) } def databaseExists(db: String): Boolean = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e7bc3d119c134..b6d7d78590aee 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.types.{DataType, StructType} * All public methods must be synchronized for thread-safety. */ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) - extends ExternalCatalog with Logging { + extends ExternalCatalog(conf, hadoopConf) with Logging { import CatalogTypes.TablePartitionSpec import HiveExternalCatalog._ @@ -129,6 +129,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + protected override def warehousePath: String = conf.get(WAREHOUSE_PATH) // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -162,7 +163,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterDatabase(dbDefinition) } - override def getDatabase(db: String): CatalogDatabase = withClient { + protected override def getDatabaseInternal(db: String): CatalogDatabase = withClient { client.getDatabase(db) } @@ -408,15 +409,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } private def defaultTablePath(tableIdent: TableIdentifier): String = { - // The default database's location always uses the warehouse path. - // Since the location of database stored in metastore is qualified, - // we also make the warehouse location qualified. - val dbLocation = if (tableIdent.database.orNull == SessionCatalog.DEFAULT_DATABASE) { - SessionCatalog.makeQualifiedPath(conf.get(WAREHOUSE_PATH), hadoopConf).toString - } else { - getDatabase(tableIdent.database.get).locationUri - } - + val dbLocation = getDatabase(tableIdent.database.get).locationUri new Path(new Path(dbLocation), tableIdent.table).toString } From 41ea1157729aa70f7a74048dfbe86a3c0b7e6ba0 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 12:00:34 +0800 Subject: [PATCH 21/28] modify a comment --- .../org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 5d6410029e336..aef3734acbf79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -93,6 +93,7 @@ class InMemoryCatalog( } } + // For InMemoryCatatlog, default database location is equal to warehouse path protected override def warehousePath: String = catalog(SessionCatalog.DEFAULT_DATABASE).db.locationUri // -------------------------------------------------------------------------- From 13245e4474115b41880224d43cd7b4b8613bd6ac Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 27 Feb 2017 12:02:12 +0800 Subject: [PATCH 22/28] modify a comment --- .../org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index aef3734acbf79..f89b245451e06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -93,7 +93,7 @@ class InMemoryCatalog( } } - // For InMemoryCatatlog, default database location is equal to warehouse path + // For InMemoryCatalog, default database location is equal to warehouse path protected override def warehousePath: String = catalog(SessionCatalog.DEFAULT_DATABASE).db.locationUri // -------------------------------------------------------------------------- From 096ae63d5c33b4d8e21cfb95370477abf164eaf7 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 1 Mar 2017 11:59:11 +0800 Subject: [PATCH 23/28] add final def --- .../org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 1207f79a55b6f..9003d6737d2af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -77,7 +77,7 @@ abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { */ def alterDatabase(dbDefinition: CatalogDatabase): Unit - def getDatabase(db: String): CatalogDatabase = { + final def getDatabase(db: String): CatalogDatabase = { val database = getDatabaseInternal(db) // The default database's location always uses the warehouse path. // Since the location of database stored in metastore is qualified, From badd61b29e4ea8f02d5ff8c93dc7c0dbf1293621 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 18:15:33 +0800 Subject: [PATCH 24/28] modify some code --- .../sql/catalyst/catalog/ExternalCatalog.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 9003d6737d2af..2e56deb38feb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -36,6 +36,17 @@ import org.apache.spark.sql.catalyst.expressions.Expression abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { import CatalogTypes.TablePartitionSpec + val defaultDB: CatalogDatabase = { + val qualifiedWarehousePath = SessionCatalog + .makeQualifiedPath(warehousePath, hadoopConf).toString + CatalogDatabase( + SessionCatalog.DEFAULT_DATABASE, + "The default database created by Spark using current warehouse path", + qualifiedWarehousePath, + Map.empty + ) + } + protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { throw new NoSuchDatabaseException(db) @@ -83,9 +94,7 @@ abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { // Since the location of database stored in metastore is qualified, // we also make the warehouse location qualified. if (db == SessionCatalog.DEFAULT_DATABASE) { - val qualifiedWarehousePath = SessionCatalog - .makeQualifiedPath(warehousePath, hadoopConf).toString - database.copy(locationUri = qualifiedWarehousePath) + defaultDB } else { database } From 35d2b59ce0014c919ea284723da4e3677ba7cca8 Mon Sep 17 00:00:00 2001 From: windpiger Date: Thu, 2 Mar 2017 19:27:12 +0800 Subject: [PATCH 25/28] add lazy flag --- .../org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 2e56deb38feb4..79758c5c08ba3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { import CatalogTypes.TablePartitionSpec - val defaultDB: CatalogDatabase = { + lazy val defaultDB: CatalogDatabase = { val qualifiedWarehousePath = SessionCatalog .makeQualifiedPath(warehousePath, hadoopConf).toString CatalogDatabase( From e3a467e52b73dc1f67fb2b669d551a7b9bb904b6 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 15:43:59 +0800 Subject: [PATCH 26/28] modify test case --- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 0817d567a51ca..37941ce430389 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -939,15 +939,16 @@ object SPARK_19667_CREATE_TABLE { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().enableHiveSupport().getOrCreate() try { - val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}" + val warehousePath = new Path(spark.sharedState.warehousePath) + val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") // default database use warehouse path as its location - assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) + assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) spark.sql("CREATE TABLE t(a string)") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) // table in default database use the location of default database which is also warehouse path - assert(table.location.stripSuffix("/") == s"$warehousePath/t") + assert(new Path(table.location) == fs.makeQualified(new Path(warehousePath, "t"))) spark.sql("INSERT INTO TABLE t SELECT 1") assert(spark.sql("SELECT * FROM t").count == 1) @@ -956,7 +957,8 @@ object SPARK_19667_CREATE_TABLE { spark.sql("CREATE TABLE t1(b string)") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) // table in not default database use the location of its own database - assert(table1.location.stripSuffix("/") == s"$warehousePath/not_default.db/t1") + assert(new Path(table1.location) == fs.makeQualified( + new Path(warehousePath, "not_default.db/t1"))) } finally { spark.sql("USE default") } @@ -967,37 +969,40 @@ object SPARK_19667_VERIFY_TABLE_PATH { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().enableHiveSupport().getOrCreate() try { - val warehousePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}" + val warehousePath = new Path(spark.sharedState.warehousePath) + val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") // default database use warehouse path as its location - assert(defaultDB.locationUri.stripSuffix("/") == warehousePath) + assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) // the table in default database created in job(SPARK_19667_CREATE_TABLE) above, // which has different warehouse path from this job, its location still equals to // the location when it's created. - assert(table.location.stripSuffix("/") != s"$warehousePath/t") + assert(new Path(table.location) != fs.makeQualified(new Path(warehousePath, "t"))) assert(spark.sql("SELECT * FROM t").count == 1) spark.sql("CREATE TABLE t3(d string)") val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3")) // the table in default database created here in this job, it will use the warehouse path // of this job as its location - assert(table3.location.stripSuffix("/") == s"$warehousePath/t3") + assert(new Path(table3.location) == fs.makeQualified(new Path(warehousePath, "t3"))) spark.sql("USE not_default") val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) // the table in not default database create in job(SPARK_19667_CREATE_TABLE) above, // which has different warehouse path from this job, its location still equals to // the location when it's created. - assert(table1.location.stripSuffix("/") != s"$warehousePath/not_default.db/t1") + assert(new Path(table1.location) != fs.makeQualified( + new Path(warehousePath, "not_default.db/t1"))) assert(!new File(s"$warehousePath/not_default.db/t1").exists()) spark.sql("CREATE TABLE t2(c string)") val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) // the table in not default database created here in this job, it will use the location // of the database as its location, not the warehouse path in this job - assert(table2.location.stripSuffix("/") != s"$warehousePath/not_default.db/t2") + assert(new Path(table2.location) != fs.makeQualified( + new Path(warehousePath, "not_default.db/t2"))) spark.sql("CREATE DATABASE not_default_1") spark.sql("USE not_default_1") @@ -1005,7 +1010,8 @@ object SPARK_19667_VERIFY_TABLE_PATH { val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4")) // the table created in the database which created in this job, it will use the location // of the database. - assert(table4.location.stripSuffix("/") == s"$warehousePath/not_default_1.db/t4") + assert(new Path(table4.location) == fs.makeQualified( + new Path(warehousePath, "not_default_1.db/t4"))) } finally { spark.sql("DROP TABLE IF EXISTS t4") From ae9938a44da02a5d4afdcf2ceeb252624a67d464 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 16:09:47 +0800 Subject: [PATCH 27/28] modify test case --- .../scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 37941ce430389..68812676c218f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -995,7 +995,7 @@ object SPARK_19667_VERIFY_TABLE_PATH { // the location when it's created. assert(new Path(table1.location) != fs.makeQualified( new Path(warehousePath, "not_default.db/t1"))) - assert(!new File(s"$warehousePath/not_default.db/t1").exists()) + assert(!new File(warehousePath.toString, "not_default.db/t1").exists()) spark.sql("CREATE TABLE t2(c string)") val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) From 7739ccd4b898cfb124cc55890a7c11e416f3ea6d Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 3 Mar 2017 17:17:36 +0800 Subject: [PATCH 28/28] mv getdatabase --- .../apache/spark/sql/catalyst/catalog/ExternalCatalog.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 79758c5c08ba3..4bd276d9f514f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -89,14 +89,13 @@ abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { def alterDatabase(dbDefinition: CatalogDatabase): Unit final def getDatabase(db: String): CatalogDatabase = { - val database = getDatabaseInternal(db) // The default database's location always uses the warehouse path. // Since the location of database stored in metastore is qualified, // we also make the warehouse location qualified. if (db == SessionCatalog.DEFAULT_DATABASE) { defaultDB } else { - database + getDatabaseInternal(db) } }