Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential problems and AQE updates in Qual tool #1021

Merged
merged 3 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,17 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
def aggregateDurationAndCPUTimeBySql(index: Int): Seq[SQLDurationExecutorTimeProfileResult] = {
val sqlRows = app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
// First, build the SQLIssues string by retrieving the potential issues from the
// app.sqlIDtoProblematic map.
val sqlIssues = if (app.sqlIDtoProblematic.contains(sqlId)) {
ToolUtils.formatPotentialProblems(app.sqlIDtoProblematic(sqlId).toSeq)
} else {
""
}
// Then, build the SQLDurationExecutorTimeProfileResult
SQLDurationExecutorTimeProfileResult(index, app.appId, sqlCase.rootExecutionID,
sqlId, sqlCase.duration, sqlCase.hasDatasetOrRDD,
app.getAppDuration.orElse(Option(0L)), sqlCase.problematic,
sqlCase.sqlCpuTimePercent)
app.getAppDuration.orElse(Option(0L)), sqlIssues, sqlCase.sqlCpuTimePercent)
}
sqlRows.toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {

// get read data schema information
def getDataSourceInfo: Seq[DataSourceProfileResult] = {
val dataSourceApps = apps.filter(_.dataSourceInfo.size > 0)
val dataSourceApps = apps.filter(_.dataSourceInfo.nonEmpty)
val sqlAccums = CollectInformation.generateSQLAccums(dataSourceApps)

// Metrics to capture from event log to the result
Expand Down Expand Up @@ -217,9 +217,6 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {

object CollectInformation extends Logging {

// Store (min, median, max, total) for a given metric
case class statisticsMetrics(min: Long, med:Long, max:Long, total: Long)

def generateSQLAccums(apps: Seq[ApplicationInfo]): Seq[SQLAccumProfileResults] = {
apps.flatMap { app =>
app.planMetricProcessor.generateSQLAccums()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class SQLExecutionInfoClass(
var endTime: Option[Long],
var duration: Option[Long],
var hasDatasetOrRDD: Boolean,
var problematic: String = "",
var sqlCpuTimePercent: Double = -1) {
def setDsOrRdd(value: Boolean): Unit = {
hasDatasetOrRDD = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ class PluginTypeChecker(val platform: Platform = PlatformFactory.createInstance(
writeFormats.map(x => x.trim).contains(format)
}

def isWriteFormatSupported(writeFormat: ArrayBuffer[String]): ArrayBuffer[String] = {
def getUnsupportedWriteFormat(writeFormat: Iterable[String]): Iterable[String] = {
writeFormat.map(x => x.toLowerCase.trim).filterNot(
writeFormats.map(x => x.trim).contains(_))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer
import com.nvidia.spark.rapids.tool.planparser.DataWritingCommandExecParser

import org.apache.spark.sql.execution.ui.SparkPlanGraphNode
import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo

/**
* A class that extends the AppSQLPlanAnalyzer to analyze the SQL plans of the applications.
* It needs to override the parent class to process the WriteDataFormat while visiting a
* SparkGraphNode.
* In addition to the side effects mentioned in AppSQLPlanAnalyzer, it updates the writeDataFormat
* field defined in the QualificationAppInfo object.
*
* @param app the Application info objects that contains the SQL plans to be processed
* @param appIndex the application index used in viewing multiple applications together.
*/
class QualSQLPlanAnalyzer(
app: QualificationAppInfo, appIndex: Integer) extends AppSQLPlanAnalyzer(app, appIndex) {

override def visitNode(visitor: SQLPlanVisitorContext,
node: SparkPlanGraphNode): Unit = {
super.visitNode(visitor, node)
// Get the write data format
if (!app.perSqlOnly) {
DataWritingCommandExecParser.getWriteCMDWrapper(node).map { wWrapper =>
app.writeDataFormat += wWrapper.dataFormat
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.analysis.AppSQLPlanAnalyzer
import com.nvidia.spark.rapids.tool.planparser.SQLPlanParser
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.SQL_DESC_STR

Expand All @@ -38,7 +39,7 @@ import org.apache.spark.sql.rapids.tool.qualification._
* SQL query level without tracking all the Application information, but currently does
* not cleanup. There is a cleanupSQL function that the user can force cleanup if required.
*
* Create the `RunningQualicationApp`:
* Create the `RunningQualificationApp`:
* {{{
* val qualApp = new com.nvidia.spark.rapids.tool.qualification.RunningQualificationApp()
* }}}
Expand Down Expand Up @@ -200,7 +201,7 @@ class RunningQualificationApp(
*/
def getSummary(delimiter: String = "|", prettyPrint: Boolean = true): String = {
if (!perSqlOnly) {
val appInfo = super.aggregateStats()
val appInfo = aggregateStats()
appInfo match {
case Some(info) =>
val unSupExecMaxSize = QualOutputWriter.getunSupportedMaxSize(
Expand Down Expand Up @@ -257,7 +258,7 @@ class RunningQualificationApp(
def getDetailed(delimiter: String = "|", prettyPrint: Boolean = true,
reportReadSchema: Boolean = false): String = {
if (!perSqlOnly) {
val appInfo = super.aggregateStats()
val appInfo = aggregateStats()
appInfo match {
case Some(info) =>
val headersAndSizesToUse =
Expand Down Expand Up @@ -299,4 +300,15 @@ class RunningQualificationApp(
}
perSqlInfos
}

/**
* Aggregate and process the application after reading the events.
* @return Option of QualificationSummaryInfo, Some if we were able to process the application
* otherwise None.
*/
override def aggregateStats(): Option[QualificationSummaryInfo] = {
// make sure that the APPSQLAppAnalyzer has processed the running application
AppSQLPlanAnalyzer(this)
super.aggregateStats()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ object QualRawReportGenerator {
appIndex: Int = 1): Unit = {
val metricsDirectory = s"$rootDir/raw_metrics/${app.appId}"
val pWriter =
new ProfileOutputWriter(metricsDirectory, "raw_information",
10000000, outputCSV = true)
new ProfileOutputWriter(metricsDirectory, "profile", 10000000, outputCSV = true)
try {
pWriter.writeText("### A. Information Collected ###")
pWriter.write(QualExecutorView.getLabel, QualExecutorView.getRawView(Seq(app)))
Expand Down
34 changes: 8 additions & 26 deletions core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package org.apache.spark.sql.rapids.tool
import java.io.InputStream
import java.util.zip.GZIPInputStream

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet, Map, SortedMap}
import scala.io.{Codec, Source}

import com.nvidia.spark.rapids.tool.{DatabricksEventLog, DatabricksRollingEventLogFilesFileReader, EventLogInfo}
Expand Down Expand Up @@ -68,12 +67,16 @@ abstract class AppBase(

// SQL containing any Dataset operation or RDD to DataSet/DataFrame operation
val sqlIDToDataSetOrRDDCase: HashSet[Long] = HashSet[Long]()
val sqlIDtoProblematic: HashMap[Long, Set[String]] = HashMap[Long, Set[String]]()
// Map (sqlID <-> String(problematic issues))
// Use LinkedHashSet of Strings to preserve the order of insertion.
val sqlIDtoProblematic: HashMap[Long, LinkedHashSet[String]] =
HashMap[Long, LinkedHashSet[String]]()
// sqlId to sql info
val sqlIdToInfo = new HashMap[Long, SQLExecutionInfoClass]()
val sqlIdToStages = new HashMap[Long, ArrayBuffer[Int]]()
// sqlPlans stores HashMap (sqlID <-> SparkPlanInfo)
var sqlPlans: HashMap[Long, SparkPlanInfo] = HashMap.empty[Long, SparkPlanInfo]
// SortedMap is used to keep the order of the sqlPlans since AQEs can overrides the existing ones
var sqlPlans: Map[Long, SparkPlanInfo] = SortedMap[Long, SparkPlanInfo]()
var sqlPlanMetricsAdaptive: ArrayBuffer[SQLPlanMetricsCase] = ArrayBuffer[SQLPlanMetricsCase]()

// accum id to task stage accum info
Expand Down Expand Up @@ -372,35 +375,14 @@ abstract class AppBase(
}
}

protected def probNotDataset: mutable.HashMap[Long, Set[String]] = {
protected def probNotDataset: HashMap[Long, LinkedHashSet[String]] = {
sqlIDtoProblematic.filterNot { case (sqlID, _) => sqlIDToDataSetOrRDDCase.contains(sqlID) }
}

protected def getPotentialProblemsForDf: Seq[String] = {
probNotDataset.values.flatten.toSet.toSeq
}

// This is to append potential issues such as UDF, decimal type determined from
// SparkGraphPlan Node description and nested complex type determined from reading the
// event logs. If there are any complex nested types, then `NESTED COMPLEX TYPE` is mentioned
// in the `Potential Problems` section in the csv file. Section `Unsupported Nested Complex
// Types` has information on the exact nested complex types which are not supported for a
// particular application.
protected def getAllPotentialProblems(
dFPotentialProb: Seq[String], nestedComplex: Seq[String]): Seq[String] = {
val nestedComplexType = if (nestedComplex.nonEmpty) Seq("NESTED COMPLEX TYPE") else Seq("")
val result = if (dFPotentialProb.nonEmpty) {
if (nestedComplex.nonEmpty) {
dFPotentialProb ++ nestedComplexType
} else {
dFPotentialProb
}
} else {
nestedComplexType
}
result
}

protected def postCompletion(): Unit = {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
event.time,
None,
None,
hasDatasetOrRDD = false,
""
hasDatasetOrRDD = false
)
app.sqlIdToInfo.put(event.executionId, sqlExecution)
app.sqlPlans += (event.executionId -> event.sparkPlanInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.rapids.tool

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
import scala.util.{Failure, Success, Try}

import com.nvidia.spark.rapids.tool.planparser.SubqueryExecParser
Expand Down Expand Up @@ -458,7 +458,8 @@ case class SqlPlanInfoGraphEntry(

// A class used to cache the SQLPlanInfoGraphs
class SqlPlanInfoGraphBuffer {
val sqlPlanInfoGraphs = ArrayBuffer[SqlPlanInfoGraphEntry]()
// A set to hold the SqlPlanInfoGraphEntry. LinkedHashSet to maintain the order of insertion.
val sqlPlanInfoGraphs = mutable.LinkedHashSet[SqlPlanInfoGraphEntry]()
def addSqlPlanInfoGraph(sqlID: Long, planInfo: SparkPlanInfo): SqlPlanInfoGraphEntry = {
val newEntry = SqlPlanInfoGraphBuffer.createEntry(sqlID, planInfo)
sqlPlanInfoGraphs += newEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class ApplicationInfo(
processEvents()

// Process SQL Plan Metrics after all events are processed
val planMetricProcessor: AppSQLPlanAnalyzer = AppSQLPlanAnalyzer.processSQLPlan(this)
val planMetricProcessor: AppSQLPlanAnalyzer = AppSQLPlanAnalyzer(this, index)
// finally aggregate the Info
aggregateAppInfo

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@
package org.apache.spark.sql.rapids.tool.qualification

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.mutable

import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.planparser.{DataWritingCommandExecParser, ExecInfo, PlanInfo, SQLPlanParser}
import com.nvidia.spark.rapids.tool.planparser.{ExecInfo, PlanInfo, SQLPlanParser}
import com.nvidia.spark.rapids.tool.qualification._
import com.nvidia.spark.rapids.tool.qualification.QualOutputWriter.DEFAULT_JOB_FREQUENCY
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.rapids.tool.{AppBase, AppEventlogProcessException, ClusterInfo, ClusterSummary, GpuEventLogException, IncorrectAppStatusException, MlOps, MlOpsEventLogType, PhotonEventLogException, SqlPlanInfoGraphBuffer, SupportedMLFuncsName, ToolUtils}
import org.apache.spark.sql.rapids.tool.{AppBase, AppEventlogProcessException, ClusterInfo, ClusterSummary, GpuEventLogException, IncorrectAppStatusException, MlOps, MlOpsEventLogType, PhotonEventLogException, SupportedMLFuncsName, ToolUtils}
import org.apache.spark.sql.rapids.tool.annotation.{Calculated, WallClock}
import org.apache.spark.sql.rapids.tool.store.StageModel

Expand All @@ -37,14 +37,16 @@ class QualificationAppInfo(
hadoopConf: Option[Configuration] = None,
pluginTypeChecker: PluginTypeChecker,
reportSqlLevel: Boolean,
perSqlOnly: Boolean = false,
val perSqlOnly: Boolean = false,
mlOpsEnabled: Boolean = false,
penalizeTransitions: Boolean = true)
extends AppBase(eventLogInfo, hadoopConf) with Logging {

var lastJobEndTime: Option[Long] = None
var lastSQLEndTime: Option[Long] = None
val writeDataFormat: ArrayBuffer[String] = ArrayBuffer[String]()
// Keeps track of the WriteDataFormats used in the WriteExecs
// Use LinkedHashSet to preserve Order of insertion and avoid duplicates
val writeDataFormat: mutable.AbstractSet[String] = mutable.LinkedHashSet[String]()

val sqlIDToTaskEndSum: HashMap[Long, StageTaskQualificationSummary] =
HashMap.empty[Long, StageTaskQualificationSummary]
Expand Down Expand Up @@ -201,7 +203,7 @@ class QualificationAppInfo(
}

protected def checkUnsupportedReadFormats(): Unit = {
if (dataSourceInfo.size > 0) {
if (dataSourceInfo.nonEmpty) {
dataSourceInfo.map { ds =>
val (_, nsTypes) = pluginTypeChecker.scoreReadDataTypes(ds.format, ds.schema)
if (nsTypes.nonEmpty) {
Expand Down Expand Up @@ -502,7 +504,7 @@ class QualificationAppInfo(
// if either job or stage failures then we mark as N/A
// TODO - what about incomplete, do we want to change those?
val sqlIdsWithFailures = sqlIDtoFailures.filter { case (_, v) =>
v.size > 0
v.nonEmpty
}.keys.map(_.toString).toSeq

// a bit odd but force filling in notSupportFormatAndTypes
Expand All @@ -514,7 +516,7 @@ class QualificationAppInfo(
}.toSeq
val writeFormat = writeFormatNotSupported(writeDataFormat)
val (allComplexTypes, nestedComplexTypes) = reportComplexTypes
val problems = getAllPotentialProblems(getPotentialProblemsForDf, nestedComplexTypes)
val problems = getPotentialProblemsForDf

val origPlanInfos = sqlPlans.collect {
case (id, plan) if sqlIdToInfo.contains(id) =>
Expand Down Expand Up @@ -838,29 +840,10 @@ class QualificationAppInfo(
}
}

private[qualification] def processSQLPlan(sqlID: Long, planInfo: SparkPlanInfo): Unit = {
val sqlPlanInfoGraphEntry = SqlPlanInfoGraphBuffer.createEntry(sqlID, planInfo)
checkMetadataForReadSchema(sqlPlanInfoGraphEntry)
for (node <- sqlPlanInfoGraphEntry.sparkPlanGraph.allNodes) {
checkGraphNodeForReads(sqlID, node)
val issues = findPotentialIssues(node.desc)
if (issues.nonEmpty) {
val existingIssues = sqlIDtoProblematic.getOrElse(sqlID, Set.empty[String])
sqlIDtoProblematic(sqlID) = existingIssues ++ issues
}
// Get the write data format
if (!perSqlOnly) {
DataWritingCommandExecParser.getWriteCMDWrapper(node).map { wWrapper =>
writeDataFormat += wWrapper.dataFormat
}
}
}
}

private def writeFormatNotSupported(writeFormat: ArrayBuffer[String]): Seq[String] = {
private def writeFormatNotSupported(writeFormat: mutable.AbstractSet[String]): Seq[String] = {
// Filter unsupported write data format
val unSupportedWriteFormat = pluginTypeChecker.isWriteFormatSupported(writeFormat)
unSupportedWriteFormat.distinct
val unSupportedWriteFormat = pluginTypeChecker.getUnsupportedWriteFormat(writeFormat)
unSupportedWriteFormat.toSeq
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,6 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
}
}

override def doSparkListenerSQLExecutionStart(
app: QualificationAppInfo,
event: SparkListenerSQLExecutionStart): Unit = {
super.doSparkListenerSQLExecutionStart(app, event)
app.processSQLPlan(event.executionId, event.sparkPlanInfo)
}

override def doSparkListenerSQLExecutionEnd(
app: QualificationAppInfo,
event: SparkListenerSQLExecutionEnd): Unit = {
Expand Down Expand Up @@ -163,13 +156,4 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
}
}
}

override def doSparkListenerSQLAdaptiveExecutionUpdate(
app: QualificationAppInfo,
event: SparkListenerSQLAdaptiveExecutionUpdate): Unit = {
logDebug("Processing event: " + event.getClass)
// AQE plan can override the ones got from SparkListenerSQLExecutionStart
app.processSQLPlan(event.executionId, event.sparkPlanInfo)
super.doSparkListenerSQLAdaptiveExecutionUpdate(app, event)
}
}
Loading