From 8d84b75a0bd5c67705da9ce1aae7d6e4cf13e1a6 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sun, 30 Apr 2023 22:18:05 +0800 Subject: [PATCH 1/4] Add wait in MetricStore interface --- .../common/metrics/collector/MetricStore.java | 55 ++++++++++++++++++- .../metrics/collector/MetricStoreTest.java | 28 ++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index e1ef233917..6fcbafefb0 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -27,11 +28,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import org.astraea.common.Utils; @@ -39,6 +42,7 @@ import org.astraea.common.consumer.ConsumerConfigs; import org.astraea.common.consumer.Deserializer; import org.astraea.common.consumer.Record; +import org.astraea.common.cost.NoSufficientMetricsException; import org.astraea.common.metrics.BeanObject; import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.ClusterBean; @@ -67,6 +71,21 @@ static Builder builder() { */ Map> sensors(); + /** Wait for the checker to be true or timeout. */ + default void wait(Predicate checker, Duration timeout) { + // Keep checking the checker until it is true or timeout. + var start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < timeout.toMillis()) { + try { + if (checker.test(clusterBean())) return; + } catch (NoSufficientMetricsException noe) { + // keep trying + } + Utils.sleep(Duration.ofSeconds(1)); + } + throw new IllegalStateException("Timeout waiting for the checker"); + } + @Override void close(); @@ -193,6 +212,7 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); + private final Map> waitingList = new HashMap<>(); private MetricStoreImpl( Supplier>> sensorsSupplier, @@ -242,8 +262,11 @@ private MetricStoreImpl( } }); }); - // generate new cluster bean - if (!allBeans.isEmpty()) updateClusterBean(); + if (!allBeans.isEmpty()) { + // generate new cluster bean + updateClusterBean(); + checkWaitingList(); + } } catch (Exception e) { // TODO: it needs better error handling e.printStackTrace(); @@ -277,6 +300,24 @@ public void close() { receiver.close(); } + /** User thread will "wait" until being awakened by the metric store or being timeout. */ + @Override + public void wait(Predicate checker, Duration timeout) { + if (checker.test(clusterBean())) return; + + var latch = new CountDownLatch(1); + try { + waitingList.put(latch, checker); + if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + throw new IllegalStateException("Timeout waiting for the checker"); + } + } catch (InterruptedException ie) { + throw new IllegalStateException("Interrupted while waiting for the checker"); + } finally { + waitingList.remove(latch); + } + } + private void updateClusterBean() { lastClusterBean = ClusterBean.of( @@ -286,5 +327,15 @@ private void updateClusterBean() { Collectors.toUnmodifiableMap( Map.Entry::getKey, e -> List.copyOf(e.getValue())))); } + + /** Check the checkers in the waiting list. If the checker returns true, awake the thread. */ + private void checkWaitingList() { + waitingList.forEach( + (latch, checker) -> { + if (checker.test(clusterBean())) { + latch.countDown(); + } + }); + } } } diff --git a/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java b/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java index 6136b7eba0..2052194af5 100644 --- a/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java +++ b/common/src/test/java/org/astraea/common/metrics/collector/MetricStoreTest.java @@ -80,4 +80,32 @@ void testBeanExpiration() { Assertions.assertEquals(0, store.clusterBean().all().size()); } } + + @Test + void testWait() { + var queue = new LinkedBlockingQueue>>(); + + try (var store = + MetricStore.builder() + .receiver(timeout -> Utils.packException(queue::take)) + .sensorsSupplier( + // Metric sensor provide fake hasBeanObject + () -> + Map.of( + (client, bean) -> + List.of(() -> new BeanObject(Utils.randomString(), Map.of(), Map.of())), + (id, exception) -> {})) + .build()) { + Assertions.assertThrows( + IllegalStateException.class, () -> store.wait((ignore) -> false, Duration.ofSeconds(1))); + Assertions.assertDoesNotThrow(() -> store.wait((ignore) -> true, Duration.ofSeconds(1))); + + Assertions.assertThrows( + IllegalStateException.class, + () -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1))); + queue.add(Map.of(1000, List.of(new BeanObject(Utils.randomString(), Map.of(), Map.of())))); + Assertions.assertDoesNotThrow( + () -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1))); + } + } } From 41b0fed90adc910c59265a6fbae3e870d156b06b Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Sun, 30 Apr 2023 23:39:25 +0800 Subject: [PATCH 2/4] Use concurrent hash map Remove default method. Check for no sufficient element exception --- .../common/metrics/collector/MetricStore.java | 31 ++++++++----------- 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index 6fcbafefb0..e8c5b4ac07 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -72,19 +71,7 @@ static Builder builder() { Map> sensors(); /** Wait for the checker to be true or timeout. */ - default void wait(Predicate checker, Duration timeout) { - // Keep checking the checker until it is true or timeout. - var start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout.toMillis()) { - try { - if (checker.test(clusterBean())) return; - } catch (NoSufficientMetricsException noe) { - // keep trying - } - Utils.sleep(Duration.ofSeconds(1)); - } - throw new IllegalStateException("Timeout waiting for the checker"); - } + void wait(Predicate checker, Duration timeout); @Override void close(); @@ -212,7 +199,8 @@ class MetricStoreImpl implements MetricStore { private final Set identities = new ConcurrentSkipListSet<>(); private volatile Map> lastSensors = Map.of(); - private final Map> waitingList = new HashMap<>(); + private final Map> waitingList = + new ConcurrentHashMap<>(); private MetricStoreImpl( Supplier>> sensorsSupplier, @@ -303,11 +291,16 @@ public void close() { /** User thread will "wait" until being awakened by the metric store or being timeout. */ @Override public void wait(Predicate checker, Duration timeout) { - if (checker.test(clusterBean())) return; + try { + if (checker.test(clusterBean())) return; + } catch (NoSufficientMetricsException e) { + // Check failed. Need to wait for more metrics. + } var latch = new CountDownLatch(1); try { waitingList.put(latch, checker); + // Wait until being awake or timeout if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { throw new IllegalStateException("Timeout waiting for the checker"); } @@ -332,8 +325,10 @@ private void updateClusterBean() { private void checkWaitingList() { waitingList.forEach( (latch, checker) -> { - if (checker.test(clusterBean())) { - latch.countDown(); + try { + if (checker.test(clusterBean())) latch.countDown(); + } catch (NoSufficientMetricsException e) { + // Check failed. Try again next time. } }); } From 462ee84e1c937c76d318ac6f3df31f667838e3d4 Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Mon, 1 May 2023 10:21:20 +0800 Subject: [PATCH 3/4] Reduce code --- .../common/metrics/collector/MetricStore.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index e8c5b4ac07..e68a1a2e93 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -253,7 +253,7 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - checkWaitingList(); + checkWaitingList(this.waitingList); } } catch (Exception e) { // TODO: it needs better error handling @@ -291,15 +291,11 @@ public void close() { /** User thread will "wait" until being awakened by the metric store or being timeout. */ @Override public void wait(Predicate checker, Duration timeout) { - try { - if (checker.test(clusterBean())) return; - } catch (NoSufficientMetricsException e) { - // Check failed. Need to wait for more metrics. - } - var latch = new CountDownLatch(1); try { waitingList.put(latch, checker); + // Check the newly added checker immediately + checkWaitingList(Map.of(latch, checker)); // Wait until being awake or timeout if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { throw new IllegalStateException("Timeout waiting for the checker"); @@ -321,8 +317,10 @@ private void updateClusterBean() { Map.Entry::getKey, e -> List.copyOf(e.getValue())))); } - /** Check the checkers in the waiting list. If the checker returns true, awake the thread. */ - private void checkWaitingList() { + /** + * Check the checkers in the waiting list. If the checker returns true, count down the latch. + */ + private void checkWaitingList(Map> waitingList) { waitingList.forEach( (latch, checker) -> { try { From 45a4378faf70a37a9e50e12fe15df0382552229a Mon Sep 17 00:00:00 2001 From: chinghongfang Date: Mon, 1 May 2023 12:59:53 +0800 Subject: [PATCH 4/4] Make method static --- .../astraea/common/metrics/collector/MetricStore.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java index e68a1a2e93..92ee5ba549 100644 --- a/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java +++ b/common/src/main/java/org/astraea/common/metrics/collector/MetricStore.java @@ -253,7 +253,7 @@ private MetricStoreImpl( if (!allBeans.isEmpty()) { // generate new cluster bean updateClusterBean(); - checkWaitingList(this.waitingList); + checkWaitingList(this.waitingList, clusterBean()); } } catch (Exception e) { // TODO: it needs better error handling @@ -295,7 +295,7 @@ public void wait(Predicate checker, Duration timeout) { try { waitingList.put(latch, checker); // Check the newly added checker immediately - checkWaitingList(Map.of(latch, checker)); + checkWaitingList(Map.of(latch, checker), clusterBean()); // Wait until being awake or timeout if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) { throw new IllegalStateException("Timeout waiting for the checker"); @@ -320,11 +320,12 @@ private void updateClusterBean() { /** * Check the checkers in the waiting list. If the checker returns true, count down the latch. */ - private void checkWaitingList(Map> waitingList) { + private static void checkWaitingList( + Map> waitingList, ClusterBean clusterBean) { waitingList.forEach( (latch, checker) -> { try { - if (checker.test(clusterBean())) latch.countDown(); + if (checker.test(clusterBean)) latch.countDown(); } catch (NoSufficientMetricsException e) { // Check failed. Try again next time. }