Skip to content

Commit

Permalink
Initial support for metrics in limits.
Browse files Browse the repository at this point in the history
Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
  • Loading branch information
spericas committed Dec 13, 2024
1 parent a2a5bc1 commit f144e8b
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 1 deletion.
4 changes: 4 additions & 0 deletions common/concurrency/limits/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-config</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.metrics</groupId>
<artifactId>helidon-metrics-api</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.service</groupId>
<artifactId>helidon-service-registry</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,9 @@ public AimdLimitConfig prototype() {
public Limit copy() {
return config.build();
}

@Override
public void init(String socketName) {
aimdLimitImpl.initMetrics(socketName, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,13 @@ interface AimdLimitConfigBlueprint extends Prototype.Factory<AimdLimit> {
@Option.Configured
@Option.DefaultBoolean(false)
boolean fair();

/**
* Whether to collect metrics for the AIMD implementation.
*
* @return metrics flag
*/
@Option.Configured
@Option.DefaultBoolean(false)
boolean enableMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import io.helidon.common.config.ConfigException;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.MeterRegistry;
import io.helidon.metrics.api.Metrics;
import io.helidon.metrics.api.MetricsFactory;
import io.helidon.metrics.api.Timer;

import static io.helidon.metrics.api.Meter.Scope.VENDOR;

class AimdLimitImpl {
private final double backoffRatio;
Expand All @@ -41,6 +49,8 @@ class AimdLimitImpl {
private final AtomicInteger limit;
private final Lock limitLock = new ReentrantLock();

private Timer rttTimer;

AimdLimitImpl(AimdLimitConfig config) {
int initialLimit = config.initialLimit();
this.backoffRatio = config.backoffRatio();
Expand Down Expand Up @@ -119,6 +129,10 @@ <T> T invoke(Callable<T> callable) throws Exception {
void updateWithSample(long startTime, long endTime, int currentRequests, boolean success) {
long rtt = endTime - startTime;

if (rttTimer != null) {
rttTimer.record(rtt, TimeUnit.NANOSECONDS);
}

int currentLimit = limit.get();
if (rtt > timeoutInNanos || !success) {
currentLimit = (int) (currentLimit * backoffRatio);
Expand Down Expand Up @@ -154,6 +168,38 @@ private void setLimit(int newLimit) {
}
}

/**
* Initialize metrics for this limit.
*
* @param socketName name of socket for which this limit was created
* @param config this limit's config
*/
void initMetrics(String socketName, AimdLimitConfig config) {
if (config.enableMetrics()) {
MetricsFactory metricsFactory = MetricsFactory.getInstance();
MeterRegistry meterRegistry = Metrics.globalRegistry();
String namePrefix = (socketName.startsWith("@") ? socketName.substring(1) : socketName)
+ "_" + config.name();

Gauge.Builder<Integer> limitBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_limit", limit::get).scope(VENDOR);
meterRegistry.getOrCreate(limitBuilder);

Gauge.Builder<Integer> concurrentRequestsBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_concurrent_requests", concurrentRequests::get).scope(VENDOR);
meterRegistry.getOrCreate(concurrentRequestsBuilder);

Gauge.Builder<Integer> queueLengthBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR);
meterRegistry.getOrCreate(queueLengthBuilder);

Timer.Builder timerBuilder = metricsFactory.timerBuilder(namePrefix + "_rtt")
.scope(VENDOR)
.baseUnit(Timer.BaseUnits.MILLISECONDS);
rttTimer = meterRegistry.getOrCreate(timerBuilder);
}
}

private static final class AdjustableSemaphore extends Semaphore {
@Serial
private static final long serialVersionUID = 114L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

import io.helidon.builder.api.RuntimeType;
import io.helidon.common.config.Config;
import io.helidon.metrics.api.Gauge;
import io.helidon.metrics.api.MeterRegistry;
import io.helidon.metrics.api.Metrics;
import io.helidon.metrics.api.MetricsFactory;

import static io.helidon.metrics.api.Meter.Scope.VENDOR;

/**
* Semaphore based limit, that supports queuing for a permit, and timeout on the queue.
Expand Down Expand Up @@ -51,14 +57,16 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api<FixedL
private final FixedLimitConfig config;
private final LimitHandlers.LimiterHandler handler;
private final int initialPermits;
private final Semaphore semaphore;

private FixedLimit(FixedLimitConfig config) {
this.config = config;
if (config.permits() == 0 && config.semaphore().isEmpty()) {
this.handler = new LimitHandlers.NoOpSemaphoreHandler();
this.initialPermits = 0;
semaphore = null;
} else {
Semaphore semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair()));
semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair()));
this.initialPermits = semaphore.availablePermits();
if (config.queueLength() == 0) {
this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore);
Expand Down Expand Up @@ -183,4 +191,25 @@ public Limit copy() {
}
return config.build();
}

/**
* Initialize metrics for this limit.
*
* @param socketName name of socket for which this limit was created
*/
@Override
public void init(String socketName) {
if (config.enableMetrics()) {
MetricsFactory metricsFactory = MetricsFactory.getInstance();
MeterRegistry meterRegistry = Metrics.globalRegistry();
String namePrefix = (socketName.startsWith("@") ? socketName.substring(1) : socketName)
+ "_" + config.name();

if (semaphore != null) {
Gauge.Builder<Integer> queueLengthBuilder = metricsFactory.gaugeBuilder(
namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR);
meterRegistry.getOrCreate(queueLengthBuilder);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,12 @@ interface FixedLimitConfigBlueprint extends Prototype.Factory<FixedLimit> {
*/
Optional<Semaphore> semaphore();

/**
* Whether to collect metrics for the AIMD implementation.
*
* @return metrics flag
*/
@Option.Configured
@Option.DefaultBoolean(false)
boolean enableMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,13 @@ public interface Limit extends LimitAlgorithm, NamedService {
* @return a copy of this limit
*/
Limit copy();

/**
* Initialization method for this limit. This method can be used for any
* task, including metrics initialization.
*
* @param socketName socket name for this limit such as {@code "@default"}
*/
default void init(String socketName) {
}
}
1 change: 1 addition & 0 deletions common/concurrency/limits/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
requires io.helidon.builder.api;
requires io.helidon.common;
requires io.helidon.common.config;
requires io.helidon.metrics.api;

exports io.helidon.common.concurrency.limits;
exports io.helidon.common.concurrency.limits.spi;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ServerListener implements ListenerContext {
.permits(listenerConfig.maxConcurrentRequests())
.build();
}
this.requestLimit.init(socketName);

this.connectionProviders = ConnectionProviders.create(selectors);
this.socketName = socketName;
Expand Down

0 comments on commit f144e8b

Please sign in to comment.