Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Photon-specific SQL Metrics #1390

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.nvidia.spark.rapids.tool.analysis

import java.util.concurrent.TimeUnit

import scala.collection.mutable

import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult}

import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.store.TaskModel
import org.apache.spark.sql.rapids.tool.store.{AccumInfo, TaskModel}

/**
* Does analysis on the DataFrames from object of AppBase.
Expand Down Expand Up @@ -324,12 +327,62 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
*/
private def aggregateSparkMetricsByStageInternal(index: Int): Unit = {
// TODO: this has stage attempts. we should handle different attempts

// For Photon apps, peak memory and shuffle write time need to be calculated from accumulators
// instead of task metrics.
// Approach:
// 1. Collect accumulators for each metric type.
// 2. For each stage, retrieve the relevant accumulators and calculate aggregated values.
// Note:
// - A HashMap could be used instead of separate mutable.ArrayBuffer for each metric type,
// but avoiding it for readability.
val photonPeakMemoryAccumInfos = mutable.ArrayBuffer[AccumInfo]()
val photonShuffleWriteTimeAccumInfos = mutable.ArrayBuffer[AccumInfo]()

if (app.isPhoton) {
app.accumManager.applyToAccumInfoMap { accumInfo =>
accumInfo.infoRef.name.value match {
case name if name.contains(
DatabricksParseHelper.PHOTON_METRIC_PEAK_MEMORY_LABEL) =>
// Collect accumulators for peak memory
photonPeakMemoryAccumInfos += accumInfo
case name if name.contains(
DatabricksParseHelper.PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL) =>
// Collect accumulators for shuffle write time
photonShuffleWriteTimeAccumInfos += accumInfo
case _ => // Ignore other accumulators
}
}
}

app.stageManager.getAllStages.foreach { sm =>
// TODO: Should we only consider successful tasks?
val tasksInStage = app.taskManager.getTasks(sm.stageInfo.stageId,
sm.stageInfo.attemptNumber())
// count duplicate task attempts
val numAttempts = tasksInStage.size

val (peakMemoryMax, shuffleWriteTimeSum) = if (app.isPhoton) {
// For max peak memory, we need to look at the accumulators at the task level.
val peakMemoryValues = tasksInStage.flatMap { taskModel =>
photonPeakMemoryAccumInfos.flatMap { accumInfo =>
accumInfo.taskUpdatesMap.get(taskModel.taskId)
}
}
// For sum of shuffle write time, we need to look at the accumulators at the stage level.
val shuffleWriteValues = photonShuffleWriteTimeAccumInfos.flatMap { accumInfo =>
accumInfo.stageValuesMap.get(sm.stageInfo.stageId)
}
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
TimeUnit.NANOSECONDS.toMillis(shuffleWriteValues.sum))
} else {
// For non-Photon apps, use the task metrics directly.
val peakMemoryValues = tasksInStage.map(_.peakExecutionMemory)
val shuffleWriteTime = tasksInStage.map(_.sw_writeTime)
(AppSparkMetricsAnalyzer.maxWithEmptyHandling(peakMemoryValues),
shuffleWriteTime.sum)
}

val (durSum, durMax, durMin, durAvg) = AppSparkMetricsAnalyzer.getDurations(tasksInStage)
val stageRow = StageAggTaskMetricsProfileResult(index,
sm.stageInfo.stageId,
Expand All @@ -350,7 +403,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tasksInStage.map(_.memoryBytesSpilled).sum,
tasksInStage.map(_.output_bytesWritten).sum,
tasksInStage.map(_.output_recordsWritten).sum,
AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.peakExecutionMemory)),
peakMemoryMax,
tasksInStage.map(_.resultSerializationTime).sum,
AppSparkMetricsAnalyzer.maxWithEmptyHandling(tasksInStage.map(_.resultSize)),
tasksInStage.map(_.sr_fetchWaitTime).sum,
Expand All @@ -362,7 +415,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
tasksInStage.map(_.sr_totalBytesRead).sum,
tasksInStage.map(_.sw_bytesWritten).sum,
tasksInStage.map(_.sw_recordsWritten).sum,
tasksInStage.map(_.sw_writeTime).sum
shuffleWriteTimeSum
)
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, stageRow)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
package com.nvidia.spark.rapids.tool.planparser

import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal
import scala.util.matching.Regex

