From c818108456cae9c49c2d33c9de8477ab2f3fce74 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Fri, 27 Jan 2023 22:23:20 -0800 Subject: [PATCH] cleaned metrics --- .../channel/ServiceUnitStateChannelImpl.java | 66 ++++++++++--------- .../channel/ServiceUnitStateChannelTest.java | 20 ++++-- 2 files changed, 49 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ab978afe340fb..38e8afa50f302 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -43,7 +43,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private long totalCleanupCnt = 0; private long totalBrokerCleanupTombstoneCnt = 0; private long totalServiceUnitCleanupTombstoneCnt = 0; - private long totalServiceUnitCleanupErrorCnt = 0; + private AtomicLong totalCleanupErrorCnt = new AtomicLong(); private long totalCleanupScheduledCnt = 0; private long totalCleanupIgnoredCnt = 0; private long totalCleanupCancelledCnt = 0; @@ -598,7 +598,16 @@ private void scheduleCleanup(String broker, long delayInSecs) { .delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor()); totalCleanupScheduledCnt++; return CompletableFuture - .runAsync(() -> doCleanup(broker), delayed); + .runAsync(() -> { + try { + doCleanup(broker); + } catch (Throwable e) { + log.error("Failed to run the cleanup job for the broker {}, " + + "totalCleanupErrorCnt:{}.", + broker, totalCleanupErrorCnt.incrementAndGet(), e); + } + } + , delayed); }); log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.", @@ -609,8 +618,8 @@ private void scheduleCleanup(String broker, long delayInSecs) { private void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); for (Map.Entry etr : tableview.entrySet()) { ServiceUnitStateData stateData = etr.getValue(); String serviceUnit = etr.getKey(); @@ -619,12 +628,13 @@ private void doCleanup(String broker) { log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); tombstoneAsync(serviceUnit).whenComplete((__, e) -> { if (e != null) { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", - serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + + "cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart); } }); - serviceUnitTombstoneCnt.incrementAndGet(); + serviceUnitTombstoneCnt++; } } @@ -634,26 +644,22 @@ private void doCleanup(String broker) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { + if (serviceUnitTombstoneCnt > 0) { this.totalCleanupCnt++; - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; this.totalBrokerCleanupTombstoneCnt++; } - if (serviceUnitTombstoneErrorCnt.get() > 0) { - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); - } - double cleanupTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); // TODO: clean load data stores log.info("Completed a cleanup for the inactive broker:{} in {} ms. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate cleanupErrorCnt:{}, metrics:{} ", broker, cleanupTime, serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); cleanupJobs.remove(broker); } @@ -673,8 +679,8 @@ private void monitorOwnerships(List brokers) { long startTime = System.nanoTime(); Set inactiveBrokers = new HashSet<>(); Set activeBrokers = new HashSet<>(brokers); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalCleanupErrorCntStart = totalCleanupErrorCnt.get(); long now = System.currentTimeMillis(); for (Map.Entry etr : tableview.entrySet()) { String serviceUnit = etr.getKey(); @@ -689,12 +695,13 @@ private void monitorOwnerships(List brokers) { tombstoneAsync(serviceUnit).whenComplete((__, e) -> { if (e != null) { - log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", - serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, " + + "cleanupErrorCnt:{}.", + serviceUnit, stateData, + totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart); } }); - serviceUnitTombstoneCnt.incrementAndGet(); + serviceUnitTombstoneCnt++; } } @@ -708,22 +715,21 @@ private void monitorOwnerships(List brokers) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + if (serviceUnitTombstoneCnt > 0) { + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; } - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); double monitorTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); log.info("Completed the ownership monitor run in {} ms. " + "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate cleanupErrorCnt:{}, metrics:{} ", monitorTime, inactiveBrokers, inactiveBrokers.size(), serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), printCleanupMetrics()); } @@ -731,13 +737,13 @@ private void monitorOwnerships(List brokers) { private String printCleanupMetrics() { return String.format( "{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, " - + "totalServiceUnitCleanupTombstoneCnt:%d, totalServiceUnitCleanupErrorCnt:%d, " + + "totalServiceUnitCleanupTombstoneCnt:%d, totalCleanupErrorCnt:%d, " + "totalCleanupScheduledCnt%d, totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, " + " activeCleanupJobs:%d}", totalCleanupCnt, totalBrokerCleanupTombstoneCnt, totalServiceUnitCleanupTombstoneCnt, - totalServiceUnitCleanupErrorCnt, + totalCleanupErrorCnt.get(), totalCleanupScheduledCnt, totalCleanupIgnoredCnt, totalCleanupCancelledCnt, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 84b53bd43a9bd..a16c2be6612bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -53,6 +53,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarServerException; @@ -569,7 +570,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -594,7 +595,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -610,7 +611,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -628,7 +629,7 @@ public void handleBrokerDeletionEventTest() assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -646,7 +647,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -671,7 +672,7 @@ public void handleBrokerDeletionEventTest() assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt")); assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt")); assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt")); - assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt")); + assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt")); assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt")); assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt")); @@ -842,6 +843,11 @@ private static void cleanTableView(ServiceUnitStateChannel channel, String servi private static long getCleanupMetric(ServiceUnitStateChannel channel, String metric) throws IllegalAccessException { - return (long) FieldUtils.readDeclaredField(channel, metric, true); + Object var = FieldUtils.readDeclaredField(channel, metric, true); + if (var instanceof AtomicLong) { + return ((AtomicLong) var).get(); + } else { + return (long) var; + } } }