Skip to content

Commit

Permalink
process default database location when create/get database from metas…
Browse files Browse the repository at this point in the history
…tore
  • Loading branch information
windpiger committed Feb 22, 2017
1 parent a2c9168 commit bacd528
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {

// Create the default database if it doesn't exist.
{
// default database set to empty string,
// when reload from metastore using warehouse path to replace it
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE, "default database", warehousePath, Map())
SessionCatalog.DEFAULT_DATABASE, "default database", "", Map())
// Initialize default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
// There may be another Spark application creating default database at the same time, here we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -407,14 +407,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
properties
}

// if the database is default, the value of WAREHOUSE_PATH in conf returned
private def defaultTablePath(tableIdent: TableIdentifier): String = {
val dbLocation = if (tableIdent.database.get == SessionCatalog.DEFAULT_DATABASE
|| conf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) {
conf.get(WAREHOUSE_PATH)
} else {
getDatabase(tableIdent.database.get).locationUri
}
val dbLocation = getDatabase(tableIdent.database.get).locationUri
new Path(new Path(dbLocation), tableIdent.table).toString
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.client.HiveClientImpl._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.sql.types._
import org.apache.spark.util.{CircularBuffer, Utils}

Expand Down Expand Up @@ -311,11 +313,12 @@ private[hive] class HiveClientImpl(
override def createDatabase(
database: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withHiveState {
// default database's location always use the warehouse path, here set to emtpy string
client.createDatabase(
new HiveDatabase(
database.name,
database.description,
database.locationUri,
if (database.name == SessionCatalog.DEFAULT_DATABASE) "" else database.locationUri,
Option(database.properties).map(_.asJava).orNull),
ignoreIfExists)
}
Expand All @@ -339,10 +342,17 @@ private[hive] class HiveClientImpl(

override def getDatabase(dbName: String): CatalogDatabase = withHiveState {
Option(client.getDatabase(dbName)).map { d =>
// default database's location always use the warehouse path
// TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH is a flag fro test
val dbLocation = if (dbName == SessionCatalog.DEFAULT_DATABASE
|| sparkConf.get(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH)) {
sparkConf.get(WAREHOUSE_PATH)
} else d.getLocationUri

CatalogDatabase(
name = d.getName,
description = d.getDescription,
locationUri = d.getLocationUri,
locationUri = dbLocation,
properties = Option(d.getParameters).map(_.asScala.toMap).orNull)
}.getOrElse(throw new NoSuchDatabaseException(dbName))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1590,38 +1590,42 @@ class HiveDDLSuite

test("create table with default database use warehouse path instead of database location") {
withTable("t") {
// default database use warehouse path as its location
withTempDir { dir =>
spark.sparkContext.conf
.set(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key, "true")

val sparkWarehousePath = spark.sharedState.warehousePath.stripSuffix("/")
spark.sql(s"CREATE DATABASE default_test LOCATION '$dir'" )
val db = spark.sessionState.catalog.getDatabaseMetadata("default_test")
assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}")
assert(db.locationUri.stripSuffix("/") == sparkWarehousePath)
spark.sql("USE default_test")
val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t"

spark.sql("CREATE TABLE t(a string)")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.stripSuffix("/") == sparkWarehouseTablePath )
assert(table.location.stripSuffix("/").stripPrefix("file:") ==
new File(sparkWarehousePath, "t").getAbsolutePath.stripSuffix("/"))

// clear
spark.sparkContext.conf
.remove(SQLConf.TEST_HIVE_CREATETABLE_DEFAULTDB_USEWAREHOUSE_PATH.key)

spark.sql("DROP TABLE t")
spark.sql("DROP DATABASE default_test")
spark.sql("USE DEFAULT")
}

// not default database use its's location from the create command
withTempDir { dir =>
val dirPath = s"file:${dir.getAbsolutePath.stripSuffix("/")}"
spark.sql(s"CREATE DATABASE test_not_default LOCATION '$dir'" )
val db = spark.sessionState.catalog.getDatabaseMetadata("test_not_default")
assert(db.locationUri.stripSuffix("/") == s"file:${dir.getAbsolutePath.stripSuffix("/")}")
assert(db.locationUri.stripSuffix("/") == dirPath)
spark.sql("USE test_not_default")
val sparkWarehouseTablePath = s"file:${spark.sharedState.warehousePath.stripSuffix("/")}/t"

spark.sql("CREATE TABLE t(a string)")
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(table.location.stripSuffix("/") == s"${db.locationUri.stripSuffix("/")}/t" )
assert(table.location.stripSuffix("/") == s"$dirPath/t" )

// clear
spark.sql("DROP TABLE t")
Expand All @@ -1631,3 +1635,5 @@ class HiveDDLSuite
}
}
}

case class cc(a: Array[Byte])

0 comments on commit bacd528

Please sign in to comment.