Skip to content

Commit

Permalink
Micrometer performance - vert.x binders
Browse files Browse the repository at this point in the history
  • Loading branch information
brunobat committed May 10, 2024
1 parent 096a23e commit 4fe0511
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ void testEventBusMetrics() {

Assertions.assertEquals(1, getMeter("eventBus.sent", "address").counter().count());
Assertions.assertEquals(1, getMeter("eventBus.sent", "rpc").counter().count());
Assertions.assertEquals(1, getMeter("eventBus.sent", "rpc").counter().getId().getTags().size());
Assertions.assertEquals(1, getMeter("eventBus.published", "address").counter().count());

Assertions.assertEquals(2, getMeter("eventBus.handlers", "address").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.handlers", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.handlers", "rpc").gauge().getId().getTags().size());

Assertions.assertEquals(0, getMeter("eventBus.discarded", "address").gauge().value());
Assertions.assertEquals(0, getMeter("eventBus.discarded", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.discarded", "rpc").gauge().getId().getTags().size());

Assertions.assertEquals(3, getMeter("eventBus.delivered", "address").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.delivered", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.delivered", "rpc").gauge().getId().getTags().size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import jakarta.ws.rs.client.ClientResponseFilter;
import jakarta.ws.rs.ext.Provider;

import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
Expand All @@ -27,6 +28,8 @@ public class RestClientMetricsFilter implements ClientRequestFilter, ClientRespo

private final HttpBinderConfiguration httpMetricsConfig;

private final Meter.MeterProvider<Timer> timer;

// RESTEasy requires no-arg constructor for CDI injection: https://issues.redhat.com/browse/RESTEASY-1538
// In the classic Rest Client this is the constructor called whereas in the Reactive one,
// the constructor using HttpBinderConfiguration is called.
Expand All @@ -37,6 +40,10 @@ public RestClientMetricsFilter() {
@Inject
public RestClientMetricsFilter(final HttpBinderConfiguration httpMetricsConfig) {
this.httpMetricsConfig = httpMetricsConfig;

timer = Timer.builder(httpMetricsConfig.getHttpClientRequestsName())
.withRegistry(registry);

}

@Override
Expand Down Expand Up @@ -69,15 +76,13 @@ public void filter(final ClientRequestContext requestContext, final ClientRespon
Timer.Sample sample = requestMetric.getSample();
int statusCode = responseContext.getStatus();

Timer.Builder builder = Timer.builder(httpMetricsConfig.getHttpClientRequestsName())
.tags(Tags.of(
sample.stop(timer
.withTags(Tags.of(
HttpCommonTags.method(requestContext.getMethod()),
HttpCommonTags.uri(requestPath, requestContext.getUri().getPath(), statusCode),
HttpCommonTags.outcome(statusCode),
HttpCommonTags.status(statusCode),
clientName(requestContext)));

sample.stop(builder.register(registry));
clientName(requestContext))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -13,7 +15,7 @@ public class NetworkMetrics implements TCPMetrics<LongTaskTimer.Sample> {
final MeterRegistry registry;
final DistributionSummary received;
final DistributionSummary sent;
final String exception;
final Meter.MeterProvider<Counter> exceptionCounter;

final Tags tags;
private final LongTaskTimer connDuration;
Expand All @@ -34,8 +36,8 @@ public NetworkMetrics(MeterRegistry registry, Tags tags, String prefix, String r
.description(connDurationDesc)
.tags(this.tags)
.register(registry);
// The exception has dynamic tags, so cannot be cached.
exception = prefix + ".errors";
exceptionCounter = Counter.builder(prefix + ".errors")
.withRegistry(registry);
}

/**
Expand Down Expand Up @@ -103,8 +105,9 @@ public void bytesWritten(LongTaskTimer.Sample sample, SocketAddress remoteAddres
*/
@Override
public void exceptionOccurred(LongTaskTimer.Sample sample, SocketAddress remoteAddress, Throwable t) {
Tags copy = this.tags.and(Tag.of("class", t.getClass().getName()));
registry.counter(exception, copy).increment();
exceptionCounter
.withTags(this.tags.and(Tag.of("class", t.getClass().getName())))
.increment();
}

public static String toString(SocketAddress remoteAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.vertx.core.eventbus.Message;
Expand All @@ -21,10 +22,32 @@ public class VertxEventBusMetrics implements EventBusMetrics<VertxEventBusMetric

private Map<String, Handler> handlers = new ConcurrentHashMap<>();

private final Meter.MeterProvider<Counter> published;
private final Meter.MeterProvider<Counter> sent;
private final Meter.MeterProvider<DistributionSummary> written;
private final Meter.MeterProvider<DistributionSummary> read;
private final Meter.MeterProvider<Counter> replyFailures;

VertxEventBusMetrics(MeterRegistry registry, Tags tags) {
this.registry = registry;
this.tags = tags;
this.ignored = new Handler(null);

published = Counter.builder("eventBus.published")
.description("Number of messages published to the event bus")
.withRegistry(registry);
sent = Counter.builder("eventBus.sent")
.description("Number of messages sent to the event bus")
.withRegistry(registry);
written = DistributionSummary.builder("eventBus.bytes.written")
.description("Track the number of bytes written to the distributed event bus")
.withRegistry(registry);
read = DistributionSummary.builder("eventBus.bytes.read")
.description("The number of bytes read from the distributed event bus")
.withRegistry(registry);
replyFailures = Counter.builder("eventBus.replyFailures")
.description("Count the number of reply failure")
.withRegistry(registry);
}

private static boolean isInternal(String address) {
Expand Down Expand Up @@ -73,50 +96,32 @@ public void discardMessage(Handler handler, boolean local, Message<?> msg) {
public void messageSent(String address, boolean publish, boolean local, boolean remote) {
if (!isInternal(address)) {
if (publish) {
Counter.builder("eventBus.published")
.description("Number of messages published to the event bus")
.tags(tags.and("address", address))
.register(registry)
.increment();
published.withTags(this.tags.and("address", address)).increment();
} else {
Counter.builder("eventBus.sent")
.description("Number of messages sent to the event bus")
.tags(tags.and("address", address))
.register(registry)
.increment();
sent.withTags(this.tags.and("address", address)).increment();
}
}
}

@Override
public void messageWritten(String address, int numberOfBytes) {
if (!isInternal(address)) {
DistributionSummary.builder("eventBus.bytes.written")
.description("Track the number of bytes written to the distributed event bus")
.tags(this.tags.and("address", address))
.register(registry)
.record(numberOfBytes);
written.withTags(this.tags.and("address", address)).record(numberOfBytes);
}
}

@Override
public void messageRead(String address, int numberOfBytes) {
if (!isInternal(address)) {
DistributionSummary.builder("eventBus.bytes.read")
.description("The number of bytes read from the distributed event bus")
.tags(this.tags.and("address", address))
.register(registry)
.record(numberOfBytes);
read.withTags(this.tags.and("address", address)).record(numberOfBytes);
}
}

@Override
public void replyFailure(String address, ReplyFailure failure) {
if (!isInternal(address)) {
Counter.builder("eventBus.replyFailures")
.description("Count the number of reply failure")
.tags(this.tags.and("address", address).and("failure", failure.name()))
.register(registry)
replyFailures
.withTags(this.tags.and("address", address, "failure", failure.name()))
.increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -36,6 +37,8 @@ class VertxHttpClientMetrics extends VertxTcpClientMetrics
private final Map<String, LongAdder> webSockets = new ConcurrentHashMap<>();
private final HttpBinderConfiguration config;

private final Meter.MeterProvider<Timer> responseTimes;

VertxHttpClientMetrics(MeterRegistry registry, String prefix, Tags tags, HttpBinderConfiguration httpBinderConfiguration) {
super(registry, prefix, tags);
this.config = httpBinderConfiguration;
Expand All @@ -61,6 +64,10 @@ public Number get() {
return pending.longValue();
}
}).description("Number of requests waiting for a response");

responseTimes = Timer.builder(config.getHttpClientRequestsName())
.description("Response times")
.withRegistry(registry);
}

@Override
Expand Down Expand Up @@ -133,10 +140,10 @@ public void responseEnd(RequestTracker tracker, long bytesRead) {
Tags list = tracker.tags
.and(HttpCommonTags.status(tracker.response.statusCode()))
.and(HttpCommonTags.outcome(tracker.response.statusCode()));
Timer.builder(config.getHttpClientRequestsName())
.description("Response times")
.tags(list)
.register(registry).record(duration, TimeUnit.NANOSECONDS);

responseTimes
.withTags(list)
.record(duration, TimeUnit.NANOSECONDS);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class VertxHttpServerMetrics extends VertxTcpServerMetrics
implements HttpServerMetrics<HttpRequestMetric, LongTaskTimer.Sample, LongTaskTimer.Sample> {
static final Logger log = Logger.getLogger(VertxHttpServerMetrics.class);

final HttpBinderConfiguration config;
HttpBinderConfiguration config;

final LongAdder activeRequests;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import java.util.Map;
import java.util.Objects;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -20,10 +22,11 @@ public class VertxNetworkMetrics implements NetworkMetrics<Map<String, Object>>
final MeterRegistry registry;
final DistributionSummary nameBytesRead;
final DistributionSummary nameBytesWritten;
final String nameExceptionOccurred;

final Tags tags;

private final Meter.MeterProvider<Counter> exceptions;

VertxNetworkMetrics(MeterRegistry registry, String prefix, Tags tags) {
this.registry = registry;
this.tags = tags;
Expand All @@ -35,7 +38,10 @@ public class VertxNetworkMetrics implements NetworkMetrics<Map<String, Object>>
}
nameBytesRead = nameBytesReadBuilder.register(registry);
nameBytesWritten = nameBytesWrittenBuilder.register(registry);
nameExceptionOccurred = prefix + ".errors";

exceptions = Counter.builder(prefix + ".errors")
.description("Number of exceptions")
.withRegistry(registry);
}

/**
Expand Down Expand Up @@ -72,8 +78,9 @@ public void bytesWritten(Map<String, Object> socketMetric, SocketAddress remoteA
*/
@Override
public void exceptionOccurred(Map<String, Object> socketMetric, SocketAddress remoteAddress, Throwable t) {
Tags copy = Objects.requireNonNullElseGet(tags, Tags::empty).and(Tag.of("class", t.getClass().getName()));
registry.counter(nameExceptionOccurred, copy).increment();
exceptions
.withTags(Objects.requireNonNullElseGet(tags, Tags::empty).and(Tag.of("class", t.getClass().getName())))
.increment();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -9,19 +11,25 @@

public class VertxUdpMetrics implements DatagramSocketMetrics {

private final MeterRegistry registry;
private volatile Tags tags;
private final String exception;
private final String read;
private final String sent;

private final Meter.MeterProvider<DistributionSummary> read;
private final Meter.MeterProvider<DistributionSummary> sent;
private final Meter.MeterProvider<Counter> exceptions;

public VertxUdpMetrics(MeterRegistry registry, String prefix, Tags tags) {
this.registry = registry;
this.tags = tags;

sent = prefix + ".bytes.written";
read = prefix + ".bytes.read";
exception = prefix + ".errors";
read = DistributionSummary.builder(prefix + ".bytes.read")
.description("Number of bytes read")
.withRegistry(registry);
sent = DistributionSummary.builder(prefix + ".bytes.written")
.description("Number of bytes written")
.withRegistry(registry);

exceptions = Counter.builder(prefix + ".errors")
.description("Number of exceptions")
.withRegistry(registry);
}

@Override
Expand All @@ -31,24 +39,19 @@ public void listening(String localName, SocketAddress localAddress) {

@Override
public void bytesRead(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
DistributionSummary.builder(read)
.description("Number of bytes read")
.tags(tags.and("remote-address", NetworkMetrics.toString(remoteAddress)))
.register(registry)
read.withTags(tags.and("remote-address", NetworkMetrics.toString(remoteAddress)))
.record(numberOfBytes);
}

@Override
public void bytesWritten(Void socketMetric, SocketAddress remoteAddress, long numberOfBytes) {
DistributionSummary.builder(sent)
.description("Number of bytes written")
.tags(tags.and("remote-address", NetworkMetrics.toString(remoteAddress)))
.register(registry);
sent.withTags(tags.and("remote-address", NetworkMetrics.toString(remoteAddress)))
.record(numberOfBytes);
}

@Override
public void exceptionOccurred(Void socketMetric, SocketAddress remoteAddress, Throwable t) {
Tags copy = this.tags.and(Tag.of("class", t.getClass().getName()));
registry.counter(exception, copy).increment();
exceptions.withTags(copy).increment();
}
}

0 comments on commit 4fe0511

Please sign in to comment.