From e51774f1eb73ddcb820619addd8be18894bba5e6 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 15 Jun 2016 03:52:04 +0900 Subject: [PATCH] Use subqueries eliminated plan to compare. --- .../sql/catalyst/plans/logical/LocalRelation.scala | 11 +++++++---- .../org/apache/spark/sql/execution/ExistingRDD.scala | 10 ++++++---- .../sql/execution/datasources/LogicalRelation.scala | 10 ++++++---- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 4 ++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index e3e7a11dba973..89981d601c2f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, analysis} import org.apache.spark.sql.types.{StructField, StructType} @@ -56,10 +57,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) override protected def stringArgs = Iterator(output) - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LocalRelation(otherOutput, otherData) => - otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + EliminateSubQueries(plan) match { + case LocalRelation(otherOutput, otherData) => + otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data + case _ => false + } } override lazy val statistics = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ea5a9afe03b00..dead63f2e599e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.sources.{HadoopFsRelation, BaseRelation} @@ -79,9 +79,11 @@ private[sql] case class LogicalRDD( override def newInstance(): LogicalRDD.this.type = LogicalRDD(output.map(_.newInstance()), rdd)(sqlContext).asInstanceOf[this.type] - override def sameResult(plan: LogicalPlan): Boolean = plan match { - case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id - case _ => false + override def sameResult(plan: LogicalPlan): Boolean = { + EliminateSubQueries(plan) match { + case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id + case _ => false + } } @transient override lazy val statistics: Statistics = Statistics( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 219dae88e515d..fd299a21011ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, MultiInstanceRelation} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation @@ -57,9 +57,11 @@ case class LogicalRelation( com.google.common.base.Objects.hashCode(relation, output) } - override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match { - case LogicalRelation(otherRelation, _) => relation == otherRelation - case _ => false + override def sameResult(otherPlan: LogicalPlan): Boolean = { + EliminateSubQueries(otherPlan) match { + case LogicalRelation(otherRelation, _) => relation == otherRelation + case _ => false + } } // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c59e09f1b36d0..0562e3303de45 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata._ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} +import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical @@ -832,7 +832,7 @@ private[hive] case class MetastoreRelation /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { - plan match { + EliminateSubQueries(plan) match { case mr: MetastoreRelation => mr.databaseName == databaseName && mr.tableName == tableName case _ => false