diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 616109f693..c323a6d1a9 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -222,8 +221,8 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); - private final Map> waitingList = - new ConcurrentHashMap<>(); + // Monitor for detecting cluster bean changing. + private final Object beanUpdateMonitor = new Object(); // For mbean register. To distinguish mbeans of different metricStore. private final String uid = Utils.randomString(); private final Sensor beanReceivedSensor = @@ -292,7 +291,10 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - checkWaitingList(this.waitingList, clusterBean()); + // Tell waiting threads that cluster bean has been changed + synchronized (beanUpdateMonitor) { + beanUpdateMonitor.notifyAll(); + } } }); } catch (Exception e) { @@ -349,23 +351,31 @@ public void close() { receivers.forEach(Receiver::close); } - /** User thread will "wait" until being awakened by the metric store or being timeout. */ + /** + * User thread will wait until checker pass or timeout. When cluster bean has changed, the + * waiting threads will be notified. + */ @Override - public void wait(Predicate checker, Duration timeout) { - var latch = new CountDownLatch(1); - try { - waitingList.put(latch, checker); - // Check the newly added checker immediately - checkWaitingList(Map.of(latch, checker), clusterBean()); - // Wait until being awake or timeout - if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - throw new IllegalStateException("Timeout waiting for the checker"); + public void wait(Predicate checker, Duration duration) { + var endTime = System.currentTimeMillis() + duration.toMillis(); + var timeout = duration.toMillis(); + if (checker.test(clusterBean())) return; + + while (timeout > 0) { + try { + synchronized (beanUpdateMonitor) { + // Release the lock and wait for clusterBean being updated + this.beanUpdateMonitor.wait(timeout); + } + if (checker.test(clusterBean())) return; + } catch (NoSufficientMetricsException e) { + // Check failed. Try again next time. + } catch (InterruptedException ie) { + throw new IllegalStateException("Interrupted while waiting for the checker"); } - } catch (InterruptedException ie) { - throw new IllegalStateException("Interrupted while waiting for the checker"); - } finally { - waitingList.remove(latch); + timeout = endTime - System.currentTimeMillis(); } + throw new IllegalStateException("Timeout waiting for the checker"); } private void updateClusterBean() { @@ -377,20 +387,5 @@ private void updateClusterBean() { Collectors.toUnmodifiableMap( Map.Entry::getKey, e -> List.copyOf(e.getValue())))); } - - /** - * Check the checkers in the waiting list. If the checker returns true, count down the latch. - */ - private static void checkWaitingList( - Map> waitingList, ClusterBean clusterBean) { - waitingList.forEach( - (latch, checker) -> { - try { - if (checker.test(clusterBean)) latch.countDown(); - } catch (NoSufficientMetricsException e) { - // Check failed. Try again next time. - } - }); - } } }