Skip to content

Commit

Permalink
Refactors concurrently-limits module.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Dec 11, 2024
1 parent 4591886 commit 5b4318a
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@
@SuppressWarnings("removal")
@RuntimeType.PrototypedBy(AimdLimitConfig.class)
public class AimdLimit implements Limit, SemaphoreLimit, RuntimeType.Api<AimdLimitConfig> {
/**
* Default length of the queue.
*/
public static final int DEFAULT_QUEUE_LENGTH = 0;
/**
* Timeout of a request that is enqueued.
*/
public static final String DEFAULT_QUEUE_TIMEOUT_DURATION = "PT1S";

static final String TYPE = "aimd";

private final AimdLimitConfig config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,39 @@ interface AimdLimitConfigBlueprint extends Prototype.Factory<AimdLimit> {
*/
@Option.Default(AimdLimit.TYPE)
String name();

/**
* How many requests can be enqueued waiting for a permit after
* the {@link #maxLimit()} is reached.
* Note that this may not be an exact behavior due to concurrent invocations.
* We use {@link java.util.concurrent.Semaphore#getQueueLength()} in the
* {@link io.helidon.common.concurrency.limits.AimdLimit} implementation.
* Default value is {@value AimdLimit#DEFAULT_QUEUE_LENGTH}.
* If set to {code 0}, there is no queueing.
*
* @return number of requests to enqueue
*/
@Option.Configured
@Option.DefaultInt(FixedLimit.DEFAULT_QUEUE_LENGTH)
int queueLength();

/**
* How long to wait for a permit when enqueued.
* Defaults to {@value AimdLimit#DEFAULT_QUEUE_TIMEOUT_DURATION}
*
* @return duration of the timeout
*/
@Option.Configured
@Option.Default(FixedLimit.DEFAULT_QUEUE_TIMEOUT_DURATION)
Duration queueTimeout();

/**
* Whether the {@link java.util.concurrent.Semaphore} should be {@link java.util.concurrent.Semaphore#isFair()}.
* Defaults to {@code false}.
*
* @return whether this should be a fair semaphore
*/
@Option.Configured
@Option.DefaultBoolean(false)
boolean fair();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AimdLimitImpl {
private final Supplier<Long> clock;
private final AtomicInteger concurrentRequests;
private final AdjustableSemaphore semaphore;
private final LimitHandlers.LimiterHandler handler;

private final AtomicInteger limit;
private final Lock limitLock = new ReentrantLock();
Expand All @@ -49,10 +50,19 @@ class AimdLimitImpl {
this.clock = config.clock().orElseGet(() -> System::nanoTime);

this.concurrentRequests = new AtomicInteger();
this.semaphore = new AdjustableSemaphore(initialLimit);

this.limit = new AtomicInteger(initialLimit);

this.semaphore = new AdjustableSemaphore(initialLimit, config.fair());
if (config.queueLength() == 0) {
this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore,
() -> new AimdToken(clock, concurrentRequests));
} else {
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
config.queueLength(),
config.queueTimeout(),
() -> new AimdToken(clock, concurrentRequests));
}

if (!(backoffRatio < 1.0 && backoffRatio >= 0.5)) {
throw new ConfigException("Backoff ratio must be within [0.5, 1.0)");
}
Expand All @@ -76,11 +86,7 @@ int currentLimit() {
}

Optional<Limit.Token> tryAcquire() {
if (!semaphore.tryAcquire()) {
return Optional.empty();
}

return Optional.of(new AimdToken(clock, concurrentRequests));
return handler.tryAcquire();
}

void invoke(Runnable runnable) throws Exception {
Expand All @@ -91,22 +97,19 @@ void invoke(Runnable runnable) throws Exception {
}

<T> T invoke(Callable<T> callable) throws Exception {
long startTime = clock.get();
int currentRequests = concurrentRequests.incrementAndGet();

if (semaphore.tryAcquire()) {
Optional<LimitAlgorithm.Token> optionalToken = handler.tryAcquire();
if (optionalToken.isPresent()) {
LimitAlgorithm.Token token = optionalToken.get();
try {
T response = callable.call();
updateWithSample(startTime, clock.get(), currentRequests, true);
token.success();
return response;
} catch (IgnoreTaskException e) {
token.dropped();
return e.handle();
} catch (Throwable e) {
updateWithSample(startTime, clock.get(), currentRequests, false);
token.ignore();
throw e;
} finally {
concurrentRequests.decrementAndGet();
semaphore.release();
}
} else {
throw new LimitException("No more permits available for the semaphore");
Expand Down Expand Up @@ -155,8 +158,8 @@ private static final class AdjustableSemaphore extends Semaphore {
@Serial
private static final long serialVersionUID = 114L;

private AdjustableSemaphore(int permits) {
super(permits);
private AdjustableSemaphore(int permits, boolean fair) {
super(permits, fair);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.helidon.common.concurrency.limits;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import io.helidon.builder.api.RuntimeType;
Expand Down Expand Up @@ -51,23 +49,23 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api<FixedL
static final String TYPE = "fixed";

private final FixedLimitConfig config;
private final LimiterHandler handler;
private final LimitHandlers.LimiterHandler handler;
private final int initialPermits;

private FixedLimit(FixedLimitConfig config) {
this.config = config;
if (config.permits() == 0 && config.semaphore().isEmpty()) {
this.handler = new NoOpSemaphoreHandler();
this.handler = new LimitHandlers.NoOpSemaphoreHandler();
this.initialPermits = 0;
} else {
Semaphore semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair()));
this.initialPermits = semaphore.availablePermits();
if (config.queueLength() == 0) {
this.handler = new RealSemaphoreHandler(semaphore);
this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore);
} else {
this.handler = new QueuedSemaphoreHandler(semaphore,
config.queueLength(),
config.queueTimeout());
this.handler = new LimitHandlers.QueuedSemaphoreHandler(semaphore,
config.queueLength(),
config.queueTimeout());
}
}
}
Expand Down Expand Up @@ -185,165 +183,4 @@ public Limit copy() {
}
return config.build();
}

@SuppressWarnings("removal")
private interface LimiterHandler extends SemaphoreLimit, LimitAlgorithm {
}

private static class NoOpSemaphoreHandler implements LimiterHandler {
private static final Token TOKEN = new Token() {
@Override
public void dropped() {
}

@Override
public void ignore() {
}

@Override
public void success() {
}
};

@Override
public <T> T invoke(Callable<T> callable) throws Exception {
try {
return callable.call();
} catch (IgnoreTaskException e) {
return e.handle();
}
}

@Override
public void invoke(Runnable runnable) {
runnable.run();
}

@Override
public Optional<Token> tryAcquire(boolean wait) {
return Optional.of(TOKEN);
}

@SuppressWarnings("removal")
@Override
public Semaphore semaphore() {
return NoopSemaphore.INSTANCE;
}
}

@SuppressWarnings("removal")
private static class RealSemaphoreHandler implements LimiterHandler {
private final Semaphore semaphore;

private RealSemaphoreHandler(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public <T> T invoke(Callable<T> callable) throws Exception {
if (semaphore.tryAcquire()) {
try {
return callable.call();
} catch (IgnoreTaskException e) {
return e.handle();
} finally {
semaphore.release();
}
} else {
throw new LimitException("No more permits available for the semaphore");
}
}

@Override
public void invoke(Runnable runnable) throws Exception {
if (semaphore.tryAcquire()) {
try {
runnable.run();
} catch (IgnoreTaskException e) {
e.handle();
} finally {
semaphore.release();
}
} else {
throw new LimitException("No more permits available for the semaphore");
}
}

@Override
public Optional<Token> tryAcquire(boolean wait) {
if (!semaphore.tryAcquire()) {
return Optional.empty();
}
return Optional.of(new SemaphoreToken(semaphore));
}

@Override
public Semaphore semaphore() {
return semaphore;
}
}

private static class QueuedSemaphoreHandler implements LimiterHandler {
private final Semaphore semaphore;
private final int queueLength;
private final long timeoutMillis;

private QueuedSemaphoreHandler(Semaphore semaphore, int queueLength, Duration queueTimeout) {
this.semaphore = semaphore;
this.queueLength = queueLength;
this.timeoutMillis = queueTimeout.toMillis();
}

@Override
public Optional<Token> tryAcquire(boolean wait) {
if (semaphore.getQueueLength() >= this.queueLength) {
// this is an estimate - we do not promise to be precise here
return Optional.empty();
}

try {
if (wait) {
if (!semaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)) {
return Optional.empty();
}
} else {
if (!semaphore.tryAcquire()) {
return Optional.empty();
}
}

} catch (InterruptedException e) {
return Optional.empty();
}
return Optional.of(new SemaphoreToken(semaphore));
}

@Override
public Semaphore semaphore() {
return semaphore;
}
}

private static class SemaphoreToken implements Token {
private final Semaphore semaphore;

private SemaphoreToken(Semaphore semaphore) {
this.semaphore = semaphore;
}

@Override
public void dropped() {
semaphore.release();
}

@Override
public void ignore() {
semaphore.release();
}

@Override
public void success() {
semaphore.release();
}
}
}
Loading

0 comments on commit 5b4318a

Please sign in to comment.