From 3b031c75ba35324242993e8742961d23ce087445 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sun, 2 Apr 2017 15:01:41 +0200 Subject: [PATCH 1/3] Implement listPartitionsByFilter() for InMemoryCatalog --- .../catalog/ExternalCatalogUtils.scala | 32 ++++++++++++++++++ .../catalyst/catalog/InMemoryCatalog.scala | 8 ++--- .../catalog/ExternalCatalogSuite.scala | 33 +++++++++++++++++++ .../datasources/DataSourceStrategy.scala | 5 +-- .../spark/sql/hive/HiveExternalCatalog.scala | 33 +++---------------- .../sql/hive/HiveExternalCatalogSuite.scala | 7 ---- 6 files changed, 75 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index a8693dcca539d..20139213ce4c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.util.Shell import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} object ExternalCatalogUtils { // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since catalyst doesn't @@ -125,6 +126,37 @@ object ExternalCatalogUtils { } escapePathName(col) + "=" + partitionString } + + def prunePartitionsByFilter( + catalogTable: CatalogTable, + inputPartitions: Seq[CatalogTablePartition], + predicates: Seq[Expression], + defaultTimeZoneId: String): Seq[CatalogTablePartition] = { + if (predicates.isEmpty) { + inputPartitions + } else { + val partitionSchema = catalogTable.partitionSchema + val partitionColumnNames = catalogTable.partitionColumnNames.toSet + + val nonPartitionPruningPredicates = predicates.filterNot { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (nonPartitionPruningPredicates.nonEmpty) { + sys.error("Expected only partition pruning predicates: " + nonPartitionPruningPredicates) + } + + val boundPredicate = + InterpretedPredicate.create(predicates.reduce(And).transform { + case att: AttributeReference => + val index = partitionSchema.indexWhere(_.name == att.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + inputPartitions.filter { p => + boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId)) + } + } + } } object CatalogUtils { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index cdf618aef97c3..9ca1c71d1dcb1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.types.StructType @@ -556,9 +556,9 @@ class InMemoryCatalog( table: String, predicates: Seq[Expression], defaultTimeZoneId: String): Seq[CatalogTablePartition] = { - // TODO: Provide an implementation - throw new UnsupportedOperationException( - "listPartitionsByFilter is not implemented for InMemoryCatalog") + val catalogTable = getTable(db, table) + val allPartitions = listPartitions(db, table) + prunePartitionsByFilter(catalogTable, allPartitions, predicates, defaultTimeZoneId) } // -------------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 7820f39d96426..5d696226479b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI +import java.util.TimeZone import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -28,6 +29,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -436,6 +438,37 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty) } + test("list partitions by filter") { + val tz = TimeZone.getDefault().getID() + val catalog = newBasicCatalog() + + def checkAnswer(table: CatalogTable, filters: Seq[Expression], + expected: Set[CatalogTablePartition]): Unit = { + assertResult(expected.map(_.spec)) { + catalog.listPartitionsByFilter(table.database, table.identifier.identifier, filters, tz) + .map(_.spec).toSet + } + } + + def pcol(table: CatalogTable, name: String): Expression = { + val col = table.partitionSchema(name) + AttributeReference(col.name, col.dataType, col.nullable)() + } + val tbl2 = catalog.getTable("db2", "tbl2") + + checkAnswer(tbl2, Seq.empty, Set(part1, part2)) + checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(1))), Set(part1)) + checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(2))), Set.empty) + checkAnswer(tbl2, Seq(In(pcol(tbl2, "a"), Seq(Literal(3)))), Set(part2)) + checkAnswer(tbl2, Seq(Not(In(pcol(tbl2, "a"), Seq(Literal(4))))), Set(part1, part2)) + checkAnswer(tbl2, Seq( + EqualTo(pcol(tbl2, "a"), Literal(1)), + EqualTo(pcol(tbl2, "b"), Literal("2"))), Set(part1)) + checkAnswer(tbl2, Seq( + EqualTo(pcol(tbl2, "a"), Literal(1)), + EqualTo(pcol(tbl2, "b"), Literal("x"))), Set.empty) + } + test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index bddf5af23e060..c350d8bcbae97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -217,8 +217,6 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val cache = sparkSession.sessionState.catalog.tableRelationCache - val withHiveSupport = - sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { @@ -233,8 +231,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] bucketSpec = table.bucketSpec, className = table.provider.get, options = table.storage.properties ++ pathOption, - // TODO: improve `InMemoryCatalog` and remove this limitation. - catalogTable = if (withHiveSupport) Some(table) else None) + catalogTable = Some(table)) LogicalRelation( dataSource.resolveRelation(checkFilesExist = false), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 33b21be37203b..f0e35dff57f7b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -1039,37 +1039,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat defaultTimeZoneId: String): Seq[CatalogTablePartition] = withClient { val rawTable = getRawTable(db, table) val catalogTable = restoreTableMetadata(rawTable) - val partitionColumnNames = catalogTable.partitionColumnNames.toSet - val nonPartitionPruningPredicates = predicates.filterNot { - _.references.map(_.name).toSet.subsetOf(partitionColumnNames) - } - if (nonPartitionPruningPredicates.nonEmpty) { - sys.error("Expected only partition pruning predicates: " + - predicates.reduceLeft(And)) - } + val partColNameMap = buildLowerCasePartColNameMap(catalogTable) - val partitionSchema = catalogTable.partitionSchema - val partColNameMap = buildLowerCasePartColNameMap(getTable(db, table)) - - if (predicates.nonEmpty) { - val clientPrunedPartitions = client.getPartitionsByFilter(rawTable, predicates).map { part => + val clientPrunedPartitions = + client.getPartitionsByFilter(rawTable, predicates).map { part => part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) } - val boundPredicate = - InterpretedPredicate.create(predicates.reduce(And).transform { - case att: AttributeReference => - val index = partitionSchema.indexWhere(_.name == att.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - clientPrunedPartitions.filter { p => - boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId)) - } - } else { - client.getPartitions(catalogTable).map { part => - part.copy(spec = restorePartitionSpec(part.spec, partColNameMap)) - } - } + prunePartitionsByFilter(catalogTable, clientPrunedPartitions, predicates, defaultTimeZoneId) } // -------------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index 4349f1aa23be0..cafc544e68482 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -50,13 +50,6 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { import utils._ - test("list partitions by filter") { - val catalog = newBasicCatalog() - val selectedPartitions = catalog.listPartitionsByFilter("db2", "tbl2", Seq('a.int === 1), "GMT") - assert(selectedPartitions.length == 1) - assert(selectedPartitions.head.spec == part1.spec) - } - test("SPARK-18647: do not put provider in table properties for Hive serde table") { val catalog = newBasicCatalog() val hiveTable = CatalogTable( From 3ad032764c661a59e32695880e43edf6da658058 Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Sun, 2 Apr 2017 20:41:26 +0200 Subject: [PATCH 2/3] Address review remarks: AnalysisException & negative test --- .../catalog/ExternalCatalogUtils.scala | 3 +- .../catalog/ExternalCatalogSuite.scala | 36 +++++++++++-------- .../sql/hive/HiveExternalCatalogSuite.scala | 1 - 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 20139213ce4c5..254eedfe77517 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -142,7 +142,8 @@ object ExternalCatalogUtils { _.references.map(_.name).toSet.subsetOf(partitionColumnNames) } if (nonPartitionPruningPredicates.nonEmpty) { - sys.error("Expected only partition pruning predicates: " + nonPartitionPruningPredicates) + throw new AnalysisException("Expected only partition pruning predicates: " + + nonPartitionPruningPredicates) } val boundPredicate = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 5d696226479b6..3824b4b19a708 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException} import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException +import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -439,7 +440,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } test("list partitions by filter") { - val tz = TimeZone.getDefault().getID() + val tz = TimeZone.getDefault.getID val catalog = newBasicCatalog() def checkAnswer(table: CatalogTable, filters: Seq[Expression], @@ -450,23 +451,28 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac } } - def pcol(table: CatalogTable, name: String): Expression = { - val col = table.partitionSchema(name) - AttributeReference(col.name, col.dataType, col.nullable)() - } val tbl2 = catalog.getTable("db2", "tbl2") checkAnswer(tbl2, Seq.empty, Set(part1, part2)) - checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(1))), Set(part1)) - checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(2))), Set.empty) - checkAnswer(tbl2, Seq(In(pcol(tbl2, "a"), Seq(Literal(3)))), Set(part2)) - checkAnswer(tbl2, Seq(Not(In(pcol(tbl2, "a"), Seq(Literal(4))))), Set(part1, part2)) - checkAnswer(tbl2, Seq( - EqualTo(pcol(tbl2, "a"), Literal(1)), - EqualTo(pcol(tbl2, "b"), Literal("2"))), Set(part1)) - checkAnswer(tbl2, Seq( - EqualTo(pcol(tbl2, "a"), Literal(1)), - EqualTo(pcol(tbl2, "b"), Literal("x"))), Set.empty) + checkAnswer(tbl2, Seq('a.int <= 1), Set(part1)) + checkAnswer(tbl2, Seq('a.int === 2), Set.empty) + checkAnswer(tbl2, Seq(In('a.int * 10, Seq(30))), Set(part2)) + checkAnswer(tbl2, Seq(Not(In('a.int, Seq(4)))), Set(part1, part2)) + checkAnswer(tbl2, Seq('a.int === 1, 'b.string === "2"), Set(part1)) + checkAnswer(tbl2, Seq('a.int === 1 && 'b.string === "2"), Set(part1)) + checkAnswer(tbl2, Seq('a.int === 1, 'b.string === "x"), Set.empty) + checkAnswer(tbl2, Seq('a.int === 1 || 'b.string === "x"), Set(part1)) + + intercept[AnalysisException] { + try { + checkAnswer(tbl2, Seq('a.int > 0 && 'col1.int > 0), Set.empty) + } catch { + // HiveExternalCatalog may be the first one to notice and throw an exception, which will + // then be caught and converted into a RuntimeException with a descriptive message. + case ex: RuntimeException if ex.getMessage.contains("MetaException") => + throw new AnalysisException("") + } + } } test("drop partitions") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index cafc544e68482..bd54c043c6ec4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.types.StructType From d6bdcf480cbd1a4e3dff51bdd299e8eea6f4306d Mon Sep 17 00:00:00 2001 From: Adrian Ionescu Date: Mon, 3 Apr 2017 12:41:36 +0200 Subject: [PATCH 3/3] Address more small review remarks. --- .../sql/catalyst/catalog/ExternalCatalogSuite.scala | 10 ++++++---- .../org/apache/spark/sql/hive/client/HiveShim.scala | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 3824b4b19a708..42db4398e5072 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -443,8 +443,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val tz = TimeZone.getDefault.getID val catalog = newBasicCatalog() - def checkAnswer(table: CatalogTable, filters: Seq[Expression], - expected: Set[CatalogTablePartition]): Unit = { + def checkAnswer( + table: CatalogTable, filters: Seq[Expression], expected: Set[CatalogTablePartition]) + : Unit = { + assertResult(expected.map(_.spec)) { catalog.listPartitionsByFilter(table.database, table.identifier.identifier, filters, tz) .map(_.spec).toSet @@ -468,9 +470,9 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac checkAnswer(tbl2, Seq('a.int > 0 && 'col1.int > 0), Set.empty) } catch { // HiveExternalCatalog may be the first one to notice and throw an exception, which will - // then be caught and converted into a RuntimeException with a descriptive message. + // then be caught and converted to a RuntimeException with a descriptive message. case ex: RuntimeException if ex.getMessage.contains("MetaException") => - throw new AnalysisException("") + throw new AnalysisException(ex.getMessage) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index d55c41e5c9f29..2e35f39839488 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -584,7 +584,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { */ def convertFilters(table: Table, filters: Seq[Expression]): String = { // hive varchar is treated as catalyst string, but hive varchar can't be pushed down. - val varcharKeys = table.getPartitionKeys.asScala + lazy val varcharKeys = table.getPartitionKeys.asScala .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) || col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME)) .map(col => col.getName).toSet