Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix implementation of processSQLPlanMetrics in Profiler #853

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {

def getDurations(tcs: ArrayBuffer[TaskCase]): (Long, Long, Long, Double) = {
val durations = tcs.map(_.duration)
if (durations.size > 0 ) {
if (durations.nonEmpty ) {
(durations.sum, durations.max, durations.min,
ToolUtils.calculateAverage(durations.sum, durations.size, 1))
} else {
Expand All @@ -49,22 +49,20 @@ class Analysis(apps: Seq[ApplicationInfo]) {
def jobAndStageMetricsAggregation(): Seq[JobStageAggTaskMetricsProfileResult] = {
val allJobRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (id, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
}.keys.map(_._1).toSeq
jc.stageIds.contains(sid)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val stageIdsInJob = jc.stageIds was removed to save memory compared to directly accessing jc.stageIds

}.keys.map(_._1).toSet
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use toSet instead of toSeq because it is going to be used mainly for lookup.

if (stagesInJob.isEmpty) {
None
} else {
val tasksInJob = app.taskEnd.filter { tc =>
stagesInJob.contains(tc.stageId)
}
// count duplicate task attempts
val numTaskAttempt = tasksInJob.size
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInJob)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"job_$id",
numTaskAttempt,
tasksInJob.size,
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
jc.duration,
tasksInJob.map(_.diskBytesSpilled).sum,
durSum,
Expand Down Expand Up @@ -100,9 +98,8 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
val allJobStageRows = apps.flatMap { app =>
app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
jc.stageIds.contains(sid)
}
if (stagesInJob.isEmpty) {
None
Expand Down Expand Up @@ -153,17 +150,16 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
// stages that are missing from a job, perhaps dropped events
val stagesWithoutJobs = apps.flatMap { app =>
val allStageinJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
val stageIdsInJob = jc.stageIds
val allStageInJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
app.stageIdToInfo.filterKeys { case (sid, _) =>
stageIdsInJob.contains(sid)
jc.stageIds.contains(sid)
}
}
val missing = app.stageIdToInfo.keys.toSeq.diff(allStageinJobs.keys.toSeq)
val missing = app.stageIdToInfo.keys.toSet.diff(allStageInJobs.keys.toSet)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets are more time efficient compared to sequences

if (missing.isEmpty) {
Seq.empty
} else {
missing.map { case ((id, saId)) =>
missing.map { case (id, saId) =>
val scOpt = app.stageIdToInfo.get((id, saId))
scOpt match {
case None =>
Expand Down Expand Up @@ -215,10 +211,10 @@ class Analysis(apps: Seq[ApplicationInfo]) {

val allRows = allJobRows ++ allJobStageRows ++ stagesWithoutJobs
val filteredRows = allRows.filter(_.isDefined).map(_.get)
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
if (filteredRows.size > 0) {
if (filteredRows.nonEmpty) {
val sortedRows = filteredRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, -(sortDur), cols.id)
(cols.appIndex, -sortDur, cols.id)
}
sortedRows
} else {
Expand All @@ -231,12 +227,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, sqlCase) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getOrElse implies that the VM allocates memory even if the jc.sqlID is not defined. This tends to be very expensive in large structures.
Since this is a predicate filter, it is more memory efficient to get the sqlID only if it is defined.

}
if (jcs.isEmpty) {
None
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet
val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
}
Expand Down Expand Up @@ -314,12 +310,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
val allRows = apps.flatMap { app =>
app.sqlIdToInfo.map { case (sqlId, _) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
}
if (jcs.isEmpty) {
None
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet

val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
Expand Down Expand Up @@ -359,12 +355,12 @@ class Analysis(apps: Seq[ApplicationInfo]) {
apps.map { app =>
val maxOfSqls = app.sqlIdToInfo.map { case (sqlId, _) =>
val jcs = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
jc.sqlID.isDefined && jc.sqlID.get == sqlId
}
if (jcs.isEmpty) {
0L
} else {
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jcs.flatMap(_._2.stageIds).toSet
val tasksInSQL = app.taskEnd.filter { tc =>
stageIdsForSQL.contains(tc.stageId)
}
Expand Down Expand Up @@ -394,7 +390,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
sqlCase.sqlCpuTimePercent)
}
}
if (allRows.size > 0) {
if (allRows.nonEmpty) {
val sortedRows = allRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, cols.sqlID, sortDur)
Expand Down Expand Up @@ -444,7 +440,7 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}

val allNonEmptyRows = allRows.filter(_.isDefined).map(_.get)
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
if (allNonEmptyRows.size > 0) {
if (allNonEmptyRows.nonEmpty) {
val sortedRows = allNonEmptyRows.sortBy { cols =>
(cols.appIndex, cols.stageId, cols.stageAttemptId, cols.taskId, cols.taskAttemptId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,8 +856,8 @@ class AutoTuner(

/**
* Recommendation for 'spark.sql.files.maxPartitionBytes' based on input size for each task.
* Note that the logic can be disabled by adding the property to [[limitedLogicRecommendations]]
* which is one of the arguments of [[getRecommendedProperties()]].
* Note that the logic can be disabled by adding the property to "limitedLogicRecommendations"
* which is one of the arguments of [[getRecommendedProperties]].
*/
private def recommendMaxPartitionBytes(): Unit = {
val maxPartitionProp =
Expand All @@ -873,8 +873,8 @@ class AutoTuner(

/**
* Recommendations for 'spark.sql.shuffle.partitions' based on spills and skew in shuffle stages.
* Note that the logic can be disabled by adding the property to [[limitedLogicRecommendations]]
* which is one of the arguments of [[getRecommendedProperties()]].
* Note that the logic can be disabled by adding the property to "limitedLogicRecommendations"
* which is one of the arguments of [[getRecommendedProperties]].
*/
def recommendShufflePartitions(): Unit = {
val lookup = "spark.sql.shuffle.partitions"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
CollectInformation.addNewProps(propsToKeep, props, numApps)
}
val allRows = props.map { case (k, v) => Seq(k) ++ v }.toSeq
if (allRows.size > 0) {
if (allRows.nonEmpty) {
val resRows = allRows.map(r => RapidsPropertyProfileResult(r(0), outputHeaders, r))
resRows.sortBy(cols => cols.key)
} else {
Expand All @@ -259,7 +259,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
val allWholeStages = apps.flatMap { app =>
app.wholeStage
}
if (allWholeStages.size > 0) {
if (allWholeStages.nonEmpty) {
allWholeStages.sortBy(cols => (cols.appIndex, cols.sqlID, cols.nodeID))
} else {
Seq.empty
Expand All @@ -269,7 +269,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
// Print SQL Plan Metrics
def getSQLPlanMetrics: Seq[SQLAccumProfileResults] = {
val sqlAccums = CollectInformation.generateSQLAccums(apps)
if (sqlAccums.size > 0) {
if (sqlAccums.nonEmpty) {
sqlAccums.sortBy(cols => (cols.appIndex, cols.sqlID, cols.nodeID,
cols.nodeName, cols.accumulatorId, cols.metricType))
} else {
Expand All @@ -286,11 +286,11 @@ object CollectInformation extends Logging {
def generateSQLAccums(apps: Seq[ApplicationInfo]): Seq[SQLAccumProfileResults] = {
val allRows = apps.flatMap { app =>
app.allSQLMetrics.map { metric =>
val sqlId = metric.sqlID
val jobsForSql = app.jobIdToInfo.filter { case (_, jc) =>
jc.sqlID.getOrElse(-1) == sqlId
// Avoid getOrElse to reduce memory allocations
jc.sqlID.isDefined && jc.sqlID.get == metric.sqlID
}
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSeq
val stageIdsForSQL = jobsForSql.flatMap(_._2.stageIds).toSet
val accumsOpt = app.taskStageAccumMap.get(metric.accumulatorId)
val taskMax = accumsOpt match {
case Some(accums) =>
Expand Down Expand Up @@ -326,7 +326,7 @@ object CollectInformation extends Logging {
val driverMax = driverAccumsOpt match {
case Some(accums) =>
val filtered = accums.filter { a =>
a.sqlID == sqlId
a.sqlID == metric.sqlID
}
val accumValues = filtered.map(_.value).sortWith(_ < _)
if (accumValues.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -144,7 +144,7 @@ case class QueryPlanWithMetrics(plan: SparkPlanInfoWithStage, metrics: Map[Long,
* Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the
* SparkPlan tree, and each edge represents a parent-child relationship between two nodes.
*/
case class SparkPlanGraph(
case class SparkPlanGraphForDot(
nodes: Seq[SparkPlanGraphNode],
edges: Seq[SparkPlanGraphEdge],
appId: String,
Expand Down Expand Up @@ -187,14 +187,14 @@ object SparkPlanGraph {
appId: String,
sqlId: String,
physicalPlan: String,
stageIdToStageMetrics: Map[Int, StageMetrics]): SparkPlanGraph = {
stageIdToStageMetrics: Map[Int, StageMetrics]): SparkPlanGraphForDot = {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
val exchanges = mutable.HashMap[SparkPlanInfoWithStage, SparkPlanGraphNode]()
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null, null, exchanges,
stageIdToStageMetrics)
new SparkPlanGraph(nodes, edges, appId, sqlId, physicalPlan)
SparkPlanGraphForDot(nodes, edges, appId, sqlId, physicalPlan)
}

@tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ class SQLExecutionInfoClass(
var duration: Option[Long],
var hasDatasetOrRDD: Boolean,
var problematic: String = "",
var sqlCpuTimePercent: Double = -1)
var sqlCpuTimePercent: Double = -1) {
def setDsOrRdd(value: Boolean): Unit = {
hasDatasetOrRDD = value
}
}

case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
nodeName: String, accumulatorId: Long, name: String, min: Long, median:Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
val app = new ApplicationInfo(hadoopConf, path, index)
EventLogPathProcessor.logApplicationInfo(app)
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}")
logInfo(s"Took ${endTime - startTime}ms to create App for ${path.eventLog.toString}")
Some(app)
} catch {
case _: com.fasterxml.jackson.core.JsonParseException =>
Expand All @@ -327,9 +327,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
* and returns the summary information. The summary information is much smaller than
* the ApplicationInfo because it has processed and combined many of the raw events.
*/
private def processApps(apps: Seq[ApplicationInfo], printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter): (ApplicationSummaryInfo,
Option[CompareSummaryInfo]) = {
private def processApps(
apps: Seq[ApplicationInfo],
printPlans: Boolean,
profileOutputWriter: ProfileOutputWriter)
: (ApplicationSummaryInfo, Option[CompareSummaryInfo]) = {
val startTime = System.currentTimeMillis()

val collect = new CollectInformation(apps)
val appInfo = collect.getAppInfo
Expand Down Expand Up @@ -403,7 +406,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
s"to $outputDir in $duration second(s)\n")
}
}
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps,
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to Process [${appInfo.head.appId}]")
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps,
rapidsJar, sqlMetrics, jsMetAgg, sqlTaskAggMetrics, durAndCpuMet, skewInfo,
failedTasks, failedStages, failedJobs, removedBMs, removedExecutors,
unsupportedOps, sparkProps, sqlStageInfo, wholeStage, maxTaskInputInfo,
Expand Down
Loading
Loading