Skip to content

Commit

Permalink
Add support for Vert.x pool metrics.
Browse files Browse the repository at this point in the history
Pool metrics cover Vert.x worker threads, the various connection pools (including the one for the Vert.x SQL clients), Redis...

The code is an adaptation of the Vert.x Micrometer pool metrics implementation to follow the Quarkus conventions.
  • Loading branch information
cescoffier committed Dec 12, 2022
1 parent 70d05af commit 31bfa0c
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.VertxMetricsFactory;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;

public class VertxMeterBinderAdapter extends MetricsOptions implements VertxMetricsFactory, VertxMetrics {
Expand Down Expand Up @@ -55,4 +56,10 @@ public MetricsOptions newOptions() {
}
return null;
}

@Override
public PoolMetrics<?> createPoolMetrics(String poolType, String poolName, int maxPoolSize) {
return new VertxPoolMetrics(Metrics.globalRegistry, poolType, poolName, maxPoolSize);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.vertx.core.spi.metrics.PoolMetrics;

/**
* Adaptation of the Vert.x Pool Metrics implementation for Quarkus Micrometer.
*/
public class VertxPoolMetrics implements PoolMetrics<VertxPoolMetrics.EventTiming> {

private final String poolType;
private final int maxPoolSize;

private final Timer usage;
private final AtomicReference<Double> ratio = new AtomicReference<>();
private final LongAdder current = new LongAdder();
private final LongAdder queue = new LongAdder();
private final Counter completed;
private final Timer queueDelay;

VertxPoolMetrics(MeterRegistry registry, String poolType, String poolName, int maxPoolSize) {
this.poolType = poolType;
this.maxPoolSize = maxPoolSize;

Tags tags = Tags.of(Tag.of("pool.type", poolType), Tag.of("pool.name", poolName));

queueDelay = Timer.builder(name("queue.delay"))
.description("Time spent in the waiting queue before being processed")
.tags(tags)
.register(registry);

usage = Timer.builder(name("usage"))
.description("Time spent using resources from the pool")
.tags(tags)
.register(registry);

Gauge.builder(name("queue.size"), new Supplier<Number>() {
@Override
public Number get() {
return queue.doubleValue();
}
})
.description("Number of pending elements in the waiting queue")
.tags(tags)
.strongReference(true)
.register(registry);

Gauge.builder(name("active"), new Supplier<Number>() {
@Override
public Number get() {
return current.doubleValue();
}
})
.description("The number of resources from the pool currently used")
.tags(tags)
.strongReference(true)
.register(registry);

if (maxPoolSize > 0) {
Gauge.builder(name("idle"), new Supplier<Number>() {
@Override
public Number get() {
return maxPoolSize - current.doubleValue();
}
})
.description("The number of resources from the pool currently used")
.tags(tags)
.strongReference(true)
.register(registry);

Gauge.builder(name("ratio"), ratio::get)
.description("Pool usage ratio")
.tags(tags)
.strongReference(true)
.register(registry);
}

completed = Counter.builder(name("completed"))
.description("Number of times resources from the pool have been acquired")
.tags(tags)
.register(registry);

}

private String name(String suffix) {
return poolType + ".pool." + suffix;
}

@Override
public EventTiming submitted() {
queue.increment();
return new EventTiming(queueDelay);
}

@Override
public void rejected(EventTiming submitted) {
queue.decrement();
submitted.end();
}

@Override
public EventTiming begin(EventTiming submitted) {
queue.decrement();
submitted.end();
current.increment();
computeRatio(current.longValue());
return new EventTiming(usage);
}

@Override
public void end(EventTiming timer, boolean succeeded) {
current.decrement();
computeRatio(current.longValue());
timer.end();

completed.increment();
}

@Override
public void close() {
}

private void computeRatio(long inUse) {
if (maxPoolSize > 0) {
ratio.set((double) inUse / maxPoolSize);
}
}

public static class EventTiming {
private final long nanoStart;
private final Timer timer;

private EventTiming(Timer timer) {
this.timer = timer;
this.nanoStart = System.nanoTime();
}

public void end() {
timer.record(System.nanoTime() - nanoStart, TimeUnit.NANOSECONDS);
}
}
}

0 comments on commit 31bfa0c

Please sign in to comment.