From 6b36185d0eece2c438ac9892cc34547da90c3c1c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 May 2016 12:35:41 -0700 Subject: [PATCH] [SPARK-15248][SQL] Make MetastoreFileCatalog consider directories from partition specs of a partitioned metastore table Table partitions can be added with locations different from default warehouse location of a hive table. `CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet ` `ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'` Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location. ``` [info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds) [info] java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition [info] at scala.collection.MapLike$class.default(MapLike.scala:228) [info] at scala.collection.AbstractMap.default(Map.scala:59) [info] at scala.collection.MapLike$class.apply(MapLike.scala:141) [info] at scala.collection.AbstractMap.apply(Map.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) [info] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) [info] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) [info] at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) [info] at scala.collection.AbstractTraversable.map(Traversable.scala:104) [info] at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55) [info] at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55) [info] at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59) [info] at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) [info] at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) [info] at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77) [info] at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82) [info] at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82) [info] at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146) [info] at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535) [info] at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166) [info] at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534) [info] at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534) ``` The solution in this PR to get the paths to list from the partition spec and not rely on the default table path alone. unit tests. Author: Tathagata Das Closes #13022 from tdas/SPARK-15248. (cherry picked from commit 81c68eceba3a857ba7349c6892dc336c3ebd11dc) Signed-off-by: Yin Huai --- .../PartitioningAwareFileCatalog.scala | 13 +++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 31 ++++++++++++----- .../apache/spark/sql/hive/parquetSuites.scala | 34 +++++++++++++++++++ 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 27f23c855da6e..e0e4ddc30b0f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog( } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => - Partition( - values, - leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_")) + val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { + case Some(existingDir) => + // Directory has children files in it, return them + existingDir.filterNot(_.getPath.getName.startsWith("_")) + + case None => + // Directory does not exist, or has no children files + Nil + } + Partition(values, files) } } logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t")) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 607f0a10ec8f6..b0a3a803d299f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(partitionSpec)) val hadoopFsRelation = cached.getOrElse { - val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil - val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec) + val fileCatalog = new MetaStorePartitionedTableFileCatalog( + sparkSession, + new Path(metastoreRelation.catalogTable.storage.locationUri.get), + partitionSpec) val inferredSchema = if (fileType.equals("parquet")) { val inferredSchema = @@ -537,18 +539,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log /** * An override of the standard HDFS listing based catalog, that overrides the partition spec with * the information from the metastore. + * @param tableBasePath The default base path of the Hive metastore table + * @param partitionSpec The partition specifications from Hive metastore */ -private[hive] class MetaStoreFileCatalog( +private[hive] class MetaStorePartitionedTableFileCatalog( sparkSession: SparkSession, - paths: Seq[Path], - partitionSpecFromHive: PartitionSpec) + tableBasePath: Path, + override val partitionSpec: PartitionSpec) extends ListingFileCatalog( sparkSession, - paths, + MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec), Map.empty, - Some(partitionSpecFromHive.partitionColumns)) { + Some(partitionSpec.partitionColumns)) { +} - override def partitionSpec(): PartitionSpec = partitionSpecFromHive +private[hive] object MetaStorePartitionedTableFileCatalog { + /** Get the list of paths to list files in the for a metastore table */ + def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = { + // If there are no partitions currently specified then use base path, + // otherwise use the paths corresponding to the partitions. + if (partitionSpec.partitions.isEmpty) { + Seq(tableBasePath) + } else { + partitionSpec.partitions.map(_.path) + } + } } /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 6e93bbde26583..f52c6e48c5760 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test") } + + test("SPARK-15248: explicitly added partitions should be readable") { + withTable("test_added_partitions", "test_temp") { + withTempDir { src => + val partitionDir = new File(src, "partition").getCanonicalPath + sql( + """ + |CREATE TABLE test_added_partitions (a STRING) + |PARTITIONED BY (b INT) + |STORED AS PARQUET + """.stripMargin) + + // Temp table to insert data into partitioned table + Seq("foo", "bar").toDF("a").registerTempTable("test_temp") + sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp") + + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + + // Create partition without data files and check whether it can be read + sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'") + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0)).toDF("a", "b")) + + // Add data files to partition directory and check whether they can be read + Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir) + checkAnswer( + sql("SELECT * FROM test_added_partitions"), + Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b")) + } + } + } } /**