import com.nvidia.spark.rapids.tool.profiling.SQLAccumProfileResults
import com.nvidia.spark.rapids.tool.views.IoMetrics
import org.json4s.{DefaultFormats, Formats}
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.UnsupportedMetricNameException
import org.apache.spark.sql.rapids.tool.util.UTF8Source

// Utilities used to handle Databricks and Photon Ops
Expand All @@ -48,6 +52,13 @@ object DatabricksParseHelper extends Logging {
val SUB_PROP_JOB_ID = "JobId"
val SUB_PROP_RUN_NAME = "RunName"

// scalastyle:off
// Photon metric labels that are used as alternatives to Spark metric labels
val PHOTON_METRIC_CUMULATIVE_TIME_LABEL = "cumulative time" // Alternative for "scan time"
val PHOTON_METRIC_PEAK_MEMORY_LABEL = "peak memory usage" // Alternative for "peak execution memory"
val PHOTON_METRIC_SHUFFLE_WRITE_TIME_LABEL = "part of shuffle file write" // Alternative for "shuffle write time"
// scalastyle:on

private val PHOTON_PATTERN: Regex = "Photon[a-zA-Z]*".r
private val PHOTON_OPS_MAPPING_DIR = "photonOperatorMappings"
// TODO: Create separate mapping file for different Photon/Databricks versions
Expand Down Expand Up @@ -147,4 +158,21 @@ object DatabricksParseHelper extends Logging {
def mapPhotonToSpark(inputStr: String): String = {
PHOTON_PATTERN.replaceAllIn(inputStr, m => photonToSparkMapping.getOrElse(m.matched, m.matched))
}

/**
* Checks if 'accum' is a Photon I/O metric.
*/
def isPhotonIoMetric(accum: SQLAccumProfileResults): Boolean =
accum.name == PHOTON_METRIC_CUMULATIVE_TIME_LABEL && accum.nodeName.contains("Scan")

/**
* Updates the I/O metrics for Photon apps based on the accumulator values.
*/
def updatePhotonIoMetric(accum: SQLAccumProfileResults, ioMetrics: IoMetrics): Unit = {
accum.name match {
case PHOTON_METRIC_CUMULATIVE_TIME_LABEL if accum.nodeName.contains("Scan") =>
ioMetrics.scanTime = TimeUnit.NANOSECONDS.toMillis(accum.total)
case _ => throw UnsupportedMetricNameException(accum.name)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.nvidia.spark.rapids.tool.views
import scala.collection.Seq

import com.nvidia.spark.rapids.tool.analysis.{ProfAppIndexMapperTrait, QualAppIndexMapperTrait}
import com.nvidia.spark.rapids.tool.planparser.DatabricksParseHelper
import com.nvidia.spark.rapids.tool.profiling.{DataSourceProfileResult, SQLAccumProfileResults}
import com.nvidia.spark.rapids.tool.qualification.QualSQLPlanAnalyzer

Expand All @@ -40,6 +41,12 @@ object IoMetrics {
val DECODE_TIME_LABEL = "GPU decode time"

val EMPTY_IO_METRICS: IoMetrics = IoMetrics(0, 0, 0, 0)

/**
* Get all labels for IoMetrics
*/
def getAllLabels: Seq[String] = Seq(
BUFFER_TIME_LABEL, SCAN_TIME_LABEL, DATA_SIZE_LABEL, DECODE_TIME_LABEL)
}

trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] {
Expand All @@ -48,11 +55,13 @@ trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] {
private def getIoMetrics(sqlAccums: Seq[SQLAccumProfileResults]): IoMetrics = {
val finalRes = IoMetrics(0, 0, 0, 0)
try {
sqlAccums.map(accum => accum.name match {
sqlAccums.foreach(accum => accum.name match {
case IoMetrics.BUFFER_TIME_LABEL => finalRes.bufferTime = accum.total
case IoMetrics.SCAN_TIME_LABEL => finalRes.scanTime = accum.total
case IoMetrics.DATA_SIZE_LABEL => finalRes.dataSize = accum.total
case IoMetrics.DECODE_TIME_LABEL => finalRes.decodeTime = accum.total
case _ if DatabricksParseHelper.isPhotonIoMetric(accum) =>
DatabricksParseHelper.updatePhotonIoMetric(accum, finalRes)
case _ => throw UnsupportedMetricNameException(accum.name)
})
} catch {
Expand Down Expand Up @@ -98,11 +107,9 @@ trait AppDataSourceViewTrait extends ViewableTrait[DataSourceProfileResult] {
index: Int,
appSqlAccums: Seq[SQLAccumProfileResults]): Seq[DataSourceProfileResult] = {
// Filter appSqlAccums to get only required metrics
val dataSourceMetrics = appSqlAccums.filter(
sqlAccum => sqlAccum.name.contains(IoMetrics.BUFFER_TIME_LABEL)
|| sqlAccum.name.contains(IoMetrics.SCAN_TIME_LABEL)
|| sqlAccum.name.contains(IoMetrics.DECODE_TIME_LABEL)
|| sqlAccum.name.equals(IoMetrics.DATA_SIZE_LABEL))
val dataSourceMetrics = appSqlAccums.filter(sqlAccum =>
IoMetrics.getAllLabels.contains(sqlAccum.name) ||
app.isPhoton && DatabricksParseHelper.isPhotonIoMetric(sqlAccum))

val dsFromLastPlan = app.dataSourceInfo.map { ds =>
val sqlIdtoDs = dataSourceMetrics.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ class AccumManager {
def getMaxStageValue(id: Long): Option[Long] = {
accumInfoMap.get(id).map(_.getMaxStageValue.get)
}

/**
* Applies the function `f` to each AccumInfo in the accumInfoMap.
*/
def applyToAccumInfoMap(f: AccumInfo => Unit): Unit = {
accumInfoMap.values.foreach(f)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
appIndex,jobId,numTasks,Duration,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum
1,48,431,237976,0,371230,1032,333,861.3,343288,1261,1286,367785,190131859,8639936081,160,0,0,0,169667947,21,15767,8,3,15657,3,15657,0,31314,18964,431,16
1,47,432,214657,0,376777,1133,499,872.2,346230,1272,1310,373271,14007280,8639936081,144,0,0,0,159530057,11,15577,4,3,15657,3,15657,0,31314,19008,432,20
1,46,433,191440,0,457364,3323,391,1056.3,352763,1508,2509,451358,5242628040,8639936081,1912,0,0,0,250840493,9,16203,551,3,15657,3,15657,0,31314,19052,433,16
1,49,1,186241,0,266,266,266,266.0,86,1,1,261,0,0,0,0,0,0,138414192,0,5344,10,209,9196,222,9768,0,18964,44,1,0
1,45,433,166081,0,415849,1448,339,960.4,349795,1302,1375,412139,2276478144,8639936081,568,0,0,0,195992906,2,15780,7,3,15657,3,15657,0,31314,19052,433,34
1,44,431,139667,0,398973,1403,365,925.7,354119,1281,1327,395265,1075691986,8639936081,328,0,0,0,188587155,0,15767,10,3,15657,3,15657,0,31314,18964,431,17
1,50,1,122711,0,267,267,267,267.0,71,1,1,262,0,0,0,0,0,0,138414192,0,5343,58,219,9636,213,9372,0,19008,44,1,0
1,43,432,114755,0,403652,1369,329,934.4,353325,1290,1326,399766,1395949742,8639936081,624,0,0,0,201771890,13,15767,14,3,15657,3,15657,0,31314,19008,432,16
1,51,1,97958,0,386,386,386,386.0,60,1,1,381,0,0,0,0,0,0,138414192,0,5343,154,221,9724,210,9240,0,18964,44,1,0
1,42,431,89634,0,616500,1899,589,1430.4,378074,1330,1515,612098,16461920726,8639936081,4132,0,0,0,216740322,23,15805,10,3,15657,3,15657,0,31314,18964,431,16
1,52,1,71718,0,384,384,384,384.0,54,1,1,379,0,0,0,0,0,0,138414192,0,5343,170,223,9812,210,9240,0,19052,44,1,0
1,41,431,51085,0,759623,2321,918,1762.5,394996,1460,2027,754015,26337468742,8639936081,7772,0,0,0,250648581,87,16157,170,3,15657,3,15657,0,31314,18964,431,19
1,53,1,46297,0,136,136,136,136.0,57,1,1,131,0,0,0,0,0,0,138414192,0,5344,0,214,9416,219,9636,0,19052,44,1,0
1,54,1,23051,0,340,340,340,340.0,36,1,1,334,0,0,0,0,0,0,138414192,0,5343,223,215,9460,217,9548,0,19008,44,1,0
1,31,1,6979,0,6738,6738,6738,6738.0,5104,128,688,6035,349526,86400,53,0,0,0,155563380,1,10759,0,0,0,0,0,0,0,7239,1800,0
1,34,1,6953,0,6725,6725,6725,6725.0,479,185,677,6036,349526,86400,53,0,0,0,155563380,0,9814,0,0,0,0,0,0,0,7239,1800,0
1,33,1,6940,0,6729,6729,6729,6729.0,206,216,679,6035,349526,86400,53,0,0,0,155563380,1,9896,0,0,0,0,0,0,0,7239,1800,0
1,35,1,6925,0,6729,6729,6729,6729.0,157,136,681,6035,12261,1350,53,0,0,0,155199546,1,9839,0,0,0,0,0,0,0,699,165,0
1,38,1,6855,0,6743,6743,6743,6743.0,187,256,688,6035,349526,86400,53,0,0,0,155563380,1,9927,0,0,0,0,0,0,0,7239,1800,0
1,0,1,6033,0,5699,5699,5699,5699.0,422,948,1114,4382,0,0,37,0,0,0,0,8,2794,0,0,0,0,0,0,0,0,0,0
1,13,200,5707,0,87661,966,349,438.3,9821,427,951,84265,0,0,144,0,0,0,0,9,6258,0,0,0,0,0,0,0,0,0,0
1,23,200,5479,0,84240,490,355,421.2,5290,200,214,82784,0,0,136,0,0,0,0,0,6214,0,0,0,0,0,0,0,0,0,0
1,21,200,5271,0,80904,485,353,404.5,6004,203,220,79384,0,0,136,0,0,0,0,1,6302,0,0,0,0,0,0,0,0,0,0
1,27,200,4728,0,70760,442,309,353.8,4042,200,209,69494,0,0,152,0,0,0,0,10,5788,0,0,0,0,0,0,0,0,0,0
1,3,1,4708,0,4693,4693,4693,4693.0,280,701,804,3796,0,0,26,0,0,0,0,7,2834,0,0,0,0,0,0,0,0,0,0
1,25,200,4603,0,70379,569,314,351.9,4106,200,216,69040,0,0,168,0,0,0,0,14,5708,0,0,0,0,0,0,0,0,0,0
1,36,1,4556,0,4332,4332,4332,4332.0,3359,95,401,3907,30328,7200,39,0,0,0,155245068,1,10552,0,0,0,0,0,0,0,7719,1920,0
1,29,200,4555,0,69682,423,310,348.4,3730,200,218,68521,0,0,168,0,0,0,0,9,5748,0,0,0,0,0,0,0,0,0,0
1,32,1,4515,0,4334,4334,4334,4334.0,260,130,404,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,39,1,4488,0,4322,4322,4322,4322.0,112,124,392,3907,349526,86400,39,0,0,0,155563380,1,9926,0,0,0,0,0,0,0,7239,1800,0
1,37,1,4481,0,4334,4334,4334,4334.0,136,144,405,3907,349526,86400,39,0,0,0,155563380,1,9851,0,0,0,0,0,0,0,7239,1800,0
1,40,1,4476,0,4327,4327,4327,4327.0,98,147,394,3907,349526,86400,39,0,0,0,155563380,1,9895,0,0,0,0,0,0,0,7239,1800,0
1,56,1,1055,0,1022,1022,1022,1022.0,758,77,95,901,0,0,0,0,0,0,134218344,6,10091,5,218,9592,220,9680,0,19272,0,0,0
1,19,200,803,0,11895,145,38,59.5,943,209,252,10017,0,0,56,0,0,0,0,22,2739,0,0,0,0,0,0,0,0,0,0
1,26,1,316,0,312,312,312,312.0,2,1,1,306,0,0,0,0,0,0,0,0,3777,0,0,0,0,0,0,0,0,0,0
1,2,1,280,0,267,267,267,267.0,6,4,4,124,0,0,0,0,0,0,0,1,3342,0,0,0,0,0,0,0,0,0,0
1,11,1,264,0,254,254,254,254.0,5,3,3,241,0,0,0,0,0,0,0,0,2913,0,0,0,0,0,0,0,0,0,0
1,7,1,240,0,227,227,227,227.0,5,4,4,213,0,0,114,0,0,0,0,0,2206,0,0,0,0,0,0,0,0,0,0
1,1,1,209,0,173,173,173,173.0,28,5,6,152,0,0,0,0,0,0,0,0,2506,0,0,0,0,0,0,0,0,0,0
1,5,1,179,0,165,165,165,165.0,4,4,4,151,0,0,0,0,0,0,0,0,2475,0,0,0,0,0,0,0,0,0,0
1,14,1,151,0,143,143,143,143.0,3,1,1,132,0,0,0,0,0,0,0,1,3120,0,0,0,0,0,0,0,0,0,0
1,4,1,147,0,139,139,139,139.0,22,5,6,121,0,0,0,0,0,0,0,0,2334,0,0,0,0,0,0,0,0,0,0
1,20,1,141,0,137,137,137,137.0,1,1,1,130,0,0,0,0,0,0,0,0,2170,0,0,0,0,0,0,0,0,0,0
1,28,1,140,0,136,136,136,136.0,2,1,1,130,0,0,0,0,0,0,0,0,3784,0,0,0,0,0,0,0,0,0,0
1,18,1,129,0,124,124,124,124.0,2,1,1,116,0,0,0,0,0,0,0,0,2501,0,0,0,0,0,0,0,0,0,0
1,16,1,125,0,117,117,117,117.0,2,1,1,108,0,0,0,0,0,0,0,0,2758,0,0,0,0,0,0,0,0,0,0
1,6,1,123,0,113,113,113,113.0,4,3,3,100,0,0,0,0,0,0,0,0,2208,0,0,0,0,0,0,0,0,0,0
1,10,1,120,0,110,110,110,110.0,6,3,3,98,0,0,0,0,0,0,0,0,3565,0,0,0,0,0,0,0,0,0,0
1,9,1,114,0,104,104,104,104.0,5,3,3,90,0,0,0,0,0,0,0,1,3514,0,0,0,0,0,0,0,0,0,0
1,12,1,105,0,85,85,85,85.0,4,3,3,72,0,0,0,0,0,0,0,0,3369,0,0,0,0,0,0,0,0,0,0
1,17,1,103,0,97,97,97,97.0,2,2,2,89,0,0,0,0,0,0,0,0,3003,0,0,0,0,0,0,0,0,0,0
1,8,1,102,0,95,95,95,95.0,4,3,3,82,0,0,0,0,0,0,0,0,3142,0,0,0,0,0,0,0,0,0,0
1,30,1,73,0,67,67,67,67.0,2,1,1,62,0,0,0,0,0,0,0,0,3199,0,0,0,0,0,0,0,0,0,0
1,24,1,72,0,59,59,59,59.0,2,1,1,51,0,0,0,0,0,0,0,0,3288,0,0,0,0,0,0,0,0,0,0
1,22,1,70,0,65,65,65,65.0,2,1,1,59,0,0,0,0,0,0,0,0,3436,0,0,0,0,0,0,0,0,0,0
1,55,1,65,0,54,54,54,54.0,27,1,1,49,0,0,0,0,0,0,138414192,0,5343,0,216,9504,215,9460,0,18964,44,1,0
1,15,1,64,0,58,58,58,58.0,2,1,1,50,0,0,0,0,0,0,0,0,2306,0,0,0,0,0,0,0,0,0,0
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
appIndex,appID,sqlID,description,numTasks,Duration,executorCPUTime,executorRunTime,executorCPURatio,diskBytesSpilled_sum,duration_sum,duration_max,duration_min,duration_avg,executorCPUTime_sum,executorDeserializeCPUTime_sum,executorDeserializeTime_sum,executorRunTime_sum,input_bytesRead_sum,input_recordsRead_sum,jvmGCTime_sum,memoryBytesSpilled_sum,output_bytesWritten_sum,output_recordsWritten_sum,peakExecutionMemory_max,resultSerializationTime_sum,resultSize_max,sr_fetchWaitTime_sum,sr_localBlocksFetched_sum,sr_localBytesRead_sum,sr_remoteBlocksFetched_sum,sr_remoteBytesRead_sum,sr_remoteBytesReadToDisk_sum,sr_totalBytesRead_sum,sw_bytesWritten_sum,sw_recordsWritten_sum,sw_writeTime_sum
1,"app-20240919162642-0000",26,"query88",3472,250542,2883837,3818106,75.53,0,3858136,6743,54,1111.2,2883837,12349,18186,3818106,52997115316,69120188398,16100,0,0,0,250840493,181,16203,1394,1759,201596,1750,201200,0,402796,218614,19946,154
Loading