diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ac6c3a89dbd08..246894813c3b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -317,6 +317,10 @@ case class LoadDataCommand( holdDDLTime = false, isSrcLocal = isLocal) } + + // Refresh the metadata cache to ensure the data visible to the users + catalog.refreshTable(targetTable.identifier) + Seq.empty[Row] } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0c110d3500540..e4b1f6ae3e49e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -246,13 +246,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong - val fileCatalog = { - val catalog = new CatalogFileIndex( + val fileIndex = { + val index = new CatalogFileIndex( sparkSession, metastoreRelation.catalogTable, sizeInBytes) if (lazyPruningEnabled) { - catalog + index } else { - catalog.filterPartitions(Nil) // materialize all the partitions in memory + index.filterPartitions(Nil) // materialize all the partitions in memory } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet @@ -261,7 +261,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) val relation = HadoopFsRelation( - location = fileCatalog, + location = fileIndex, partitionSchema = partitionSchema, dataSchema = dataSchema, bucketSpec = bucketSpec, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index d3e04d4bf2e0e..aa4a150a4b807 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -23,8 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExecutedCommandExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation} -import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf @@ -187,7 +186,8 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "normal_parquet", "jt", "jt_array", - "test_parquet") + "test_parquet") + super.afterAll() } test(s"conversion is working") { @@ -575,30 +575,30 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Create partition without data files and check whether it can be read sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) // Add data files to partition directory and check whether they can be read sql("INSERT INTO TABLE test_added_partitions PARTITION (b=1) select 'baz' as a") checkAnswer( sql("SELECT * FROM test_added_partitions"), - Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0), Row("baz", 1))) // Check it with pruning predicates checkAnswer( sql("SELECT * FROM test_added_partitions where b = 0"), - Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + Seq(Row("foo", 0), Row("bar", 0))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 1"), - Seq(("baz", 1)).toDF("a", "b")) + Seq(Row("baz", 1))) checkAnswer( sql("SELECT * FROM test_added_partitions where b = 2"), - Seq[(String, Int)]().toDF("a", "b")) + Seq.empty) // Also verify the inputFiles implementation assert(sql("select * from test_added_partitions").inputFiles.length == 2) @@ -609,6 +609,63 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } } + test("Explicitly added partitions should be readable after load") { + withTable("test_added_partitions") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql( + """ + |CREATE TABLE test_added_partitions (a STRING) + |PARTITIONED BY (b INT) + |STORED AS PARQUET + """.stripMargin) + + // Create partition without data files and check whether it can be read + sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1')") + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("test_added_partitions"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE test_added_partitions PARTITION(b='1') + """.stripMargin) + + checkAnswer( + spark.table("test_added_partitions"), + Seq(Row("0", 1), Row("1", 1))) + } + } + } + + test("Non-partitioned table readable after load") { + withTable("tab") { + withTempDir { src => + val newPartitionDir = src.getCanonicalPath + spark.range(2).selectExpr("cast(id as string)").toDF("a").write + .mode("overwrite") + .parquet(newPartitionDir) + + sql("CREATE TABLE tab (a STRING) STORED AS PARQUET") + + // This table fetch is to fill the cache with zero leaf files + checkAnswer(spark.table("tab"), Seq.empty) + + sql( + s""" + |LOAD DATA LOCAL INPATH '$newPartitionDir' OVERWRITE + |INTO TABLE tab + """.stripMargin) + + checkAnswer(spark.table("tab"), Seq(Row("0"), Row("1"))) + } + } + } + test("self-join") { val table = spark.table("normal_parquet") val selfJoin = table.as("t1").crossJoin(table.as("t2"))