-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19667][SQL]create table with hiveenabled in default database use warehouse path instead of the location of default database #17001
Changes from all commits
aebdfc6
825c0ad
a2c9168
bacd528
3f6e061
96dcc7d
f329387
83dba73
58a0020
1dce2d7
12f81d3
56e83d5
901bb1c
99d9746
db555e3
d327994
73c8802
747b31a
8f8063f
4dc11c1
9c0773b
80b8133
41ea115
13245e4
096ae63
badd61b
35d2b59
e3a467e
ae9938a
7739ccd
f93f5d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -334,6 +334,35 @@ class HiveSparkSubmitSuite | |
runSparkSubmit(argsForShowTables) | ||
} | ||
|
||
test("SPARK-19667: create table in default database with HiveEnabled use warehouse path " + | ||
"instead of the location of default database") { | ||
val unusedJar = TestUtils.createJarWithClasses(Seq.empty) | ||
val warehousePath1 = Utils.createTempDir("wh1") | ||
val argsForCreateTable = Seq( | ||
"--class", SPARK_19667_CREATE_TABLE.getClass.getName.stripSuffix("$"), | ||
"--name", "SPARK-19667", | ||
"--master", "local-cluster[2,1,1024]", | ||
"--conf", "spark.ui.enabled=false", | ||
"--conf", "spark.master.rest.enabled=false", | ||
"--conf", s"spark.sql.warehouse.dir=$warehousePath1", | ||
unusedJar.toString) | ||
runSparkSubmit(argsForCreateTable) | ||
|
||
val warehousePath2 = Utils.createTempDir("wh2") | ||
val argsForShowTables = Seq( | ||
"--class", SPARK_19667_VERIFY_TABLE_PATH.getClass.getName.stripSuffix("$"), | ||
"--name", "SPARK-19667", | ||
"--master", "local-cluster[2,1,1024]", | ||
"--conf", "spark.ui.enabled=false", | ||
"--conf", "spark.master.rest.enabled=false", | ||
"--conf", s"spark.sql.warehouse.dir=$warehousePath2", | ||
unusedJar.toString) | ||
runSparkSubmit(argsForShowTables) | ||
|
||
Utils.deleteRecursively(warehousePath1) | ||
Utils.deleteRecursively(warehousePath2) | ||
} | ||
|
||
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. | ||
// This is copied from org.apache.spark.deploy.SparkSubmitSuite | ||
private def runSparkSubmit(args: Seq[String]): Unit = { | ||
|
@@ -907,3 +936,97 @@ object SPARK_18989_DESC_TABLE { | |
} | ||
} | ||
} | ||
|
||
object SPARK_19667_CREATE_TABLE { | ||
def main(args: Array[String]): Unit = { | ||
val spark = SparkSession.builder().enableHiveSupport().getOrCreate() | ||
try { | ||
val warehousePath = new Path(spark.sharedState.warehousePath) | ||
val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) | ||
val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") | ||
// default database use warehouse path as its location | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) | ||
spark.sql("CREATE TABLE t(a string)") | ||
|
||
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) | ||
// table in default database use the location of default database which is also warehouse path | ||
assert(new Path(table.location) == fs.makeQualified(new Path(warehousePath, "t"))) | ||
spark.sql("INSERT INTO TABLE t SELECT 1") | ||
assert(spark.sql("SELECT * FROM t").count == 1) | ||
|
||
spark.sql("CREATE DATABASE not_default") | ||
spark.sql("USE not_default") | ||
spark.sql("CREATE TABLE t1(b string)") | ||
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) | ||
// table in not default database use the location of its own database | ||
assert(new Path(table1.location) == fs.makeQualified( | ||
new Path(warehousePath, "not_default.db/t1"))) | ||
} finally { | ||
spark.sql("USE default") | ||
} | ||
} | ||
} | ||
|
||
object SPARK_19667_VERIFY_TABLE_PATH { | ||
def main(args: Array[String]): Unit = { | ||
val spark = SparkSession.builder().enableHiveSupport().getOrCreate() | ||
try { | ||
val warehousePath = new Path(spark.sharedState.warehousePath) | ||
val fs = warehousePath.getFileSystem(spark.sessionState.newHadoopConf()) | ||
val defaultDB = spark.sessionState.catalog.getDatabaseMetadata("default") | ||
// default database use warehouse path as its location | ||
assert(new Path(defaultDB.locationUri) == fs.makeQualified(warehousePath)) | ||
|
||
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) | ||
// the table in default database created in job(SPARK_19667_CREATE_TABLE) above, | ||
// which has different warehouse path from this job, its location still equals to | ||
// the location when it's created. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about?
|
||
assert(new Path(table.location) != fs.makeQualified(new Path(warehousePath, "t"))) | ||
assert(spark.sql("SELECT * FROM t").count == 1) | ||
|
||
spark.sql("CREATE TABLE t3(d string)") | ||
val table3 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3")) | ||
// the table in default database created here in this job, it will use the warehouse path | ||
// of this job as its location | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ->
|
||
assert(new Path(table3.location) == fs.makeQualified(new Path(warehousePath, "t3"))) | ||
|
||
spark.sql("USE not_default") | ||
val table1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) | ||
// the table in not default database create in job(SPARK_19667_CREATE_TABLE) above, | ||
// which has different warehouse path from this job, its location still equals to | ||
// the location when it's created. | ||
assert(new Path(table1.location) != fs.makeQualified( | ||
new Path(warehousePath, "not_default.db/t1"))) | ||
assert(!new File(warehousePath.toString, "not_default.db/t1").exists()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This scenario (line 993-1000) is not needed to test, IMO. Most of the test cases already cover it. |
||
|
||
spark.sql("CREATE TABLE t2(c string)") | ||
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2")) | ||
// the table in not default database created here in this job, it will use the location | ||
// of the database as its location, not the warehouse path in this job | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ->
|
||
assert(new Path(table2.location) != fs.makeQualified( | ||
new Path(warehousePath, "not_default.db/t2"))) | ||
|
||
spark.sql("CREATE DATABASE not_default_1") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. -> |
||
spark.sql("USE not_default_1") | ||
spark.sql("CREATE TABLE t4(e string)") | ||
val table4 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t4")) | ||
// the table created in the database which created in this job, it will use the location | ||
// of the database. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ->
|
||
assert(new Path(table4.location) == fs.makeQualified( | ||
new Path(warehousePath, "not_default_1.db/t4"))) | ||
|
||
} finally { | ||
spark.sql("DROP TABLE IF EXISTS t4") | ||
spark.sql("DROP DATABASE not_default_1") | ||
|
||
spark.sql("USE not_default") | ||
spark.sql("DROP TABLE IF EXISTS t1") | ||
spark.sql("DROP TABLE IF EXISTS t2") | ||
spark.sql("DROP DATABASE not_default") | ||
|
||
spark.sql("USE default") | ||
spark.sql("DROP TABLE IF EXISTS t") | ||
spark.sql("DROP TABLE IF EXISTS t3") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about we just pass in a
defaultDB: CatalogDatabase
? then we don't need to add theprotected def warehousePath: String
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think conf/hadoopConf is more useful, later logic can use it. and it's subclass also has these two conf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still have conf/hadoopConf in
InMemoryCatalog
andHiveExternalCatalog
, we can just add one more parameter.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we pass a defaultDB, it seems like we introduce an instance of defaultDB as we discussed above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it will be only used in
getDatabase
, and we can save a metastore call to get the default database.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok~ let me fix it~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I found it that if we add a parameter
defaultDB
forExternalCatalog
and its subclassInMemoryCatalog
andHiveExternalCatalog
, this change will cause a lot of related code to be modified, such as test cases ,and other logic where createInMemoryCatalog
andHiveExternalCatalog
For example:
currently all the parameters of
InMemoryCatalog
have its own default valuewe can create it without an parameters, but if we add a
defaultDB
, we should new a defaultDB in the parameter, while we can not create a legal deafultDB because we can not get the warehouse path for the defaultDB like this:if we don't provide a default value for defautDB in the parameter, this will cause more code change which I think it is not proper.
what about we keep the
provided def warehousePath
inExternalCatalog
, and add alazy val defaultDB = { val qualifiedWarehousePath = SessionCatalog .makeQualifiedPath(warehousePath, hadoopConf).toString CatalogDatabase("default","", qualifiedWarehousePath, Map.empty) }
this can also avoid call getDatabase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modify the code by adding
in
ExternalCatalog
if it is not ok ,I will revert it, thanks~