From bacd528749ef26b99bf813081bef480ab4c24f97 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 22 Feb 2017 12:53:04 +0800 Subject: [PATCH] process default database location when create/get database from metastore --- .../spark/sql/internal/SharedState.scala | 4 +++- .../spark/sql/hive/HiveExternalCatalog.scala | 10 ++-------- .../spark/sql/hive/client/HiveClientImpl.scala | 14 ++++++++++++-- .../sql/hive/execution/HiveDDLSuite.scala | 18 ++++++++++++------ 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 8de95fe64e663..ae99dd840b36c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -90,8 +90,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // Create the default database if it doesn't exist. { + // default database set to empty string, + // when reload from metastore using warehouse path to replace it val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map()) + SessionCatalog.DEFAULT_DATABASE, "default database", "", Map()) // Initialize default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c1563d763699..ea48256147857 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types.{DataType, StructType} @@ -407,14 +407,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties } - // if the database is default, the value of WAREHOUSE_PATH in conf returned private def defaultTablePath(tableIdent: TableIdentifier): String = { - val dbLocation = if (tableIdent.database.get == SessionCatalog.DEFAULT_DATABASE - || conf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { - conf.get(WAREHOUSE_PATH) - } else { - getDatabase(tableIdent.database.get).locationUri - } + val dbLocation = getDatabase(tableIdent.database.get).locationUri new Path(new Path(dbLocation), tableIdent.table).toString } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index dc9c3ff33542d..7f8d1cdd77c16 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -48,6 +48,8 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.client.HiveClientImpl._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -311,11 +313,12 @@ private[hive] class HiveClientImpl( override def createDatabase( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { + // default database's location always use the warehouse path, here set to emtpy string client.createDatabase( new HiveDatabase( database.name, database.description, - database.locationUri, + if (database.name == SessionCatalog.DEFAULT_DATABASE) "" else database.locationUri, Option(database.properties).map(_.asJava).orNull), ignoreIfExists) } @@ -339,10 +342,17 @@ private[hive] class HiveClientImpl( override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => + // default database's location always use the warehouse path + // TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH is a flag fro test + val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE + || sparkConf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) { + sparkConf.get(WAREHOUSE_PATH) + } else d.getLocationUri + CatalogDatabase( name = d.getName, description = d.getDescription, - locationUri = d.getLocationUri, + locationUri = dbLocation, properties = Option(d.getParameters).map(_.asScala.toMap).orNull) }.getOrElse(throw new NoSuchDatabaseException(dbName)) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2c3eee7d2ce80..14062be653195 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1590,38 +1590,42 @@ class HiveDDLSuite test("create table with default database use warehouse path instead of database location") { withTable("t") { + // default database use warehouse path as its location withTempDir { dir => spark.sparkContext.conf .set(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true") + val sparkWarehousePath = spark.sharedState.warehousePath.stripSuffix("/") spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" ) val db = spark.sessionState.catalog.getDatabaseMetadata("default_test") - assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + assert(db.locationUri.stripSuffix("/") == sparkWarehousePath) spark.sql("USE default_test") - val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" spark.sql("CREATE TABLE t(a string)") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == sparkWarehouseTablePath ) + assert(table.location.stripSuffix("/").stripPrefix("file:") == + new File(sparkWarehousePath, "t").getAbsolutePath.stripSuffix("/")) // clear spark.sparkContext.conf .remove(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key) + spark.sql("DROP TABLE t") spark.sql("DROP DATABASE default_test") spark.sql("USE DEFAULT") } + // not default database use its's location from the create command withTempDir { dir => + val dirPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}" spark.sql(s"CREATE DATABASE test_not_default LOCATION '$dir'" ) val db = spark.sessionState.catalog.getDatabaseMetadata("test_not_default") - assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}") + assert(db.locationUri.stripSuffix("/") == dirPath) spark.sql("USE test_not_default") - val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t" spark.sql("CREATE TABLE t(a string)") val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) - assert(table.location.stripSuffix("/") == s"${db.locationUri.stripSuffix("/")}/t" ) + assert(table.location.stripSuffix("/") == s"$dirPath/t" ) // clear spark.sql("DROP TABLE t") @@ -1631,3 +1635,5 @@ class HiveDDLSuite } } } + +case class cc(a: Array[Byte])