Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Stage info code between Q/P tools #971

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ object SQLPlanParser extends Logging {
def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
nodeAccums.flatMap { nodeAccumId =>
app.accumulatorToStages.get(nodeAccumId)
}.flatten.toSet
app.stageManager.getStagesIdsByAccumId(nodeAccumId)
}.toSet
}

// Set containing execs that refers to other expressions. We need this to be a list to allow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.nvidia.spark.rapids.tool.profiling

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.rapids.tool.ToolUtils
Expand All @@ -37,180 +38,127 @@ class Analysis(apps: Seq[ApplicationInfo]) {
}
}

private def maxWithEmptyHandling(arr: ArrayBuffer[Long]): Long = {
private def maxWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
} else {
arr.max
}
}

private def minWithEmptyHandling(arr: Iterable[Long]): Long = {
if (arr.isEmpty) {
0L
} else {
arr.min
}
}

// Job + Stage Level TaskMetrics Aggregation
def jobAndStageMetricsAggregation(): Seq[JobStageAggTaskMetricsProfileResult] = {
val allJobRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (id, jc) =>
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
jc.stageIds.contains(sid)
}.keys.map(_._1).toSet
if (stagesInJob.isEmpty) {
None
} else {
val tasksInJob = app.taskEnd.filter { tc =>
stagesInJob.contains(tc.stageId)
}
// count duplicate task attempts
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInJob)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"job_$id",
tasksInJob.size,
jc.duration,
tasksInJob.map(_.diskBytesSpilled).sum,
durSum,
durMax,
durMin,
durAvg,
tasksInJob.map(_.executorCPUTime).sum,
tasksInJob.map(_.executorDeserializeCPUTime).sum,
tasksInJob.map(_.executorDeserializeTime).sum,
tasksInJob.map(_.executorRunTime).sum,
tasksInJob.map(_.input_bytesRead).sum,
tasksInJob.map(_.input_recordsRead).sum,
tasksInJob.map(_.jvmGCTime).sum,
tasksInJob.map(_.memoryBytesSpilled).sum,
tasksInJob.map(_.output_bytesWritten).sum,
tasksInJob.map(_.output_recordsWritten).sum,
maxWithEmptyHandling(tasksInJob.map(_.peakExecutionMemory)),
tasksInJob.map(_.resultSerializationTime).sum,
maxWithEmptyHandling(tasksInJob.map(_.resultSize)),
tasksInJob.map(_.sr_fetchWaitTime).sum,
tasksInJob.map(_.sr_localBlocksFetched).sum,
tasksInJob.map(_.sr_localBytesRead).sum,
tasksInJob.map(_.sr_remoteBlocksFetched).sum,
tasksInJob.map(_.sr_remoteBytesRead).sum,
tasksInJob.map(_.sr_remoteBytesReadToDisk).sum,
tasksInJob.map(_.sr_totalBytesRead).sum,
tasksInJob.map(_.sw_bytesWritten).sum,
tasksInJob.map(_.sw_recordsWritten).sum,
tasksInJob.map(_.sw_writeTime).sum
))
// first get all stage aggregated levels
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old code used to iterate on all jobs to get the stages, then iterate on all tasks within each stage to aggregate. This will create all the jobs rows.
Then it will do the same sequence to get all the stage rows.
This is clearly very time consuming.
Instead the new code does the following:

  • Loop on all the stages and aggreate them.
  • Cache the results in a hashMap.
  • Loop on all the jobs and aggregate the values cached within the hashMap.

