-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix implementation of processSQLPlanMetrics in Profiler #853
Conversation
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]> Fixes NVIDIA#851 - The implementation of `processSQLPlanMetrics` to parameterize the information based on the SqlID - Added a buffer `SqlPlanInfoGraphBuffer` to capture the construction of the SprkSqlPlan. Otherwise, the graph we constructed multiple times which was not efficient - Changed JDOC in AutoTuner to cleanup build warning. - Refactoring the implementation of `jobAndStageMetricsAggregation` reduced both Mem and CPU by 20% - Made a few changes in `jobAndStageMetricsAggregation` which reduced the total memory allcated by this method - Changed the type of local variables from `Seq` to `Set` - Removed some local variables that were causing memory bloats - Renamed `GenerateDot.SparkPlanGraph` to `SparkPlanGraphForDot` because it was conflicting with Spark.SparkPlanGraph class in the imports - Fixed unit test - Added a new Set to keep track of missing event classes which reduces the noise in the log files.
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
val stagesInJob = app.stageIdToInfo.filterKeys { case (sid, _) => | ||
stageIdsInJob.contains(sid) | ||
}.keys.map(_._1).toSeq | ||
jc.stageIds.contains(sid) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val stageIdsInJob = jc.stageIds
was removed to save memory compared to directly accessing jc.stageIds
stageIdsInJob.contains(sid) | ||
}.keys.map(_._1).toSeq | ||
jc.stageIds.contains(sid) | ||
}.keys.map(_._1).toSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use toSet
instead of toSeq
because it is going to be used mainly for lookup.
} | ||
} | ||
val missing = app.stageIdToInfo.keys.toSeq.diff(allStageinJobs.keys.toSeq) | ||
val missing = app.stageIdToInfo.keys.toSet.diff(allStageInJobs.keys.toSet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets are more time efficient compared to sequences
@@ -231,12 +227,12 @@ class Analysis(apps: Seq[ApplicationInfo]) { | |||
val allRows = apps.flatMap { app => | |||
app.sqlIdToInfo.map { case (sqlId, sqlCase) => | |||
val jcs = app.jobIdToInfo.filter { case (_, jc) => | |||
jc.sqlID.getOrElse(-1) == sqlId | |||
jc.sqlID.isDefined && jc.sqlID.get == sqlId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getOrElse
implies that the VM allocates memory even if the jc.sqlID
is not defined. This tends to be very expensive in large structures.
Since this is a predicate filter, it is more memory efficient to get the sqlID only if it is defined.
val allMetaWithSchema = getPlanMetaWithSchema(planInfo) | ||
val planGraph = ToolsPlanGraph(planInfo) | ||
val allNodes = planGraph.allNodes | ||
val allMetaWithSchema = getPlanMetaWithSchema(sqlPlanInfoGraph.planInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the cached graph instead of reconstructing it again.
val allNodes = planGraph.allNodes | ||
val allMetaWithSchema = getPlanMetaWithSchema(sqlPlanInfoGraph.planInfo) | ||
val allNodes = sqlPlanInfoGraph.sparkPlanGraph.allNodes | ||
val results = ArrayBuffer[DataSourceCase]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cache the results to be returned to the caller. Otherwise, the original code was accessing the global data structure to extract what we have just calculated here.
} | ||
|
||
// This will find scans for DataSource V2, if the schema is very large it | ||
// will likely be incomplete and have ... at the end. | ||
protected def checkGraphNodeForReads(sqlID: Long, node: SparkPlanGraphNode): Unit = { | ||
protected def checkGraphNodeForReads( | ||
sqlID: Long, node: SparkPlanGraphNode): Option[DataSourceCase] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the datasource to be used by the caller instead of querying the global data structure.
} | ||
connectOperatorToStage(createGraphFunc) | ||
for (sqlPIGEntry <- sqlPlanInfoBuffer.sqlPlanInfoGraphs) { | ||
var sqlIsDsOrRDD = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used to avoid accessing global sqlIDToDataSetOrRDDCase
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @amahussein ! This helps in reducing the runtime of Profile tool. Agree that we can make some more improvements. Do you plan on creating followon issue for the TODO's mentioned in the description?
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Analysis.scala
Outdated
Show resolved
Hide resolved
Thanks @nartal1 ! |
Signed-off-by: Ahmed Hussein (amahussein) <[email protected]>
Signed-off-by: Ahmed Hussein (amahussein) [email protected]
Fixes #851
There are still more opportunities to improve the performance but I limited the scope of this fix to get the highest gains with minimum efforts.
processSQLPlanMetrics
to parameterize the information based on the SqlIDSqlPlanInfoGraphBuffer
to capture the construction of the SprkSqlPlan. Otherwise, the graph we constructed multiple times which was not efficientjobAndStageMetricsAggregation
reduced both Mem and CPU by 20%jobAndStageMetricsAggregation
which reduced the total memory allcated by this methodSeq
toSet
GenerateDot.SparkPlanGraph
toSparkPlanGraphForDot
because it was conflicting with Spark.SparkPlanGraph class in the importsOverall Impact
After Changes:
Before Changes:
The below snapshot indicates the frequency of GC and the idleness of CPU
Does this affect the end user?
Yes.
sql_duration_and_executor_cpu_time_percent.csv
is different after fixing the bugDoes this break the nightly builds?
Yes.
Some of generated output files have changed.
Does this require additional followups?
There is still a room to improve the efficiency of the core. Some of the ToDos:
Analysis.scala
where we keep extracting the Jobs/tasks in every function.System.properties
in the profiler.