From beaed19ed45386fdb3c87ef9098e5fd722235759 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Sun, 31 Mar 2024 15:52:58 -0700 Subject: [PATCH 1/4] Add operators to ignore list and update WindowExpr parser Signed-off-by: Niranjan Artal --- .../tool/planparser/SQLPlanParser.scala | 22 ++++++++++++------- .../spark/sql/rapids/tool/ToolUtils.scala | 10 ++++++++- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index daafe0ddc..2b8fc710f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -760,15 +760,21 @@ object SQLPlanParser extends Logging { trim.replaceAll("""^\[+""", "").replaceAll("""\]+$""", ""). replaceAll("cast\\(", "").split("windowspecdefinition").map(_.trim) - // Get functionname from each array element except the last one as it doesn't contain + // Get function name from each array element except the last one as it doesn't contain // any window function - for ( i <- 0 to windowExprs.size - 1 ) { - val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList - val expr = windowFunc.last - val functionName = getFunctionName(windowFunctionPattern, expr) - functionName match { - case Some(func) => parsedExpressions += func - case _ => // NO OP + if (windowExprs.nonEmpty) { + // Iterate through each expression in windowExprs + for (expr <- windowExprs) { + val windowFunc = windowFunctionPattern.findAllIn(expr).toList + // Check to ensure windowFunc is not empty before accessing last element + if (windowFunc.nonEmpty) { + val expr = windowFunc.last + val functionName = getFunctionName(windowFunctionPattern, expr) + functionName match { + case Some(func) => parsedExpressions += func + case _ => // NO OP + } + } } } parsedExpressions.distinct.toArray diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 7d571df64..3d1cde8e7 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -415,6 +415,8 @@ object ExecHelper { private val ExecuteRefreshTable = "Execute RefreshTable" private val ExecuteRepairTableCommand = "Execute RepairTableCommand" private val ExecuteShowPartitionsCommand = "Execute ShowPartitionsCommand" + private val ExecuteClearCacheCommand = "Execute ClearCacheCommand" + private val ExecuteOptimizeTableCommandEdge = "Execute OptimizeTableCommandEdge" // DeltaLakeOperations private val ExecUpdateCommandEdge = "Execute UpdateCommandEdge" private val ExecDeleteCommandEdge = "Execute DeleteCommandEdge" @@ -444,11 +446,17 @@ object ExecHelper { ExecuteRefreshTable, ExecuteRepairTableCommand, ExecuteShowPartitionsCommand, + ExecuteClearCacheCommand, + ExecuteOptimizeTableCommandEdge, SubqueryExecParser.execName ) def shouldIgnore(execName: String): Boolean = { - getAllIgnoreExecs.contains(execName) + // Normalize the execName by removing the trailing '$' character, if present. + // This is necessary because in Scala, the '$' character is often appended to the names of + // generated classes or objects, and we want to match the base name regardless of this suffix. + val normalizedExecName = execName.stripSuffix("$") + getAllIgnoreExecs.contains(normalizedExecName) } } From 7173ab9ddf6751d63f409640f84993af05c394d9 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 1 Apr 2024 10:31:14 -0700 Subject: [PATCH 2/4] addressed review comments Signed-off-by: Niranjan Artal --- .../tool/planparser/SQLPlanParser.scala | 28 +++++++++---------- .../spark/sql/rapids/tool/ToolUtils.scala | 6 +--- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 2b8fc710f..75085b24e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -451,18 +451,22 @@ object SQLPlanParser extends Logging { // Avoid counting duplicate nodes. We mark them as shouldRemove to neutralize their impact on // speedups. val isDupNode = reusedNodeIds.contains(node.id) + // Normalize the execName by removing the trailing '$' character, if present. + // This is necessary because in Scala, the '$' character is often appended to the names of + // generated classes or objects, and we want to match the base name regardless of this suffix. + val normalizedNodeName = node.name.stripSuffix("$") if (isDupNode) { // log that information. This should not cause significant increase in log size. - logDebug(s"Marking [sqlID = ${sqlID}, node = ${node.name}] as shouldRemove. " + + logDebug(s"Marking [sqlID = ${sqlID}, node = ${normalizedNodeName}] as shouldRemove. " + s"Reason: duplicate - ancestor of ReusedExchange") } - if (node.name.contains("WholeStageCodegen")) { + if (normalizedNodeName.contains("WholeStageCodegen")) { // this is special because it is a SparkPlanGraphCluster vs SparkPlanGraphNode WholeStageExecParser(node.asInstanceOf[SparkPlanGraphCluster], checker, sqlID, app, reusedNodeIds).parse } else { val execInfos = try { - node.name match { + normalizedNodeName match { case "AggregateInPandas" => AggregateInPandasExecParser(node, checker, sqlID).parse case "ArrowEvalPython" => @@ -484,7 +488,7 @@ object SQLPlanParser extends Logging { case c if c.contains("CreateDataSourceTableAsSelectCommand") => // create data source table doesn't show the format so we can't determine // if we support it - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, isSupported = false, None) case "CustomShuffleReader" | "AQEShuffleRead" => CustomShuffleReaderExecParser(node, checker, sqlID).parse @@ -546,8 +550,8 @@ object SQLPlanParser extends Logging { // Execs that are members of reuseExecs (i.e., ReusedExchange) should be marked as // supported but with shouldRemove flag set to True. // Setting the "shouldRemove" is handled at the end of the function. - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, - isSupported = reuseExecs.contains(node.name), None) + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, + isSupported = reuseExecs.contains(normalizedNodeName), None) } } catch { // Error parsing expression could trigger an exception. If the exception is not handled, @@ -558,9 +562,9 @@ object SQLPlanParser extends Logging { // - No need to add the SQL to the failed SQLs, because this will cause the app to be // labeled as "Not Applicable" which is not preferred at this point. case NonFatal(e) => - logWarning(s"Unexpected error parsing plan node ${node.name}. " + + logWarning(s"Unexpected error parsing plan node ${normalizedNodeName}. " + s" sqlID = ${sqlID}", e) - ExecInfo(node, sqlID, node.name, expr = "", 1, duration = None, node.id, + ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, isSupported = false, None) } val stagesInNode = getStagesInSQLNode(node, app) @@ -763,11 +767,8 @@ object SQLPlanParser extends Logging { // Get function name from each array element except the last one as it doesn't contain // any window function if (windowExprs.nonEmpty) { - // Iterate through each expression in windowExprs - for (expr <- windowExprs) { - val windowFunc = windowFunctionPattern.findAllIn(expr).toList - // Check to ensure windowFunc is not empty before accessing last element - if (windowFunc.nonEmpty) { + for ( i <- 0 to windowExprs.size - 1 ) { + val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList val expr = windowFunc.last val functionName = getFunctionName(windowFunctionPattern, expr) functionName match { @@ -776,7 +777,6 @@ object SQLPlanParser extends Logging { } } } - } parsedExpressions.distinct.toArray } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 34a5cc31a..1b44f8468 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -452,11 +452,7 @@ object ExecHelper { ) def shouldIgnore(execName: String): Boolean = { - // Normalize the execName by removing the trailing '$' character, if present. - // This is necessary because in Scala, the '$' character is often appended to the names of - // generated classes or objects, and we want to match the base name regardless of this suffix. - val normalizedExecName = execName.stripSuffix("$") - getAllIgnoreExecs.contains(normalizedExecName) + getAllIgnoreExecs.contains(execName) } } From e445a937398d7a89e739c5f6f0238aeff998f450 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 1 Apr 2024 14:52:50 -0700 Subject: [PATCH 3/4] addressed review comments --- .../spark/rapids/tool/planparser/SQLPlanParser.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index 75085b24e..d1080e6a4 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -485,11 +485,6 @@ object SQLPlanParser extends Logging { CoalesceExecParser(node, checker, sqlID).parse case "CollectLimit" => CollectLimitExecParser(node, checker, sqlID).parse - case c if c.contains("CreateDataSourceTableAsSelectCommand") => - // create data source table doesn't show the format so we can't determine - // if we support it - ExecInfo(node, sqlID, normalizedNodeName, expr = "", 1, duration = None, node.id, - isSupported = false, None) case "CustomShuffleReader" | "AQEShuffleRead" => CustomShuffleReaderExecParser(node, checker, sqlID).parse case "Exchange" => @@ -769,7 +764,7 @@ object SQLPlanParser extends Logging { if (windowExprs.nonEmpty) { for ( i <- 0 to windowExprs.size - 1 ) { val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList - val expr = windowFunc.last + val expr = windowFunc.lastOption.getOrElse("") val functionName = getFunctionName(windowFunctionPattern, expr) functionName match { case Some(func) => parsedExpressions += func From 1f51b14af1806ab207bc85035d68b60e5e15b9ca Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 1 Apr 2024 15:09:01 -0700 Subject: [PATCH 4/4] addressed review comments --- .../rapids/tool/planparser/SQLPlanParser.scala | 16 ++++++++-------- .../apache/spark/sql/rapids/tool/ToolUtils.scala | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala index d1080e6a4..82817559c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/planparser/SQLPlanParser.scala @@ -762,16 +762,16 @@ object SQLPlanParser extends Logging { // Get function name from each array element except the last one as it doesn't contain // any window function if (windowExprs.nonEmpty) { - for ( i <- 0 to windowExprs.size - 1 ) { - val windowFunc = windowFunctionPattern.findAllIn(windowExprs(i)).toList - val expr = windowFunc.lastOption.getOrElse("") - val functionName = getFunctionName(windowFunctionPattern, expr) - functionName match { - case Some(func) => parsedExpressions += func - case _ => // NO OP - } + windowExprs.dropRight(1).foreach { windowExprString => + val windowFunc = windowFunctionPattern.findAllIn(windowExprString).toList + val expr = windowFunc.lastOption.getOrElse("") + val functionName = getFunctionName(windowFunctionPattern, expr) + functionName match { + case Some(func) => parsedExpressions += func + case _ => // NO OP } } + } parsedExpressions.distinct.toArray } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 1b44f8468..279d7b04a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -452,7 +452,7 @@ object ExecHelper { ) def shouldIgnore(execName: String): Boolean = { - getAllIgnoreExecs.contains(execName) + getAllIgnoreExecs.contains(execName) } }