Skip to content

Commit

Permalink
Qualification tool - Handle cancelled jobs and stages better and don'…
Browse files Browse the repository at this point in the history
…t skip the app (#1033)

* Handle cancelled jobs and stages better and don't skip the app

Signed-off-by: Thomas Graves <[email protected]>

* add error for unknown job type

---------

Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs authored May 24, 2024
1 parent a6fdc86 commit 89fbf83
Showing 1 changed file with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,26 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
app.lastJobEndTime = Some(event.time)
}
if (event.jobResult != JobSucceeded) {
app.jobIdToSqlID.get(event.jobId) match {
case Some(sqlID) =>
// zero out the cpu and run times since failed
app.sqlIDToTaskEndSum.get(sqlID).foreach { sum =>
sum.executorRunTime = 0
sum.executorCPUTime = 0
if (event.jobResult.isInstanceOf[JobFailed]) {
val failedJob = event.jobResult.asInstanceOf[JobFailed]
if (!failedJob.exception.getMessage.toLowerCase().contains("cancelled")) {
app.jobIdToSqlID.get(event.jobId) match {
case Some(sqlID) =>
// zero out the cpu and run times since failed
app.sqlIDToTaskEndSum.get(sqlID).foreach { sum =>
sum.executorRunTime = 0
sum.executorCPUTime = 0
}
val failures = app.sqlIDtoFailures.getOrElseUpdate(sqlID, ArrayBuffer.empty[String])
val jobStr = s"Job${event.jobId}"
failures += jobStr
case None =>
}
val failures = app.sqlIDtoFailures.getOrElseUpdate(sqlID, ArrayBuffer.empty[String])
val jobStr = s"Job${event.jobId}"
failures += jobStr
case None =>
} else {
logDebug(s"Job was cancelled so not skipping, failure reason: ${failedJob.exception}")
}
} else {
logError("Unknown JobResult type, not checking for failure!")
}
}
}
Expand All @@ -147,12 +156,18 @@ class QualificationEventProcessor(app: QualificationAppInfo, perSqlOnly: Boolean
event: SparkListenerStageCompleted): Unit = {
super.doSparkListenerStageCompleted(app, event)
if (event.stageInfo.failureReason.nonEmpty) {
app.stageIdToSqlID.get(event.stageInfo.stageId) match {
case Some(sqlID) =>
val failures = app.sqlIDtoFailures.getOrElseUpdate(sqlID, ArrayBuffer.empty[String])
val stageStr = s"Stage${event.stageInfo.stageId}"
failures += stageStr
case None =>
// don't count cancelled stages as failures
if (!event.stageInfo.failureReason.get.toLowerCase().contains("cancelled")) {
app.stageIdToSqlID.get(event.stageInfo.stageId) match {
case Some(sqlID) =>
val failures = app.sqlIDtoFailures.getOrElseUpdate(sqlID, ArrayBuffer.empty[String])
val stageStr = s"Stage${event.stageInfo.stageId}"
failures += stageStr
case None =>
}
} else {
logDebug("Stage was cancelled so not skipping, " +
s"failure reason: ${event.stageInfo.failureReason}")
}
}
}
Expand Down

0 comments on commit 89fbf83

Please sign in to comment.