Skip to content

Commit

Permalink
[SPARK-15248][SQL] Make MetastoreFileCatalog consider directories fro…
Browse files Browse the repository at this point in the history
…m partition specs of a partitioned metastore table

Table partitions can be added with locations different from default warehouse location of a hive table.
`CREATE TABLE parquetTable (a int) PARTITIONED BY (b int) STORED AS parquet `
`ALTER TABLE parquetTable ADD PARTITION (b=1) LOCATION '/partition'`
Querying such a table throws error as the MetastoreFileCatalog does not list the added partition directory, it only lists the default base location.

```
[info] - SPARK-15248: explicitly added partitions should be readable *** FAILED *** (1 second, 8 milliseconds)
[info]   java.util.NoSuchElementException: key not found: file:/Users/tdas/Projects/Spark/spark2/target/tmp/spark-b39ad224-c5d1-4966-8981-fb45a2066d61/partition
[info]   at scala.collection.MapLike$class.default(MapLike.scala:228)
[info]   at scala.collection.AbstractMap.default(Map.scala:59)
[info]   at scala.collection.MapLike$class.apply(MapLike.scala:141)
[info]   at scala.collection.AbstractMap.apply(Map.scala:59)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:59)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog$$anonfun$listFiles$1.apply(PartitioningAwareFileCatalog.scala:55)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
[info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
[info]   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
[info]   at org.apache.spark.sql.execution.datasources.PartitioningAwareFileCatalog.listFiles(PartitioningAwareFileCatalog.scala:55)
[info]   at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:93)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:55)
[info]   at org.apache.spark.sql.execution.SparkStrategies$SpecialLimits$.apply(SparkStrategies.scala:55)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:59)
[info]   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
[info]   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
[info]   at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:60)
[info]   at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:77)
[info]   at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
[info]   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:82)
[info]   at org.apache.spark.sql.QueryTest.assertEmptyMissingInput(QueryTest.scala:330)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:146)
[info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:159)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:554)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7$$anonfun$apply$mcV$sp$25.apply(parquetSuites.scala:535)
[info]   at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:125)
[info]   at org.apache.spark.sql.hive.ParquetPartitioningTest.withTempDir(parquetSuites.scala:726)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12$$anonfun$apply$mcV$sp$7.apply$mcV$sp(parquetSuites.scala:535)
[info]   at org.apache.spark.sql.test.SQLTestUtils$class.withTable(SQLTestUtils.scala:166)
[info]   at org.apache.spark.sql.hive.ParquetPartitioningTest.withTable(parquetSuites.scala:726)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply$mcV$sp(parquetSuites.scala:534)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
[info]   at org.apache.spark.sql.hive.ParquetMetastoreSuite$$anonfun$12.apply(parquetSuites.scala:534)
```

The solution in this PR to get the paths to list from the partition spec and not rely on the default table path alone.

unit tests.

Author: Tathagata Das <[email protected]>

Closes #13022 from tdas/SPARK-15248.

(cherry picked from commit 81c68ec)
Signed-off-by: Yin Huai <[email protected]>
  • Loading branch information
tdas authored and yhuai committed May 11, 2016
1 parent 56e1e2f commit 6b36185
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,16 @@ abstract class PartitioningAwareFileCatalog(
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
Partition(
values,
leafDirToChildrenFiles(path).filterNot(_.getPath.getName startsWith "_"))
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filterNot(_.getPath.getName.startsWith("_"))

case None =>
// Directory does not exist, or has no children files
Nil
}
Partition(values, files)
}
}
logTrace("Selected files after partition pruning:\n\t" + selectedPartitions.mkString("\n\t"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSpec))

val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec)
val fileCatalog = new MetaStorePartitionedTableFileCatalog(
sparkSession,
new Path(metastoreRelation.catalogTable.storage.locationUri.get),
partitionSpec)

val inferredSchema = if (fileType.equals("parquet")) {
val inferredSchema =
Expand Down Expand Up @@ -537,18 +539,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
/**
* An override of the standard HDFS listing based catalog, that overrides the partition spec with
* the information from the metastore.
* @param tableBasePath The default base path of the Hive metastore table
* @param partitionSpec The partition specifications from Hive metastore
*/
private[hive] class MetaStoreFileCatalog(
private[hive] class MetaStorePartitionedTableFileCatalog(
sparkSession: SparkSession,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
tableBasePath: Path,
override val partitionSpec: PartitionSpec)
extends ListingFileCatalog(
sparkSession,
paths,
MetaStorePartitionedTableFileCatalog.getPaths(tableBasePath, partitionSpec),
Map.empty,
Some(partitionSpecFromHive.partitionColumns)) {
Some(partitionSpec.partitionColumns)) {
}

override def partitionSpec(): PartitionSpec = partitionSpecFromHive
private[hive] object MetaStorePartitionedTableFileCatalog {
/** Get the list of paths to list files in the for a metastore table */
def getPaths(tableBasePath: Path, partitionSpec: PartitionSpec): Seq[Path] = {
// If there are no partitions currently specified then use base path,
// otherwise use the paths corresponding to the partitions.
if (partitionSpec.partitions.isEmpty) {
Seq(tableBasePath)
} else {
partitionSpec.partitions.map(_.path)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,40 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
}

test("SPARK-15248: explicitly added partitions should be readable") {
withTable("test_added_partitions", "test_temp") {
withTempDir { src =>
val partitionDir = new File(src, "partition").getCanonicalPath
sql(
"""
|CREATE TABLE test_added_partitions (a STRING)
|PARTITIONED BY (b INT)
|STORED AS PARQUET
""".stripMargin)

// Temp table to insert data into partitioned table
Seq("foo", "bar").toDF("a").registerTempTable("test_temp")
sql("INSERT INTO test_added_partitions PARTITION(b='0') SELECT a FROM test_temp")

checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))

// Create partition without data files and check whether it can be read
sql(s"ALTER TABLE test_added_partitions ADD PARTITION (b='1') LOCATION '$partitionDir'")
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0)).toDF("a", "b"))

// Add data files to partition directory and check whether they can be read
Seq("baz").toDF("a").write.mode(SaveMode.Overwrite).parquet(partitionDir)
checkAnswer(
sql("SELECT * FROM test_added_partitions"),
Seq(("foo", 0), ("bar", 0), ("baz", 1)).toDF("a", "b"))
}
}
}
}

/**
Expand Down

0 comments on commit 6b36185

Please sign in to comment.