Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate metrics of @ApplyFaultTolerance for each method #1061

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

/**
* A special interceptor binding annotation to apply preconfigured fault tolerance.
* If {@code @ApplyFaultTolerance("<identifier>")} is present on a business method,
* If {@code @ApplyFaultTolerance("<identifier>")} is present on a business method,
* then a bean of type {@link FaultTolerance} with qualifier
* {@link io.smallrye.common.annotation.Identifier @Identifier("&lt;identifier>")}
* must exist. Such bean serves as a preconfigured set of fault tolerance strategies
Expand Down Expand Up @@ -42,8 +42,8 @@
* <p>
* A single preconfigured fault tolerance can even be applied to multiple methods with different
* return types, as long as the constraint on method asynchrony described above is obeyed. In such
* case, it is customary to declare the fault tolerance instance as {@code FaultTolerance&lt;Object>}
* for synchronous methods, {@code FaultTolerance&lt;CompletionStage&lt;Object>>} for asynchronous
* case, it is customary to declare the fault tolerance instance as {@code FaultTolerance<Object>}
* for synchronous methods, {@code FaultTolerance<CompletionStage<Object>>} for asynchronous
* methods that return {@code CompletionStage}, and so on. Note that this effectively precludes
* defining a useful fallback, because fallback can only be defined when the value type is known.
*/
Expand Down
10 changes: 10 additions & 0 deletions doc/modules/ROOT/pages/reference/reusable.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,13 @@ Likewise, it is possible to do this for xref:reference/asynchronous.adoc#async-t
Note that you can't define a synchronous `FaultTolerance<T>` object and apply it to any asynchronous method.
Similarly, you can't define an asynchronous `FaultTolerance<CompletionStage<T>>` and apply it to a synchronous method or an asynchronous method with different asynchronous type.
This limitation will be lifted in the future.

== Metrics

Methods annotated `@ApplyFaultTolerance` gather metrics similarly to methods annotated with {microprofile-fault-tolerance} annotations.
That is, each method gets its own metrics, with the `method` tag being `<fully qualified class name>.<method name>`.

At the same time, state is still shared.
All methods annotated `@ApplyFaultTolerance` share the same bulkhead, circuit breaker and/or rate limit.

If the `FaultTolerance` object used for `@ApplyFaultTolerance` is also used xref:reference/programmatic-api.adoc[programmatically], that usage is coalesced in metrics under the description as the `method` tag.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import io.smallrye.faulttolerance.core.invocation.AsyncSupportRegistry;
import io.smallrye.faulttolerance.core.invocation.Invoker;
import io.smallrye.faulttolerance.core.invocation.StrategyInvoker;
import io.smallrye.faulttolerance.core.metrics.CompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.DelegatingCompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.DelegatingMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.rate.limit.CompletionStageRateLimit;
import io.smallrye.faulttolerance.core.rate.limit.RateLimit;
import io.smallrye.faulttolerance.core.retry.BackOff;
Expand Down Expand Up @@ -103,22 +105,32 @@ public final class FaultToleranceImpl<V, S, T> implements FaultTolerance<T> {
this.hasFallback = hasFallback;
}

