Skip to content

Commit

Permalink
[SPARK-40416][SQL] Move subquery expression CheckAnalysis error messa…
Browse files Browse the repository at this point in the history
…ges to use the new error framework

### What changes were proposed in this pull request?

Move subquery expression CheckAnalysis error messages to use the new error framework.

This will help improve the usability of Apache Spark as a product, and link against documentation.

### Why are the changes needed?

Error messages related to SQL query analysis in Apache Spark need some work to make them more descriptive and actionable.

### Does this PR introduce _any_ user-facing change?

Yes, error messages change.

### How was this patch tested?

Unit test and query test coverage shows the updates in error messages received.

Closes #37840 from dtenedor/subquery-error-classes.

Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
dtenedor authored and gengliangwang committed Sep 20, 2022
1 parent d86d646 commit f89b847
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 103 deletions.
79 changes: 79 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,18 @@
],
"sqlState" : "42000"
},
"INVALID_SUBQUERY_EXPRESSION" : {
"message" : [
"Invalid subquery:"
],
"subClass" : {
"SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN" : {
"message" : [
"Scalar subquery must return only one column, but got <number>"
]
}
}
},
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [
"Unknown static partition column: <columnName>"
Expand Down Expand Up @@ -727,6 +739,73 @@
}
}
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
"message" : [
"Unsupported subquery expression:"
],
"subClass" : {
"ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
"message" : [
"Accessing outer query column is not allowed in this location<treeNode>"
]
},
"AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
"message" : [
"Found an aggregate function in a correlated predicate that has both outer and local references, which is not supported: <function>"
]
},
"CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
"message" : [
"Correlated column is not allowed in predicate: <treeNode>"
]
},
"CORRELATED_COLUMN_NOT_FOUND" : {
"message" : [
"A correlated outer name reference within a subquery expression body was not found in the enclosing query: <value>"
]
},
"LATERAL_JOIN_CONDITION_NON_DETERMINISTIC" : {
"message" : [
"Lateral join condition cannot be non-deterministic: <condition>"
]
},
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries in the GROUP BY clause must also be in the aggregate expressions<treeNode>"
]
},
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT" : {
"message" : [
"The output of a correlated scalar subquery must be aggregated"
]
},
"NON_CORRELATED_COLUMNS_IN_GROUP_BY" : {
"message" : [
"A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns: <value>"
]
},
"NON_DETERMINISTIC_LATERAL_SUBQUERIES" : {
"message" : [
"Non-deterministic lateral subqueries are not supported when joining with outer relations that produce more than one row<treeNode>"
]
},
"UNSUPPORTED_CORRELATED_REFERENCE" : {
"message" : [
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses<treeNode>"
]
},
"UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries can only be used in filters, aggregations, projections, and UPDATE/MERGE/DELETE commands<treeNode>"
]
},
"UNSUPPORTED_IN_EXISTS_SUBQUERY" : {
"message" : [
"IN/EXISTS predicate subqueries can only be used in filters, joins, aggregations, window functions, projections, and UPDATE/MERGE/DELETE commands<treeNode>"
]
}
}
},
"UNTYPED_SCALA_UDF" : {
"message" : [
"You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:",
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,16 @@ abstract class SparkFunSuite
checkError(exception, errorClass, Some(errorSubClass), sqlState, parameters,
false, Array(context))

protected def checkErrorMatchPVals(
exception: SparkThrowable,
errorClass: String,
errorSubClass: String,
sqlState: Option[String],
parameters: Map[String, String],
context: QueryContext): Unit =
checkError(exception, errorClass, Some(errorSubClass), sqlState, parameters,
matchPVals = true, Array(context))

protected def checkError(
exception: SparkThrowable,
errorClass: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils

/**
* Throws user facing errors when passed invalid queries that fail to analyze.
Expand Down Expand Up @@ -220,7 +221,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case c: Cast if !c.resolved =>
failAnalysis(s"invalid cast from ${c.child.dataType.catalogString} to " +
c.dataType.catalogString)

case e: RuntimeReplaceable if !e.replacement.resolved =>
throw new IllegalStateException("Illegal RuntimeReplaceable: " + e +
"\nReplacement is unresolved: " + e.replacement)
Expand Down Expand Up @@ -726,6 +726,19 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}
}

private def scrubOutIds(string: String): String =
string.replaceAll("#\\d+", "#x")
.replaceAll("operator id = \\d+", "operator id = #x")

private def planToString(plan: LogicalPlan): String = {
if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString
}

private def exprsToString(exprs: Seq[Expression]): String = {
val result = exprs.map(_.toString).mkString("\n")
if (Utils.isTesting) scrubOutIds(result) else result
}

/**
* Validates subquery expressions in the plan. Upon failure, returns an user facing error.
*/
Expand All @@ -739,7 +752,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case a: AggregateExpression => a
})
if (aggregates.isEmpty) {
failAnalysis("The output of a correlated scalar subquery must be aggregated")
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT")
}

// SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
Expand All @@ -752,10 +767,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
failAnalysis(
"A GROUP BY clause in a scalar correlated subquery " +
"cannot contain non-correlated columns: " +
invalidCols.mkString(","))
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "NON_CORRELATED_COLUMNS_IN_GROUP_BY",
messageParameters = Map("value" -> invalidCols.map(_.name).mkString(",")))
}
}

