From 5b4318a41f7df3d83cd2adc6cf6dc9cc1133b412 Mon Sep 17 00:00:00 2001 From: Santiago Pericas-Geertsen Date: Wed, 11 Dec 2024 15:59:45 -0500 Subject: [PATCH] Refactors concurrently-limits module. Signed-off-by: Santiago Pericas-Geertsen --- .../common/concurrency/limits/AimdLimit.java | 9 + .../limits/AimdLimitConfigBlueprint.java | 35 +++ .../concurrency/limits/AimdLimitImpl.java | 39 ++-- .../common/concurrency/limits/FixedLimit.java | 175 +-------------- .../concurrency/limits/LimitHandlers.java | 206 ++++++++++++++++++ 5 files changed, 277 insertions(+), 187 deletions(-) create mode 100644 common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java index 1c901e8b78c..4482513ac87 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java @@ -34,6 +34,15 @@ @SuppressWarnings("removal") @RuntimeType.PrototypedBy(AimdLimitConfig.class) public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api { + /** + * Default length of the queue. + */ + public static final int DEFAULT_QUEUE_LENGTH = 0; + /** + * Timeout of a request that is enqueued. + */ + public static final String DEFAULT_QUEUE_TIMEOUT_DURATION = "PT1S"; + static final String TYPE = "aimd"; private final AimdLimitConfig config; diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java index 400fbb99682..c837a4439bc 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java @@ -94,4 +94,39 @@ interface AimdLimitConfigBlueprint extends Prototype.Factory { */ @Option.Default(AimdLimit.TYPE) String name(); + + /** + * How many requests can be enqueued waiting for a permit after + * the {@link #maxLimit()} is reached. + * Note that this may not be an exact behavior due to concurrent invocations. + * We use {@link java.util.concurrent.Semaphore#getQueueLength()} in the + * {@link io.helidon.common.concurrency.limits.AimdLimit} implementation. + * Default value is {@value AimdLimit#DEFAULT_QUEUE_LENGTH}. + * If set to {code 0}, there is no queueing. + * + * @return number of requests to enqueue + */ + @Option.Configured + @Option.DefaultInt(FixedLimit.DEFAULT_QUEUE_LENGTH) + int queueLength(); + + /** + * How long to wait for a permit when enqueued. + * Defaults to {@value AimdLimit#DEFAULT_QUEUE_TIMEOUT_DURATION} + * + * @return duration of the timeout + */ + @Option.Configured + @Option.Default(FixedLimit.DEFAULT_QUEUE_TIMEOUT_DURATION) + Duration queueTimeout(); + + /** + * Whether the {@link java.util.concurrent.Semaphore} should be {@link java.util.concurrent.Semaphore#isFair()}. + * Defaults to {@code false}. + * + * @return whether this should be a fair semaphore + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean fair(); } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java index 2431ec6e24c..dc3664f1154 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -36,6 +36,7 @@ class AimdLimitImpl { private final Supplier clock; private final AtomicInteger concurrentRequests; private final AdjustableSemaphore semaphore; + private final LimitHandlers.LimiterHandler handler; private final AtomicInteger limit; private final Lock limitLock = new ReentrantLock(); @@ -49,10 +50,19 @@ class AimdLimitImpl { this.clock = config.clock().orElseGet(() -> System::nanoTime); this.concurrentRequests = new AtomicInteger(); - this.semaphore = new AdjustableSemaphore(initialLimit); - this.limit = new AtomicInteger(initialLimit); + this.semaphore = new AdjustableSemaphore(initialLimit, config.fair()); + if (config.queueLength() == 0) { + this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore, + () -> new AimdToken(clock, concurrentRequests)); + } else { + this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore, + config.queueLength(), + config.queueTimeout(), + () -> new AimdToken(clock, concurrentRequests)); + } + if (!(backoffRatio < 1.0 && backoffRatio >= 0.5)) { throw new ConfigException("Backoff ratio must be within [0.5, 1.0)"); } @@ -76,11 +86,7 @@ int currentLimit() { } Optional tryAcquire() { - if (!semaphore.tryAcquire()) { - return Optional.empty(); - } - - return Optional.of(new AimdToken(clock, concurrentRequests)); + return handler.tryAcquire(); } void invoke(Runnable runnable) throws Exception { @@ -91,22 +97,19 @@ void invoke(Runnable runnable) throws Exception { } T invoke(Callable callable) throws Exception { - long startTime = clock.get(); - int currentRequests = concurrentRequests.incrementAndGet(); - - if (semaphore.tryAcquire()) { + Optional optionalToken = handler.tryAcquire(); + if (optionalToken.isPresent()) { + LimitAlgorithm.Token token = optionalToken.get(); try { T response = callable.call(); - updateWithSample(startTime, clock.get(), currentRequests, true); + token.success(); return response; } catch (IgnoreTaskException e) { + token.dropped(); return e.handle(); } catch (Throwable e) { - updateWithSample(startTime, clock.get(), currentRequests, false); + token.ignore(); throw e; - } finally { - concurrentRequests.decrementAndGet(); - semaphore.release(); } } else { throw new LimitException("No more permits available for the semaphore"); @@ -155,8 +158,8 @@ private static final class AdjustableSemaphore extends Semaphore { @Serial private static final long serialVersionUID = 114L; - private AdjustableSemaphore(int permits) { - super(permits); + private AdjustableSemaphore(int permits, boolean fair) { + super(permits, fair); } @Override diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java index 97255156e4e..533a6713923 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java @@ -16,11 +16,9 @@ package io.helidon.common.concurrency.limits; -import java.time.Duration; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import io.helidon.builder.api.RuntimeType; @@ -51,23 +49,23 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api new Semaphore(config.permits(), config.fair())); this.initialPermits = semaphore.availablePermits(); if (config.queueLength() == 0) { - this.handler = new RealSemaphoreHandler(semaphore); + this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore); } else { - this.handler = new QueuedSemaphoreHandler(semaphore, - config.queueLength(), - config.queueTimeout()); + this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore, + config.queueLength(), + config.queueTimeout()); } } } @@ -185,165 +183,4 @@ public Limit copy() { } return config.build(); } - - @SuppressWarnings("removal") - private interface LimiterHandler extends SemaphoreLimit, LimitAlgorithm { - } - - private static class NoOpSemaphoreHandler implements LimiterHandler { - private static final Token TOKEN = new Token() { - @Override - public void dropped() { - } - - @Override - public void ignore() { - } - - @Override - public void success() { - } - }; - - @Override - public T invoke(Callable callable) throws Exception { - try { - return callable.call(); - } catch (IgnoreTaskException e) { - return e.handle(); - } - } - - @Override - public void invoke(Runnable runnable) { - runnable.run(); - } - - @Override - public Optional tryAcquire(boolean wait) { - return Optional.of(TOKEN); - } - - @SuppressWarnings("removal") - @Override - public Semaphore semaphore() { - return NoopSemaphore.INSTANCE; - } - } - - @SuppressWarnings("removal") - private static class RealSemaphoreHandler implements LimiterHandler { - private final Semaphore semaphore; - - private RealSemaphoreHandler(Semaphore semaphore) { - this.semaphore = semaphore; - } - - @Override - public T invoke(Callable callable) throws Exception { - if (semaphore.tryAcquire()) { - try { - return callable.call(); - } catch (IgnoreTaskException e) { - return e.handle(); - } finally { - semaphore.release(); - } - } else { - throw new LimitException("No more permits available for the semaphore"); - } - } - - @Override - public void invoke(Runnable runnable) throws Exception { - if (semaphore.tryAcquire()) { - try { - runnable.run(); - } catch (IgnoreTaskException e) { - e.handle(); - } finally { - semaphore.release(); - } - } else { - throw new LimitException("No more permits available for the semaphore"); - } - } - - @Override - public Optional tryAcquire(boolean wait) { - if (!semaphore.tryAcquire()) { - return Optional.empty(); - } - return Optional.of(new SemaphoreToken(semaphore)); - } - - @Override - public Semaphore semaphore() { - return semaphore; - } - } - - private static class QueuedSemaphoreHandler implements LimiterHandler { - private final Semaphore semaphore; - private final int queueLength; - private final long timeoutMillis; - - private QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) { - this.semaphore = semaphore; - this.queueLength = queueLength; - this.timeoutMillis = queueTimeout.toMillis(); - } - - @Override - public Optional tryAcquire(boolean wait) { - if (semaphore.getQueueLength() >= this.queueLength) { - // this is an estimate - we do not promise to be precise here - return Optional.empty(); - } - - try { - if (wait) { - if (!semaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) { - return Optional.empty(); - } - } else { - if (!semaphore.tryAcquire()) { - return Optional.empty(); - } - } - - } catch (InterruptedException e) { - return Optional.empty(); - } - return Optional.of(new SemaphoreToken(semaphore)); - } - - @Override - public Semaphore semaphore() { - return semaphore; - } - } - - private static class SemaphoreToken implements Token { - private final Semaphore semaphore; - - private SemaphoreToken(Semaphore semaphore) { - this.semaphore = semaphore; - } - - @Override - public void dropped() { - semaphore.release(); - } - - @Override - public void ignore() { - semaphore.release(); - } - - @Override - public void success() { - semaphore.release(); - } - } } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java new file mode 100644 index 00000000000..b73ec4ee519 --- /dev/null +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/LimitHandlers.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2024 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.helidon.common.concurrency.limits; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +class LimitHandlers { + + private LimitHandlers() { + } + + @SuppressWarnings("removal") + interface LimiterHandler extends SemaphoreLimit, LimitAlgorithm { + } + + static class NoOpSemaphoreHandler implements LimiterHandler { + private static final Token TOKEN = new Token() { + @Override + public void dropped() { + } + + @Override + public void ignore() { + } + + @Override + public void success() { + } + }; + + @Override + public T invoke(Callable callable) throws Exception { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } + } + + @Override + public void invoke(Runnable runnable) { + runnable.run(); + } + + @Override + public Optional tryAcquire(boolean wait) { + return Optional.of(TOKEN); + } + + @SuppressWarnings("removal") + @Override + public Semaphore semaphore() { + return NoopSemaphore.INSTANCE; + } + } + + @SuppressWarnings("removal") + static class RealSemaphoreHandler implements LimiterHandler { + private final Semaphore semaphore; + private final Supplier tokenSupplier; + + RealSemaphoreHandler(Semaphore semaphore) { + this.semaphore = semaphore; + this.tokenSupplier = () -> new SemaphoreToken(semaphore); + } + + RealSemaphoreHandler(Semaphore semaphore, Supplier tokenSupplier) { + this.semaphore = semaphore; + this.tokenSupplier = tokenSupplier; + } + + @Override + public T invoke(Callable callable) throws Exception { + if (semaphore.tryAcquire()) { + try { + return callable.call(); + } catch (IgnoreTaskException e) { + return e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public void invoke(Runnable runnable) throws Exception { + if (semaphore.tryAcquire()) { + try { + runnable.run(); + } catch (IgnoreTaskException e) { + e.handle(); + } finally { + semaphore.release(); + } + } else { + throw new LimitException("No more permits available for the semaphore"); + } + } + + @Override + public Optional tryAcquire(boolean wait) { + if (!semaphore.tryAcquire()) { + return Optional.empty(); + } + return Optional.of(tokenSupplier.get()); + } + + @Override + public Semaphore semaphore() { + return semaphore; + } + } + + static class QueuedSemaphoreHandler implements LimiterHandler { + private final Semaphore semaphore; + private final int queueLength; + private final long timeoutMillis; + private final Supplier tokenSupplier; + + QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) { + this.semaphore = semaphore; + this.queueLength = queueLength; + this.timeoutMillis = queueTimeout.toMillis(); + this.tokenSupplier = () -> new SemaphoreToken(semaphore); + } + + QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout, Supplier tokenSupplier) { + this.semaphore = semaphore; + this.queueLength = queueLength; + this.timeoutMillis = queueTimeout.toMillis(); + this.tokenSupplier = tokenSupplier; + } + + @Override + public Optional tryAcquire(boolean wait) { + if (semaphore.getQueueLength() >= this.queueLength) { + // this is an estimate - we do not promise to be precise here + return Optional.empty(); + } + + try { + if (wait) { + if (!semaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) { + return Optional.empty(); + } + } else { + if (!semaphore.tryAcquire()) { + return Optional.empty(); + } + } + + } catch (InterruptedException e) { + return Optional.empty(); + } + return Optional.of(tokenSupplier.get()); + } + + @Override + public Semaphore semaphore() { + return semaphore; + } + } + + static class SemaphoreToken implements LimitAlgorithm.Token { + private final Semaphore semaphore; + + SemaphoreToken(Semaphore semaphore) { + this.semaphore = semaphore; + } + + @Override + public void dropped() { + semaphore.release(); + } + + @Override + public void ignore() { + semaphore.release(); + } + + @Override + public void success() { + semaphore.release(); + } + } +}