Skip to content

Commit

Permalink
Introduce support for Uni return types in Micrometer annotations
Browse files Browse the repository at this point in the history
Fixes: #15601
  • Loading branch information
geoand committed Jan 5, 2022
1 parent d8604fd commit d27d218
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.micrometer.test.GuardedResult;
import io.quarkus.micrometer.test.TimedResource;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class MicrometerCounterInterceptorTest {

Expand Down Expand Up @@ -177,4 +178,83 @@ void testCountAsyncEmptyMetricName_Failure() {
Assertions.assertNotNull(counter);
Assertions.assertEquals(1, counter.count());
}

@Test
void testCountUniFailuresOnly_NoMetricsOnSuccess() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = counted.onlyCountUniFailures(guardedResult);
guardedResult.complete();
uni.subscribe().asCompletionStage().join();

Assertions.assertThrows(MeterNotFoundException.class, () -> registry.get("uni.none").counter());
}

@Test
void testCountUniAllMetrics_MetricsOnSuccess() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = counted.countAllUniInvocations(guardedResult);
guardedResult.complete();
uni.subscribe().asCompletionStage().join();

Counter counter = registry.get("uni.all")
.tag("method", "countAllUniInvocations")
.tag("class", "io.quarkus.micrometer.test.CountedResource")
.tag("extra", "tag")
.tag("exception", "none")
.tag("result", "success").counter();
Assertions.assertNotNull(counter);
Assertions.assertEquals(1, counter.count());
}

@Test
void testCountUniAllMetrics_MetricsOnFailure() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = counted.countAllUniInvocations(guardedResult);
guardedResult.complete(new NullPointerException());
Assertions.assertThrows(java.util.concurrent.CompletionException.class,
() -> uni.subscribe().asCompletionStage().join());

Counter counter = registry.get("uni.all")
.tag("method", "countAllUniInvocations")
.tag("class", "io.quarkus.micrometer.test.CountedResource")
.tag("extra", "tag")
.tag("exception", "NullPointerException")
.tag("result", "failure").counter();
Assertions.assertNotNull(counter);
Assertions.assertEquals(1, counter.count());
Assertions.assertNull(counter.getId().getDescription());
}

@Test
void testCountUniEmptyMetricName_Success() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = counted.emptyUniMetricName(guardedResult);
guardedResult.complete();
uni.subscribe().asCompletionStage().join();

Counter counter = registry.get("method.counted")
.tag("method", "emptyUniMetricName")
.tag("class", "io.quarkus.micrometer.test.CountedResource")
.tag("exception", "none")
.tag("result", "success").counter();
Assertions.assertNotNull(counter);
Assertions.assertEquals(1, counter.count());
}

@Test
void testCountUniEmptyMetricName_Failure() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = counted.emptyUniMetricName(guardedResult);
guardedResult.complete(new NullPointerException());
Assertions.assertThrows(java.util.concurrent.CompletionException.class,
() -> uni.subscribe().asCompletionStage().join());

Counter counter = registry.get("method.counted")
.tag("method", "emptyMetricName")
.tag("class", "io.quarkus.micrometer.test.CountedResource")
.tag("exception", "NullPointerException")
.tag("result", "failure").counter();
Assertions.assertNotNull(counter);
Assertions.assertEquals(1, counter.count());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.quarkus.micrometer.test.GuardedResult;
import io.quarkus.micrometer.test.TimedResource;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Uni;

public class MicrometerTimedInterceptorTest {
@RegisterExtension
Expand Down Expand Up @@ -99,6 +100,39 @@ void testTimeMethod_AsyncFailed() {
Assertions.assertEquals(1, timer.count());
}

@Test
void testTimeMethod_Uni() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = timed.uniCall(guardedResult);
guardedResult.complete();
uni.subscribe().asCompletionStage().join();

Timer timer = registry.get("uni.call")
.tag("method", "uniCall")
.tag("class", "io.quarkus.micrometer.test.TimedResource")
.tag("exception", "none")
.tag("extra", "tag").timer();
Assertions.assertNotNull(timer);
Assertions.assertEquals(1, timer.count());
}

@Test
void testTimeMethod_UniFailed() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = timed.uniCall(guardedResult);
guardedResult.complete(new NullPointerException());
Assertions.assertThrows(java.util.concurrent.CompletionException.class,
() -> uni.subscribe().asCompletionStage().join());

Timer timer = registry.get("uni.call")
.tag("method", "uniCall")
.tag("class", "io.quarkus.micrometer.test.TimedResource")
.tag("exception", "NullPointerException")
.tag("extra", "tag").timer();
Assertions.assertNotNull(timer);
Assertions.assertEquals(1, timer.count());
}

