diff --git a/common/concurrency/limits/pom.xml b/common/concurrency/limits/pom.xml index 79f981da434..480d0a173c9 100644 --- a/common/concurrency/limits/pom.xml +++ b/common/concurrency/limits/pom.xml @@ -43,6 +43,10 @@ io.helidon.common helidon-common-config + + io.helidon.metrics + helidon-metrics-api + io.helidon.service helidon-service-registry diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java index 4482513ac87..008f42139ec 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimit.java @@ -146,4 +146,9 @@ public AimdLimitConfig prototype() { public Limit copy() { return config.build(); } + + @Override + public void init(String socketName) { + aimdLimitImpl.initMetrics(socketName, config); + } } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java index c837a4439bc..9ec9b213a3a 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitConfigBlueprint.java @@ -129,4 +129,13 @@ interface AimdLimitConfigBlueprint extends Prototype.Factory { @Option.Configured @Option.DefaultBoolean(false) boolean fair(); + + /** + * Whether to collect metrics for the AIMD implementation. + * + * @return metrics flag + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean enableMetrics(); } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java index 0ff523a3289..e0ddd8d9909 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/AimdLimitImpl.java @@ -20,12 +20,20 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import io.helidon.common.config.ConfigException; +import io.helidon.metrics.api.Gauge; +import io.helidon.metrics.api.MeterRegistry; +import io.helidon.metrics.api.Metrics; +import io.helidon.metrics.api.MetricsFactory; +import io.helidon.metrics.api.Timer; + +import static io.helidon.metrics.api.Meter.Scope.VENDOR; class AimdLimitImpl { private final double backoffRatio; @@ -41,6 +49,8 @@ class AimdLimitImpl { private final AtomicInteger limit; private final Lock limitLock = new ReentrantLock(); + private Timer rttTimer; + AimdLimitImpl(AimdLimitConfig config) { int initialLimit = config.initialLimit(); this.backoffRatio = config.backoffRatio(); @@ -119,6 +129,10 @@ T invoke(Callable callable) throws Exception { void updateWithSample(long startTime, long endTime, int currentRequests, boolean success) { long rtt = endTime - startTime; + if (rttTimer != null) { + rttTimer.record(rtt, TimeUnit.NANOSECONDS); + } + int currentLimit = limit.get(); if (rtt > timeoutInNanos || !success) { currentLimit = (int) (currentLimit * backoffRatio); @@ -154,6 +168,38 @@ private void setLimit(int newLimit) { } } + /** + * Initialize metrics for this limit. + * + * @param socketName name of socket for which this limit was created + * @param config this limit's config + */ + void initMetrics(String socketName, AimdLimitConfig config) { + if (config.enableMetrics()) { + MetricsFactory metricsFactory = MetricsFactory.getInstance(); + MeterRegistry meterRegistry = Metrics.globalRegistry(); + String namePrefix = (socketName.startsWith("@") ? socketName.substring(1) : socketName) + + "_" + config.name(); + + Gauge.Builder limitBuilder = metricsFactory.gaugeBuilder( + namePrefix + "_limit", limit::get).scope(VENDOR); + meterRegistry.getOrCreate(limitBuilder); + + Gauge.Builder concurrentRequestsBuilder = metricsFactory.gaugeBuilder( + namePrefix + "_concurrent_requests", concurrentRequests::get).scope(VENDOR); + meterRegistry.getOrCreate(concurrentRequestsBuilder); + + Gauge.Builder queueLengthBuilder = metricsFactory.gaugeBuilder( + namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR); + meterRegistry.getOrCreate(queueLengthBuilder); + + Timer.Builder timerBuilder = metricsFactory.timerBuilder(namePrefix + "_rtt") + .scope(VENDOR) + .baseUnit(Timer.BaseUnits.MILLISECONDS); + rttTimer = meterRegistry.getOrCreate(timerBuilder); + } + } + private static final class AdjustableSemaphore extends Semaphore { @Serial private static final long serialVersionUID = 114L; diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java index 533a6713923..566eafdaca9 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimit.java @@ -23,6 +23,12 @@ import io.helidon.builder.api.RuntimeType; import io.helidon.common.config.Config; +import io.helidon.metrics.api.Gauge; +import io.helidon.metrics.api.MeterRegistry; +import io.helidon.metrics.api.Metrics; +import io.helidon.metrics.api.MetricsFactory; + +import static io.helidon.metrics.api.Meter.Scope.VENDOR; /** * Semaphore based limit, that supports queuing for a permit, and timeout on the queue. @@ -51,14 +57,16 @@ public class FixedLimit implements Limit, SemaphoreLimit, RuntimeType.Api new Semaphore(config.permits(), config.fair())); + semaphore = config.semaphore().orElseGet(() -> new Semaphore(config.permits(), config.fair())); this.initialPermits = semaphore.availablePermits(); if (config.queueLength() == 0) { this.handler = new LimitHandlers.RealSemaphoreHandler(semaphore); @@ -183,4 +191,25 @@ public Limit copy() { } return config.build(); } + + /** + * Initialize metrics for this limit. + * + * @param socketName name of socket for which this limit was created + */ + @Override + public void init(String socketName) { + if (config.enableMetrics()) { + MetricsFactory metricsFactory = MetricsFactory.getInstance(); + MeterRegistry meterRegistry = Metrics.globalRegistry(); + String namePrefix = (socketName.startsWith("@") ? socketName.substring(1) : socketName) + + "_" + config.name(); + + if (semaphore != null) { + Gauge.Builder queueLengthBuilder = metricsFactory.gaugeBuilder( + namePrefix + "_queue_length", semaphore::getQueueLength).scope(VENDOR); + meterRegistry.getOrCreate(queueLengthBuilder); + } + } + } } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java index c5e672c70cd..1b9ef13309e 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/FixedLimitConfigBlueprint.java @@ -96,4 +96,12 @@ interface FixedLimitConfigBlueprint extends Prototype.Factory { */ Optional semaphore(); + /** + * Whether to collect metrics for the AIMD implementation. + * + * @return metrics flag + */ + @Option.Configured + @Option.DefaultBoolean(false) + boolean enableMetrics(); } diff --git a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java index fc375762006..7a165e9c4bd 100644 --- a/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java +++ b/common/concurrency/limits/src/main/java/io/helidon/common/concurrency/limits/Limit.java @@ -30,4 +30,13 @@ public interface Limit extends LimitAlgorithm, NamedService { * @return a copy of this limit */ Limit copy(); + + /** + * Initialization method for this limit. This method can be used for any + * task, including metrics initialization. + * + * @param socketName socket name for this limit such as {@code "@default"} + */ + default void init(String socketName) { + } } diff --git a/common/concurrency/limits/src/main/java/module-info.java b/common/concurrency/limits/src/main/java/module-info.java index f1b23377399..7ed0273b540 100644 --- a/common/concurrency/limits/src/main/java/module-info.java +++ b/common/concurrency/limits/src/main/java/module-info.java @@ -25,6 +25,7 @@ requires io.helidon.builder.api; requires io.helidon.common; requires io.helidon.common.config; + requires io.helidon.metrics.api; exports io.helidon.common.concurrency.limits; exports io.helidon.common.concurrency.limits.spi; diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ServerListener.java b/webserver/webserver/src/main/java/io/helidon/webserver/ServerListener.java index 9d05e20687c..37cb43a9167 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ServerListener.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ServerListener.java @@ -130,6 +130,7 @@ class ServerListener implements ListenerContext { .permits(listenerConfig.maxConcurrentRequests()) .build(); } + this.requestLimit.init(socketName); this.connectionProviders = ConnectionProviders.create(selectors); this.socketName = socketName;