Skip to content

Commit

Permalink
[SPARK-19120] Refresh Metadata Cache After Loading Hive Tables
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
```Scala
        sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

        // This table fetch is to fill the cache with zero leaf files
        spark.table("tab").show()

        sql(
          s"""
             |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
             |INTO TABLE tab
           """.stripMargin)

        spark.table("tab").show()
```

In the above example, the returned result is empty after table loading. The metadata cache could be out of dated after loading new data into the table, because loading/inserting does not update the cache. So far, the metadata cache is only used for data source tables. Thus, for Hive serde tables, only `parquet` and `orc` formats are facing such issues, because the Hive serde tables in the format of  parquet/orc could be converted to data source tables when `spark.sql.hive.convertMetastoreParquet`/`spark.sql.hive.convertMetastoreOrc` is on.

This PR is to refresh the metadata cache after processing the `LOAD DATA` command.

In addition, Spark SQL does not convert **partitioned** Hive tables (orc/parquet) to data source tables in the write path, but the read path is using the metadata cache for both **partitioned** and non-partitioned Hive tables (orc/parquet). That means, writing the partitioned parquet/orc tables still use `InsertIntoHiveTable`, instead of `InsertIntoHadoopFsRelationCommand`. To avoid reading the out-of-dated cache, `InsertIntoHiveTable` needs to refresh the metadata cache for partitioned tables. Note, it does not need to refresh the cache for non-partitioned parquet/orc tables, because it does not call `InsertIntoHiveTable` at all. Based on the comments, this PR will keep the existing logics unchanged. That means, we always refresh the table no matter whether the table is partitioned or not.

### How was this patch tested?
Added test cases in parquetSuites.scala

Author: gatorsmile <[email protected]>

Closes #16500 from gatorsmile/refreshInsertIntoHiveTable.
  • Loading branch information
gatorsmile authored and cloud-fan committed Jan 15, 2017
1 parent a5e651f commit de62ddf
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ case class LoadDataCommand(
holdDDLTime = false,
isSrcLocal = isLocal)
}

// Refresh the metadata cache to ensure the data visible to the users
catalog.refreshTable(targetTable.identifier)

Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
val logicalRelation = cached.getOrElse {
val sizeInBytes =
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
val fileCatalog = {
val catalog = new CatalogFileIndex(
val fileIndex = {
val index = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
index
} else {
catalog.filterPartitions(Nil) // materialize all the partitions in memory
index.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
Expand All @@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))

val relation = HadoopFsRelation(
location = fileCatalog,
location = fileIndex,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
bucketSpec = bucketSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"normal_parquet",
"jt",
"jt_array",
"test_parquet")
"test_parquet")
super.afterAll()
}

test(s"conversion is working") {
Expand Down Expand Up @@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))

// Add data files to partition directory and check whether they can be read
sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1)))

// Check it with pruning predicates
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 0"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))
Seq(Row("foo", 0), Row("bar", 0)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 1"),
Seq(("baz", 1)).toDF("a", "b"))
Seq(Row("baz", 1)))
checkAnswer(
sql("SELECT * FROM test_added_partitions where b = 2"),
Seq[(String, Int)]().toDF("a", "b"))
Seq.empty)

// Also verify the inputFiles implementation
assert(sql("select * from test_added_partitions").inputFiles.length == 2)
Expand All @@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
}

test("Explicitly added partitions should be readable after load") {
withTable("test_added_partitions") {
withTempDir { src =>
val newPartitionDir = src.getCanonicalPath
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
.mode("overwrite")
.parquet(newPartitionDir)

sql(
"""
|CREATE TABLE test_added_partitions (a STRING)
|PARTITIONED BY (b INT)
|STORED AS PARQUET
""".stripMargin)

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')")
// This table fetch is to fill the cache with zero leaf files
checkAnswer(spark.table("test_added_partitions"), Seq.empty)

sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE test_added_partitions PARTITION(b='1')
""".stripMargin)

checkAnswer(
spark.table("test_added_partitions"),
Seq(Row("0", 1), Row("1", 1)))
}
}
}

test("Non-partitioned table readable after load") {
withTable("tab") {
withTempDir { src =>
val newPartitionDir = src.getCanonicalPath
spark.range(2).selectExpr("cast(id as string)").toDF("a").write
.mode("overwrite")
.parquet(newPartitionDir)

sql("CREATE TABLE tab (a STRING) STORED AS PARQUET")

// This table fetch is to fill the cache with zero leaf files
checkAnswer(spark.table("tab"), Seq.empty)

sql(
s"""
|LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE
|INTO TABLE tab
""".stripMargin)

checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1")))
}
}
}

test("self-join") {
val table = spark.table("normal_parquet")
val selfJoin = table.as("t1").crossJoin(table.as("t2"))
Expand Down

0 comments on commit de62ddf

Please sign in to comment.