@Override
public T call(Callable<T> action) throws Exception {
T call(Callable<T> action, MeteredOperationName meteredOperationName) throws Exception {
if (asyncSupport == null) {
InvocationContext<T> ctx = new InvocationContext<>(action);
if (meteredOperationName != null) {
ctx.set(MeteredOperationName.class, meteredOperationName);
}
eventHandlers.register(ctx);
return ((FaultToleranceStrategy<T>) strategy).apply(ctx);
}

Invoker<T> invoker = new CallableInvoker<>(action);
InvocationContext<CompletionStage<V>> ctx = new InvocationContext<>(() -> asyncSupport.toCompletionStage(invoker));
if (meteredOperationName != null) {
ctx.set(MeteredOperationName.class, meteredOperationName);
}
eventHandlers.register(ctx);
Invoker<CompletionStage<V>> wrapper = new StrategyInvoker<>(null,
(FaultToleranceStrategy<CompletionStage<V>>) strategy, ctx);
return asyncSupport.fromCompletionStage(wrapper);
}

@Override
public T call(Callable<T> action) throws Exception {
return call(action, null);
}

@Override
public void run(Runnable action) {
try {
Expand Down Expand Up @@ -259,7 +271,7 @@ private void eagerInitialization() {
}
}

private FaultTolerance<T> build(BuilderLazyDependencies lazyDependencies) {
private FaultToleranceImpl<?, ?, T> build(BuilderLazyDependencies lazyDependencies) {
Consumer<CircuitBreakerEvents.StateTransition> cbMaintenanceEventHandler = null;
if (circuitBreakerBuilder != null && circuitBreakerBuilder.name != null) {
cbMaintenanceEventHandler = eagerDependencies.cbMaintenance()
Expand All @@ -285,12 +297,13 @@ private FaultTolerance<T> build(BuilderLazyDependencies lazyDependencies) {
return isAsync ? buildAsync(lazyDependencies, eventHandlers) : buildSync(lazyDependencies, eventHandlers);
}

private FaultTolerance<T> buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
private FaultToleranceImpl<T, T, T> buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
FaultToleranceStrategy<T> strategy = buildSyncStrategy(lazyDependencies);
return new FaultToleranceImpl<>(strategy, (AsyncSupport<T, T>) null, eventHandlers, fallbackBuilder != null);
return new FaultToleranceImpl<>(strategy, null, eventHandlers, fallbackBuilder != null);
}

private <V> FaultTolerance<T> buildAsync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
private <V> FaultToleranceImpl<V, CompletionStage<V>, T> buildAsync(BuilderLazyDependencies lazyDependencies,
EventHandlers eventHandlers) {
FaultToleranceStrategy<CompletionStage<V>> strategy = buildAsyncStrategy(lazyDependencies);
AsyncSupport<V, T> asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType);
return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers, fallbackBuilder != null);
Expand Down Expand Up @@ -354,10 +367,10 @@ private FaultToleranceStrategy<T> buildSyncStrategy(BuilderLazyDependencies lazy
fallbackBuilder.whenPredicate));
}

if (lazyDependencies.metricsProvider().isEnabled()) {
MeteredOperation meteredOperation = buildMeteredOperation();
result = new MetricsCollector<>(result, lazyDependencies.metricsProvider().create(meteredOperation),
meteredOperation);
MetricsProvider metricsProvider = lazyDependencies.metricsProvider();
if (metricsProvider.isEnabled()) {
MeteredOperation defaultOperation = buildMeteredOperation();
result = new DelegatingMetricsCollector<>(result, metricsProvider, defaultOperation);
}

return result;
Expand Down Expand Up @@ -436,10 +449,10 @@ private <V> FaultToleranceStrategy<CompletionStage<V>> buildAsyncStrategy(Builde
fallbackBuilder.whenPredicate));
}

