Skip to content

Commit

Permalink
Merge pull request quarkusio#40508 from brunobat/micrometer-perf
Browse files Browse the repository at this point in the history
Micrometer performance improvements on the vert.x package
  • Loading branch information
Sanne authored May 10, 2024
2 parents a25866d + 4fe0511 commit 77f440a
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ void testEventBusMetrics() {

Assertions.assertEquals(1, getMeter("eventBus.sent", "address").counter().count());
Assertions.assertEquals(1, getMeter("eventBus.sent", "rpc").counter().count());
Assertions.assertEquals(1, getMeter("eventBus.sent", "rpc").counter().getId().getTags().size());
Assertions.assertEquals(1, getMeter("eventBus.published", "address").counter().count());

Assertions.assertEquals(2, getMeter("eventBus.handlers", "address").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.handlers", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.handlers", "rpc").gauge().getId().getTags().size());

Assertions.assertEquals(0, getMeter("eventBus.discarded", "address").gauge().value());
Assertions.assertEquals(0, getMeter("eventBus.discarded", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.discarded", "rpc").gauge().getId().getTags().size());

Assertions.assertEquals(3, getMeter("eventBus.delivered", "address").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.delivered", "rpc").gauge().value());
Assertions.assertEquals(1, getMeter("eventBus.delivered", "rpc").gauge().getId().getTags().size());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import jakarta.ws.rs.client.ClientResponseFilter;
import jakarta.ws.rs.ext.Provider;

import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
Expand All @@ -27,6 +28,8 @@ public class RestClientMetricsFilter implements ClientRequestFilter, ClientRespo

private final HttpBinderConfiguration httpMetricsConfig;

private final Meter.MeterProvider<Timer> timer;

// RESTEasy requires no-arg constructor for CDI injection: https://issues.redhat.com/browse/RESTEASY-1538
// In the classic Rest Client this is the constructor called whereas in the Reactive one,
// the constructor using HttpBinderConfiguration is called.
Expand All @@ -37,6 +40,10 @@ public RestClientMetricsFilter() {
@Inject
public RestClientMetricsFilter(final HttpBinderConfiguration httpMetricsConfig) {
this.httpMetricsConfig = httpMetricsConfig;

timer = Timer.builder(httpMetricsConfig.getHttpClientRequestsName())
.withRegistry(registry);

}

@Override
Expand Down Expand Up @@ -69,15 +76,13 @@ public void filter(final ClientRequestContext requestContext, final ClientRespon
Timer.Sample sample = requestMetric.getSample();
int statusCode = responseContext.getStatus();

Timer.Builder builder = Timer.builder(httpMetricsConfig.getHttpClientRequestsName())
.tags(Tags.of(
sample.stop(timer
.withTags(Tags.of(
HttpCommonTags.method(requestContext.getMethod()),
HttpCommonTags.uri(requestPath, requestContext.getUri().getPath(), statusCode),
HttpCommonTags.outcome(statusCode),
HttpCommonTags.status(statusCode),
clientName(requestContext)));

sample.stop(builder.register(registry));
clientName(requestContext))));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -13,7 +15,7 @@ public class NetworkMetrics implements TCPMetrics<LongTaskTimer.Sample> {
final MeterRegistry registry;
final DistributionSummary received;
final DistributionSummary sent;
final String exception;
final Meter.MeterProvider<Counter> exceptionCounter;

final Tags tags;
private final LongTaskTimer connDuration;
Expand All @@ -34,8 +36,8 @@ public NetworkMetrics(MeterRegistry registry, Tags tags, String prefix, String r
.description(connDurationDesc)
.tags(this.tags)
.register(registry);
// The exception has dynamic tags, so cannot be cached.
exception = prefix + ".errors";
exceptionCounter = Counter.builder(prefix + ".errors")
.withRegistry(registry);
}

/**
Expand Down Expand Up @@ -103,8 +105,9 @@ public void bytesWritten(LongTaskTimer.Sample sample, SocketAddress remoteAddres
*/
@Override
public void exceptionOccurred(LongTaskTimer.Sample sample, SocketAddress remoteAddress, Throwable t) {
Tags copy = this.tags.and(Tag.of("class", t.getClass().getName()));
registry.counter(exception, copy).increment();
exceptionCounter
.withTags(this.tags.and(Tag.of("class", t.getClass().getName())))
.increment();
}

public static String toString(SocketAddress remoteAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.vertx.core.eventbus.Message;
Expand All @@ -21,10 +22,32 @@ public class VertxEventBusMetrics implements EventBusMetrics<VertxEventBusMetric

private Map<String, Handler> handlers = new ConcurrentHashMap<>();

private final Meter.MeterProvider<Counter> published;
private final Meter.MeterProvider<Counter> sent;
private final Meter.MeterProvider<DistributionSummary> written;
private final Meter.MeterProvider<DistributionSummary> read;
private final Meter.MeterProvider<Counter> replyFailures;

VertxEventBusMetrics(MeterRegistry registry, Tags tags) {
this.registry = registry;
this.tags = tags;
this.ignored = new Handler(null);

published = Counter.builder("eventBus.published")
.description("Number of messages published to the event bus")
.withRegistry(registry);
sent = Counter.builder("eventBus.sent")
.description("Number of messages sent to the event bus")
.withRegistry(registry);
written = DistributionSummary.builder("eventBus.bytes.written")
.description("Track the number of bytes written to the distributed event bus")
.withRegistry(registry);
read = DistributionSummary.builder("eventBus.bytes.read")
.description("The number of bytes read from the distributed event bus")
.withRegistry(registry);
replyFailures = Counter.builder("eventBus.replyFailures")
.description("Count the number of reply failure")
.withRegistry(registry);
}

private static boolean isInternal(String address) {
Expand Down Expand Up @@ -73,50 +96,32 @@ public void discardMessage(Handler handler, boolean local, Message<?> msg) {
public void messageSent(String address, boolean publish, boolean local, boolean remote) {
if (!isInternal(address)) {
if (publish) {
Counter.builder("eventBus.published")
.description("Number of messages published to the event bus")
.tags(tags.and("address", address))
.register(registry)
.increment();
published.withTags(this.tags.and("address", address)).increment();
} else {
Counter.builder("eventBus.sent")
.description("Number of messages sent to the event bus")
.tags(tags.and("address", address))
.register(registry)
.increment();
sent.withTags(this.tags.and("address", address)).increment();
}
}
}

@Override
public void messageWritten(String address, int numberOfBytes) {
if (!isInternal(address)) {
DistributionSummary.builder("eventBus.bytes.written")
.description("Track the number of bytes written to the distributed event bus")
.tags(this.tags.and("address", address))
.register(registry)
.record(numberOfBytes);
written.withTags(this.tags.and("address", address)).record(numberOfBytes);
}
}

@Override
public void messageRead(String address, int numberOfBytes) {
if (!isInternal(address)) {
DistributionSummary.builder("eventBus.bytes.read")
.description("The number of bytes read from the distributed event bus")
.tags(this.tags.and("address", address))
.register(registry)
.record(numberOfBytes);
read.withTags(this.tags.and("address", address)).record(numberOfBytes);
}
}

@Override
public void replyFailure(String address, ReplyFailure failure) {
if (!isInternal(address)) {
Counter.builder("eventBus.replyFailures")
.description("Count the number of reply failure")
.tags(this.tags.and("address", address).and("failure", failure.name()))
.register(registry)
replyFailures
.withTags(this.tags.and("address", address, "failure", failure.name()))
.increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -36,6 +37,8 @@ class VertxHttpClientMetrics extends VertxTcpClientMetrics
private final Map<String, LongAdder> webSockets = new ConcurrentHashMap<>();
private final HttpBinderConfiguration config;

private final Meter.MeterProvider<Timer> responseTimes;

VertxHttpClientMetrics(MeterRegistry registry, String prefix, Tags tags, HttpBinderConfiguration httpBinderConfiguration) {
super(registry, prefix, tags);
this.config = httpBinderConfiguration;
Expand All @@ -61,6 +64,10 @@ public Number get() {
return pending.longValue();
}
}).description("Number of requests waiting for a response");

responseTimes = Timer.builder(config.getHttpClientRequestsName())
.description("Response times")
.withRegistry(registry);
}

@Override
Expand Down Expand Up @@ -133,10 +140,10 @@ public void responseEnd(RequestTracker tracker, long bytesRead) {
Tags list = tracker.tags
.and(HttpCommonTags.status(tracker.response.statusCode()))
.and(HttpCommonTags.outcome(tracker.response.statusCode()));
Timer.builder(config.getHttpClientRequestsName())
.description("Response times")
.tags(list)
.register(registry).record(duration, TimeUnit.NANOSECONDS);

responseTimes
.withTags(list)
.record(duration, TimeUnit.NANOSECONDS);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import org.jboss.logging.Logger;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.Meter.MeterProvider;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
Expand Down Expand Up @@ -39,27 +41,37 @@ public class VertxHttpServerMetrics extends VertxTcpServerMetrics

HttpBinderConfiguration config;

final String nameWebsocketConnections;
final String nameHttpServerPush;
final String nameHttpServerRequests;
final LongAdder activeRequests;

final MeterProvider<Timer> requestsTimer;
final MeterProvider<LongTaskTimer> websocketConnectionTimer;
final MeterProvider<Counter> pushCounter;

private final List<HttpServerMetricsTagsContributor> httpServerMetricsTagsContributors;

VertxHttpServerMetrics(MeterRegistry registry, HttpBinderConfiguration config) {
super(registry, "http.server", null);
this.config = config;

// not dev-mode changeable
nameWebsocketConnections = config.getHttpServerWebSocketConnectionsName();
nameHttpServerPush = config.getHttpServerPushName();
nameHttpServerRequests = config.getHttpServerRequestsName();

activeRequests = new LongAdder();
Gauge.builder(config.getHttpServerActiveRequestsName(), activeRequests, LongAdder::doubleValue)
.register(registry);

httpServerMetricsTagsContributors = resolveHttpServerMetricsTagsContributors();

// not dev-mode changeable -----
requestsTimer = Timer.builder(config.getHttpServerRequestsName())
.description("HTTP server request processing time")
.withRegistry(registry);

websocketConnectionTimer = LongTaskTimer.builder(config.getHttpServerWebSocketConnectionsName())
.description("Server web socket connection time")
.withRegistry(registry);

pushCounter = Counter.builder(config.getHttpServerPushName())
.description("HTTP server response push counter")
.withRegistry(registry);
// not dev-mode changeable -----ˆ
}

private List<HttpServerMetricsTagsContributor> resolveHttpServerMetricsTagsContributors() {
Expand Down Expand Up @@ -98,11 +110,12 @@ public HttpRequestMetric responsePushed(LongTaskTimer.Sample socketMetric, HttpM
config.getServerMatchPatterns(),
config.getServerIgnorePatterns());
if (path != null) {
registry.counter(nameHttpServerPush, Tags.of(
HttpCommonTags.uri(path, requestMetric.initialPath, response.statusCode()),
VertxMetricsTags.method(method),
VertxMetricsTags.outcome(response),
HttpCommonTags.status(response.statusCode())))
pushCounter
.withTags(Tags.of(
HttpCommonTags.uri(path, requestMetric.initialPath, response.statusCode()),
VertxMetricsTags.method(method),
VertxMetricsTags.outcome(response),
HttpCommonTags.status(response.statusCode())))
.increment();
}
log.debugf("responsePushed %s, %s", socketMetric, requestMetric);
Expand Down Expand Up @@ -150,14 +163,13 @@ public void requestReset(HttpRequestMetric requestMetric) {
config.getServerIgnorePatterns());
if (path != null) {
Timer.Sample sample = requestMetric.getSample();
Timer.Builder builder = Timer.builder(nameHttpServerRequests)
.tags(Tags.of(

sample.stop(requestsTimer
.withTags(Tags.of(
VertxMetricsTags.method(requestMetric.request().method()),
HttpCommonTags.uri(path, requestMetric.initialPath, 0),
Outcome.CLIENT_ERROR.asTag(),
HttpCommonTags.STATUS_RESET));

sample.stop(builder.register(registry));
HttpCommonTags.STATUS_RESET)));
}
requestMetric.requestEnded();
}
Expand Down Expand Up @@ -194,17 +206,15 @@ public void responseEnd(HttpRequestMetric requestMetric, HttpResponse response,
}
}
}
Timer.Builder builder = Timer.builder(nameHttpServerRequests).tags(allTags);

sample.stop(builder.register(registry));
sample.stop(requestsTimer.withTags(allTags));
}
requestMetric.requestEnded();
}

/**
* Called when a server web socket connects.
*
* @param socketMetric a Map for socket metric context or null
* @param requestMetric a RequestMetricContext or null
* @param serverWebSocket the server web socket
* @return a LongTaskTimer.Sample or null
Expand All @@ -216,9 +226,8 @@ public LongTaskTimer.Sample connected(LongTaskTimer.Sample sample, HttpRequestMe
config.getServerMatchPatterns(),
config.getServerIgnorePatterns());
if (path != null) {
return LongTaskTimer.builder(nameWebsocketConnections)
.tags(Tags.of(HttpCommonTags.uri(path, requestMetric.initialPath, 0)))
.register(registry)
return websocketConnectionTimer
.withTags(Tags.of(HttpCommonTags.uri(path, requestMetric.initialPath, 0)))
.start();
}
return null;
Expand Down
Loading

0 comments on commit 77f440a

Please sign in to comment.