Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate SQL duration wallclock time for databricks eventlog #810

Merged
merged 8 commits into from
Mar 12, 2024

Conversation

nartal1
Copy link
Collaborator

@nartal1 nartal1 commented Feb 27, 2024

This fixes #780

For 780:
This PR adds support to detect rootExecution ID's if the eventlogs are from Spark-3.4+ or Databricks eventlogs.
If the start time of subexecution doesn't fall under the parent SQLID then we add that duration as well for SQL durations.
Added a unit test to detect root Execution ID.

@nartal1 nartal1 added bug Something isn't working core_tools Scope the core module (scala) labels Feb 27, 2024
@nartal1 nartal1 self-assigned this Feb 27, 2024
Copy link
Collaborator

@amahussein amahussein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nartal1
Did we verify which spark version exactly the rootExecutionId was added?

0L
}
} else {
0L
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when rootExecutionInfo is not defined, then we should return the info.duration.getOrElse(0L)

var lastSQLEndTime: Option[Long] = None
var lastSQLEndTimeId: Option[Long] = None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have the IDs of the job and the Sql, then we don't really need to store the endTime as well.
We can lookitup from the JobInfo and the Sql Info.

Comment on lines 133 to 137
val rootExecutionIdOpt = Try {
val field = event.getClass.getDeclaredField("rootExecutionId")
field.setAccessible(true)
Option(field.get(event)).map(_.asInstanceOf[Option[Long]]).getOrElse(None)
}.toOption.flatten
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is hard to read. Lets move that to EventUtils. We can have a new method readRootIDFromSQLStartEvent(event).

On a side node, this is a little bit expensive to do it in every sqlStartEvent.
Instead, we can evaluate that field using reflection once and use it directly in loading the rootID.
For example, EventUtils can store the rootExecutionId Field, if it is defined the readRootIDFromSQLStartEvent(event) is going to use it to read rootID value.

Comment on lines 100 to 101
app.lastSQLEndTime = Some(event.time)
app.lastSQLEndTimeId = Some(event.executionId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I had a question about that part.

  1. Why do we check if (!perSqlOnly) before updating the field? Regardless of the fact that we report on SQL granularity or not, the same result should be maintained. right? or do I miss something?
  2. should we check if the new event is actually LGE to the existing value? if (event.Time > app.lastSQLEndTime)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into perSqlOnly flag and default value false is always used while calling QualificationAppInfo - link. per-sql reporting is captured in reportSqlLevel argument.
So it doesn't have any affect on the if condition as it's always true. I think this condition can be removed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunningQualificationEventProcessor initializes qualApp with perSqlOnly set to True

 class RunningQualificationEventProcessor(sparkConf: SparkConf) extends SparkListener with Logging {

  private val qualApp = new RunningQualificationApp(true)

QualificationAppInfo has two different flags: perSqlOnly and reportSqlLevel . The second one seems to be specific to the reporting. I wonder what perSqlOnly really do...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perSqlOnly is only used in RunningQualificationEventProcessor as we print per-sql in the output file i.e once the SQLExecutionEnd event is processed and it doesn't track the entire application.
The code to match the event is here - onOtherEvent

      case e: SparkListenerSQLExecutionEnd =>
        writeSQLDetails(e.executionId)
      case _ =>

@@ -149,6 +150,7 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
super.doSparkListenerJobEnd(app, event)
if (!perSqlOnly) {
app.lastJobEndTime = Some(event.time)
app.lastJobEndTimeId = Some(event.jobId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as for the lastSQLEndTime. why doing it conditionally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also applies for RunningQualificationEventProcessor where per-sql output is written. So lastJobEndTime is skipped in this case as well.

Comment on lines 219 to 231
test("test subexecutionId mapping to rootExecutionId") {
val eventlog = ToolTestUtils.getTestResourcePath("" +
"spark-events-qualification/db_subExecution_id.zstd")
val app = createAppFromEventlog(eventlog)
// In Spark 3.4.0+ and later, all the sub-executions will be grouped if they are part of the
// the same root execution.
if (ToolUtils.isSpark340OrLater()) {
assert(app.sqlIdToInfo.values.exists(_.rootExecutionID.isDefined))
} else {
assert(app.sqlIdToInfo.values.forall(_.rootExecutionID.isEmpty))
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test should be executed conditionally for the correct target versions. we have some unit tests that executes conditionally according to the spark version used in the unit-tests.

I do not find the test is doing anything valuable.
It is not validating the values are correct and it does not validate the SQL durations is not duplicate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test. Now it checks for subExecutionId's for a given rootId and also verifies that we are not double counting the durations.

info.duration.getOrElse(0L)
case Some(rootExecutionID) if rootExecutionID != info.sqlID =>
val rootExecutionInfo = sqlIdToInfo.get(rootExecutionID)
if (rootExecutionInfo.nonEmpty) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo? Usually nonEmpty is for itertable objects but in this case a value is retrieved from the hashMap which may not be dfined.
This might cause NPE if the SQLevent is processed before the rootSQL has been added to the hashMap.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! It was a typo. Updated it to get the rootExecutionInfo only if it present.

Comment on lines 729 to 730
case Some(rootExecutionID) if rootExecutionID == info.sqlID =>
info.duration.getOrElse(0L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is redundant because it is handled in the case _ => info.duration.getOrElse(0L)

Comment on lines 738 to 739
if (sqlStartTime < rootExecutionStartTime || sqlEndTime > rootExecutionEndTime) {
info.duration.getOrElse(0L)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add a comment here saying why this if condition is enough to check for overlap. Later we may forget why we are not applying stricter checks.
For example:

  • the above check condition will be true if the child is not completely inside the root. nevertheless we still account for its total duration and not the overlap.
  • the condition will be true if the child starts after the root ends. is this even a possible case?
  • the condition will be true if the spans the root. is this possible case?

@parthosa parthosa self-requested a review February 28, 2024 17:42
Copy link
Collaborator

@parthosa parthosa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nartal1.

@amahussein amahussein merged commit 6855297 into NVIDIA:dev Mar 12, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working core_tools Scope the core module (scala)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Qualification tool to deduplicate the SQL duration wallclock time for a Databricks event log
3 participants