Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-40416][SQL] Move subquery expression CheckAnalysis error messages to use the new error framework #37840

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,18 @@
],
"sqlState" : "42000"
},
"INVALID_SUBQUERY_EXPRESSION" : {
"message" : [
"Invalid subquery:"
],
"subClass" : {
"SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN" : {
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
"message" : [
"Scalar subquery must return only one column, but got <number>"
]
}
}
},
"MISSING_STATIC_PARTITION_COLUMN" : {
"message" : [
"Unknown static partition column: <columnName>"
Expand Down Expand Up @@ -695,6 +707,73 @@
}
}
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
"message" : [
"Invalid subquery expression:"
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
],
"subClass" : {
"ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
"message" : [
"Accessing outer query column is not allowed in this location<planString>"
]
},
"AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
"message" : [
"Found an aggregate function in a correlated predicate that has both outer and local references, which is not supported: <function>"
]
},
"CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
"message" : [
"Correlated column is not allowed in predicate<planString>"
]
},
"CORRELATED_COLUMN_NOT_FOUND" : {
"message" : [
"A correlated outer name reference within a subquery expression body was not found in the enclosing query: <value>"
]
},
"LATERAL_JOIN_CONDITION_NON_DETERMINISTIC" : {
"message" : [
"Lateral join condition cannot be non-deterministic: <condition>"
]
},
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries in the GROUP BY clause must also be in the aggregate expressions<planString>"
]
},
"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT" : {
"message" : [
"The output of a correlated scalar subquery must be aggregated"
]
},
"NON_CORRELATED_COLUMNS_IN_GROUP_BY" : {
"message" : [
"A GROUP BY clause in a scalar correlated subquery cannot contain non-correlated columns: <value>"
]
},
"NON_DETERMINISTIC_LATERAL_SUBQUERIES" : {
"message" : [
"Non-deterministic lateral subqueries are not supported when joining with outer relations that produce more than one row<planString>"
]
},
"UNSUPPORTED_CORRELATED_REFERENCE" : {
"message" : [
"Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses<planString>"
]
},
"UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY" : {
"message" : [
"Correlated scalar subqueries can only be used in filters, aggregations, projections, and UPDATE/MERGE/DELETE commands<planString>"
]
},
"UNSUPPORTED_IN_EXISTS_SUBQUERY" : {
"message" : [
"IN/EXISTS predicate subqueries can only be used in filters, joins, aggregations, window functions, projections, and UPDATE/MERGE/DELETE commands<planString>"
]
}
}
},
"UNTYPED_SCALA_UDF" : {
"message" : [
"You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ private[spark] object SparkThrowableHelper {
if (!parameterNames.isEmpty) {
g.writeObjectFieldStart("messageParameters")
(parameterNames zip e.getMessageParameters).foreach { case (name, value) =>
g.writeStringField(name, value)
// Skip parameters with the name "planString" in the output to keep results
// deterministic since these strings usually contain expression IDs which change from
// one test run to another.
if (name != "planString") {
g.writeStringField(name, value)
}
}
g.writeEndObject()
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkFunSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ abstract class SparkFunSuite
checkError(exception, errorClass, Some(errorSubClass), sqlState, parameters,
false, Array(context))

protected def checkErrorMatchPVals(
exception: SparkThrowable,
errorClass: String,
errorSubClass: String,
sqlState: Option[String],
parameters: Map[String, String],
context: QueryContext): Unit =
checkError(exception, errorClass, Some(errorSubClass), sqlState, parameters,
matchPVals = true, Array(context))

protected def checkError(
exception: SparkThrowable,
errorClass: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case a: AggregateExpression => a
})
if (aggregates.isEmpty) {
failAnalysis("The output of a correlated scalar subquery must be aggregated")
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT")
}

// SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
Expand All @@ -756,10 +758,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
failAnalysis(
"A GROUP BY clause in a scalar correlated subquery " +
"cannot contain non-correlated columns: " +
invalidCols.mkString(","))
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "NON_CORRELATED_COLUMNS_IN_GROUP_BY",
messageParameters = Map("value" -> invalidCols.map(_.name).mkString(",")))
}
}

Expand All @@ -784,7 +786,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case o: OuterReference =>
p.children.foreach(e =>
if (!e.output.exists(_.exprId == o.exprId)) {
failAnalysis("outer attribute not found")
o.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "CORRELATED_COLUMN_NOT_FOUND",
messageParameters = Map("value" -> o.name))
})
case _ =>
})
Expand All @@ -802,16 +807,22 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case ScalarSubquery(query, outerAttrs, _, _) =>
// Scalar subquery must return one column as output.
if (query.output.size != 1) {
failAnalysis(
s"Scalar subquery must return only one column, but got ${query.output.size}")
expr.failAnalysis(
errorClass = "INVALID_SUBQUERY_EXPRESSION",
errorSubClass = "SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN",
messageParameters = Map("number" -> query.output.size.toString))
}

if (outerAttrs.nonEmpty) {
cleanQueryInScalarSubquery(query) match {
case a: Aggregate => checkAggregateInScalarSubquery(outerAttrs, query, a)
case Filter(_, a: Aggregate) => checkAggregateInScalarSubquery(outerAttrs, query, a)
case p: LogicalPlan if p.maxRows.exists(_ <= 1) => // Ok
case fail => failAnalysis(s"Correlated scalar subqueries must be aggregated: $fail")
case other =>
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
planString = other.toString)
}

