Skip to content

Commit

Permalink
Revert "fix_4186"
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuan1223 authored Sep 27, 2023
1 parent c83836b commit 40e80d9
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,15 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString

private lazy val activeJobs = new ConcurrentHashMap[Int, JobInfo]()
private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]()

private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand Down Expand Up @@ -84,7 +82,6 @@ class SQLOperationListener(
override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId).toList
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand All @@ -97,10 +94,7 @@ class SQLOperationListener(
}
}
withOperationLog {
activeJobs.put(
jobId,
new JobInfo(stageSize, stageIds)
)
activeJobs.add(jobId)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
Expand All @@ -109,7 +103,7 @@ class SQLOperationListener(

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId) != null ) {
if (activeJobs.remove(jobId)) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
Expand Down Expand Up @@ -140,21 +134,14 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeJobs.forEach((_, jobInfo) => {
if (jobInfo.stageIds.contains(stageInfo.stageId)) {
jobInfo.numCompleteStages += 1
}
})

val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}


override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized {
val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveJobs: ConcurrentHashMap[Int, JobInfo],
liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
extends Logging {
Expand Down Expand Up @@ -81,13 +80,6 @@ class SparkConsoleProgressBar(
private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
// build job log info
val jobId: Option[Int] = liveJobs.asScala.find {
case (jobId, jobInfo) => jobInfo.stageIds.contains(s.stageId)
}.map(_._1)
val jobInfoHeader = s"[Job ${jobId} " +
s"(${liveJobs.get(jobId).numCompleteStages} / ${liveJobs.get(jobId).numStages}) Stages] "
// build stage log info
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
Expand All @@ -101,7 +93,7 @@ class SparkConsoleProgressBar(
} else {
""
}
jobInfoHeader + header + bar + tailer
header + bar + tailer
}.mkString("")

// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}

class JobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = 0
}

class StageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = 0
var numCompleteTasks = 0
class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
}

0 comments on commit 40e80d9

Please sign in to comment.