Skip to content

Commit

Permalink
[Feat] Update AppDiskUsage calculation logic, ApplicationDiskUsage in…
Browse files Browse the repository at this point in the history
… cluster should be multiplied by worker size.
  • Loading branch information
吴梓溢 committed Oct 30, 2024
1 parent 12f25d3 commit f1a7c63
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ class AppDiskUsageSnapShot(val topItemCount: Int) extends Logging with Serializa
val zoneId = ZoneId.systemDefault()
s"Snapshot " +
s"start ${LocalDateTime.ofInstant(Instant.ofEpochMilli(startSnapShotTime), zoneId)} " +
s"end ${LocalDateTime.ofInstant(Instant.ofEpochMilli(endSnapShotTime), zoneId)}" +
s" ${topNItems.filter(_ != null).mkString(", ")}"
s"end ${LocalDateTime.ofInstant(Instant.ofEpochMilli(endSnapShotTime), zoneId)}\n" +
s"${topNItems.filter(_ != null).mkString("\n")}"
}
}

Expand All @@ -124,12 +124,12 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging {
var currentSnapShot: AtomicReference[AppDiskUsageSnapShot] =
new AtomicReference[AppDiskUsageSnapShot]()

def update(appDiskUsage: java.util.Map[String, java.lang.Long]): Unit = {
def update(appDiskUsage: java.util.Map[String, java.lang.Long], workerCount: Int = 1): Unit = {
updateExecutor.submit(new Runnable {
override def run(): Unit = {
if (currentSnapShot.get() != null) {
appDiskUsage.asScala.foreach { case (key, usage) =>
currentSnapShot.get().updateAppDiskUsage(key, usage)
currentSnapShot.get().updateAppDiskUsage(key, usage * workerCount)
}
}
}
Expand All @@ -147,7 +147,7 @@ class AppDiskUsageMetric(conf: CelebornConf) extends Logging {
logInfo(s"App Disk Usage Top$usageCount Report: $summaryStr")
}
},
60,
0,
interval,
TimeUnit.SECONDS)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void updateWorkerHeartbeatMeta(
}
}

appDiskUsageMetric.update(estimatedAppDiskUsage);
appDiskUsageMetric.update(estimatedAppDiskUsage, workers.size());
// If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list.
long unhealthyDiskNum =
disks.values().stream().filter(s -> !s.status().equals(DiskStatus.HEALTHY)).count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ public void testObjSerde() throws IOException, InterruptedException {
Map<String, Long> appDiskUsage = JavaUtils.newConcurrentHashMap();
appDiskUsage.put("app-1", 100L);
appDiskUsage.put("app-2", 200L);
masterStatusSystem.appDiskUsageMetric.update(appDiskUsage);
masterStatusSystem.appDiskUsageMetric.update(appDiskUsage, masterStatusSystem.workers.size());
appDiskUsage.put("app-3", 300L);
appDiskUsage.put("app-1", 200L);
masterStatusSystem.appDiskUsageMetric.update(appDiskUsage);
masterStatusSystem.appDiskUsageMetric.update(appDiskUsage, masterStatusSystem.workers.size());
// wait for snapshot updated
Thread.sleep(3000);

Expand Down

0 comments on commit f1a7c63

Please sign in to comment.