diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala index 1efd2e0e2..e2c25a5f8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSQLPlanAnalyzer.scala @@ -16,39 +16,49 @@ package com.nvidia.spark.rapids.tool.analysis -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{AbstractSet, ArrayBuffer, HashMap, LinkedHashSet} import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser -import com.nvidia.spark.rapids.tool.profiling.{SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.profiling.{DataSourceCase, SQLAccumProfileResults, SQLMetricInfoCase, SQLStageInfoProfileResult, UnsupportedSQLPlan, WholeStageCodeGenResults} +import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.execution.ui.SparkPlanGraph -import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SQLMetricsStats, SqlPlanInfoGraphBuffer, ToolUtils} +import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode} +import org.apache.spark.sql.rapids.tool.{AppBase, RDDCheckHelper, SQLMetricsStats, SqlPlanInfoGraphBuffer, SqlPlanInfoGraphEntry} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo +import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo import org.apache.spark.sql.rapids.tool.util.ToolsPlanGraph /** - * This class processes SQL plan to build some information such as: metrics, wholestage nodes, and - * connecting operators to nodes. The implementation used to be directly under profiler's + * This class processes SQL plan to build some information such as: metrics, wholeStage nodes, and + * connecting operators to nodes. The implementation used to be directly under Profiler's * ApplicationInfo class. Moving the logic and the data structure to this new class is part of * refactor to have a SQLPlan processor that can produce the same analysis for both the Prof/Qual * tools. - * TODO: 1- Make the processor accepts appBase instead of applicationInfo. The tricky part here - * that Qual has its own way of reporting Problematic SQLs and identifying RDDs. - * 2- Restructure the implementation similar to AppSparkMetricsAnalysis to separate between - * analysis and views. - * @param app the Application into objects that contains the SQL plans to be processed + * Calling processSQLPlanMetrics() has a side effect on the app object: + * 1- it updates dataSourceInfo with V2 and V1 data sources + * 2- it updates sqlIDtoProblematic the map between SQL ID and potential problems + * 3- it updates sqlIdToInfo.DsOrRdd as boolean to indicate whether a sql is an RDD/DS or not + * + * @param app the Application info objects that contains the SQL plans to be processed */ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(app) { - private val sqlPlanNodeIdToStageIds: mutable.HashMap[(Long, Long), Set[Int]] = - mutable.HashMap.empty[(Long, Long), Set[Int]] + // A map between (SQL ID, Node ID) and the set of stage IDs + // TODO: The Qualification should use this map instead of building a new set for each exec. + private val sqlPlanNodeIdToStageIds: HashMap[(Long, Long), Set[Int]] = + HashMap.empty[(Long, Long), Set[Int]] var wholeStage: ArrayBuffer[WholeStageCodeGenResults] = ArrayBuffer[WholeStageCodeGenResults]() + // A list of UnsupportedSQLPlan that contains the SQL ID, node ID, node name. + // TODO: for now, unsupportedSQLPlan is kept here for now to match the legacy Profiler's + // output but we need to revisit this in the future to see if we can move it to a + // different place or fix any inconsistent issues between this implementation and + // SQLPlanParser. var unsupportedSQLPlan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]() var allSQLMetrics: ArrayBuffer[SQLMetricInfoCase] = ArrayBuffer[SQLMetricInfoCase]() /** - * Connects Operators to Stages using AccumulatorIDs + * Connects Operators to Stages using AccumulatorIDs. + * TODO: This function can be fused in the visitNode function to avoid the extra iteration. * @param cb function that creates a SparkPlanGraph. This can be used as a cacheHolder for the * object created to be used later. */ @@ -65,6 +75,135 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } } + /** + * Update the potential problems in the app object. This is a common function that is used by + * Both Qual/Prof analysis. + * For Qual apps, the app.sqlIDtoProblematic won't be set because it is done later during the + * aggregation phase. + * @param sqlId the SQL ID being analyzed + * @param potentialProblems a set of strings that represent the potential problems found in the + * SQL plan. + */ + private def updateAppPotentialProblems(sqlId: Long, + potentialProblems: AbstractSet[String]): Unit = { + // Append problematic issues to the global variable for that SqlID + val existingIssues = app.sqlIDtoProblematic.getOrElse(sqlId, LinkedHashSet[String]()) + app.sqlIDtoProblematic(sqlId) = existingIssues ++ potentialProblems + } + + // A visitor context to hold the state of the SQL plan visitor. + // The fields are updated and modified by the visitNode function. + // sqlIsDsOrRDD is initialized to False, and it is set only once to True when a node is detected + // as RDD or DS. + protected case class SQLPlanVisitorContext( + sqlPIGEntry: SqlPlanInfoGraphEntry, + sqlDataSources: ArrayBuffer[DataSourceCase] = ArrayBuffer[DataSourceCase](), + potentialProblems: LinkedHashSet[String] = LinkedHashSet[String](), + var sqlIsDsOrRDD: Boolean = false) + + /** + * The method is called for each node in the SparkGraph plan. + * It visits the node to extract the following information + * 1- the metrics; + * 2- potential problems. + * 3- data sources + * + * It updates the following fields defined in AppSQLPlanAnalyzer: + * 1- allSQLMetrics: a list of SQLMetricInfoCase + * 2- wholeStage: a list of WholeStageCodeGenResults + * 3- unsupportedSQLPlan: a list of UnsupportedSQLPlan that contains the SQL ID, node ID, + * node name. + * TODO: Consider handling the construction of this list in a different way for the + * Qualification + * 4- sqlPlanNodeIdToStageIds: A map between (SQL ID, Node ID) and the set of stage IDs + * + * It has the following effect on the visitor object: + * 1- It updates the sqlIsDsOrRDD argument to True when the visited node is an RDD or Dataset. + * 2- If the SQLID is an RDD, the potentialProblems is cleared because once SQL is marked as RDD, + * all the other problems are ignored. Note that we need to set that flag only once to True + * for the given SQLID. + * 3- It appends the current node's potential problems to the SQLID problems only if the SQL is + * visitor.sqlIsDsOrRDD is False. Otherwise, it is kind of redundant to keep checking for + * potential problems for every node when they get to be ignored. + * + * It has the following effect on the app object: + * 1- it updates dataSourceInfo with V2 and V1 data sources + * 2- it updates sqlIDtoProblematic the map between SQL ID and potential problems + * + * @param visitor the visitor context defined per SQLPlan + * @param node the node being currently visited. + */ + protected def visitNode(visitor: SQLPlanVisitorContext, + node: SparkPlanGraphNode): Unit = { + node match { + case cluster: SparkPlanGraphCluster => + val ch = cluster.nodes + ch.foreach { c => + wholeStage += WholeStageCodeGenResults( + appIndex, visitor.sqlPIGEntry.sqlID, node.id, node.name, c.name, c.id) + } + case _ => + } + // get V2 dataSources for that node + val nodeV2Reads = app.checkGraphNodeForReads(visitor.sqlPIGEntry.sqlID, node) + if (nodeV2Reads.isDefined) { + visitor.sqlDataSources += nodeV2Reads.get + } + + val nodeIsDsOrRDD = RDDCheckHelper.isDatasetOrRDDPlan(node.name, node.desc).isRDD + if (nodeIsDsOrRDD) { + // we want to report every node that is an RDD + val thisPlan = UnsupportedSQLPlan(visitor.sqlPIGEntry.sqlID, node.id, node.name, node.desc, + "Contains Dataset or RDD") + unsupportedSQLPlan += thisPlan + // If one node is RDD, the Sql should be set too + if (!visitor.sqlIsDsOrRDD) { // We need to set the flag only once for the given sqlID + visitor.sqlIsDsOrRDD = true + app.sqlIdToInfo.get(visitor.sqlPIGEntry.sqlID).foreach { sql => + sql.setDsOrRdd(visitor.sqlIsDsOrRDD) + app.sqlIDToDataSetOrRDDCase += visitor.sqlPIGEntry.sqlID + // Clear the potential problems since it is an RDD to free memory + visitor.potentialProblems.clear() + } + } + } + if (!visitor.sqlIsDsOrRDD) { + // Append current node's potential problems to the Sql problems only if the SQL is not an + // RDD. This is an optimization since the potentialProblems won't be used any more. + visitor.potentialProblems ++= app.findPotentialIssues(node.desc) + } + // Then process SQL plan metric type + for (metric <- node.metrics) { + val stages = + sqlPlanNodeIdToStageIds.getOrElse((visitor.sqlPIGEntry.sqlID, node.id), Set.empty) + val allMetric = SQLMetricInfoCase(visitor.sqlPIGEntry.sqlID, metric.name, + metric.accumulatorId, metric.metricType, node.id, + node.name, node.desc, stages) + + allSQLMetrics += allMetric + if (app.sqlPlanMetricsAdaptive.nonEmpty) { + val adaptive = app.sqlPlanMetricsAdaptive.filter { adaptiveMetric => + adaptiveMetric.sqlID == visitor.sqlPIGEntry.sqlID && + adaptiveMetric.accumulatorId == metric.accumulatorId + } + adaptive.foreach { adaptiveMetric => + val allMetric = SQLMetricInfoCase(visitor.sqlPIGEntry.sqlID, adaptiveMetric.name, + adaptiveMetric.accumulatorId, adaptiveMetric.metricType, node.id, + node.name, node.desc, stages) + // could make this more efficient but seems ok for now + val exists = allSQLMetrics.filter { a => + ((a.accumulatorId == adaptiveMetric.accumulatorId) + && (a.sqlID == visitor.sqlPIGEntry.sqlID) + && (a.nodeID == node.id && adaptiveMetric.metricType == a.metricType)) + } + if (exists.isEmpty) { + allSQLMetrics += allMetric + } + } + } + } + } + /** * Function to process SQL Plan Metrics after all events are processed */ @@ -77,100 +216,28 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } connectOperatorToStage(createGraphFunc) for (sqlPIGEntry <- sqlPlanInfoBuffer.sqlPlanInfoGraphs) { - var sqlIsDsOrRDD = false - val potentialProblems = collection.mutable.Set[String]() - // store all datasources of the given SQL in a variable so that we won't have to iterate + // store all dataSources of the given SQL in a variable so that we won't have to iterate // through the entire list // get V1 dataSources for that SQLId - val sqlDataSources = app.checkMetadataForReadSchema(sqlPIGEntry) + val visitorContext = SQLPlanVisitorContext(sqlPIGEntry, + app.checkMetadataForReadSchema(sqlPIGEntry)) for (node <- sqlPIGEntry.sparkPlanGraph.allNodes) { - var nodeIsDsOrRDD = false - if (node.isInstanceOf[org.apache.spark.sql.execution.ui.SparkPlanGraphCluster]) { - val ch = node.asInstanceOf[org.apache.spark.sql.execution.ui.SparkPlanGraphCluster].nodes - ch.foreach { c => - wholeStage += WholeStageCodeGenResults( - appIndex, sqlPIGEntry.sqlID, node.id, node.name, c.name, c.id) - } - } - // get V2 dataSources for that node - val nodeV2Reads = app.checkGraphNodeForReads(sqlPIGEntry.sqlID, node) - if (nodeV2Reads.isDefined) { - sqlDataSources += nodeV2Reads.get - } - nodeIsDsOrRDD = RDDCheckHelper.isDatasetOrRDDPlan(node.name, node.desc).isRDD - if (nodeIsDsOrRDD) { - if (app.gpuMode) { // we want to report every node that is an RDD - val thisPlan = UnsupportedSQLPlan(sqlPIGEntry.sqlID, node.id, node.name, node.desc, - "Contains Dataset or RDD") - unsupportedSQLPlan += thisPlan - } - // If one node is RDD, the Sql should be set too - if (!sqlIsDsOrRDD) { // We need to set the flag only once for the given sqlID - sqlIsDsOrRDD = true - app.sqlIdToInfo.get(sqlPIGEntry.sqlID).foreach { sql => - sql.setDsOrRdd(sqlIsDsOrRDD) - app.sqlIDToDataSetOrRDDCase += sqlPIGEntry.sqlID - // Clear the potential problems since it is an RDD to free memory - potentialProblems.clear() - } - } - } - if (!sqlIsDsOrRDD) { - // Append current node's potential problems to the Sql problems only if the SQL is not an - // RDD. This is an optimization since the potentialProblems won't be used any more. - potentialProblems ++= app.findPotentialIssues(node.desc) - } - // Then process SQL plan metric type - for (metric <- node.metrics) { - val stages = - sqlPlanNodeIdToStageIds.get((sqlPIGEntry.sqlID, node.id)).getOrElse(Set.empty) - val allMetric = SQLMetricInfoCase(sqlPIGEntry.sqlID, metric.name, - metric.accumulatorId, metric.metricType, node.id, - node.name, node.desc, stages) - - allSQLMetrics += allMetric - if (app.sqlPlanMetricsAdaptive.nonEmpty) { - val adaptive = app.sqlPlanMetricsAdaptive.filter { adaptiveMetric => - adaptiveMetric.sqlID == sqlPIGEntry.sqlID && - adaptiveMetric.accumulatorId == metric.accumulatorId - } - adaptive.foreach { adaptiveMetric => - val allMetric = SQLMetricInfoCase(sqlPIGEntry.sqlID, adaptiveMetric.name, - adaptiveMetric.accumulatorId, adaptiveMetric.metricType, node.id, - node.name, node.desc, stages) - // could make this more efficient but seems ok for now - val exists = allSQLMetrics.filter { a => - ((a.accumulatorId == adaptiveMetric.accumulatorId) && (a.sqlID == sqlPIGEntry.sqlID) - && (a.nodeID == node.id && adaptiveMetric.metricType == a.metricType)) - } - if (exists.isEmpty) { - allSQLMetrics += allMetric - } - } - } - } + visitNode(visitorContext, node) } - if (app.isInstanceOf[ApplicationInfo]) { - // TODO: We should clean this in better way so that sqlItoProblematic is handled similar - // way in both Qual/Prof tools. - // This is a hack to get the processSQLPlanMetrics() method to work for both Qual/Prof - // - we check if the app is AppInfo, then we add the potential problems. - // - If not, then we do not set the problematic issues because this will cause - // records to be duplicated in the Qualification tool. - // Check if readsSchema is complex for the given sql + if (visitorContext.sqlDataSources.nonEmpty) { val sqlNestedComplexTypes = - AppBase.parseReadSchemaForNestedTypes(sqlDataSources.map { ds => ds.schema }) + AppBase.parseReadSchemaForNestedTypes( + visitorContext.sqlDataSources.map { ds => ds.schema }) // Append problematic issues to the global variable for that SqlID if (sqlNestedComplexTypes._2.nonEmpty) { - potentialProblems += "NESTED COMPLEX TYPE" - } - // Finally, add the local potentialProblems to the global data structure if any. - app.sqlIDtoProblematic(sqlPIGEntry.sqlID) = potentialProblems.toSet - // Convert the problematic issues to a string and update the SQLInfo - app.sqlIdToInfo.get(sqlPIGEntry.sqlID).foreach { sqlInfoClass => - sqlInfoClass.problematic = ToolUtils.formatPotentialProblems(potentialProblems.toSeq) + visitorContext.potentialProblems += "NESTED COMPLEX TYPE" } } + // Finally, update the potential problems in the app object + // Note that the implementation depends on teh type of the AppBase + if (visitorContext.potentialProblems.nonEmpty) { + updateAppPotentialProblems(sqlPIGEntry.sqlID, visitorContext.potentialProblems) + } } } @@ -200,7 +267,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } // Store (min, median, max, total) for a given metric - private case class statisticsMetrics(min: Long, med:Long, max:Long, total: Long) + private case class StatisticsMetrics(min: Long, med:Long, max:Long, total: Long) def generateSQLAccums(): Seq[SQLAccumProfileResults] = { allSQLMetrics.flatMap { metric => @@ -223,9 +290,9 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap None } else if (accumValues.length <= 1) { - Some(statisticsMetrics(0L, 0L, 0L, accumValues.sum)) + Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum)) } else { - Some(statisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), + Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), accumValues(accumValues.size - 1), accumValues.sum)) } } else { @@ -233,7 +300,7 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap if (accumValues.isEmpty) { None } else { - Some(statisticsMetrics(0L, 0L, 0L, accumValues.max)) + Some(StatisticsMetrics(0L, 0L, 0L, accumValues.max)) } } case None => None @@ -250,9 +317,9 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap if (accumValues.isEmpty) { None } else if (accumValues.length <= 1) { - Some(statisticsMetrics(0L, 0L, 0L, accumValues.sum)) + Some(StatisticsMetrics(0L, 0L, 0L, accumValues.sum)) } else { - Some(statisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), + Some(StatisticsMetrics(accumValues(0), accumValues(accumValues.size / 2), accumValues(accumValues.size - 1), accumValues.sum)) } case None => @@ -260,14 +327,8 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } if (taskMax.isDefined || driverMax.isDefined) { - val taskInfo = taskMax match { - case Some(task) => task - case None => statisticsMetrics(0L, 0L, 0L, 0L) - } - val driverInfo = driverMax match { - case Some(driver) => driver - case None => statisticsMetrics(0L, 0L, 0L, 0L) - } + val taskInfo = taskMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L)) + val driverInfo = driverMax.getOrElse(StatisticsMetrics(0L, 0L, 0L, 0L)) val max = Math.max(taskInfo.max, driverInfo.max) val min = Math.max(taskInfo.min, driverInfo.min) @@ -285,14 +346,14 @@ class AppSQLPlanAnalyzer(app: AppBase, appIndex: Int) extends AppAnalysisBase(ap } object AppSQLPlanAnalyzer { - def processSQLPlan(app: ApplicationInfo): AppSQLPlanAnalyzer = { - val sqlProcessor = new AppSQLPlanAnalyzer(app, app.index) - sqlProcessor.processSQLPlanMetrics() - sqlProcessor - } - def apply(app: AppBase, appIndex: Int): AppSQLPlanAnalyzer = { - val sqlProcessor = new AppSQLPlanAnalyzer(app, appIndex) - sqlProcessor.processSQLPlanMetrics() - sqlProcessor + def apply(app: AppBase, appIndex: Integer = 1): AppSQLPlanAnalyzer = { + val sqlAnalyzer = app match { + case qApp: QualificationAppInfo => + new QualSQLPlanAnalyzer(qApp, appIndex) + case pApp: ApplicationInfo => + new AppSQLPlanAnalyzer(pApp, pApp.index) + } + sqlAnalyzer.processSQLPlanMetrics() + sqlAnalyzer } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 9e3bf3134..342bb40a0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -302,10 +302,17 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { */ def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = { val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) => + // First, build the SQLIssues string by retrieving the potential issues from the + // app.sqlIDtoProblematic map. + val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) { + ToolUtils.formatPotentialProblems(app.sqlIDtoProblematic(sqlId).toSeq) + } else { + "" + } + // Then, build the SQLDurationExecutorTimeProfileResult SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID, sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD, - app.getAppDuration.orElse(Option(0L)), sqlCase.problematic, - sqlCase.sqlCpuTimePercent) + app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent) } sqlRows.toSeq } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala index 9d6486eb1..40a3cd94e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/CollectInformation.scala @@ -88,7 +88,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { // get read data schema information def getDataSourceInfo: Seq[DataSourceProfileResult] = { - val dataSourceApps = apps.filter(_.dataSourceInfo.size > 0) + val dataSourceApps = apps.filter(_.dataSourceInfo.nonEmpty) val sqlAccums = CollectInformation.generateSQLAccums(dataSourceApps) // Metrics to capture from event log to the result @@ -217,9 +217,6 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging { object CollectInformation extends Logging { - // Store (min, median, max, total) for a given metric - case class statisticsMetrics(min: Long, med:Long, max:Long, total: Long) - def generateSQLAccums(apps: Seq[ApplicationInfo]): Seq[SQLAccumProfileResults] = { apps.flatMap { app => app.planMetricProcessor.generateSQLAccums() diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index 320b51026..e5106f02b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -165,7 +165,6 @@ class SQLExecutionInfoClass( var endTime: Option[Long], var duration: Option[Long], var hasDatasetOrRDD: Boolean, - var problematic: String = "", var sqlCpuTimePercent: Double = -1) { def setDsOrRdd(value: Boolean): Unit = { hasDatasetOrRDD = value diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala index aa672e9a4..e73426a68 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/PluginTypeChecker.scala @@ -357,7 +357,7 @@ class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance( writeFormats.map(x => x.trim).contains(format) } - def isWriteFormatSupported(writeFormat: ArrayBuffer[String]): ArrayBuffer[String] = { + def getUnsupportedWriteFormat(writeFormat: Iterable[String]): Iterable[String] = { writeFormat.map(x => x.toLowerCase.trim).filterNot( writeFormats.map(x => x.trim).contains(_)) } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualSQLPlanAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualSQLPlanAnalyzer.scala new file mode 100644 index 000000000..cbaff4443 --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualSQLPlanAnalyzer.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.qualification + +import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer +import com.nvidia.spark.rapids.tool.planparser.DataWritingCommandExecParser + +import org.apache.spark.sql.execution.ui.SparkPlanGraphNode +import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo + +/** + * A class that extends the AppSQLPlanAnalyzer to analyze the SQL plans of the applications. + * It needs to override the parent class to process the WriteDataFormat while visiting a + * SparkGraphNode. + * In addition to the side effects mentioned in AppSQLPlanAnalyzer, it updates the writeDataFormat + * field defined in the QualificationAppInfo object. + * + * @param app the Application info objects that contains the SQL plans to be processed + * @param appIndex the application index used in viewing multiple applications together. + */ +class QualSQLPlanAnalyzer( + app: QualificationAppInfo, appIndex: Integer) extends AppSQLPlanAnalyzer(app, appIndex) { + + override def visitNode(visitor: SQLPlanVisitorContext, + node: SparkPlanGraphNode): Unit = { + super.visitNode(visitor, node) + // Get the write data format + if (!app.perSqlOnly) { + DataWritingCommandExecParser.getWriteCMDWrapper(node).map { wWrapper => + app.writeDataFormat += wWrapper.dataFormat + } + } + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala index 17208db0e..1c1e4e030 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/RunningQualificationApp.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids.tool.qualification +import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.SQL_DESC_STR @@ -38,7 +39,7 @@ import org.apache.spark.sql.rapids.tool.qualification._ * SQL query level without tracking all the Application information, but currently does * not cleanup. There is a cleanupSQL function that the user can force cleanup if required. * - * Create the `RunningQualicationApp`: + * Create the `RunningQualificationApp`: * {{{ * val qualApp = new com.nvidia.spark.rapids.tool.qualification.RunningQualificationApp() * }}} @@ -200,7 +201,7 @@ class RunningQualificationApp( */ def getSummary(delimiter: String = "|", prettyPrint: Boolean = true): String = { if (!perSqlOnly) { - val appInfo = super.aggregateStats() + val appInfo = aggregateStats() appInfo match { case Some(info) => val unSupExecMaxSize = QualOutputWriter.getunSupportedMaxSize( @@ -257,7 +258,7 @@ class RunningQualificationApp( def getDetailed(delimiter: String = "|", prettyPrint: Boolean = true, reportReadSchema: Boolean = false): String = { if (!perSqlOnly) { - val appInfo = super.aggregateStats() + val appInfo = aggregateStats() appInfo match { case Some(info) => val headersAndSizesToUse = @@ -299,4 +300,15 @@ class RunningQualificationApp( } perSqlInfos } + + /** + * Aggregate and process the application after reading the events. + * @return Option of QualificationSummaryInfo, Some if we were able to process the application + * otherwise None. + */ + override def aggregateStats(): Option[QualificationSummaryInfo] = { + // make sure that the APPSQLAppAnalyzer has processed the running application + AppSQLPlanAnalyzer(this) + super.aggregateStats() + } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index f9e200377..071fee3ea 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -68,8 +68,7 @@ object QualRawReportGenerator { appIndex: Int = 1): Unit = { val metricsDirectory = s"$rootDir/raw_metrics/${app.appId}" val pWriter = - new ProfileOutputWriter(metricsDirectory, "raw_information", - 10000000, outputCSV = true) + new ProfileOutputWriter(metricsDirectory, "profile", 10000000, outputCSV = true) try { pWriter.writeText("### A. Information Collected ###") pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app))) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 7e6b3a1d3..d1dbc219a 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.rapids.tool import java.io.InputStream import java.util.zip.GZIPInputStream -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map, SortedMap} import scala.io.{Codec, Source} import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo} @@ -68,12 +67,16 @@ abstract class AppBase( // SQL containing any Dataset operation or RDD to DataSet/DataFrame operation val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]() - val sqlIDtoProblematic: HashMap[Long, Set[String]] = HashMap[Long, Set[String]]() + // Map (sqlID <-> String(problematic issues)) + // Use LinkedHashSet of Strings to preserve the order of insertion. + val sqlIDtoProblematic: HashMap[Long, LinkedHashSet[String]] = + HashMap[Long, LinkedHashSet[String]]() // sqlId to sql info val sqlIdToInfo = new HashMap[Long, SQLExecutionInfoClass]() val sqlIdToStages = new HashMap[Long, ArrayBuffer[Int]]() // sqlPlans stores HashMap (sqlID <-> SparkPlanInfo) - var sqlPlans: HashMap[Long, SparkPlanInfo] = HashMap.empty[Long, SparkPlanInfo] + // SortedMap is used to keep the order of the sqlPlans since AQEs can overrides the existing ones + var sqlPlans: Map[Long, SparkPlanInfo] = SortedMap[Long, SparkPlanInfo]() var sqlPlanMetricsAdaptive: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]() // accum id to task stage accum info @@ -372,7 +375,7 @@ abstract class AppBase( } } - protected def probNotDataset: mutable.HashMap[Long, Set[String]] = { + protected def probNotDataset: HashMap[Long, LinkedHashSet[String]] = { sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetOrRDDCase.contains(sqlID) } } @@ -380,27 +383,6 @@ abstract class AppBase( probNotDataset.values.flatten.toSet.toSeq } - // This is to append potential issues such as UDF, decimal type determined from - // SparkGraphPlan Node description and nested complex type determined from reading the - // event logs. If there are any complex nested types, then `NESTED COMPLEX TYPE` is mentioned - // in the `Potential Problems` section in the csv file. Section `Unsupported Nested Complex - // Types` has information on the exact nested complex types which are not supported for a - // particular application. - protected def getAllPotentialProblems( - dFPotentialProb: Seq[String], nestedComplex: Seq[String]): Seq[String] = { - val nestedComplexType = if (nestedComplex.nonEmpty) Seq("NESTED COMPLEX TYPE") else Seq("") - val result = if (dFPotentialProb.nonEmpty) { - if (nestedComplex.nonEmpty) { - dFPotentialProb ++ nestedComplexType - } else { - dFPotentialProb - } - } else { - nestedComplexType - } - result - } - protected def postCompletion(): Unit = {} /** diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala index 230af18ab..36b534255 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala @@ -145,8 +145,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi event.time, None, None, - hasDatasetOrRDD = false, - "" + hasDatasetOrRDD = false ) app.sqlIdToInfo.put(event.executionId, sqlExecution) app.sqlPlans += (event.executionId -> event.sparkPlanInfo) diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 7e35c5e90..b5dc5c683 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.rapids.tool -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import scala.util.{Failure, Success, Try} import com.nvidia.spark.rapids.tool.planparser.SubqueryExecParser @@ -458,7 +458,8 @@ case class SqlPlanInfoGraphEntry( // A class used to cache the SQLPlanInfoGraphs class SqlPlanInfoGraphBuffer { - val sqlPlanInfoGraphs = ArrayBuffer[SqlPlanInfoGraphEntry]() + // A set to hold the SqlPlanInfoGraphEntry. LinkedHashSet to maintain the order of insertion. + val sqlPlanInfoGraphs = mutable.LinkedHashSet[SqlPlanInfoGraphEntry]() def addSqlPlanInfoGraph(sqlID: Long, planInfo: SparkPlanInfo): SqlPlanInfoGraphEntry = { val newEntry = SqlPlanInfoGraphBuffer.createEntry(sqlID, planInfo) sqlPlanInfoGraphs += newEntry diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala index 1bc172795..f27a716d7 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/ApplicationInfo.scala @@ -196,7 +196,7 @@ class ApplicationInfo( processEvents() // Process SQL Plan Metrics after all events are processed - val planMetricProcessor: AppSQLPlanAnalyzer = AppSQLPlanAnalyzer.processSQLPlan(this) + val planMetricProcessor: AppSQLPlanAnalyzer = AppSQLPlanAnalyzer(this, index) // finally aggregate the Info aggregateAppInfo diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala index 13b681bbd..b14f45a33 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationAppInfo.scala @@ -17,17 +17,17 @@ package org.apache.spark.sql.rapids.tool.qualification import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable import com.nvidia.spark.rapids.tool.EventLogInfo -import com.nvidia.spark.rapids.tool.planparser.{DataWritingCommandExecParser, ExecInfo, PlanInfo, SQLPlanParser} +import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo, SQLPlanParser} import com.nvidia.spark.rapids.tool.qualification._ import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.spark.sql.execution.SparkPlanInfo -import org.apache.spark.sql.rapids.tool.{AppBase, AppEventlogProcessException, ClusterInfo, ClusterSummary, GpuEventLogException, IncorrectAppStatusException, MlOps, MlOpsEventLogType, PhotonEventLogException, SqlPlanInfoGraphBuffer, SupportedMLFuncsName, ToolUtils} +import org.apache.spark.sql.rapids.tool.{AppBase, AppEventlogProcessException, ClusterInfo, ClusterSummary, GpuEventLogException, IncorrectAppStatusException, MlOps, MlOpsEventLogType, PhotonEventLogException, SupportedMLFuncsName, ToolUtils} import org.apache.spark.sql.rapids.tool.annotation.{Calculated, WallClock} import org.apache.spark.sql.rapids.tool.store.StageModel @@ -37,14 +37,16 @@ class QualificationAppInfo( hadoopConf: Option[Configuration] = None, pluginTypeChecker: PluginTypeChecker, reportSqlLevel: Boolean, - perSqlOnly: Boolean = false, + val perSqlOnly: Boolean = false, mlOpsEnabled: Boolean = false, penalizeTransitions: Boolean = true) extends AppBase(eventLogInfo, hadoopConf) with Logging { var lastJobEndTime: Option[Long] = None var lastSQLEndTime: Option[Long] = None - val writeDataFormat: ArrayBuffer[String] = ArrayBuffer[String]() + // Keeps track of the WriteDataFormats used in the WriteExecs + // Use LinkedHashSet to preserve Order of insertion and avoid duplicates + val writeDataFormat: mutable.AbstractSet[String] = mutable.LinkedHashSet[String]() val sqlIDToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] = HashMap.empty[Long, StageTaskQualificationSummary] @@ -201,7 +203,7 @@ class QualificationAppInfo( } protected def checkUnsupportedReadFormats(): Unit = { - if (dataSourceInfo.size > 0) { + if (dataSourceInfo.nonEmpty) { dataSourceInfo.map { ds => val (_, nsTypes) = pluginTypeChecker.scoreReadDataTypes(ds.format, ds.schema) if (nsTypes.nonEmpty) { @@ -502,7 +504,7 @@ class QualificationAppInfo( // if either job or stage failures then we mark as N/A // TODO - what about incomplete, do we want to change those? val sqlIdsWithFailures = sqlIDtoFailures.filter { case (_, v) => - v.size > 0 + v.nonEmpty }.keys.map(_.toString).toSeq // a bit odd but force filling in notSupportFormatAndTypes @@ -514,7 +516,7 @@ class QualificationAppInfo( }.toSeq val writeFormat = writeFormatNotSupported(writeDataFormat) val (allComplexTypes, nestedComplexTypes) = reportComplexTypes - val problems = getAllPotentialProblems(getPotentialProblemsForDf, nestedComplexTypes) + val problems = getPotentialProblemsForDf val origPlanInfos = sqlPlans.collect { case (id, plan) if sqlIdToInfo.contains(id) => @@ -838,29 +840,10 @@ class QualificationAppInfo( } } - private[qualification] def processSQLPlan(sqlID: Long, planInfo: SparkPlanInfo): Unit = { - val sqlPlanInfoGraphEntry = SqlPlanInfoGraphBuffer.createEntry(sqlID, planInfo) - checkMetadataForReadSchema(sqlPlanInfoGraphEntry) - for (node <- sqlPlanInfoGraphEntry.sparkPlanGraph.allNodes) { - checkGraphNodeForReads(sqlID, node) - val issues = findPotentialIssues(node.desc) - if (issues.nonEmpty) { - val existingIssues = sqlIDtoProblematic.getOrElse(sqlID, Set.empty[String]) - sqlIDtoProblematic(sqlID) = existingIssues ++ issues - } - // Get the write data format - if (!perSqlOnly) { - DataWritingCommandExecParser.getWriteCMDWrapper(node).map { wWrapper => - writeDataFormat += wWrapper.dataFormat - } - } - } - } - - private def writeFormatNotSupported(writeFormat: ArrayBuffer[String]): Seq[String] = { + private def writeFormatNotSupported(writeFormat: mutable.AbstractSet[String]): Seq[String] = { // Filter unsupported write data format - val unSupportedWriteFormat = pluginTypeChecker.isWriteFormatSupported(writeFormat) - unSupportedWriteFormat.distinct + val unSupportedWriteFormat = pluginTypeChecker.getUnsupportedWriteFormat(writeFormat) + unSupportedWriteFormat.toSeq } /** diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala index 04184952c..bdf1c0419 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/qualification/QualificationEventProcessor.scala @@ -67,13 +67,6 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean } } - override def doSparkListenerSQLExecutionStart( - app: QualificationAppInfo, - event: SparkListenerSQLExecutionStart): Unit = { - super.doSparkListenerSQLExecutionStart(app, event) - app.processSQLPlan(event.executionId, event.sparkPlanInfo) - } - override def doSparkListenerSQLExecutionEnd( app: QualificationAppInfo, event: SparkListenerSQLExecutionEnd): Unit = { @@ -163,13 +156,4 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean } } } - - override def doSparkListenerSQLAdaptiveExecutionUpdate( - app: QualificationAppInfo, - event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = { - logDebug("Processing event: " + event.getClass) - // AQE plan can override the ones got from SparkListenerSQLExecutionStart - app.processSQLPlan(event.executionId, event.sparkPlanInfo) - super.doSparkListenerSQLAdaptiveExecutionUpdate(app, event) - } } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala index d49a2043c..7f810a612 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/StageModelManager.scala @@ -41,8 +41,10 @@ class StageModelManager extends Logging { // of composite key (i.e., Tuple). // Composite keys would cost more because it implicitly allocates a new object every time there // is a read operation from the map. - private val stageIdToInfo: mutable.HashMap[Int, mutable.HashMap[Int, StageModel]] = - new mutable.HashMap[Int, mutable.HashMap[Int, StageModel]]() + // Finally use SortedMaps to keep the map sorted. That way iterating on the map will be orders + // by IDs/AttemptIDs. + private val stageIdToInfo: mutable.SortedMap[Int, mutable.SortedMap[Int, StageModel]] = + mutable.SortedMap[Int, mutable.SortedMap[Int, StageModel]]() // Holds the mapping between AccumulatorIDs to Stages (1-to-N) // [Long: AccumId -> SortedSet[Int: StageId]] @@ -69,7 +71,7 @@ class StageModelManager extends Logging { // Internal method used to create new instance of StageModel if it does not exist. private def getOrCreateStage(stageInfo: StageInfo): StageModel = { val currentAttempts = - stageIdToInfo.getOrElseUpdate(stageInfo.stageId, new mutable.HashMap[Int, StageModel]()) + stageIdToInfo.getOrElseUpdate(stageInfo.stageId, mutable.SortedMap[Int, StageModel]()) val sModel = StageModel(stageInfo, currentAttempts.get(stageInfo.attemptNumber())) currentAttempts.put(stageInfo.attemptNumber(), sModel) sModel diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/TaskModelManager.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/TaskModelManager.scala index ae1f6bd3f..ce846a9be 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/TaskModelManager.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/TaskModelManager.scala @@ -15,8 +15,7 @@ */ package org.apache.spark.sql.rapids.tool.store -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, SortedMap} import org.apache.spark.scheduler.SparkListenerTaskEnd import org.apache.spark.sql.rapids.tool.annotation.Since @@ -45,15 +44,16 @@ class TaskModelManager { // composite key (i.e., Tuple). // Composite keys would cost more because it implicitly allocates a new object every time there // is a read operation from the map. - val stageAttemptToTasks: mutable.HashMap[Int, mutable.HashMap[Int, ArrayBuffer[TaskModel]]] = - new mutable.HashMap[Int, mutable.HashMap[Int, ArrayBuffer[TaskModel]]]() + // Finally use SortedMaps to keep the map sorted. That way iterating on the map will be orders + // by IDs/AttemptIDs. + val stageAttemptToTasks: SortedMap[Int, SortedMap[Int, ArrayBuffer[TaskModel]]] = + SortedMap[Int, SortedMap[Int, ArrayBuffer[TaskModel]]]() // Given a Spark taskEnd event, create a new Task and add it to the Map. def addTaskFromEvent(event: SparkListenerTaskEnd): Unit = { val taskModel = TaskModel(event) val stageAttempts = - stageAttemptToTasks.getOrElseUpdate(event.stageId, - new mutable.HashMap[Int, ArrayBuffer[TaskModel]]()) + stageAttemptToTasks.getOrElseUpdate(event.stageId, SortedMap[Int, ArrayBuffer[TaskModel]]()) val attemptToTasks = stageAttempts.getOrElseUpdate(event.stageAttemptId, ArrayBuffer[TaskModel]()) attemptToTasks += taskModel diff --git a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv index abea9ec37..6dd826418 100644 --- a/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/jdbc_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569916.96,2050.03,2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,569025,859,19035,3.68,false,"Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand;CollectLimit","",30 +"Spark shell","app-20211019113801-0001","Not Recommended",1.0,569916.96,2050.03,2942,19894,571967,2814,28.41,"","JDBC[*]","","","","",1812,569025,859,19035,3.68,false,"CollectLimit;Scan JDBCRelation(TBLS) [numPartitions=1];Execute CreateViewCommand","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv index 2c6c693dd..42b047db5 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv1_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371544219","Not Recommended",1.0,174409.69,883.3,4575,20421,175293,1557,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,176916,13469,6952,2.31,false,"CollectLimit;Scan json;Execute InsertIntoHadoopFsRelationCommand json;Scan text","",30 +"Spark shell","local-1624371544219","Not Recommended",1.0,174409.69,883.3,4575,20421,175293,1557,72.15,"","JSON[string:double:date:int:bigint];Text[*]","JSON","","","",1859,176916,13469,6952,2.31,false,"CollectLimit;Scan text;Execute InsertIntoHadoopFsRelationCommand json;Scan json","",30 diff --git a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv index 4edac7527..5f4de3eff 100644 --- a/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/read_dsv2_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1624371906627","Not Recommended",1.01,82690.75,1047.24,4917,21802,83738,1745,71.3,"","Text[*];json[double]","JSON","","","",1984,83336,14064,7738,2.5,false,"BatchScan json;Execute InsertIntoHadoopFsRelationCommand json;CollectLimit;Scan text","",30 +"Spark shell","local-1624371906627","Not Recommended",1.01,82690.75,1047.24,4917,21802,83738,1745,71.3,"","Text[*];json[double]","JSON","","","",1984,83336,14064,7738,2.5,false,"CollectLimit;Scan text;Execute InsertIntoHadoopFsRelationCommand json;BatchScan json","",30 diff --git a/core/src/test/resources/QualificationExpectations/write_format_expectation.csv b/core/src/test/resources/QualificationExpectations/write_format_expectation.csv index d68694991..fdad62b12 100644 --- a/core/src/test/resources/QualificationExpectations/write_format_expectation.csv +++ b/core/src/test/resources/QualificationExpectations/write_format_expectation.csv @@ -1,2 +1,2 @@ App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly) -"Spark shell","local-1629442299891","Not Recommended",1.02,19159.68,394.31,1151,920,19554,788,91.72,"","","CSV;JSON","","","",1235,18251,290,630,2.0,false,"Execute InsertIntoHadoopFsRelationCommand json;Execute InsertIntoHadoopFsRelationCommand csv","",30 +"Spark shell","local-1629442299891","Not Recommended",1.02,19159.68,394.31,1151,920,19554,788,91.72,"","","CSV;JSON","","","",1235,18251,290,630,2.0,false,"Execute InsertIntoHadoopFsRelationCommand csv;Execute InsertIntoHadoopFsRelationCommand json","",30 diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala index 345d60acc..aa2e3dcfa 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationSuite.scala @@ -1338,7 +1338,7 @@ class QualificationSuite extends BaseTestSuite { val outputActual = readExpectedFile(new File(outputResults)) assert(outputActual.collect().size == 1) assert(outputActual.select("Potential Problems").first.getString(0) == - "TIMEZONE to_timestamp():TIMEZONE hour():TIMEZONE current_timestamp():TIMEZONE second()") + "TIMEZONE hour():TIMEZONE current_timestamp():TIMEZONE to_timestamp():TIMEZONE second()") } } }