Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark wholestageCodeGen as shouldRemove when child nodes are removed #1142

Merged
merged 1 commit into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ case class ExecInfo(
stages = stageIDs
}

def appendToStages(stageIDs: Set[Int]): Unit = {
stages ++= stageIDs
}

def setShouldRemove(value: Boolean): Unit = {
shouldRemove ||= value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,26 @@ case class WholeStageExecParser(
val childNodes = node.nodes.flatMap { c =>
SQLPlanParser.parsePlanNode(c, sqlID, checker, app, reusedNodeIds)
}
// if any of the execs in WholeStageCodegen supported mark this entire thing
// as supported
// For the childNodes, we need to append the stages. Otherwise, nodes without metrics won't be
// assigned to stage
childNodes.foreach(_.appendToStages(stagesInNode))
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
// if any of the execs in WholeStageCodegen supported mark this entire thing as supported
val anySupported = childNodes.exists(_.isSupported == true)
val unSupportedExprsArray = childNodes.filter(_.unsupportedExprs.length > 0 ).map(
x => x.unsupportedExprs).flatten.toArray
val unSupportedExprsArray =
childNodes.filter(_.unsupportedExprs.nonEmpty).flatMap(x => x.unsupportedExprs).toArray
// average speedup across the execs in the WholeStageCodegen for now
val supportedChildren = childNodes.filterNot(_.shouldRemove)
val avSpeedupFactor = SQLPlanParser.averageSpeedup(supportedChildren.map(_.speedupFactor))
// TODO: revisit the logic behind adding the stages of child nodes to the current node.
Copy link
Collaborator Author

@amahussein amahussein Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The below allStagesIncludingChildren is there since the early days. I don't quite understand the reason childNodes stages are appended to the wholeStageCodeGen stageIDs.
@tgravescs do you know why we this was needed? If not, I suggest we remove it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking at the comment its saying that the child node might be associated to a stage but the whole stage codegen doesn't match it. So we basically add all the child node ones in. I don't remember when that can happen but would not remove it unless we are positive its not needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I don't think this comment was addressed before merged? unless you know this doesn't happen I would prefer to see this comment removed or to file an issue to investigate it. This to me could just lead to confusion from someone reading the code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad! haven't got my cup of coffee. For some reason I misread that I got 2 approvals.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Posted a followup #1146

// can't rely on the wholeStagecodeGen having a stage if children do so aggregate them together
// for now
val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet
// Finally, the node should be marked as shouldRemove when all the children of the
// wholeStageCodeGen are marked as shouldRemove.
val removeNode = isDupNode || childNodes.forall(_.shouldRemove)
val execInfo = ExecInfo(node, sqlID, node.name, node.name, avSpeedupFactor, maxDuration,
node.id, anySupported, Some(childNodes), allStagesIncludingChildren,
shouldRemove = isDupNode, unsupportedExprs = unSupportedExprsArray)
shouldRemove = removeNode, unsupportedExprs = unSupportedExprsArray)
Seq(execInfo)
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
App Name,App ID,Recommendation,Estimated GPU Speedup,Estimated GPU Duration,Estimated GPU Time Saved,SQL DF Duration,SQL Dataframe Task Duration,App Duration,GPU Opportunity,Executor CPU Time Percent,SQL Ids with Failures,Unsupported Read File Formats and Types,Unsupported Write Data Format,Complex Types,Nested Complex Types,Potential Problems,Longest SQL Duration,SQL Stage Durations Sum,NONSQL Task Duration Plus Overhead,Unsupported Task Duration,Supported SQL DF Task Duration,Task Speedup Factor,App Duration Estimated,Unsupported Execs,Unsupported Expressions,Estimated Job Frequency (monthly)
"Spark shell","local-1629442299891","Not Recommended",1.02,19159.68,394.31,1151,920,19554,788,91.72,"","","CSV;JSON","","","",1235,1049,18251,290,630,2.0,false,"Execute InsertIntoHadoopFsRelationCommand csv;Execute InsertIntoHadoopFsRelationCommand json","",30
"Spark shell","local-1629442299891","Not Recommended",1.02,19080.81,473.18,1151,920,19554,788,91.72,"","","CSV;JSON","","","",1235,1049,18251,290,630,2.5,false,"Execute InsertIntoHadoopFsRelationCommand csv;Execute InsertIntoHadoopFsRelationCommand json","",30