Skip to content

Commit

Permalink
[METRICS] New thread for waiting list checking (#1836)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored Aug 17, 2023
1 parent 53aed87 commit d4c3d4b
Showing 1 changed file with 28 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -222,8 +221,8 @@ class MetricStoreImpl implements MetricStore {
private final Set<Integer> identities = new ConcurrentSkipListSet<>();

private volatile Map<MetricSensor, BiConsumer<Integer, Exception>> lastSensors = Map.of();
private final Map<CountDownLatch, Predicate<ClusterBean>> waitingList =
new ConcurrentHashMap<>();
// Monitor for detecting cluster bean changing.
private final Object beanUpdateMonitor = new Object();
// For mbean register. To distinguish mbeans of different metricStore.
private final String uid = Utils.randomString();
private final Sensor<Long> beanReceivedSensor =
Expand Down Expand Up @@ -292,7 +291,10 @@ private MetricStoreImpl(
if (!allBeans.isEmpty()) {
// generate new cluster bean
updateClusterBean();
checkWaitingList(this.waitingList, clusterBean());
// Tell waiting threads that cluster bean has been changed
synchronized (beanUpdateMonitor) {
beanUpdateMonitor.notifyAll();
}
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -349,23 +351,31 @@ public void close() {
receivers.forEach(Receiver::close);
}

/** User thread will "wait" until being awakened by the metric store or being timeout. */
/**
* User thread will wait until checker pass or timeout. When cluster bean has changed, the
* waiting threads will be notified.
*/
@Override
public void wait(Predicate<ClusterBean> checker, Duration timeout) {
var latch = new CountDownLatch(1);
try {
waitingList.put(latch, checker);
// Check the newly added checker immediately
checkWaitingList(Map.of(latch, checker), clusterBean());
// Wait until being awake or timeout
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Timeout waiting for the checker");
public void wait(Predicate<ClusterBean> checker, Duration duration) {
var endTime = System.currentTimeMillis() + duration.toMillis();
var timeout = duration.toMillis();
if (checker.test(clusterBean())) return;

while (timeout > 0) {
try {
synchronized (beanUpdateMonitor) {
// Release the lock and wait for clusterBean being updated
this.beanUpdateMonitor.wait(timeout);
}
if (checker.test(clusterBean())) return;
} catch (NoSufficientMetricsException e) {
// Check failed. Try again next time.
} catch (InterruptedException ie) {
throw new IllegalStateException("Interrupted while waiting for the checker");
}
} catch (InterruptedException ie) {
throw new IllegalStateException("Interrupted while waiting for the checker");
} finally {
waitingList.remove(latch);
timeout = endTime - System.currentTimeMillis();
}
throw new IllegalStateException("Timeout waiting for the checker");
}

private void updateClusterBean() {
Expand All @@ -377,20 +387,5 @@ private void updateClusterBean() {
Collectors.toUnmodifiableMap(
Map.Entry::getKey, e -> List.copyOf(e.getValue()))));
}

/**
* Check the checkers in the waiting list. If the checker returns true, count down the latch.
*/
private static void checkWaitingList(
Map<CountDownLatch, Predicate<ClusterBean>> waitingList, ClusterBean clusterBean) {
waitingList.forEach(
(latch, checker) -> {
try {
if (checker.test(clusterBean)) latch.countDown();
} catch (NoSufficientMetricsException e) {
// Check failed. Try again next time.
}
});
}
}
}

0 comments on commit d4c3d4b

Please sign in to comment.