@Test
void testTimeMethod_LongTaskTimer() {
timed.longCall(false);
Expand Down Expand Up @@ -152,6 +186,37 @@ void testTimeMethod_LongTaskTimer_AsyncFailed() {
Assertions.assertEquals(0, timer.activeTasks());
}

@Test
void testTimeMethod_LongTaskTimer_Uni() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = timed.longUniCall(guardedResult);
guardedResult.complete();
uni.subscribe().asCompletionStage().join();

LongTaskTimer timer = registry.get("uni.longCall")
.tag("method", "longUniCall")
.tag("class", "io.quarkus.micrometer.test.TimedResource")
.tag("extra", "tag").longTaskTimer();
Assertions.assertNotNull(timer);
Assertions.assertEquals(0, timer.activeTasks());
}

@Test
void testTimeMethod_LongTaskTimer_UniFailed() {
GuardedResult guardedResult = new GuardedResult();
Uni<?> uni = timed.longUniCall(guardedResult);
guardedResult.complete(new NullPointerException());
Assertions.assertThrows(java.util.concurrent.CompletionException.class,
() -> uni.subscribe().asCompletionStage().join());

LongTaskTimer timer = registry.get("uni.longCall")
.tag("method", "longUniCall")
.tag("class", "io.quarkus.micrometer.test.TimedResource")
.tag("extra", "tag").longTaskTimer();
Assertions.assertNotNull(timer);
Assertions.assertEquals(0, timer.activeTasks());
}