val allRows = apps.flatMap { app =>
// create a cache of stage rows to be used by the job aggregator
val cachedStageRows = new mutable.LinkedHashMap[Int, JobStageAggTaskMetricsProfileResult]()
app.stageManager.getAllStages.foreach { case sm =>
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == sm.sId && tc.stageAttemptId == sm.attemptId
}
// count duplicate task attempts
val numAttempts = tasksInStage.size
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInStage)
val stageRow = JobStageAggTaskMetricsProfileResult(app.index,
s"stage_${sm.sId}",
numAttempts,
sm.duration,
tasksInStage.map(_.diskBytesSpilled).sum,
durSum,
durMax,
durMin,
durAvg,
tasksInStage.map(_.executorCPUTime).sum,
tasksInStage.map(_.executorDeserializeCPUTime).sum,
tasksInStage.map(_.executorDeserializeTime).sum,
tasksInStage.map(_.executorRunTime).sum,
tasksInStage.map(_.input_bytesRead).sum,
tasksInStage.map(_.input_recordsRead).sum,
tasksInStage.map(_.jvmGCTime).sum,
tasksInStage.map(_.memoryBytesSpilled).sum,
tasksInStage.map(_.output_bytesWritten).sum,
tasksInStage.map(_.output_recordsWritten).sum,
maxWithEmptyHandling(tasksInStage.map(_.peakExecutionMemory)),
tasksInStage.map(_.resultSerializationTime).sum,
maxWithEmptyHandling(tasksInStage.map(_.resultSize)),
tasksInStage.map(_.sr_fetchWaitTime).sum,
tasksInStage.map(_.sr_localBlocksFetched).sum,
tasksInStage.map(_.sr_localBytesRead).sum,
tasksInStage.map(_.sr_remoteBlocksFetched).sum,
tasksInStage.map(_.sr_remoteBytesRead).sum,
tasksInStage.map(_.sr_remoteBytesReadToDisk).sum,
tasksInStage.map(_.sr_totalBytesRead).sum,
tasksInStage.map(_.sw_bytesWritten).sum,
tasksInStage.map(_.sw_recordsWritten).sum,
tasksInStage.map(_.sw_writeTime).sum
)
// cache the stage row to be used later
cachedStageRows.put(sm.sId, stageRow)
}
}
val allJobStageRows = apps.flatMap { app =>
app.jobIdToInfo.flatMap { case (_, jc) =>
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) =>
jc.stageIds.contains(sid)
}
if (stagesInJob.isEmpty) {
// Aggregate all the stages by job
val jobRows = app.jobIdToInfo.map { case (id, jc) =>
if (jc.stageIds.isEmpty) {
None
} else {
stagesInJob.map { case ((id, _), sc) =>
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == id
}
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInStage)
val profResultsInJob = cachedStageRows.filterKeys(jc.stageIds.contains).values
if (profResultsInJob.isEmpty) {
None
} else {
// Recalculate the duration sum, max, min, avg for the job based on the cached
// stage Profiling results
val tasksInJob = profResultsInJob.map(_.numTasks).sum
val durSum = profResultsInJob.map(_.durationSum).sum
val durMax = maxWithEmptyHandling(profResultsInJob.map(_.durationMax))
val durMin = minWithEmptyHandling(profResultsInJob.map(_.durationMin))
val durAvg = ToolUtils.calculateAverage(durSum, tasksInJob, 1)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"stage_$id",
tasksInStage.size,
sc.duration,
tasksInStage.map(_.diskBytesSpilled).sum,
s"job_$id",
tasksInJob,
jc.duration,
profResultsInJob.map(_.diskBytesSpilledSum).sum,
durSum,
durMax,
durMin,
durAvg,
tasksInStage.map(_.executorCPUTime).sum,
tasksInStage.map(_.executorDeserializeCPUTime).sum,
tasksInStage.map(_.executorDeserializeTime).sum,
tasksInStage.map(_.executorRunTime).sum,
tasksInStage.map(_.input_bytesRead).sum,
tasksInStage.map(_.input_recordsRead).sum,
tasksInStage.map(_.jvmGCTime).sum,
tasksInStage.map(_.memoryBytesSpilled).sum,
tasksInStage.map(_.output_bytesWritten).sum,
tasksInStage.map(_.output_recordsWritten).sum,
maxWithEmptyHandling(tasksInStage.map(_.peakExecutionMemory)),
tasksInStage.map(_.resultSerializationTime).sum,
maxWithEmptyHandling(tasksInStage.map(_.resultSize)),
tasksInStage.map(_.sr_fetchWaitTime).sum,
tasksInStage.map(_.sr_localBlocksFetched).sum,
tasksInStage.map(_.sr_localBytesRead).sum,
tasksInStage.map(_.sr_remoteBlocksFetched).sum,
tasksInStage.map(_.sr_remoteBytesRead).sum,
tasksInStage.map(_.sr_remoteBytesReadToDisk).sum,
tasksInStage.map(_.sr_totalBytesRead).sum,
tasksInStage.map(_.sw_bytesWritten).sum,
tasksInStage.map(_.sw_recordsWritten).sum,
tasksInStage.map(_.sw_writeTime).sum
))
}
}
}
}
// stages that are missing from a job, perhaps dropped events
val stagesWithoutJobs = apps.flatMap { app =>
val allStageInJobs = app.jobIdToInfo.flatMap { case (_, jc) =>
app.stageIdToInfo.filterKeys { case (sid, _) =>
jc.stageIds.contains(sid)
}
}
val missing = app.stageIdToInfo.keys.toSet.diff(allStageInJobs.keys.toSet)
if (missing.isEmpty) {
Seq.empty
} else {
missing.map { case (id, saId) =>
val scOpt = app.stageIdToInfo.get((id, saId))
scOpt match {
case None =>
None
case Some(sc) =>
val tasksInStage = app.taskEnd.filter { tc =>
tc.stageId == id
}
// count duplicate task attempts
val numAttempts = tasksInStage.size
val (durSum, durMax, durMin, durAvg) = getDurations(tasksInStage)
Some(JobStageAggTaskMetricsProfileResult(app.index,
s"stage_$id",
numAttempts,
sc.duration,
tasksInStage.map(_.diskBytesSpilled).sum,
durSum,
durMax,
durMin,
durAvg,
tasksInStage.map(_.executorCPUTime).sum,
tasksInStage.map(_.executorDeserializeCPUTime).sum,
tasksInStage.map(_.executorDeserializeTime).sum,
tasksInStage.map(_.executorRunTime).sum,
tasksInStage.map(_.input_bytesRead).sum,
tasksInStage.map(_.input_recordsRead).sum,
tasksInStage.map(_.jvmGCTime).sum,
tasksInStage.map(_.memoryBytesSpilled).sum,
tasksInStage.map(_.output_bytesWritten).sum,
tasksInStage.map(_.output_recordsWritten).sum,
maxWithEmptyHandling(tasksInStage.map(_.peakExecutionMemory)),
tasksInStage.map(_.resultSerializationTime).sum,
maxWithEmptyHandling(tasksInStage.map(_.resultSize)),
tasksInStage.map(_.sr_fetchWaitTime).sum,
tasksInStage.map(_.sr_localBlocksFetched).sum,
tasksInStage.map(_.sr_localBytesRead).sum,
tasksInStage.map(_.sr_remoteBlocksFetched).sum,
tasksInStage.map(_.sr_remoteBytesRead).sum,
tasksInStage.map(_.sr_remoteBytesReadToDisk).sum,
tasksInStage.map(_.sr_totalBytesRead).sum,
tasksInStage.map(_.sw_bytesWritten).sum,
tasksInStage.map(_.sw_recordsWritten).sum,
tasksInStage.map(_.sw_writeTime).sum
))
profResultsInJob.map(_.executorCPUTimeSum).sum,
profResultsInJob.map(_.executorDeserializeCpuTimeSum).sum,
profResultsInJob.map(_.executorDeserializeTimeSum).sum,
profResultsInJob.map(_.executorRunTimeSum).sum,
profResultsInJob.map(_.inputBytesReadSum).sum,
profResultsInJob.map(_.inputRecordsReadSum).sum,
profResultsInJob.map(_.jvmGCTimeSum).sum,
profResultsInJob.map(_.memoryBytesSpilledSum).sum,
profResultsInJob.map(_.outputBytesWrittenSum).sum,
profResultsInJob.map(_.outputRecordsWrittenSum).sum,
maxWithEmptyHandling(profResultsInJob.map(_.peakExecutionMemoryMax)),
profResultsInJob.map(_.resultSerializationTimeSum).sum,
maxWithEmptyHandling(profResultsInJob.map(_.resultSizeMax)),
profResultsInJob.map(_.srFetchWaitTimeSum).sum,
profResultsInJob.map(_.srLocalBlocksFetchedSum).sum,
profResultsInJob.map(_.srcLocalBytesReadSum).sum,
profResultsInJob.map(_.srRemoteBlocksFetchSum).sum,
profResultsInJob.map(_.srRemoteBytesReadSum).sum,
profResultsInJob.map(_.srRemoteBytesReadToDiskSum).sum,
profResultsInJob.map(_.srTotalBytesReadSum).sum,
profResultsInJob.map(_.swBytesWrittenSum).sum,
profResultsInJob.map(_.swRecordsWrittenSum).sum,
profResultsInJob.map(_.swWriteTimeSum).sum))
}
}
}
cachedStageRows.values ++ jobRows.flatMap(row => row)
}

