diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 31eded4deba7d..cf9a3259088e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException} import org.apache.spark.sql.catalyst.expressions.Expression @@ -30,9 +33,20 @@ import org.apache.spark.sql.catalyst.expressions.Expression * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog { +abstract class ExternalCatalog(conf: SparkConf, hadoopConf: Configuration) { import CatalogTypes.TablePartitionSpec + lazy val defaultDB: CatalogDatabase = { + val qualifiedWarehousePath = SessionCatalog + .makeQualifiedPath(CatalogUtils.stringToURI(warehousePath), hadoopConf) + CatalogDatabase( + SessionCatalog.DEFAULT_DATABASE, + "The default database created by Spark using current warehouse path", + qualifiedWarehousePath, + Map.empty + ) + } + protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { throw new NoSuchDatabaseException(db) @@ -74,7 +88,16 @@ abstract class ExternalCatalog { */ def alterDatabase(dbDefinition: CatalogDatabase): Unit - def getDatabase(db: String): CatalogDatabase + final def getDatabase(db: String): CatalogDatabase = { + // The default database's location always uses the warehouse path. + // Since the location of database stored in metastore is qualified, + // we also make the warehouse location qualified. + if (db == SessionCatalog.DEFAULT_DATABASE) { + defaultDB + } else { + getDatabaseInternal(db) + } + } def databaseExists(db: String): Boolean @@ -268,4 +291,7 @@ abstract class ExternalCatalog { def listFunctions(db: String, pattern: String): Seq[String] + protected def getDatabaseInternal(db: String): CatalogDatabase + + protected def warehousePath: String } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 80aba4af9436c..24760065e4a60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils class InMemoryCatalog( conf: SparkConf = new SparkConf, hadoopConfig: Configuration = new Configuration) - extends ExternalCatalog { + extends ExternalCatalog(conf, hadoopConfig) { import CatalogTypes.TablePartitionSpec @@ -93,6 +93,9 @@ class InMemoryCatalog( } } + // For InMemoryCatalog, default database location is equal to warehouse path + protected override def warehousePath: String = + CatalogUtils.URIToString(catalog(SessionCatalog.DEFAULT_DATABASE).db.locationUri) // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -156,7 +159,7 @@ class InMemoryCatalog( catalog(dbDefinition.name).db = dbDefinition } - override def getDatabase(db: String): CatalogDatabase = synchronized { + protected override def getDatabaseInternal(db: String): CatalogDatabase = synchronized { requireDbExists(db) catalog(db).db } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 498bfbde9d7a1..1f7f56d8de5f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -38,6 +38,18 @@ import org.apache.spark.sql.catalyst.util.StringUtils object SessionCatalog { val DEFAULT_DATABASE = "default" + + /** + * This method is used to make the given path qualified before we + * store this path in the underlying external catalog. So, when a path + * does not contain a scheme, this path will not be changed after the default + * FileSystem is changed. + */ + def makeQualifiedPath(path: URI, conf: Configuration): URI = { + val hadoopPath = new Path(path) + val fs = hadoopPath.getFileSystem(conf) + fs.makeQualified(hadoopPath).toUri + } } /** @@ -126,18 +138,6 @@ class SessionCatalog( CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() } - /** - * This method is used to make the given path qualified before we - * store this path in the underlying external catalog. So, when a path - * does not contain a scheme, this path will not be changed after the default - * FileSystem is changed. - */ - private def makeQualifiedPath(path: URI): URI = { - val hadoopPath = new Path(path) - val fs = hadoopPath.getFileSystem(hadoopConf) - fs.makeQualified(hadoopPath).toUri - } - private def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { throw new NoSuchDatabaseException(db) @@ -171,7 +171,7 @@ class SessionCatalog( "you cannot create a database with this name.") } validateName(dbName) - val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri) + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri, hadoopConf) externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), ignoreIfExists) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 9ab4624594924..586e08a521582 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.types.{DataType, StructType} * All public methods must be synchronized for thread-safety. */ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) - extends ExternalCatalog with Logging { + extends ExternalCatalog(conf, hadoopConf) with Logging { import CatalogTypes.TablePartitionSpec import HiveExternalCatalog._ @@ -129,6 +129,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } + protected override def warehousePath: String = conf.get(WAREHOUSE_PATH) // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -162,7 +163,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterDatabase(dbDefinition) } - override def getDatabase(db: String): CatalogDatabase = withClient { + protected override def getDatabaseInternal(db: String): CatalogDatabase = withClient { client.getDatabase(db) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 5f15a705a2e99..6b08f75f9c7de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -334,6 +334,35 @@ class HiveSparkSubmitSuite runSparkSubmit(argsForShowTables) } + test("SPARK-19667: create table in default database with HiveEnabled use warehouse path " + + "instead of the location of default database") { + val unusedJar = TestUtils.createJarWithClasses(Seq.empty) + val warehousePath1 = Utils.createTempDir("wh1") + val argsForCreateTable = Seq( + "--class", SPARK_19667_CREATE_TABLE.getClass.getName.stripSuffix("$"), + "--name", "SPARK-19667", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=$warehousePath1", + unusedJar.toString) + runSparkSubmit(argsForCreateTable) + + val warehousePath2 = Utils.createTempDir("wh2") + val argsForShowTables = Seq( + "--class", SPARK_19667_VERIFY_TABLE_PATH.getClass.getName.stripSuffix("$"), + "--name", "SPARK-19667", + "--master", "local-cluster[2,1,1024]", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.master.rest.enabled=false", + "--conf", s"spark.sql.warehouse.dir=$warehousePath2", + unusedJar.toString) + runSparkSubmit(argsForShowTables) + + Utils.deleteRecursively(warehousePath1) + Utils.deleteRecursively(warehousePath2) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. // This is copied from org.apache.spark.deploy.SparkSubmitSuite private def runSparkSubmit(args: Seq[String]): Unit = { @@ -907,3 +936,97 @@ object SPARK_18989_DESC_TABLE { } } } + +object SPARK_19667_CREATE_TABLE { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder().enableHiveSupport().getOrCreate() + try { + val warehousePath = new Path(spark.sharedState.warehousePath) + val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) + val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") + // default database use warehouse path as its location + assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) + spark.sql("CREATE TABLE t(a string)") + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + // table in default database use the location of default database which is also warehouse path + assert(new Path(table.location) == fs.makeQualified(new Path(warehousePath, "t"))) + spark.sql("INSERT INTO TABLE t SELECT 1") + assert(spark.sql("SELECT * FROM t").count == 1) + + spark.sql("CREATE DATABASE not_default") + spark.sql("USE not_default") + spark.sql("CREATE TABLE t1(b string)") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + // table in not default database use the location of its own database + assert(new Path(table1.location) == fs.makeQualified( + new Path(warehousePath, "not_default.db/t1"))) + } finally { + spark.sql("USE default") + } + } +} + +object SPARK_19667_VERIFY_TABLE_PATH { + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder().enableHiveSupport().getOrCreate() + try { + val warehousePath = new Path(spark.sharedState.warehousePath) + val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) + val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") + // default database use warehouse path as its location + assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) + + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + // the table in default database created in job(SPARK_19667_CREATE_TABLE) above, + // which has different warehouse path from this job, its location still equals to + // the location when it's created. + assert(new Path(table.location) != fs.makeQualified(new Path(warehousePath, "t"))) + assert(spark.sql("SELECT * FROM t").count == 1) + + spark.sql("CREATE TABLE t3(d string)") + val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3")) + // the table in default database created here in this job, it will use the warehouse path + // of this job as its location + assert(new Path(table3.location) == fs.makeQualified(new Path(warehousePath, "t3"))) + + spark.sql("USE not_default") + val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + // the table in not default database create in job(SPARK_19667_CREATE_TABLE) above, + // which has different warehouse path from this job, its location still equals to + // the location when it's created. + assert(new Path(table1.location) != fs.makeQualified( + new Path(warehousePath, "not_default.db/t1"))) + assert(!new File(warehousePath.toString, "not_default.db/t1").exists()) + + spark.sql("CREATE TABLE t2(c string)") + val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) + // the table in not default database created here in this job, it will use the location + // of the database as its location, not the warehouse path in this job + assert(new Path(table2.location) != fs.makeQualified( + new Path(warehousePath, "not_default.db/t2"))) + + spark.sql("CREATE DATABASE not_default_1") + spark.sql("USE not_default_1") + spark.sql("CREATE TABLE t4(e string)") + val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4")) + // the table created in the database which created in this job, it will use the location + // of the database. + assert(new Path(table4.location) == fs.makeQualified( + new Path(warehousePath, "not_default_1.db/t4"))) + + } finally { + spark.sql("DROP TABLE IF EXISTS t4") + spark.sql("DROP DATABASE not_default_1") + + spark.sql("USE not_default") + spark.sql("DROP TABLE IF EXISTS t1") + spark.sql("DROP TABLE IF EXISTS t2") + spark.sql("DROP DATABASE not_default") + + spark.sql("USE default") + spark.sql("DROP TABLE IF EXISTS t") + spark.sql("DROP TABLE IF EXISTS t3") + } + } +}