@Test
void testTimeMethod_repeatable() {
timed.repeatableCall(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import javax.enterprise.context.ApplicationScoped;

import io.micrometer.core.annotation.Counted;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class CountedResource {
Expand Down Expand Up @@ -43,4 +44,19 @@ public CompletableFuture<?> emptyAsyncMetricName(GuardedResult guardedResult) {
return supplyAsync(guardedResult::get);
}

@Counted(value = "uni.none", recordFailuresOnly = true)
public Uni<?> onlyCountUniFailures(GuardedResult guardedResult) {
return Uni.createFrom().item(guardedResult::get);
}

@Counted(value = "uni.all", extraTags = { "extra", "tag" })
public Uni<?> countAllUniInvocations(GuardedResult guardedResult) {
return Uni.createFrom().item(guardedResult::get);
}

@Counted
public Uni<?> emptyUniMetricName(GuardedResult guardedResult) {
return Uni.createFrom().item(guardedResult::get);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import static java.util.concurrent.CompletableFuture.supplyAsync;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.CompletableFuture;

import javax.enterprise.context.ApplicationScoped;

import io.micrometer.core.annotation.Timed;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class TimedResource {
Expand Down Expand Up @@ -34,6 +37,11 @@ public CompletableFuture<?> asyncCall(GuardedResult guardedResult) {
return supplyAsync(guardedResult::get);
}

@Timed(value = "uni.call", extraTags = { "extra", "tag" })
public Uni<?> uniCall(GuardedResult guardedResult) {
return Uni.createFrom().item(guardedResult::get);
}

@Timed(value = "async.longCall", extraTags = { "extra", "tag" }, longTask = true)
public CompletableFuture<?> longAsyncCall(GuardedResult guardedResult) {
try {
Expand All @@ -43,6 +51,11 @@ public CompletableFuture<?> longAsyncCall(GuardedResult guardedResult) {
return supplyAsync(guardedResult::get);
}

@Timed(value = "uni.longCall", extraTags = { "extra", "tag" }, longTask = true)
public Uni<?> longUniCall(GuardedResult guardedResult) {
return Uni.createFrom().item(guardedResult::get).onItem().delayIt().by(Duration.of(3, ChronoUnit.MILLIS));
}

@Timed(value = "alpha", extraTags = { "extra", "tag" })
@Timed(value = "bravo", extraTags = { "extra", "tag" })
public void repeatableCall(boolean fail) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.lang.reflect.Method;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;

import javax.annotation.Priority;
import javax.interceptor.AroundInvoke;
Expand All @@ -12,6 +13,8 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import io.quarkus.arc.ArcInvocationContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Functions;

/**
* Quarkus declared interceptor responsible for intercepting all methods
Expand Down Expand Up @@ -51,6 +54,7 @@ public MicrometerCountedInterceptor(MeterRegistry meterRegistry) {
* @throws Throwable When the intercepted method throws one.
*/
@AroundInvoke
@SuppressWarnings("unchecked")
Object countedMethod(ArcInvocationContext context) throws Exception {
MicrometerCounted counted = context.findIterceptorBinding(MicrometerCounted.class);
if (counted == null) {
Expand All @@ -59,16 +63,30 @@ Object countedMethod(ArcInvocationContext context) throws Exception {
Method method = context.getMethod();
Tags commonTags = getCommonTags(method.getDeclaringClass().getName(), method.getName());

// If we're working with a CompletionStage
final boolean stopWhenCompleted = CompletionStage.class.isAssignableFrom(method.getReturnType());
if (stopWhenCompleted) {
Class<?> returnType = method.getReturnType();
if (TypesUtil.isCompletionStage(returnType)) {
try {
return ((CompletionStage<?>) context.proceed()).whenComplete((result, throwable) -> {
recordCompletionResult(counted, commonTags, throwable);
return ((CompletionStage<?>) context.proceed()).whenComplete(new BiConsumer<Object, Throwable>() {
@Override
public void accept(Object o, Throwable throwable) {
recordCompletionResult(counted, commonTags, throwable);
}
});
} catch (Throwable e) {
record(counted, commonTags, e);
}
} else if (TypesUtil.isUni(returnType)) {
try {
return ((Uni<Object>) context.proceed()).onTermination().invoke(
new Functions.TriConsumer<>() {
@Override
public void accept(Object o, Throwable throwable, Boolean cancelled) {
recordCompletionResult(counted, commonTags, throwable);
}
});
} catch (Throwable e) {
record(counted, commonTags, e);
}
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.quarkus.arc.ArcInvocationContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Functions;

/**
* Quarkus defined interceptor for types or methods annotated with {@link Timed @Timed}.
Expand All @@ -37,26 +39,36 @@ public MicrometerTimedInterceptor(MeterRegistry meterRegistry) {
}

@AroundInvoke
@SuppressWarnings("unchecked")
Object timedMethod(ArcInvocationContext context) throws Exception {
final boolean stopWhenCompleted = CompletionStage.class.isAssignableFrom(context.getMethod().getReturnType());
final List<Sample> samples = getSamples(context);

if (samples.isEmpty()) {
// This should never happen - at least one @Timed binding must be present
return context.proceed();
}

if (stopWhenCompleted) {
Class<?> returnType = context.getMethod().getReturnType();
if (TypesUtil.isCompletionStage(returnType)) {
try {
return ((CompletionStage<?>) context.proceed()).whenComplete((result, throwable) -> {
for (Sample sample : samples) {
sample.stop(MicrometerRecorder.getExceptionTag(throwable));
}
stop(samples, MicrometerRecorder.getExceptionTag(throwable));
});
} catch (Exception ex) {
for (Sample sample : samples) {
sample.stop(MicrometerRecorder.getExceptionTag(ex));
}
stop(samples, MicrometerRecorder.getExceptionTag(ex));
throw ex;
}
} else if (TypesUtil.isUni(returnType)) {
try {
return ((Uni<Object>) context.proceed()).onTermination().invoke(
new Functions.TriConsumer<>() {
@Override
public void accept(Object o, Throwable throwable, Boolean cancelled) {
stop(samples, MicrometerRecorder.getExceptionTag(throwable));
}
});
} catch (Exception ex) {
stop(samples, MicrometerRecorder.getExceptionTag(ex));
throw ex;
}
}
Expand All @@ -68,9 +80,7 @@ Object timedMethod(ArcInvocationContext context) throws Exception {
exceptionClass = MicrometerRecorder.getExceptionTag(ex);
throw ex;
} finally {
for (Sample sample : samples) {
sample.stop(exceptionClass);
}
stop(samples, exceptionClass);
}
}

Expand All @@ -92,6 +102,12 @@ private List<Sample> getSamples(ArcInvocationContext context) {
return samples;
}

private void stop(List<Sample> samples, String throwableClassName) {
for (int i = 0; i < samples.size(); i++) {
samples.get(i).stop(throwableClassName);
}
}

private void record(Timed timed, Timer.Sample sample, String exceptionClass, Tags timerTags) {
final String metricName = timed.value().isEmpty() ? DEFAULT_METRIC_NAME : timed.value();
try {
Expand Down Expand Up @@ -141,7 +157,7 @@ private Tags getCommonTags(String className, String methodName) {
return Tags.of("class", className, "method", methodName);
}

abstract class Sample {
abstract static class Sample {

protected final Timed timed;
protected final Tags commonTags;
Expand Down
Loading

0 comments on commit d27d218

Please sign in to comment.