From c154f3a8e9d33270e73678764b1f3dc384648e1b Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Thu, 4 May 2017 15:11:12 +0800 Subject: [PATCH 1/9] [SPARK-20594]The staging directory should be appended with ".hive-staging" to avoid being deleted if we set hive.exec.stagingdir under the table directory without start with "." --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3682dc850790e..b0c46ba8567dc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -222,7 +222,7 @@ case class InsertIntoHiveTable( val externalCatalog = sparkSession.sharedState.externalCatalog val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version val hadoopConf = sessionState.newHadoopConf() - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + "/.hive-staging" val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") val hiveQlTable = HiveClientImpl.toHiveTable(table) From de938ed91f6e62b02fed32c7123f3aae5d51d9f7 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Mon, 8 May 2017 15:35:21 +0800 Subject: [PATCH 2/9] add a test case for SPARK-20594 --- .../sql/hive/InsertIntoHiveTableSuite.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index d6999af84eac0..aedc01290b981 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -494,4 +494,50 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef spark.table("t").write.insertInto(tableName) } } + + /** + * Drop named tables if they exist + * + * @param tableNames tables to drop + */ + def dropTables(tableNames: String*): Unit = { + tableNames.foreach { name => + sql(s"DROP TABLE IF EXISTS $name") + } + } + + test( + """SPARK-20594: The staging directory should be appended with ".hive-staging" + |to avoid being deleted if we set hive.exec.stagingdir under the table directory + |without start with "."""".stripMargin) { + + dropTables("test_table", "test_table1") + + sql("CREATE TABLE test_table (key int, value string)") + + // Add some data. + testData.write.mode(SaveMode.Append).insertInto("test_table") + + // Make sure the table has also been updated. + checkAnswer( + sql("SELECT * FROM test_table"), + testData.collect().toSeq + ) + + sql("CREATE TABLE test_table1 (key int, value string)") + + // Set hive.exec.stagingdir under the table directory without start with ".". + sql("set hive.exec.stagingdir=./test") + + // Now overwrite. + sql("INSERT OVERWRITE TABLE test_table1 SELECT * FROM test_table") + + // Make sure the table has also been updated. + checkAnswer( + sql("SELECT * FROM test_table1"), + testData.collect().toSeq + ) + + dropTables("test_table", "test_table1") + } } From 6b22d3ea694c4133965ddface73c52c3566cd156 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 9 May 2017 12:07:49 +0800 Subject: [PATCH 3/9] [SPARK-20594]The staging directory should be a child directory starts with "." to avoid being deleted if we set hive.exec.stagingdir under the table directory. --- .../sql/hive/execution/InsertIntoHiveTable.scala | 15 +++++++++++++-- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 7 +------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index b0c46ba8567dc..6fcd6b1586db0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -97,12 +97,23 @@ case class InsertIntoHiveTable( val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) - val stagingPathName: String = + var stagingPathName: String = if (inputPathName.indexOf(stagingDir) == -1) { new Path(inputPathName, stagingDir).toString } else { inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) } + + // SPARK-20594: The staging directory should be a child directory starts with "." to avoid + // being deleted if we set hive.exec.stagingdir under the table directory. + if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) + && !stagingPathName.stripPrefix(inputPathName).startsWith(".")) { + logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + + s"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + s"directory.") + stagingPathName = new Path(inputPathName, ".hive-staging").toString + } + val dir: Path = fs.makeQualified( new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) @@ -222,7 +233,7 @@ case class InsertIntoHiveTable( val externalCatalog = sparkSession.sharedState.externalCatalog val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version val hadoopConf = sessionState.newHadoopConf() - val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") + "/.hive-staging" + val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") val hiveQlTable = HiveClientImpl.toHiveTable(table) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index aedc01290b981..11274fc6e42f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -495,12 +495,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - /** - * Drop named tables if they exist - * - * @param tableNames tables to drop - */ - def dropTables(tableNames: String*): Unit = { + private def dropTables(tableNames: String*): Unit = { tableNames.foreach { name => sql(s"DROP TABLE IF EXISTS $name") } From 6b1b153e1ee9ec3e7830158d8f8eb274970929ae Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 9 May 2017 14:38:49 +0800 Subject: [PATCH 4/9] should add stripPrefix(File.separator) --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 6fcd6b1586db0..c7395de633df4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,20 +17,18 @@ package org.apache.spark.sql.hive.execution -import java.io.IOException +import java.io.{File, IOException} import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc - import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -107,7 +105,7 @@ case class InsertIntoHiveTable( // SPARK-20594: The staging directory should be a child directory starts with "." to avoid // being deleted if we set hive.exec.stagingdir under the table directory. if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) - && !stagingPathName.stripPrefix(inputPathName).startsWith(".")) { + && !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + s"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + s"directory.") From 2a542e4248b52e2e5a7a9bc651598b755a81ca3a Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Tue, 9 May 2017 15:08:27 +0800 Subject: [PATCH 5/9] Fix scala style checks --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c7395de633df4..ebf9050a28c36 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -23,12 +23,14 @@ import java.text.SimpleDateFormat import java.util.{Date, Locale, Random} import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc + import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.CatalogTable From 9f414366c67e4fd6d46b4ec4d241de73e0921f86 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Wed, 10 May 2017 16:53:18 +0800 Subject: [PATCH 6/9] Fix code review --- .../hive/execution/InsertIntoHiveTable.scala | 5 +- .../sql/hive/InsertIntoHiveTableSuite.scala | 53 ++++++++----------- 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ebf9050a28c36..85c843037dcd7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -104,8 +104,9 @@ case class InsertIntoHiveTable( inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) } - // SPARK-20594: The staging directory should be a child directory starts with "." to avoid - // being deleted if we set hive.exec.stagingdir under the table directory. + // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + // staging directory needs to avoid being deleted when users set hive.exec.stagingdir + // under the table directory. if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 11274fc6e42f9..569687cc3d32d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -495,44 +495,33 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - private def dropTables(tableNames: String*): Unit = { - tableNames.foreach { name => - sql(s"DROP TABLE IF EXISTS $name") - } - } - test( - """SPARK-20594: The staging directory should be appended with ".hive-staging" - |to avoid being deleted if we set hive.exec.stagingdir under the table directory - |without start with "."""".stripMargin) { + """SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the + |staging directory needs to avoid being deleted when users set hive.exec.stagingdir + |under the table directory.""".stripMargin) { - dropTables("test_table", "test_table1") + withTable("test_table", "test_table1") { + spark.range(1).write.saveAsTable("test_table") - sql("CREATE TABLE test_table (key int, value string)") - - // Add some data. - testData.write.mode(SaveMode.Append).insertInto("test_table") + // Make sure the table has also been updated. + checkAnswer( + sql("SELECT * FROM test_table"), + Row(0) + ) - // Make sure the table has also been updated. - checkAnswer( - sql("SELECT * FROM test_table"), - testData.collect().toSeq - ) + sql("CREATE TABLE test_table1 (key int)") - sql("CREATE TABLE test_table1 (key int, value string)") + // Set hive.exec.stagingdir under the table directory without start with ".". + sql("set hive.exec.stagingdir=./test") - // Set hive.exec.stagingdir under the table directory without start with ".". - sql("set hive.exec.stagingdir=./test") + // Now overwrite. + sql("INSERT OVERWRITE TABLE test_table1 SELECT * FROM test_table") - // Now overwrite. - sql("INSERT OVERWRITE TABLE test_table1 SELECT * FROM test_table") - - // Make sure the table has also been updated. - checkAnswer( - sql("SELECT * FROM test_table1"), - testData.collect().toSeq - ) - - dropTables("test_table", "test_table1") + // Make sure the table has also been updated. + checkAnswer( + sql("SELECT * FROM test_table1"), + Row(0) + ) + } } } From bf1b4eca9b9692d2dd11a5c166446cc2d5258e86 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Thu, 11 May 2017 10:36:08 +0800 Subject: [PATCH 7/9] Fix code review --- .../sql/hive/execution/InsertIntoHiveTable.scala | 8 ++++---- .../spark/sql/hive/InsertIntoHiveTableSuite.scala | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 85c843037dcd7..3facf9f67be9f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -107,11 +107,11 @@ case class InsertIntoHiveTable( // SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the // staging directory needs to avoid being deleted when users set hive.exec.stagingdir // under the table directory. - if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) - && !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { + if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs) && + !stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")) { logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + - s"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + - s"directory.") + "with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + + "directory.") stagingPathName = new Path(inputPathName, ".hive-staging").toString } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 569687cc3d32d..bda7d2a88fb5c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -500,7 +500,7 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef |staging directory needs to avoid being deleted when users set hive.exec.stagingdir |under the table directory.""".stripMargin) { - withTable("test_table", "test_table1") { + withTable("test_table") { spark.range(1).write.saveAsTable("test_table") // Make sure the table has also been updated. @@ -509,19 +509,19 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef Row(0) ) - sql("CREATE TABLE test_table1 (key int)") - // Set hive.exec.stagingdir under the table directory without start with ".". sql("set hive.exec.stagingdir=./test") // Now overwrite. - sql("INSERT OVERWRITE TABLE test_table1 SELECT * FROM test_table") + sql("INSERT OVERWRITE TABLE test_table SELECT 1") // Make sure the table has also been updated. checkAnswer( - sql("SELECT * FROM test_table1"), - Row(0) + sql("SELECT * FROM test_table"), + Row(1) ) + + sql("reset") } } } From 4e1b6a037070dcbcb956f600abf152ad2d6dcca2 Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Thu, 11 May 2017 20:01:53 +0800 Subject: [PATCH 8/9] Fix test case --- .../apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index bda7d2a88fb5c..eae28fe37a4f6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -501,13 +501,8 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef |under the table directory.""".stripMargin) { withTable("test_table") { - spark.range(1).write.saveAsTable("test_table") - // Make sure the table has also been updated. - checkAnswer( - sql("SELECT * FROM test_table"), - Row(0) - ) + sql("CREATE TABLE test_table (key int)") // Set hive.exec.stagingdir under the table directory without start with ".". sql("set hive.exec.stagingdir=./test") From 639d63a20e94523f3443bc83b272fc60c1f5627a Mon Sep 17 00:00:00 2001 From: zuotingbing Date: Fri, 12 May 2017 09:25:27 +0800 Subject: [PATCH 9/9] simplify the test case --- .../sql/hive/InsertIntoHiveTableSuite.scala | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index eae28fe37a4f6..2c724f8388693 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -495,28 +495,14 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } - test( - """SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the - |staging directory needs to avoid being deleted when users set hive.exec.stagingdir - |under the table directory.""".stripMargin) { - - withTable("test_table") { - - sql("CREATE TABLE test_table (key int)") - - // Set hive.exec.stagingdir under the table directory without start with ".". - sql("set hive.exec.stagingdir=./test") - - // Now overwrite. - sql("INSERT OVERWRITE TABLE test_table SELECT 1") - - // Make sure the table has also been updated. - checkAnswer( - sql("SELECT * FROM test_table"), - Row(1) - ) - - sql("reset") + test("SPARK-20594: hive.exec.stagingdir was deleted by Hive") { + // Set hive.exec.stagingdir under the table directory without start with ".". + withSQLConf("hive.exec.stagingdir" -> "./test") { + withTable("test_table") { + sql("CREATE TABLE test_table (key int)") + sql("INSERT OVERWRITE TABLE test_table SELECT 1") + checkAnswer(sql("SELECT * FROM test_table"), Row(1)) + } } } }