Expand All @@ -780,7 +795,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case o: OuterReference =>
p.children.foreach(e =>
if (!e.output.exists(_.exprId == o.exprId)) {
failAnalysis("outer attribute not found")
o.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "CORRELATED_COLUMN_NOT_FOUND",
messageParameters = Map("value" -> o.name))
})
case _ =>
})
Expand All @@ -798,16 +816,22 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case ScalarSubquery(query, outerAttrs, _, _) =>
// Scalar subquery must return one column as output.
if (query.output.size != 1) {
failAnalysis(
s"Scalar subquery must return only one column, but got ${query.output.size}")
expr.failAnalysis(
errorClass = "INVALID_SUBQUERY_EXPRESSION",
errorSubClass = "SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN",
messageParameters = Map("number" -> query.output.size.toString))
}

if (outerAttrs.nonEmpty) {
cleanQueryInScalarSubquery(query) match {
case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, query, a)
case Filter(_, a: Aggregate) => checkAggregateInScalarSubquery(outerAttrs, query, a)
case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok
case fail => failAnalysis(s"Correlated scalar subqueries must be aggregated: $fail")
case other =>
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(other)))
}

// Only certain operators are allowed to host subquery expression containing
Expand All @@ -819,12 +843,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// it must also be in the aggregate expressions to be rewritten in the optimization
// phase.
if (containsExpr(a.groupingExpressions) && !containsExpr(a.aggregateExpressions)) {
failAnalysis("Correlated scalar subqueries in the group by clause " +
s"must also be in the aggregate expressions:\n$a")
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(a)))
}
case other => failAnalysis(
"Correlated scalar sub-queries can only be used in a " +
s"Filter/Aggregate/Project and a few commands: $plan")
case other =>
other.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(other)))
}
}
// Validate to make sure the correlations appearing in the query are valid and
Expand All @@ -838,13 +866,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// cannot be decorrelated. Otherwise it may produce incorrect results.
if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
expr.failAnalysis(
s"Non-deterministic lateral subqueries are not supported when joining with " +
s"outer relations that produce more than one row\n${expr.plan}")
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "NON_DETERMINISTIC_LATERAL_SUBQUERIES",
messageParameters = Map("treeNode" -> planToString(plan)))
}
// Check if the lateral join's join condition is deterministic.
if (join.condition.exists(!_.deterministic)) {
join.failAnalysis(
s"Lateral join condition cannot be non-deterministic: ${join.condition.get.sql}")
join.condition.get.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC",
messageParameters = Map("condition" -> join.condition.get.sql))
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
Expand All @@ -855,8 +886,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case _: Filter | _: SupportsSubquery | _: Join |
_: Project | _: Aggregate | _: Window => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter/Join/Project/Aggregate/Window and a few commands: $plan")
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_IN_EXISTS_SUBQUERY",
messageParameters = Map("treeNode" -> planToString(plan)))
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
Expand Down Expand Up @@ -910,7 +943,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
if (a.references.nonEmpty) {
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql)
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
messageParameters = Map("function" -> a.sql))
}
case _ =>
}
Expand All @@ -919,7 +955,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
messageParameters = Map("treeNode" -> planToString(p)))
}
}

Expand All @@ -938,9 +977,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!canHostOuter(p) && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_CORRELATED_REFERENCE",
messageParameters = Map("treeNode" -> planToString(p)))
}
}

Expand Down Expand Up @@ -1003,8 +1043,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
if (predicates.nonEmpty) {
// Report a non-supported case as an exception
failAnalysis("Correlated column is not allowed in predicate " +
s"${predicates.map(_.sql).mkString}:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
messageParameters =
Map("treeNode" -> s"${exprsToString(predicates)}\n${planToString(p)}"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.errors.QueryErrorsBase

Expand All @@ -45,18 +45,37 @@ package object analysis {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition)
}

/** Fails the analysis at the point where a specific tree node was parsed. */
/** Fails the analysis at the point where a specific tree node was parsed with a given cause. */
def failAnalysis(msg: String, cause: Throwable): Nothing = {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition, cause = Some(cause))
}

/**
* Fails the analysis at the point where a specific tree node was parsed using a provided
* error class and message parameters.
*/
def failAnalysis(errorClass: String, messageParameters: Map[String, String]): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
origin = t.origin)
}

/**
* Fails the analysis at the point where a specific tree node was parsed using a provided
* error class and subclass and message parameters.
*/
def failAnalysis(
errorClass: String,
errorSubClass: String,
messageParameters: Map[String, String] = Map.empty[String, String]): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
errorSubClass = errorSubClass,
messageParameters = messageParameters,
origin = t.origin)
}

def dataTypeMismatch(expr: Expression, mismatch: DataTypeMismatch): Nothing = {
throw new AnalysisException(
errorClass = "DATATYPE_MISMATCH",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ object SubExprUtils extends PredicateHelper {
def collectOutRefs(input: Expression): Unit = input match {
case a: AggregateExpression if containsOuter(a) =>
if (a.references.nonEmpty) {
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql)
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
} else {
// Collect and update the sub-tree so that outer references inside this aggregate
// expression will not be collected. For example: min(outer(a)) -> min(a).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1576,10 +1576,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
new AnalysisException(s"'$operation' does not support partitioning")
}

def mixedRefsInAggFunc(funcStr: String): Throwable = {
val msg = "Found an aggregate function in a correlated predicate that has both " +
"outer and local references, which is not supported: " + funcStr
new AnalysisException(msg)
def mixedRefsInAggFunc(funcStr: String, origin: Origin): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
origin = origin,
messageParameters = Map("function" -> funcStr))
}

def functionCannotProcessInputError(
Expand Down
Loading

0 comments on commit f89b847

Please sign in to comment.