if (lazyDependencies.metricsProvider().isEnabled()) {
MeteredOperation meteredOperation = buildMeteredOperation();
result = new CompletionStageMetricsCollector<>(result,
lazyDependencies.metricsProvider().create(meteredOperation), meteredOperation);
MetricsProvider metricsProvider = lazyDependencies.metricsProvider();
if (metricsProvider.isEnabled()) {
MeteredOperation defaultOperation = buildMeteredOperation();
result = new DelegatingCompletionStageMetricsCollector<>(result, metricsProvider, defaultOperation);
}

// thread offload is always enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import java.util.function.Supplier;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;

public final class LazyFaultTolerance<T> implements FaultTolerance<T> {
private final Supplier<FaultTolerance<T>> builder;
private final Supplier<FaultToleranceImpl<?, ?, T>> builder;
private final Class<?> asyncType;

private final ReentrantLock lock = new ReentrantLock();

private volatile FaultTolerance<T> instance;
private volatile FaultToleranceImpl<?, ?, T> instance;

LazyFaultTolerance(Supplier<FaultTolerance<T>> builder, Class<?> asyncType) {
LazyFaultTolerance(Supplier<FaultToleranceImpl<?, ?, T>> builder, Class<?> asyncType) {
this.builder = builder;
this.asyncType = asyncType;
}
Expand All @@ -23,6 +24,10 @@ public Class<?> internalGetAsyncType() {
return asyncType;
}

public T call(Callable<T> action, MeteredOperationName meteredOperationName) throws Exception {
return instance().call(action, meteredOperationName);
}

@Override
public T call(Callable<T> action) throws Exception {
return instance().call(action);
Expand Down Expand Up @@ -50,8 +55,8 @@ public <U> FaultTolerance<U> castAsync(Class<?> asyncType) {
return instance().castAsync(asyncType);
}

private FaultTolerance<T> instance() {
FaultTolerance<T> instance = this.instance;
private FaultToleranceImpl<?, ?, T> instance() {
FaultToleranceImpl<?, ?, T> instance = this.instance;
if (instance == null) {
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.smallrye.faulttolerance.core.metrics;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

public class DelegatingCompletionStageMetricsCollector<V> implements FaultToleranceStrategy<CompletionStage<V>> {
private final FaultToleranceStrategy<CompletionStage<V>> delegate;
private final MetricsProvider provider;
private final MeteredOperation originalOperation;

private final ConcurrentMap<MeteredOperation, CompletionStageMetricsCollector<V>> cache = new ConcurrentHashMap<>();

public DelegatingCompletionStageMetricsCollector(FaultToleranceStrategy<CompletionStage<V>> delegate,
MetricsProvider provider, MeteredOperation originalOperation) {
this.delegate = delegate;
this.provider = provider;
this.originalOperation = originalOperation;
}

@Override
public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) throws Exception {
MeteredOperationName name = ctx.get(MeteredOperationName.class);
MeteredOperation operation = name != null
? new DelegatingMeteredOperation(originalOperation, name.get())
: originalOperation;
CompletionStageMetricsCollector<V> delegate = cache.computeIfAbsent(operation,
ignored -> new CompletionStageMetricsCollector<>(this.delegate, provider.create(operation), operation));
return delegate.apply(ctx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.smallrye.faulttolerance.core.metrics;

final class DelegatingMeteredOperation implements MeteredOperation {
private final MeteredOperation operation;
private final String name;

DelegatingMeteredOperation(MeteredOperation operation, String name) {
this.operation = operation;
this.name = name;
}

@Override
public boolean isAsynchronous() {
return operation.isAsynchronous();
}

@Override
public boolean hasBulkhead() {
return operation.hasBulkhead();
}

@Override
public boolean hasCircuitBreaker() {
return operation.hasCircuitBreaker();
}

@Override
public boolean hasFallback() {
return operation.hasFallback();
}

@Override
public boolean hasRateLimit() {
return operation.hasRateLimit();
}

@Override
public boolean hasRetry() {
return operation.hasRetry();
}

@Override
public boolean hasTimeout() {
return operation.hasTimeout();
}

@Override
public String name() {
return name;
}

@Override
public Object cacheKey() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.smallrye.faulttolerance.core.metrics;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

public class DelegatingMetricsCollector<V> implements FaultToleranceStrategy<V> {
private final FaultToleranceStrategy<V> delegate;
private final MetricsProvider provider;
private final MeteredOperation originalOperation;

private final ConcurrentMap<MeteredOperation, MetricsCollector<V>> cache = new ConcurrentHashMap<>();

public DelegatingMetricsCollector(FaultToleranceStrategy<V> delegate, MetricsProvider provider,
MeteredOperation originalOperation) {
this.delegate = delegate;
this.provider = provider;
this.originalOperation = originalOperation;
}

@Override
public V apply(InvocationContext<V> ctx) throws Exception {
MeteredOperationName name = ctx.get(MeteredOperationName.class);
MeteredOperation operation = name != null
? new DelegatingMeteredOperation(originalOperation, name.get())
: originalOperation;
MetricsCollector<V> delegate = cache.computeIfAbsent(operation,
ignored -> new MetricsCollector<>(this.delegate, provider.create(operation), operation));
return delegate.apply(ctx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.smallrye.faulttolerance.core.metrics;

public final class MeteredOperationName {
private final String name;

public MeteredOperationName(String name) {
this.name = name;
}

public String get() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.smallrye.faulttolerance.core.invocation.StrategyInvoker;
import io.smallrye.faulttolerance.core.metrics.CompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;
import io.smallrye.faulttolerance.core.metrics.MetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.rate.limit.CompletionStageRateLimit;
Expand Down Expand Up @@ -210,14 +211,16 @@ private Object preconfiguredFlow(FaultToleranceOperation operation, InvocationCo
throw new FaultToleranceException("Configured fault tolerance '" + identifier
+ "' is not created by the FaultTolerance API, this is not supported");
}
LazyFaultTolerance<Object> lazyFaultTolerance = (LazyFaultTolerance<Object>) faultTolerance;

Class<?> asyncType = ((LazyFaultTolerance<?>) faultTolerance).internalGetAsyncType();
Class<?> asyncType = lazyFaultTolerance.internalGetAsyncType();
MeteredOperationName meteredOperationName = new MeteredOperationName(operation.getName());

AsyncSupport<?, ?> forOperation = AsyncSupportRegistry.get(operation.getParameterTypes(), operation.getReturnType());
AsyncSupport<?, ?> fromConfigured = asyncType == null ? null : AsyncSupportRegistry.get(new Class[0], asyncType);

if (forOperation == null && fromConfigured == null) {
return faultTolerance.call(interceptionContext::proceed);
return lazyFaultTolerance.call(interceptionContext::proceed, meteredOperationName);
} else if (forOperation == null) {
throw new FaultToleranceException("Configured fault tolerance '" + identifier
+ "' expects the operation to " + fromConfigured.mustDescription()
Expand All @@ -231,7 +234,7 @@ private Object preconfiguredFlow(FaultToleranceOperation operation, InvocationCo
+ "' expects the operation to " + fromConfigured.mustDescription()
+ ", but it " + forOperation.doesDescription() + ": " + operation);
} else {
return faultTolerance.call(interceptionContext::proceed);
return lazyFaultTolerance.call(interceptionContext::proceed, meteredOperationName);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.smallrye.faulttolerance.reuse.async.completionstage.metrics;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.faulttolerance.api.FaultTolerance;

@ApplicationScoped
public class MyFaultTolerance {
@Produces
@Identifier("my-fault-tolerance")
public static final FaultTolerance<CompletionStage<String>> FT = FaultTolerance.<String> createAsync()
.withRetry().maxRetries(2).done()
.withFallback().handler(() -> completedFuture("fallback")).done()
.build();
}
Loading