// Only certain operators are allowed to host subquery expression containing
Expand All @@ -823,12 +834,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// it must also be in the aggregate expressions to be rewritten in the optimization
// phase.
if (containsExpr(a.groupingExpressions) && !containsExpr(a.aggregateExpressions)) {
failAnalysis("Correlated scalar subqueries in the group by clause " +
s"must also be in the aggregate expressions:\n$a")
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
planString = a.toString)
}
case other => failAnalysis(
"Correlated scalar sub-queries can only be used in a " +
s"Filter/Aggregate/Project and a few commands: $plan")
case other =>
other.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_CORRELATED_SCALAR_SUBQUERY",
planString = other.toString)
}
}
// Validate to make sure the correlations appearing in the query are valid and
Expand All @@ -842,13 +857,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// cannot be decorrelated. Otherwise it may produce incorrect results.
if (!expr.deterministic && !join.left.maxRows.exists(_ <= 1)) {
expr.failAnalysis(
s"Non-deterministic lateral subqueries are not supported when joining with " +
s"outer relations that produce more than one row\n${expr.plan}")
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "NON_DETERMINISTIC_LATERAL_SUBQUERIES",
planString = plan.toString)
}
// Check if the lateral join's join condition is deterministic.
if (join.condition.exists(!_.deterministic)) {
join.failAnalysis(
s"Lateral join condition cannot be non-deterministic: ${join.condition.get.sql}")
join.condition.get.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC",
messageParameters = Map("condition" -> join.condition.get.sql))
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
Expand All @@ -859,8 +877,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case _: Filter | _: SupportsSubquery | _: Join |
_: Project | _: Aggregate | _: Window => // Ok
case _ =>
failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" +
s" Filter/Join/Project/Aggregate/Window and a few commands: $plan")
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_IN_EXISTS_SUBQUERY",
planString = plan.toString)
}
// Validate to make sure the correlations appearing in the query are valid and
// allowed by spark.
Expand Down Expand Up @@ -914,7 +934,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
expr.foreach {
case a: AggregateExpression if containsOuter(a) =>
if (a.references.nonEmpty) {
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql)
a.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
messageParameters = Map("function" -> a.sql))
}
case _ =>
}
Expand All @@ -923,7 +946,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
planString = p.toString)
}
}

Expand All @@ -942,9 +968,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
if (!canHostOuter(p) && p.expressions.exists(containsOuter)) {
failAnalysis(
"Expressions referencing the outer query are not supported outside of WHERE/HAVING " +
s"clauses:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "UNSUPPORTED_CORRELATED_REFERENCE",
planString = p.toString)
}
}

Expand Down Expand Up @@ -1007,8 +1034,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: LogicalPlan): Unit = {
if (predicates.nonEmpty) {
// Report a non-supported case as an exception
failAnalysis("Correlated column is not allowed in predicate " +
s"${predicates.map(_.sql).mkString}:\n$p")
p.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
planString = s"${predicates.map(_.sql).mkString}:\n$p")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.errors.QueryErrorsBase
import org.apache.spark.sql.internal.SQLConf

/**
* Provides a logical query plan [[Analyzer]] and supporting classes for performing analysis.
Expand All @@ -45,18 +46,55 @@ package object analysis {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition)
}

/** Fails the analysis at the point where a specific tree node was parsed. */
/** Fails the analysis at the point where a specific tree node was parsed with a given cause. */
def failAnalysis(msg: String, cause: Throwable): Nothing = {
throw new AnalysisException(msg, t.origin.line, t.origin.startPosition, cause = Some(cause))
}

/**
* Fails the analysis at the point where a specific tree node was parsed using a provided
* error class and message parameters.
*/
def failAnalysis(errorClass: String, messageParameters: Map[String, String]): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
messageParameters = messageParameters,
origin = t.origin)
}

/**
* Fails the analysis at the point where a specific tree node was parsed using a provided
* error class and subclass and message parameters.
*/
def failAnalysis(
errorClass: String,
errorSubClass: String,
messageParameters: Map[String, String] = Map.empty[String, String]): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
errorSubClass = errorSubClass,
messageParameters = messageParameters,
origin = t.origin)
}

/**
* Fails the analysis at the point where a specific tree node was parsed using a provided
* error class and subclass and one message parameter comprising a plan string. The plan string
* will be printed in the error message if and only if the corresponding Spark configuration is
* enabled.
*/
def failAnalysis(
errorClass: String,
errorSubClass: String,
planString: String): Nothing = {
throw new AnalysisException(
errorClass = errorClass,
errorSubClass = errorSubClass,
messageParameters =
Map("planString" -> (if (SQLConf.get.includePlansInErrors) s": $planString" else "")),
origin = t.origin)
}

def dataTypeMismatch(expr: Expression, mismatch: DataTypeMismatch): Nothing = {
throw new AnalysisException(
errorClass = "DATATYPE_MISMATCH",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ object SubExprUtils extends PredicateHelper {
def collectOutRefs(input: Expression): Unit = input match {
case a: AggregateExpression if containsOuter(a) =>
if (a.references.nonEmpty) {
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql)
throw QueryCompilationErrors.mixedRefsInAggFunc(a.sql, a.origin)
} else {
// Collect and update the sub-tree so that outer references inside this aggregate
// expression will not be collected. For example: min(outer(a)) -> min(a).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,10 +1563,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
new AnalysisException(s"'$operation' does not support partitioning")
}

def mixedRefsInAggFunc(funcStr: String): Throwable = {
val msg = "Found an aggregate function in a correlated predicate that has both " +
"outer and local references, which is not supported: " + funcStr
new AnalysisException(msg)
def mixedRefsInAggFunc(funcStr: String, origin: Origin): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY",
errorSubClass = "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
origin = origin,
messageParameters = Map("function" -> funcStr))
dtenedor marked this conversation as resolved.
Show resolved Hide resolved
}

def functionCannotProcessInputError(
Expand Down
Loading