Skip to content

Commit

Permalink
cleaned metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Jan 28, 2023
1 parent bf665d3 commit c818108
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
Expand Down Expand Up @@ -101,7 +101,7 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private long totalCleanupCnt = 0;
private long totalBrokerCleanupTombstoneCnt = 0;
private long totalServiceUnitCleanupTombstoneCnt = 0;
private long totalServiceUnitCleanupErrorCnt = 0;
private AtomicLong totalCleanupErrorCnt = new AtomicLong();
private long totalCleanupScheduledCnt = 0;
private long totalCleanupIgnoredCnt = 0;
private long totalCleanupCancelledCnt = 0;
Expand Down Expand Up @@ -598,7 +598,16 @@ private void scheduleCleanup(String broker, long delayInSecs) {
.delayedExecutor(delayInSecs, TimeUnit.SECONDS, pulsar.getLoadManagerExecutor());
totalCleanupScheduledCnt++;
return CompletableFuture
.runAsync(() -> doCleanup(broker), delayed);
.runAsync(() -> {
try {
doCleanup(broker);
} catch (Throwable e) {
log.error("Failed to run the cleanup job for the broker {}, "
+ "totalCleanupErrorCnt:{}.",
broker, totalCleanupErrorCnt.incrementAndGet(), e);
}
}
, delayed);
});

log.info("Scheduled ownership cleanup for broker:{} with delay:{} secs. Pending clean jobs:{}.",
Expand All @@ -609,8 +618,8 @@ private void scheduleCleanup(String broker, long delayInSecs) {
private void doCleanup(String broker) {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
int serviceUnitTombstoneCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
ServiceUnitStateData stateData = etr.getValue();
String serviceUnit = etr.getKey();
Expand All @@ -619,12 +628,13 @@ private void doCleanup(String broker) {
log.info("Cleaning ownership serviceUnit:{}, stateData:{}.", serviceUnit, stateData);
tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
serviceUnitTombstoneErrorCnt.incrementAndGet();
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
+ "cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
}
});
serviceUnitTombstoneCnt.incrementAndGet();
serviceUnitTombstoneCnt++;
}
}

Expand All @@ -634,26 +644,22 @@ private void doCleanup(String broker) {
log.error("Failed to flush the in-flight messages.", e);
}

if (serviceUnitTombstoneCnt.get() > 0) {
if (serviceUnitTombstoneCnt > 0) {
this.totalCleanupCnt++;
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get();
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
this.totalBrokerCleanupTombstoneCnt++;
}

if (serviceUnitTombstoneErrorCnt.get() > 0) {
this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get();
}

double cleanupTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
// TODO: clean load data stores
log.info("Completed a cleanup for the inactive broker:{} in {} ms. "
+ "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
+ "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+ "approximate cleanupErrorCnt:{}, metrics:{} ",
broker,
cleanupTime,
serviceUnitTombstoneCnt,
serviceUnitTombstoneErrorCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
printCleanupMetrics());
cleanupJobs.remove(broker);
}
Expand All @@ -673,8 +679,8 @@ private void monitorOwnerships(List<String> brokers) {
long startTime = System.nanoTime();
Set<String> inactiveBrokers = new HashSet<>();
Set<String> activeBrokers = new HashSet<>(brokers);
AtomicInteger serviceUnitTombstoneCnt = new AtomicInteger();
AtomicInteger serviceUnitTombstoneErrorCnt = new AtomicInteger();
int serviceUnitTombstoneCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
long now = System.currentTimeMillis();
for (Map.Entry<String, ServiceUnitStateData> etr : tableview.entrySet()) {
String serviceUnit = etr.getKey();
Expand All @@ -689,12 +695,13 @@ private void monitorOwnerships(List<String> brokers) {

tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
serviceUnitTombstoneErrorCnt.incrementAndGet();
log.error("Failed cleaning the ownership serviceUnit:{}, stateData:{}, "
+ "cleanupErrorCnt:{}.",
serviceUnit, stateData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
}
});
serviceUnitTombstoneCnt.incrementAndGet();
serviceUnitTombstoneCnt++;
}
}

Expand All @@ -708,36 +715,35 @@ private void monitorOwnerships(List<String> brokers) {
log.error("Failed to flush the in-flight messages.", e);
}

if (serviceUnitTombstoneCnt.get() > 0) {
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt.get();
if (serviceUnitTombstoneCnt > 0) {
this.totalServiceUnitCleanupTombstoneCnt += serviceUnitTombstoneCnt;
}
this.totalServiceUnitCleanupErrorCnt += serviceUnitTombstoneErrorCnt.get();

double monitorTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
log.info("Completed the ownership monitor run in {} ms. "
+ "Scheduled cleanups for inactiveBrokers:{}. inactiveBrokerCount:{}. "
+ "Published tombstone for orphan service units: serviceUnitTombstoneCnt:{}, "
+ "serviceUnitTombstoneErrorCnt:{}, metrics:{} ",
+ "approximate cleanupErrorCnt:{}, metrics:{} ",
monitorTime,
inactiveBrokers,
inactiveBrokers.size(),
serviceUnitTombstoneCnt,
serviceUnitTombstoneErrorCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
printCleanupMetrics());

}

private String printCleanupMetrics() {
return String.format(
"{totalCleanupCnt:%d, totalBrokerCleanupTombstoneCnt:%d, "
+ "totalServiceUnitCleanupTombstoneCnt:%d, totalServiceUnitCleanupErrorCnt:%d, "
+ "totalServiceUnitCleanupTombstoneCnt:%d, totalCleanupErrorCnt:%d, "
+ "totalCleanupScheduledCnt%d, totalCleanupIgnoredCnt:%d, totalCleanupCancelledCnt:%d, "
+ " activeCleanupJobs:%d}",
totalCleanupCnt,
totalBrokerCleanupTombstoneCnt,
totalServiceUnitCleanupTombstoneCnt,
totalServiceUnitCleanupErrorCnt,
totalCleanupErrorCnt.get(),
totalCleanupScheduledCnt,
totalCleanupIgnoredCnt,
totalCleanupCancelledCnt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -569,7 +570,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand All @@ -594,7 +595,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand All @@ -610,7 +611,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand All @@ -628,7 +629,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand All @@ -646,7 +647,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand All @@ -671,7 +672,7 @@ public void handleBrokerDeletionEventTest()
assertEquals(2, getCleanupMetric(leaderChannel, "totalCleanupCnt"));
assertEquals(2, getCleanupMetric(leaderChannel, "totalBrokerCleanupTombstoneCnt"));
assertEquals(4, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupTombstoneCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalServiceUnitCleanupErrorCnt"));
assertEquals(0, getCleanupMetric(leaderChannel, "totalCleanupErrorCnt"));
assertEquals(3, getCleanupMetric(leaderChannel, "totalCleanupScheduledCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupIgnoredCnt"));
assertEquals(1, getCleanupMetric(leaderChannel, "totalCleanupCancelledCnt"));
Expand Down Expand Up @@ -842,6 +843,11 @@ private static void cleanTableView(ServiceUnitStateChannel channel, String servi

private static long getCleanupMetric(ServiceUnitStateChannel channel, String metric)
throws IllegalAccessException {
return (long) FieldUtils.readDeclaredField(channel, metric, true);
Object var = FieldUtils.readDeclaredField(channel, metric, true);
if (var instanceof AtomicLong) {
return ((AtomicLong) var).get();
} else {
return (long) var;
}
}
}

0 comments on commit c818108

Please sign in to comment.