val allRows = allJobRows ++ allJobStageRows ++ stagesWithoutJobs
val filteredRows = allRows.flatMap(row => row)
if (filteredRows.nonEmpty) {
val sortedRows = filteredRows.sortBy { cols =>
if (allRows.nonEmpty) {
val sortedRows = allRows.sortBy { cols =>
val sortDur = cols.duration.getOrElse(0L)
(cols.appIndex, -sortDur, cols.id)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,8 @@ class CompareApplications(apps: Seq[ApplicationInfo]) extends Logging {
def findMatchingStages(): (Seq[CompareProfileResults], Seq[CompareProfileResults]) = {
val normalizedByAppId = apps.map { app =>
val normalized = app.sqlPlans.mapValues { plan =>
SparkPlanInfoWithStage(plan, app.accumIdToStageId).normalizeForStageComparison
SparkPlanInfoWithStage(plan,
app.stageManager.reduceAccumMapping()).normalizeForStageComparison
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reduceAccumMapping() is a hack to get the generateDot to work with the 1-to-M map.
GenerateDot is rarely used and fixing the implementation to be 1-to-M is going to bloat the PR

}
(app.appId, normalized)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object GenerateDot {
val accumSummary = accums.map { a =>
Seq(a.sqlID, a.accumulatorId, a.total)
}
val accumIdToStageId = app.accumIdToStageId
val accumIdToStageId = app.stageManager.reduceAccumMapping()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a hack to get the generateDot to work with the 1-to-M map.
GenerateDot is rarely used and fixing the implementation to be 1-to-M is going to bloat the PR

val formatter = java.text.NumberFormat.getIntegerInstance
val stageIdToStageMetrics = app.taskEnd.groupBy(task => task.stageId).mapValues { tasks =>
val durations = tasks.map(_.duration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -325,8 +325,8 @@ object GenerateTimeline {
maxEndTime = Math.max(finishTime, maxEndTime)
}

val allStageIds = app.stageIdToInfo.keys.map(_._1).toList
allStageIds.distinct.sorted.foreach { stageId =>
val allStageIds = app.stageManager.getAllStageIds.toSeq
allStageIds.sorted.foreach { stageId =>
stageIdToColor.getOrElseUpdate(stageId, {
val color = COLORS(colorIndex % COLORS.length)
colorIndex += 1
Expand All @@ -345,11 +345,11 @@ object GenerateTimeline {
new TimelineStageInfo(stageId, start, end, end-start)
}

val stageInfo = app.stageIdToInfo.map { case ((_, _), sc) =>
val stageId = sc.info.stageId
val submissionTime = sc.info.submissionTime.get
val completionTime = sc.completionTime.get
val duration = sc.duration.get
val stageInfo = app.stageManager.getAllStages.map { case sm =>
val stageId = sm.sId
val submissionTime = sm.sInfo.submissionTime.get
val completionTime = sm.sInfo.completionTime.get
val duration = sm.getDuration
minStartTime = Math.min(minStartTime, submissionTime)
maxEndTime = Math.max(maxEndTime, completionTime)
new TimelineStageInfo(stageId, submissionTime, completionTime, duration)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,13 +43,11 @@ class HealthCheck(apps: Seq[ApplicationInfo]) {

def getFailedStages: Seq[FailedStagesProfileResults] = {
val failed = apps.flatMap { app =>
val stagesFailed = app.stageIdToInfo.filter { case (_, sc) =>
sc.failureReason.nonEmpty
}
stagesFailed.map { case ((id, attId), sc) =>
val failureStr = sc.failureReason.getOrElse("")
FailedStagesProfileResults(app.index, id, attId,
sc.info.name, sc.info.numTasks,
val stagesFailed = app.stageManager.getFailedStages
stagesFailed.map { case fsm =>
val failureStr = fsm.getFailureReason
FailedStagesProfileResults(app.index, fsm.sId, fsm.attemptId,
fsm.sInfo.name, fsm.sInfo.numTasks,
ProfileUtils.truncateFailureStr(failureStr))
}
}
Expand Down
Loading