From b4a4fc77647031cc5edbbe880db12137f2d51610 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 23 Jul 2015 00:47:19 +0800 Subject: [PATCH] revert a refactor --- .../sql/catalyst/analysis/CheckAnalysis.scala | 179 +++++++++--------- 1 file changed, 90 insertions(+), 89 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 05fd0a246fe6b..a02183dd3dd7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2 import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -47,96 +46,98 @@ trait CheckAnalysis { def checkAnalysis(plan: LogicalPlan): Unit = { // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. - plan.foreachUp { operator => - operator transformExpressionsUp { - case a: Attribute if !a.resolved => - val from = operator.inputSet.map(_.name).mkString(", ") - a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") - - case e: Expression if e.checkInputDataTypes().isFailure => - e.checkInputDataTypes() match { - case TypeCheckResult.TypeCheckFailure(message) => - e.failAnalysis( - s"cannot resolve '${e.prettyString}' due to data type mismatch: $message") - } - - case c: Cast if !c.resolved => - failAnalysis( - s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}") - - case WindowExpression(UnresolvedWindowFunction(name, _), _) => - failAnalysis( - s"Could not resolve window function '$name'. " + - "Note that, using window functions currently requires a HiveContext") - - case w @ WindowExpression(windowFunction, windowSpec) if windowSpec.validate.nonEmpty => - // The window spec is not valid. - val reason = windowSpec.validate.get - failAnalysis(s"Window specification $windowSpec is not valid because $reason") - } - - operator match { - case f: Filter if f.condition.dataType != BooleanType => - failAnalysis( - s"filter expression '${f.condition.prettyString}' " + - s"of type ${f.condition.dataType.simpleString} is not a boolean.") - - case Aggregate(groupingExprs, aggregateExprs, child) => - def checkValidAggregateExpression(expr: Expression): Unit = expr match { - case _: AggregateExpression => // OK - case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) => - failAnalysis( - s"expression '${e.prettyString}' is neither present in the group by, " + - s"nor is it an aggregate function. " + - "Add to group by or wrap in first() if you don't care which value you get.") - case e if groupingExprs.exists(_.semanticEquals(e)) => // OK - case e if e.references.isEmpty => // OK - case e => e.children.foreach(checkValidAggregateExpression) - } - - aggregateExprs.foreach(checkValidAggregateExpression) - - case _ => // Fallbacks to the following checks - } - - operator match { - case o if o.children.nonEmpty && o.missingInput.nonEmpty => - val missingAttributes = o.missingInput.mkString(",") - val input = o.inputSet.mkString(",") - - failAnalysis( - s"resolved attribute(s) $missingAttributes missing from $input " + - s"in operator ${operator.simpleString}") - - case p @ Project(exprs, _) if containsMultipleGenerators(exprs) => - failAnalysis( - s"""Only a single table generating function is allowed in a SELECT clause, found: - | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) - - // Special handling for cases when self-join introduce duplicate expression ids. - case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => - val conflictingAttributes = left.outputSet.intersect(right.outputSet) - failAnalysis( - s""" - |Failure when resolving conflicting references in Join: - |$plan - |Conflicting attributes: ${conflictingAttributes.mkString(",")} - |""".stripMargin) - - case o if !o.resolved => - failAnalysis( - s"unresolved operator ${operator.simpleString}") - - case o if o.expressions.exists(!_.deterministic) && - !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] => - failAnalysis( - s"""nondeterministic expressions are only allowed in Project or Filter, found: - | ${o.expressions.map(_.prettyString).mkString(",")} - |in operator ${operator.simpleString} + plan.foreachUp { + + case operator: LogicalPlan => + operator transformExpressionsUp { + case a: Attribute if !a.resolved => + val from = operator.inputSet.map(_.name).mkString(", ") + a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") + + case e: Expression if e.checkInputDataTypes().isFailure => + e.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckFailure(message) => + e.failAnalysis( + s"cannot resolve '${e.prettyString}' due to data type mismatch: $message") + } + + case c: Cast if !c.resolved => + failAnalysis( + s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}") + + case WindowExpression(UnresolvedWindowFunction(name, _), _) => + failAnalysis( + s"Could not resolve window function '$name'. " + + "Note that, using window functions currently requires a HiveContext") + + case w @ WindowExpression(windowFunction, windowSpec) if windowSpec.validate.nonEmpty => + // The window spec is not valid. + val reason = windowSpec.validate.get + failAnalysis(s"Window specification $windowSpec is not valid because $reason") + } + + operator match { + case f: Filter if f.condition.dataType != BooleanType => + failAnalysis( + s"filter expression '${f.condition.prettyString}' " + + s"of type ${f.condition.dataType.simpleString} is not a boolean.") + + case Aggregate(groupingExprs, aggregateExprs, child) => + def checkValidAggregateExpression(expr: Expression): Unit = expr match { + case _: AggregateExpression => // OK + case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) => + failAnalysis( + s"expression '${e.prettyString}' is neither present in the group by, " + + s"nor is it an aggregate function. " + + "Add to group by or wrap in first() if you don't care which value you get.") + case e if groupingExprs.exists(_.semanticEquals(e)) => // OK + case e if e.references.isEmpty => // OK + case e => e.children.foreach(checkValidAggregateExpression) + } + + aggregateExprs.foreach(checkValidAggregateExpression) + + case _ => // Fallbacks to the following checks + } + + operator match { + case o if o.children.nonEmpty && o.missingInput.nonEmpty => + val missingAttributes = o.missingInput.mkString(",") + val input = o.inputSet.mkString(",") + + failAnalysis( + s"resolved attribute(s) $missingAttributes missing from $input " + + s"in operator ${operator.simpleString}") + + case p @ Project(exprs, _) if containsMultipleGenerators(exprs) => + failAnalysis( + s"""Only a single table generating function is allowed in a SELECT clause, found: + | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) + + // Special handling for cases when self-join introduce duplicate expression ids. + case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => + val conflictingAttributes = left.outputSet.intersect(right.outputSet) + failAnalysis( + s""" + |Failure when resolving conflicting references in Join: + |$plan + |Conflicting attributes: ${conflictingAttributes.mkString(",")} + |""".stripMargin) + + case o if !o.resolved => + failAnalysis( + s"unresolved operator ${operator.simpleString}") + + case o if o.expressions.exists(!_.deterministic) && + !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] => + failAnalysis( + s"""nondeterministic expressions are only allowed in Project or Filter, found: + | ${o.expressions.map(_.prettyString).mkString(",")} + |in operator ${operator.simpleString} """.stripMargin) - case _ => // Analysis successful! - } + case _ => // Analysis successful! + } } extendedCheckRules.foreach(_(plan)) }