From 8b1bd07905022c198d677d13882a09ffb7eeafff Mon Sep 17 00:00:00 2001 From: onebox-li Date: Fri, 13 Oct 2023 11:18:03 +0800 Subject: [PATCH] [CELEBORN-1037] Incorrect output for metrics of Prometheus ### What changes were proposed in this pull request? The new added `deadlocks` metrics in `ThreadStatesGaugeSet` is a Set, which is invalid. So here add a filter at the `addGauge` extrance. ### Why are the changes needed? Ditto ### Does this PR introduce _any_ user-facing change? Remove the unused metrics. BTW the template use `metrics_jvm_thread_deadlock_count_Value` ### How was this patch tested? Manual test Closes #1981 from onebox-li/fix-1037. Authored-by: onebox-li Signed-off-by: mingji --- .../metrics/source/AbstractSource.scala | 8 +- .../worker/storage/DeviceMonitorSuite.scala | 420 ++++++++++-------- 2 files changed, 234 insertions(+), 194 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index 21fd68d3368..c4409c0501d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -73,7 +73,13 @@ abstract class AbstractSource(conf: CelebornConf, role: String) name: String, labels: Map[String, String], gauge: Gauge[T]): Unit = { - namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels)) + // filter out non-number type gauges + if (gauge.getValue.isInstanceOf[Number]) { + namedGauges.add(NamedGauge(name, gauge, labels ++ staticLabels)) + } else { + logWarning( + s"Add gauge $name failed, the value type ${gauge.getValue.getClass} is not a number") + } } def addGauge[T]( diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala index 57489481431..e4eb34281b2 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitorSuite.scala @@ -102,6 +102,17 @@ class DeviceMonitorSuite extends AnyFunSuite { |/dev/vdb 1932735283200 97710505984 1835024777216 6% /mnt/disk5 |""".stripMargin + val dfBOut1DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 7) + val dfBOut2DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) + val dfBOut3DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 7) + val dfBOut4DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) + val dfBOut5DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) + val dirs = new jArrayList[File]() val workingDir1 = ListBuffer[File](new File("/mnt/disk1/data1")) val workingDir2 = ListBuffer[File](new File("/mnt/disk1/data2")) @@ -163,169 +174,183 @@ class DeviceMonitorSuite extends AnyFunSuite { test("init") { withObjectMocked[org.apache.celeborn.common.util.Utils.type] { - when(Utils.runCommand(dfCmd)) thenReturn dfOut - when(Utils.runCommand(lsCmd)) thenReturn lsOut - - deviceMonitor.init() - - assertEquals(2, deviceMonitor.observedDevices.size()) - - assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo)) - assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo)) - - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1) - - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1")) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2")) - - assertEquals( - deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0), - new File("/mnt/disk1/data1")) - assertEquals( - deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1), - new File("/mnt/disk1/data2")) - assertEquals( - deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0), - new File("/mnt/disk2/data3")) - assertEquals( - deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1), - new File("/mnt/disk2/data4")) - - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 1) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 1) + withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type] { + when(Utils.runCommand(dfCmd)) thenReturn dfOut + when(Utils.runCommand(lsCmd)) thenReturn lsOut + + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn( + dfBOut1DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn( + dfBOut2DiskUsageInfo) + + deviceMonitor.init() + + assertEquals(2, deviceMonitor.observedDevices.size()) + + assert(deviceMonitor.observedDevices.containsKey(vdaDeviceInfo)) + assert(deviceMonitor.observedDevices.containsKey(vdbDeviceInfo)) + + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.size, 1) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.size, 1) + + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.containsKey("/mnt/disk1")) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.containsKey("/mnt/disk2")) + + assertEquals( + deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(0), + new File("/mnt/disk1/data1")) + assertEquals( + deviceMonitor.observedDevices.get(vdaDeviceInfo).diskInfos.get("/mnt/disk1").dirs(1), + new File("/mnt/disk1/data2")) + assertEquals( + deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(0), + new File("/mnt/disk2/data3")) + assertEquals( + deviceMonitor.observedDevices.get(vdbDeviceInfo).diskInfos.get("/mnt/disk2").dirs(1), + new File("/mnt/disk2/data4")) + + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 1) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 1) + } } } test("register/unregister/notify/report") { withObjectMocked[org.apache.celeborn.common.util.Utils.type] { - when(Utils.runCommand(dfCmd)) thenReturn dfOut - when(Utils.runCommand(lsCmd)) thenReturn lsOut - - deviceMonitor.init() - - val fw1 = mock[FileWriter] - val fw2 = mock[FileWriter] - val fw3 = mock[FileWriter] - val fw4 = mock[FileWriter] - - val f1 = new File("/mnt/disk1/data1/f1") - val f2 = new File("/mnt/disk1/data2/f2") - val f3 = new File("/mnt/disk2/data3/f3") - val f4 = new File("/mnt/disk2/data4/f4") - when(fw1.getFile).thenReturn(f1) - when(fw2.getFile).thenReturn(f2) - when(fw3.getFile).thenReturn(f3) - when(fw4.getFile).thenReturn(f4) - - deviceMonitor.registerFileWriter(fw1) - deviceMonitor.registerFileWriter(fw2) - deviceMonitor.registerFileWriter(fw3) - deviceMonitor.registerFileWriter(fw4) - - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2)) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4)) - - deviceMonitor.unregisterFileWriter(fw1) - deviceMonitor.unregisterFileWriter(fw3) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 2) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 2) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2)) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4)) - - val df1 = mock[LocalFlusher] - val df2 = mock[LocalFlusher] - val df3 = mock[LocalFlusher] - val df4 = mock[LocalFlusher] - - when(df1.stopFlag).thenReturn(new AtomicBoolean(false)) - when(df2.stopFlag).thenReturn(new AtomicBoolean(false)) - when(df3.stopFlag).thenReturn(new AtomicBoolean(false)) - when(df4.stopFlag).thenReturn(new AtomicBoolean(false)) - - when(df1.mountPoint).thenReturn("/mnt/disk1") - when(df2.mountPoint).thenReturn("/mnt/disk1") - when(df3.mountPoint).thenReturn("/mnt/disk2") - when(df4.mountPoint).thenReturn("/mnt/disk2") - - deviceMonitor.registerFlusher(df1) - deviceMonitor.registerFlusher(df2) - deviceMonitor.registerFlusher(df3) - deviceMonitor.registerFlusher(df4) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) - - deviceMonitor.unregisterFlusher(df1) - deviceMonitor.unregisterFlusher(df3) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) - - when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) - .thenAnswer((a: String, b: List[File]) => { - deviceMonitor.unregisterFileWriter(fw2) - }) - when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) - .thenAnswer((a: String, b: List[File]) => { - deviceMonitor.unregisterFileWriter(fw4) - }) - when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) - .thenAnswer((a: String, b: List[File]) => { - df2.stopFlag.set(true) - }) - when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) - .thenAnswer((a: String, b: List[File]) => { - df4.stopFlag.set(true) - }) - - deviceMonitor.observedDevices - .get(vdaDeviceInfo) - .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG) - deviceMonitor.observedDevices - .get(vdbDeviceInfo) - .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) - assert( - deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) - assert( - deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) - assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) - - deviceMonitor.registerFileWriter(fw1) - deviceMonitor.registerFileWriter(fw2) - deviceMonitor.registerFileWriter(fw3) - deviceMonitor.registerFileWriter(fw4) - assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4) - assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4) + withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type] { + when(Utils.runCommand(dfCmd)) thenReturn dfOut + when(Utils.runCommand(lsCmd)) thenReturn lsOut + + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn( + dfBOut1DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn( + dfBOut2DiskUsageInfo) + + deviceMonitor.init() + + val fw1 = mock[FileWriter] + val fw2 = mock[FileWriter] + val fw3 = mock[FileWriter] + val fw4 = mock[FileWriter] + + val f1 = new File("/mnt/disk1/data1/f1") + val f2 = new File("/mnt/disk1/data2/f2") + val f3 = new File("/mnt/disk2/data3/f3") + val f4 = new File("/mnt/disk2/data4/f4") + when(fw1.getFile).thenReturn(f1) + when(fw2.getFile).thenReturn(f2) + when(fw3.getFile).thenReturn(f3) + when(fw4.getFile).thenReturn(f4) + + deviceMonitor.registerFileWriter(fw1) + deviceMonitor.registerFileWriter(fw2) + deviceMonitor.registerFileWriter(fw3) + deviceMonitor.registerFileWriter(fw4) + + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw1)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2)) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw3)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4)) + + deviceMonitor.unregisterFileWriter(fw1) + deviceMonitor.unregisterFileWriter(fw3) + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 2) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 2) + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(fw2)) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(fw4)) + + val df1 = mock[LocalFlusher] + val df2 = mock[LocalFlusher] + val df3 = mock[LocalFlusher] + val df4 = mock[LocalFlusher] + + when(df1.stopFlag).thenReturn(new AtomicBoolean(false)) + when(df2.stopFlag).thenReturn(new AtomicBoolean(false)) + when(df3.stopFlag).thenReturn(new AtomicBoolean(false)) + when(df4.stopFlag).thenReturn(new AtomicBoolean(false)) + + when(df1.mountPoint).thenReturn("/mnt/disk1") + when(df2.mountPoint).thenReturn("/mnt/disk1") + when(df3.mountPoint).thenReturn("/mnt/disk2") + when(df4.mountPoint).thenReturn("/mnt/disk2") + + deviceMonitor.registerFlusher(df1) + deviceMonitor.registerFlusher(df2) + deviceMonitor.registerFlusher(df3) + deviceMonitor.registerFlusher(df4) + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4) + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df1)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df3)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) + + deviceMonitor.unregisterFlusher(df1) + deviceMonitor.unregisterFlusher(df3) + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) + + when(fw2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) + .thenAnswer((a: String, b: List[File]) => { + deviceMonitor.unregisterFileWriter(fw2) + }) + when(fw4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) + .thenAnswer((a: String, b: List[File]) => { + deviceMonitor.unregisterFileWriter(fw4) + }) + when(df2.notifyError("vda", DiskStatus.CRITICAL_ERROR)) + .thenAnswer((a: String, b: List[File]) => { + df2.stopFlag.set(true) + }) + when(df4.notifyError("vdb", DiskStatus.CRITICAL_ERROR)) + .thenAnswer((a: String, b: List[File]) => { + df4.stopFlag.set(true) + }) + + deviceMonitor.observedDevices + .get(vdaDeviceInfo) + .notifyObserversOnError(List("/mnt/disk1"), DiskStatus.IO_HANG) + deviceMonitor.observedDevices + .get(vdbDeviceInfo) + .notifyObserversOnError(List("/mnt/disk2"), DiskStatus.IO_HANG) + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 3) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 3) + assert( + deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.contains(df2)) + assert( + deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(storageManager)) + assert(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.contains(df4)) + + deviceMonitor.registerFileWriter(fw1) + deviceMonitor.registerFileWriter(fw2) + deviceMonitor.registerFileWriter(fw3) + deviceMonitor.registerFileWriter(fw4) + assertEquals(deviceMonitor.observedDevices.get(vdaDeviceInfo).observers.size(), 4) + assertEquals(deviceMonitor.observedDevices.get(vdbDeviceInfo).observers.size(), 4) + } } } @@ -350,45 +375,52 @@ class DeviceMonitorSuite extends AnyFunSuite { test("monitor non-critical error metrics") { withObjectMocked[org.apache.celeborn.common.util.Utils.type] { - when(Utils.runCommand(dfCmd)) thenReturn dfOut - when(Utils.runCommand(lsCmd)) thenReturn lsOut - - deviceMonitor.init() - - val device1 = deviceMonitor.observedDevices.values().asScala.head - val mountPoints1 = device1.diskInfos.keySet().asScala.toList - - device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.READ_OR_WRITE_FAILURE) - device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.IO_HANG) - val deviceMonitorMetrics = - workerSource.gauges().filter(_.name.startsWith("Device_" + device1.deviceInfo.name)) - .sortBy(_.name) - - assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name) - assertEquals("Device_vda_ReadOrWriteFailure_Count", deviceMonitorMetrics.last.name) - assertEquals(1, deviceMonitorMetrics.head.gauge.getValue) - assertEquals(1, deviceMonitorMetrics.last.gauge.getValue) - - device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.READ_OR_WRITE_FAILURE) - device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.IO_HANG) - assertEquals(2, deviceMonitorMetrics.head.gauge.getValue) - assertEquals(2, deviceMonitorMetrics.last.gauge.getValue) + withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type] { + when(Utils.runCommand(dfCmd)) thenReturn dfOut + when(Utils.runCommand(lsCmd)) thenReturn lsOut + + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk1"))).thenReturn( + dfBOut1DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos.get("/mnt/disk2"))).thenReturn( + dfBOut2DiskUsageInfo) + + deviceMonitor.init() + + val device1 = deviceMonitor.observedDevices.values().asScala.head + val mountPoints1 = device1.diskInfos.keySet().asScala.toList + + device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.READ_OR_WRITE_FAILURE) + device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.IO_HANG) + val deviceMonitorMetrics = + workerSource.gauges().filter(_.name.startsWith("Device_" + device1.deviceInfo.name)) + .sortBy(_.name) + + assertEquals("Device_vda_IoHang_Count", deviceMonitorMetrics.head.name) + assertEquals("Device_vda_ReadOrWriteFailure_Count", deviceMonitorMetrics.last.name) + assertEquals(1, deviceMonitorMetrics.head.gauge.getValue) + assertEquals(1, deviceMonitorMetrics.last.gauge.getValue) + + device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.READ_OR_WRITE_FAILURE) + device1.notifyObserversOnNonCriticalError(mountPoints1, DiskStatus.IO_HANG) + assertEquals(2, deviceMonitorMetrics.head.gauge.getValue) + assertEquals(2, deviceMonitorMetrics.last.gauge.getValue) + } } } test("monitor device usage metrics") { withObjectMocked[org.apache.celeborn.service.deploy.worker.storage.DeviceMonitor.type] { - val dfBOut1 = DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 7) - val dfBOut2 = DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) - val dfBOut3 = DeviceMonitor.DiskUsageInfo(1395864371200L, 1293858897920L, 102005473280L, 7) - val dfBOut4 = DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) - val dfBOut5 = DeviceMonitor.DiskUsageInfo(1932735283200L, 1835024777216L, 97710505984L, 6) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut1) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn(dfBOut2) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn(dfBOut3) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn(dfBOut4) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn(dfBOut5) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn( + dfBOut1DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk2"))).thenReturn( + dfBOut2DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk3"))).thenReturn( + dfBOut3DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk4"))).thenReturn( + dfBOut4DiskUsageInfo) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk5"))).thenReturn( + dfBOut5DiskUsageInfo) deviceMonitor2.init() @@ -423,8 +455,10 @@ class DeviceMonitorSuite extends AnyFunSuite { assertEquals("vdb", metrics4.last.labels("device")) assertEquals(1024L * 3, metrics4.last.gauge.getValue) - val dfBOut6 = DeviceMonitor.DiskUsageInfo(1395864371200L, 1264867868672L, 130996502528L, 9) - when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn(dfBOut6) + val dfBOut6DiskUsageInfo = + DeviceMonitor.DiskUsageInfo(1395864371200L, 1264867868672L, 130996502528L, 9) + when(DeviceMonitor.getDiskUsageInfos(diskInfos2.get("/mnt/disk1"))).thenReturn( + dfBOut6DiskUsageInfo) assertEquals(1264867868672L, metrics2.head.gauge.getValue) } }