Skip to content

Commit

Permalink
[METRIC] Add wait in MetricStore interface (#1684)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored May 1, 2023
1 parent f86391d commit 0855d44
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.astraea.common.Utils;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.consumer.Deserializer;
import org.astraea.common.consumer.Record;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.ClusterBean;
Expand Down Expand Up @@ -67,6 +70,9 @@ static Builder builder() {
*/
Map<MetricSensor, BiConsumer<Integer, Exception>> sensors();

/** Wait for the checker to be true or timeout. */
void wait(Predicate<ClusterBean> checker, Duration timeout);

@Override
void close();

Expand Down Expand Up @@ -193,6 +199,8 @@ class MetricStoreImpl implements MetricStore {
private final Set<Integer> identities = new ConcurrentSkipListSet<>();

private volatile Map<MetricSensor, BiConsumer<Integer, Exception>> lastSensors = Map.of();
private final Map<CountDownLatch, Predicate<ClusterBean>> waitingList =
new ConcurrentHashMap<>();

private MetricStoreImpl(
Supplier<Map<MetricSensor, BiConsumer<Integer, Exception>>> sensorsSupplier,
Expand Down Expand Up @@ -243,8 +251,11 @@ private MetricStoreImpl(
}
});
});
// generate new cluster bean
if (!allBeans.isEmpty()) updateClusterBean();
if (!allBeans.isEmpty()) {
// generate new cluster bean
updateClusterBean();
checkWaitingList(this.waitingList, clusterBean());
}
} catch (Exception e) {
// TODO: it needs better error handling
e.printStackTrace();
Expand Down Expand Up @@ -278,6 +289,25 @@ public void close() {
receiver.close();
}

/** User thread will "wait" until being awakened by the metric store or being timeout. */
@Override
public void wait(Predicate<ClusterBean> checker, Duration timeout) {
var latch = new CountDownLatch(1);
try {
waitingList.put(latch, checker);
// Check the newly added checker immediately
checkWaitingList(Map.of(latch, checker), clusterBean());
// Wait until being awake or timeout
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Timeout waiting for the checker");
}
} catch (InterruptedException ie) {
throw new IllegalStateException("Interrupted while waiting for the checker");
} finally {
waitingList.remove(latch);
}
}

private void updateClusterBean() {
lastClusterBean =
ClusterBean.of(
Expand All @@ -287,5 +317,20 @@ private void updateClusterBean() {
Collectors.toUnmodifiableMap(
Map.Entry::getKey, e -> List.copyOf(e.getValue()))));
}

/**
* Check the checkers in the waiting list. If the checker returns true, count down the latch.
*/
private static void checkWaitingList(
Map<CountDownLatch, Predicate<ClusterBean>> waitingList, ClusterBean clusterBean) {
waitingList.forEach(
(latch, checker) -> {
try {
if (checker.test(clusterBean)) latch.countDown();
} catch (NoSufficientMetricsException e) {
// Check failed. Try again next time.
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,32 @@ void testBeanExpiration() {
Assertions.assertEquals(0, store.clusterBean().all().size());
}
}

@Test
void testWait() {
var queue = new LinkedBlockingQueue<Map<Integer, Collection<BeanObject>>>();

try (var store =
MetricStore.builder()
.receiver(timeout -> Utils.packException(queue::take))
.sensorsSupplier(
// Metric sensor provide fake hasBeanObject
() ->
Map.of(
(client, bean) ->
List.of(() -> new BeanObject(Utils.randomString(), Map.of(), Map.of())),
(id, exception) -> {}))
.build()) {
Assertions.assertThrows(
IllegalStateException.class, () -> store.wait((ignore) -> false, Duration.ofSeconds(1)));
Assertions.assertDoesNotThrow(() -> store.wait((ignore) -> true, Duration.ofSeconds(1)));

Assertions.assertThrows(
IllegalStateException.class,
() -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1)));
queue.add(Map.of(1000, List.of(new BeanObject(Utils.randomString(), Map.of(), Map.of()))));
Assertions.assertDoesNotThrow(
() -> store.wait((clusterBean) -> !clusterBean.all().isEmpty(), Duration.ofSeconds(1)));
}
}
}

0 comments on commit 0855d44

Please sign in to comment.