Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46532][CONNECT] Pass message parameters in metadata of ErrorInfo #44468

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
|""".stripMargin)
.collect()
}
assert(ex.getErrorClass != null)
assert(
ex.getErrorClass ===
"INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER")
assert(
ex.getMessageParameters.asScala == Map(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the changes, getMessageParameters() returned empty Map.

Copy link
Contributor

@heyihong heyihong Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is partially true, getMessageParameters() only returned empty Map when error enrichment is disabled or doesn't work

"datetime" -> "'02-29'",
"config" -> "\"spark.sql.legacy.timeParserPolicy\""))
if (enrichErrorEnabled) {
assert(ex.getCause.isInstanceOf[DateTimeException])
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,14 @@ private[client] object GrpcExceptionConverter {
.addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava)

if (errorClass != null) {
val messageParameters = JsonMethods
.parse(info.getMetadataOrDefault("messageParameters", "{}"))
.extract[Map[String, String]]
builder.setSparkThrowable(
FetchErrorDetailsResponse.SparkThrowable
.newBuilder()
.setErrorClass(errorClass)
.putAllMessageParameters(messageParameters.asJava)
.build())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,13 @@ object Connect {
.version("4.0.0")
.booleanConf
.createWithDefault(true)

val CONNECT_GRPC_MAX_METADATA_SIZE =
buildStaticConf("spark.connect.grpc.maxMetadataSize")
.doc(
"Sets the maximum size of metadata fields. For instance, it restricts metadata fields " +
"in `ErrorInfo`.")
.version("4.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(1024)
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ private[connect] object ErrorUtils extends Logging {
"classes",
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))

val maxMetadataSize = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MAX_METADATA_SIZE)
// Add the SQL State and Error Class to the response metadata of the ErrorInfoObject.
st match {
case e: SparkThrowable =>
Expand All @@ -181,7 +182,12 @@ private[connect] object ErrorUtils extends Logging {
}
val errorClass = e.getErrorClass
if (errorClass != null && errorClass.nonEmpty) {
errorInfo.putMetadata("errorClass", errorClass)
val messageParameters = JsonMethods.compact(
JsonMethods.render(map2jvalue(e.getMessageParameters.asScala.toMap)))
if (messageParameters.length <= maxMetadataSize) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I restricted the size because some errors includes huge parameters like entire string of nested exceptions (especially in legacy error classes).

errorInfo.putMetadata("errorClass", errorClass)
errorInfo.putMetadata("messageParameters", messageParameters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon please review this. I'm not sure if this is correct, as I don't see we pass the error context here (e.g. SQL string index).

Copy link
Member Author

@MaxGekk MaxGekk Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the error context you can restore an error but without message parameter it is impossible, and just passing the error class is useless.

Copy link
Contributor

@heyihong heyihong Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constructing exception messages based on error class and message parameters only works well for Scala client if I understand correctly? For Python client, it only recognizes a very limited set of error classes. https://github.com/databricks/runtime/blob/master/python/pyspark/errors/error_classes.py#L1114

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue for constructing exception messages based on error class and message parameters on the client side is the multi version support because for the same error classes in different versions of clients, constructing error messages may require different message parameters

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only works well for Scala client if I understand correctly? For Python client, it only recognizes a very limited set of error classes.

This only means that we should keep the python client in sync. Not more.

constructing error messages may require different message parameters

The issue might be solved by sending old and new parameters together but if you send nothing, you don't have any chance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR #44464, I require all spark exceptions must have an error class (cannot build from just a text message). Found a few test failures at the client connect while creating a SparkThrowable in GrpcExceptionConverter like at:

    errorConstructor(params =>
      new SparkNumberFormatException(
        params.errorClass.orNull,
        params.messageParameters,
        params.queryContext)),

SparkNumberFormatException's constructor fails because it cannot fill in message parameters since we don't transfer them (the map is empty).

Copy link
Member Author

@MaxGekk MaxGekk Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example https://github.com/MaxGekk/spark/runs/19914247939, the failed test ClientE2ETestSuite.cause exception - false:

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Expected exception org.apache.spark.SparkUpgradeException to be thrown, but org.apache.spark.SparkException was thrown
...
Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: [INTERNAL_ERROR] Undefined error message parameter for error class: 'INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER'. Parameters: Map() SQLSTATE: XX000
	at org.apache.spark.SparkException$.internalError(SparkException.scala:92)
	at org.apache.spark.SparkException$.internalError(SparkException.scala:96)

How should I create SparkUpgradeException if its constructor requires an error class and correct message parameters (otherwise it fails with the internal error)?

Copy link
Contributor

@heyihong heyihong Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue might be solved by sending old and new parameters together but if you send nothing, you don't have any chance.

BTW, maintaining backward compatibility of a ~7k LOC error-classes.json that is changing constantly can be a challenge...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me make a followup to match Python side for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
case _ =>
}
Expand All @@ -200,8 +206,10 @@ private[connect] object ErrorUtils extends Logging {
val withStackTrace =
if (sessionHolderOpt.exists(
_.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) {
val maxSize = SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize))
val maxSize = Math.min(
SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
maxMetadataSize)
errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt))
} else {
errorInfo
}
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests/connect/test_connect_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3452,7 +3452,7 @@ def test_error_stack_trace(self):
self.spark.stop()
spark = (
PySparkSession.builder.config(conf=self.conf())
.config("spark.connect.jvmStacktrace.maxSize", 128)
.config("spark.connect.grpc.maxMetadataSize", 128)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to restrict amount of metadata much more otherwise the test fails w/ a gRPC failure: client cannot receive big message.

.remote("local[4]")
.getOrCreate()
)
Expand Down