From a2a5bc13bbfe445f99d3e3c17eb76d221563a617 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Wed, 11 Dec 2024 17:10:26 -0500 Subject: [PATCH] Adds new test. Signed-off-by: Santiago Pericas-Geertsen --- .../concurrency/limits/AimdLimitImpl.java | 4 +- .../concurrency/limits/AimdLimitTest.java | 79 +++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java index dc3664f1154..0ff523a3289 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -105,10 +105,10 @@ T invoke(Callable callable) throws Exception { token.success(); return response; } catch (IgnoreTaskException e) { - token.dropped(); + token.ignore(); return e.handle(); } catch (Throwable e) { - token.ignore(); + token.dropped(); throw e; } } else { diff --git a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java index c118cce5565..0e4dfee17a2 100644 --- a/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java +++ b/common/concurrency/limits/src/test/java/io/helidon/common/concurrency/limits/AimdLimitTest.java @@ -17,13 +17,21 @@ package io.helidon.common.concurrency.limits; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.is; @@ -181,4 +189,75 @@ public void testSemaphoreReleasedWithToken() { token.get().success(); } } + + @RepeatedTest(5) + public void testLimitWithQueue() throws Exception { + AimdLimit limiter = AimdLimit.builder() + .minLimit(1) + .maxLimit(2) + .initialLimit(1) + .queueLength(5) + .queueTimeout(Duration.ofSeconds(5)) + .build(); + + int concurrency = 5; + Barrier barrier = new Barrier(); + + Lock lock = new ReentrantLock(); + List result = new ArrayList<>(concurrency); + AtomicInteger failures = new AtomicInteger(); + + Thread[] threads = new Thread[concurrency]; + for (int i = 0; i < concurrency; i++) { + int index = i; + threads[i] = new Thread(() -> { + try { + limiter.invoke(() -> { + barrier.waitOn(); + lock.lock(); + try { + result.add("result_" + index); + } finally { + lock.unlock(); + } + return null; + }); + } catch (LimitException e) { + failures.incrementAndGet(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + + for (Thread thread : threads) { + thread.start(); + } + // wait for the threads to reach their destination (either failed, or on cdl, or in queue) + TimeUnit.MILLISECONDS.sleep(100); + barrier.retract(); + for (Thread thread : threads) { + thread.join(Duration.ofSeconds(5)); + } + + // all tasks should be queued + assertThat(failures.get(), is(0)); + // and eventually run to completion + assertThat(result.size(), is(5)); + } + + /** + * A barrier is used to force a thread to wait (block) until it is retracted. + */ + private static class Barrier { + private final CompletableFuture future = new CompletableFuture<>(); + + void waitOn() throws ExecutionException, InterruptedException, TimeoutException { + future.get(10, TimeUnit.SECONDS); + } + + void retract() { + future.complete(null); + } + } }