From de62ddf7ff42bdc383da127e6b1155897565354c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 15 Jan 2017 20:40:44 +0800 Subject: [PATCH] [SPARK-19120] Refresh Metadata Cache After Loading Hive Tables ### What changes were proposed in this pull request? ```Scala sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") // This table fetch is to fill the cache with zero leaf files spark.table("tab").show() sql( s""" |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE |INTO TABLE tab """.stripMargin) spark.table("tab").show() ``` In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on. This PR is to refresh the metadata cache after processing the `LOAD DATA` command. In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not. ### How was this patch tested? Added test cases in parquetSuites.scala Author: gatorsmile Closes #16500 from gatorsmile/refreshInsertIntoHiveTable. --- .../spark/sql/execution/command/tables.scala | 4 + .../spark/sql/hive/HiveMetastoreCatalog.scala | 10 +-- .../apache/spark/sql/hive/parquetSuites.scala | 75 ++++++++++++++++--- 3 files changed, 75 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ac6c3a89dbd08..246894813c3b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -317,6 +317,10 @@ case class LoadDataCommand( holdDDLTime = false, isSrcLocal = isLocal) } + + // Refresh the metadata cache to ensure the data visible to the users + catalog.refreshTable(targetTable.identifier) + Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0c110d3500540..e4b1f6ae3e49e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong - val fileCatalog = { - val catalog = new CatalogFileIndex( + val fileIndex = { + val index = new CatalogFileIndex( sparkSession, metastoreRelation.catalogTable, sizeInBytes) if (lazyPruningEnabled) { - catalog + index } else { - catalog.filterPartitions(Nil) // materialize all the partitions in memory + index.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet @@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) val relation = HadoopFsRelation( - location = fileCatalog, + location = fileIndex, partitionSchema = partitionSchema, dataSchema = dataSchema, bucketSpec = bucketSpec, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index d3e04d4bf2e0e..aa4a150a4b807 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "normal_parquet", "jt", "jt_array", - "test_parquet") + "test_parquet") + super.afterAll() } test(s"conversion is working") { @@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Create partition without data files and check whether it can be read sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Add data files to partition directory and check whether they can be read sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1))) // Check it with pruning predicates checkAnswer( sql("SELECT * FROM test_added_partitions where b = 0"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 1"), - Seq(("baz", 1)).toDF("a", "b")) + Seq(Row("baz", 1))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 2"), - Seq[(String, Int)]().toDF("a", "b")) + Seq.empty) // Also verify the inputFiles implementation assert(sql("select * from test_added_partitions").inputFiles.length == 2) @@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } + test("Explicitly added partitions should be readable after load") { + withTable("test_added_partitions") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql( + """ + |CREATE TABLE test_added_partitions (a STRING) + |PARTITIONED BY (b INT) + |STORED AS PARQUET + """.stripMargin) + + // Create partition without data files and check whether it can be read + sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')") + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("test_added_partitions"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE test_added_partitions PARTITION(b='1') + """.stripMargin) + + checkAnswer( + spark.table("test_added_partitions"), + Seq(Row("0", 1), Row("1", 1))) + } + } + } + + test("Non-partitioned table readable after load") { + withTable("tab") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") + + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("tab"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE tab + """.stripMargin) + + checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1"))) + } + } + } + test("self-join") { val table = spark.table("normal_parquet") val selfJoin = table.as("t1").crossJoin(table.as("t2"))