Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operators to ignore list and update WindowExpr parser #890

Merged
merged 5 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,18 +451,22 @@ object SQLPlanParser extends Logging {
// Avoid counting duplicate nodes. We mark them as shouldRemove to neutralize their impact on
// speedups.
val isDupNode = reusedNodeIds.contains(node.id)
// Normalize the execName by removing the trailing '$' character, if present.
// This is necessary because in Scala, the '$' character is often appended to the names of
// generated classes or objects, and we want to match the base name regardless of this suffix.
val normalizedNodeName = node.name.stripSuffix("$")
if (isDupNode) {
// log that information. This should not cause significant increase in log size.
logDebug(s"Marking [sqlID = ${sqlID}, node = ${node.name}] as shouldRemove. " +
logDebug(s"Marking [sqlID = ${sqlID}, node = ${normalizedNodeName}] as shouldRemove. " +
s"Reason: duplicate - ancestor of ReusedExchange")
}
if (node.name.contains("WholeStageCodegen")) {
if (normalizedNodeName.contains("WholeStageCodegen")) {
// this is special because it is a SparkPlanGraphCluster vs SparkPlanGraphNode
WholeStageExecParser(node.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app,
reusedNodeIds).parse
} else {
val execInfos = try {
node.name match {
normalizedNodeName match {
case "AggregateInPandas" =>
AggregateInPandasExecParser(node, checker, sqlID).parse
case "ArrowEvalPython" =>
Expand All @@ -481,11 +485,6 @@ object SQLPlanParser extends Logging {
CoalesceExecParser(node, checker, sqlID).parse
case "CollectLimit" =>
CollectLimitExecParser(node, checker, sqlID).parse
case c if c.contains("CreateDataSourceTableAsSelectCommand") =>
// create data source table doesn't show the format so we can't determine
// if we support it
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = false, None)
case "CustomShuffleReader" | "AQEShuffleRead" =>
CustomShuffleReaderExecParser(node, checker, sqlID).parse
case "Exchange" =>
Expand Down Expand Up @@ -546,8 +545,8 @@ object SQLPlanParser extends Logging {
// Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as
// supported but with shouldRemove flag set to True.
// Setting the "shouldRemove" is handled at the end of the function.
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = reuseExecs.contains(node.name), None)
ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id,
isSupported = reuseExecs.contains(normalizedNodeName), None)
}
} catch {
// Error parsing expression could trigger an exception. If the exception is not handled,
Expand All @@ -558,9 +557,9 @@ object SQLPlanParser extends Logging {
// - No need to add the SQL to the failed SQLs, because this will cause the app to be
// labeled as "Not Applicable" which is not preferred at this point.
case NonFatal(e) =>
logWarning(s"Unexpected error parsing plan node ${node.name}. " +
logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " +
s" sqlID = ${sqlID}", e)
ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id,
ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id,
isSupported = false, None)
}
val stagesInNode = getStagesInSQLNode(node, app)
Expand Down Expand Up @@ -760,15 +759,17 @@ object SQLPlanParser extends Logging {
trim.replaceAll("""^\[+""", "").replaceAll("""\]+$""", "").
replaceAll("cast\\(", "").split("windowspecdefinition").map(_.trim)

// Get functionname from each array element except the last one as it doesn't contain
// Get function name from each array element except the last one as it doesn't contain
// any window function
for ( i <- 0 to windowExprs.size - 1 ) {
val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList
val expr = windowFunc.last
val functionName = getFunctionName(windowFunctionPattern, expr)
functionName match {
case Some(func) => parsedExpressions += func
case _ => // NO OP
if (windowExprs.nonEmpty) {
windowExprs.dropRight(1).foreach { windowExprString =>
val windowFunc = windowFunctionPattern.findAllIn(windowExprString).toList
val expr = windowFunc.lastOption.getOrElse("")
val functionName = getFunctionName(windowFunctionPattern, expr)
functionName match {
case Some(func) => parsedExpressions += func
case _ => // NO OP
}
}
}
parsedExpressions.distinct.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ object ExecHelper {
private val ExecuteRefreshTable = "Execute RefreshTable"
private val ExecuteRepairTableCommand = "Execute RepairTableCommand"
private val ExecuteShowPartitionsCommand = "Execute ShowPartitionsCommand"
private val ExecuteClearCacheCommand = "Execute ClearCacheCommand"
private val ExecuteOptimizeTableCommandEdge = "Execute OptimizeTableCommandEdge"
// DeltaLakeOperations
private val ExecUpdateCommandEdge = "Execute UpdateCommandEdge"
private val ExecDeleteCommandEdge = "Execute DeleteCommandEdge"
Expand Down Expand Up @@ -444,6 +446,8 @@ object ExecHelper {
ExecuteRefreshTable,
ExecuteRepairTableCommand,
ExecuteShowPartitionsCommand,
ExecuteClearCacheCommand,
ExecuteOptimizeTableCommandEdge,
SubqueryExecParser.execName
)

Expand Down