From 36184167eb941e7aa64d296a3a5d44badd6100c9 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Fri, 27 Jan 2023 22:23:20 -0800 Subject: [PATCH] cleaned metrics --- .../channel/ServiceUnitStateChannelImpl.java | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index ab978afe340fbf..016071875a7c94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -43,7 +43,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableInt; @@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel { private long totalCleanupCnt = 0; private long totalBrokerCleanupTombstoneCnt = 0; private long totalServiceUnitCleanupTombstoneCnt = 0; - private long totalServiceUnitCleanupErrorCnt = 0; + private AtomicLong totalServiceUnitCleanupErrorCnt = new AtomicLong(); private long totalCleanupScheduledCnt = 0; private long totalCleanupIgnoredCnt = 0; private long totalCleanupCancelledCnt = 0; @@ -609,8 +609,8 @@ private void scheduleCleanup(String broker, long delayInSecs) { private void doCleanup(String broker) { long startTime = System.nanoTime(); log.info("Started ownership cleanup for the inactive broker:{}", broker); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalServiceUnitCleanupErrorCntStart = totalServiceUnitCleanupErrorCnt.get(); for (Map.Entry etr : tableview.entrySet()) { ServiceUnitStateData stateData = etr.getValue(); String serviceUnit = etr.getKey(); @@ -621,10 +621,10 @@ private void doCleanup(String broker) { if (e != null) { log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + totalServiceUnitCleanupErrorCnt.incrementAndGet(); } }); - serviceUnitTombstoneCnt.incrementAndGet(); + serviceUnitTombstoneCnt++; } } @@ -634,26 +634,22 @@ private void doCleanup(String broker) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { + if (serviceUnitTombstoneCnt > 0) { this.totalCleanupCnt++; - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; this.totalBrokerCleanupTombstoneCnt++; } - if (serviceUnitTombstoneErrorCnt.get() > 0) { - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); - } - double cleanupTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); // TODO: clean load data stores log.info("Completed a cleanup for the inactive broker:{} in {} ms. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate serviceUnitTombstoneErrorCnt:{}, metrics:{} ", broker, cleanupTime, serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalServiceUnitCleanupErrorCntStart - totalServiceUnitCleanupErrorCnt.get(), printCleanupMetrics()); cleanupJobs.remove(broker); } @@ -673,8 +669,8 @@ private void monitorOwnerships(List brokers) { long startTime = System.nanoTime(); Set inactiveBrokers = new HashSet<>(); Set activeBrokers = new HashSet<>(brokers); - AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger(); - AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger(); + int serviceUnitTombstoneCnt = 0; + long totalServiceUnitCleanupErrorCntStart = totalServiceUnitCleanupErrorCnt.get(); long now = System.currentTimeMillis(); for (Map.Entry etr : tableview.entrySet()) { String serviceUnit = etr.getKey(); @@ -691,10 +687,10 @@ private void monitorOwnerships(List brokers) { if (e != null) { log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData); - serviceUnitTombstoneErrorCnt.incrementAndGet(); + totalServiceUnitCleanupErrorCnt.incrementAndGet(); } }); - serviceUnitTombstoneCnt.incrementAndGet(); + serviceUnitTombstoneCnt++; } } @@ -708,22 +704,21 @@ private void monitorOwnerships(List brokers) { log.error("Failed to flush the in-flight messages.", e); } - if (serviceUnitTombstoneCnt.get() > 0) { - this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get(); + if (serviceUnitTombstoneCnt > 0) { + this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt; } - this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get(); double monitorTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); log.info("Completed the ownership monitor run in {} ms. " + "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. " + "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, " - + "serviceUnitTombstoneErrorCnt:{}, metrics:{} ", + + "approximate serviceUnitTombstoneErrorCnt:{}, metrics:{} ", monitorTime, inactiveBrokers, inactiveBrokers.size(), serviceUnitTombstoneCnt, - serviceUnitTombstoneErrorCnt, + totalServiceUnitCleanupErrorCntStart - totalServiceUnitCleanupErrorCnt.get(), printCleanupMetrics()); }