Skip to content

Commit

Permalink
separate metrics of @ApplyFaultTolerance for each method
Browse files Browse the repository at this point in the history
This commit separates metrics of `@ApplyFaultTolerance` methods, so that
each method has its own metrics yet they all share the same state.
  • Loading branch information
Ladicek committed Oct 16, 2024
1 parent d5a2a57 commit dc377b7
Show file tree
Hide file tree
Showing 21 changed files with 575 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

/**
* A special interceptor binding annotation to apply preconfigured fault tolerance.
* If {@code @ApplyFaultTolerance("<identifier>")} is present on a business method,
* If {@code @ApplyFaultTolerance("<identifier>")} is present on a business method,
* then a bean of type {@link FaultTolerance} with qualifier
* {@link io.smallrye.common.annotation.Identifier @Identifier("&lt;identifier>")}
* must exist. Such bean serves as a preconfigured set of fault tolerance strategies
Expand Down Expand Up @@ -42,8 +42,8 @@
* <p>
* A single preconfigured fault tolerance can even be applied to multiple methods with different
* return types, as long as the constraint on method asynchrony described above is obeyed. In such
* case, it is customary to declare the fault tolerance instance as {@code FaultTolerance&lt;Object>}
* for synchronous methods, {@code FaultTolerance&lt;CompletionStage&lt;Object>>} for asynchronous
* case, it is customary to declare the fault tolerance instance as {@code FaultTolerance<Object>}
* for synchronous methods, {@code FaultTolerance<CompletionStage<Object>>} for asynchronous
* methods that return {@code CompletionStage}, and so on. Note that this effectively precludes
* defining a useful fallback, because fallback can only be defined when the value type is known.
*/
Expand Down
10 changes: 10 additions & 0 deletions doc/modules/ROOT/pages/reference/reusable.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,13 @@ Likewise, it is possible to do this for xref:reference/asynchronous.adoc#async-t
Note that you can't define a synchronous `FaultTolerance<T>` object and apply it to any asynchronous method.
Similarly, you can't define an asynchronous `FaultTolerance<CompletionStage<T>>` and apply it to a synchronous method or an asynchronous method with different asynchronous type.
This limitation will be lifted in the future.

== Metrics

Methods annotated `@ApplyFaultTolerance` gather metrics similarly to methods annotated with {microprofile-fault-tolerance} annotations.
That is, each method gets its own metrics, with the `method` tag being `<fully qualified class name>.<method name>`.

At the same time, state is still shared.
All methods annotated `@ApplyFaultTolerance` share the same bulkhead, circuit breaker and/or rate limit.

If the `FaultTolerance` object used for `@ApplyFaultTolerance` is also used xref:reference/programmatic-api.adoc[programmatically], that usage is coalesced in metrics under the description as the `method` tag.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import io.smallrye.faulttolerance.core.invocation.AsyncSupportRegistry;
import io.smallrye.faulttolerance.core.invocation.Invoker;
import io.smallrye.faulttolerance.core.invocation.StrategyInvoker;
import io.smallrye.faulttolerance.core.metrics.CompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.DelegatingCompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.DelegatingMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.rate.limit.CompletionStageRateLimit;
import io.smallrye.faulttolerance.core.rate.limit.RateLimit;
import io.smallrye.faulttolerance.core.retry.BackOff;
Expand Down Expand Up @@ -103,22 +105,32 @@ public final class FaultToleranceImpl<V, S, T> implements FaultTolerance<T> {
this.hasFallback = hasFallback;
}

@Override
public T call(Callable<T> action) throws Exception {
T call(Callable<T> action, MeteredOperationName meteredOperationName) throws Exception {
if (asyncSupport == null) {
InvocationContext<T> ctx = new InvocationContext<>(action);
if (meteredOperationName != null) {
ctx.set(MeteredOperationName.class, meteredOperationName);
}
eventHandlers.register(ctx);
return ((FaultToleranceStrategy<T>) strategy).apply(ctx);
}

Invoker<T> invoker = new CallableInvoker<>(action);
InvocationContext<CompletionStage<V>> ctx = new InvocationContext<>(() -> asyncSupport.toCompletionStage(invoker));
if (meteredOperationName != null) {
ctx.set(MeteredOperationName.class, meteredOperationName);
}
eventHandlers.register(ctx);
Invoker<CompletionStage<V>> wrapper = new StrategyInvoker<>(null,
(FaultToleranceStrategy<CompletionStage<V>>) strategy, ctx);
return asyncSupport.fromCompletionStage(wrapper);
}

@Override
public T call(Callable<T> action) throws Exception {
return call(action, null);
}

@Override
public void run(Runnable action) {
try {
Expand Down Expand Up @@ -259,7 +271,7 @@ private void eagerInitialization() {
}
}

private FaultTolerance<T> build(BuilderLazyDependencies lazyDependencies) {
private FaultToleranceImpl<?, ?, T> build(BuilderLazyDependencies lazyDependencies) {
Consumer<CircuitBreakerEvents.StateTransition> cbMaintenanceEventHandler = null;
if (circuitBreakerBuilder != null && circuitBreakerBuilder.name != null) {
cbMaintenanceEventHandler = eagerDependencies.cbMaintenance()
Expand All @@ -285,12 +297,13 @@ private FaultTolerance<T> build(BuilderLazyDependencies lazyDependencies) {
return isAsync ? buildAsync(lazyDependencies, eventHandlers) : buildSync(lazyDependencies, eventHandlers);
}

private FaultTolerance<T> buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
private FaultToleranceImpl<T, T, T> buildSync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
FaultToleranceStrategy<T> strategy = buildSyncStrategy(lazyDependencies);
return new FaultToleranceImpl<>(strategy, (AsyncSupport<T, T>) null, eventHandlers, fallbackBuilder != null);
return new FaultToleranceImpl<>(strategy, null, eventHandlers, fallbackBuilder != null);
}

private <V> FaultTolerance<T> buildAsync(BuilderLazyDependencies lazyDependencies, EventHandlers eventHandlers) {
private <V> FaultToleranceImpl<V, CompletionStage<V>, T> buildAsync(BuilderLazyDependencies lazyDependencies,
EventHandlers eventHandlers) {
FaultToleranceStrategy<CompletionStage<V>> strategy = buildAsyncStrategy(lazyDependencies);
AsyncSupport<V, T> asyncSupport = AsyncSupportRegistry.get(new Class[0], asyncType);
return new FaultToleranceImpl<>(strategy, asyncSupport, eventHandlers, fallbackBuilder != null);
Expand Down Expand Up @@ -354,10 +367,10 @@ private FaultToleranceStrategy<T> buildSyncStrategy(BuilderLazyDependencies lazy
fallbackBuilder.whenPredicate));
}

if (lazyDependencies.metricsProvider().isEnabled()) {
MeteredOperation meteredOperation = buildMeteredOperation();
result = new MetricsCollector<>(result, lazyDependencies.metricsProvider().create(meteredOperation),
meteredOperation);
MetricsProvider metricsProvider = lazyDependencies.metricsProvider();
if (metricsProvider.isEnabled()) {
MeteredOperation defaultOperation = buildMeteredOperation();
result = new DelegatingMetricsCollector<>(result, metricsProvider, defaultOperation);
}

return result;
Expand Down Expand Up @@ -436,10 +449,10 @@ private <V> FaultToleranceStrategy<CompletionStage<V>> buildAsyncStrategy(Builde
fallbackBuilder.whenPredicate));
}

if (lazyDependencies.metricsProvider().isEnabled()) {
MeteredOperation meteredOperation = buildMeteredOperation();
result = new CompletionStageMetricsCollector<>(result,
lazyDependencies.metricsProvider().create(meteredOperation), meteredOperation);
MetricsProvider metricsProvider = lazyDependencies.metricsProvider();
if (metricsProvider.isEnabled()) {
MeteredOperation defaultOperation = buildMeteredOperation();
result = new DelegatingCompletionStageMetricsCollector<>(result, metricsProvider, defaultOperation);
}

// thread offload is always enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import java.util.function.Supplier;

import io.smallrye.faulttolerance.api.FaultTolerance;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;

public final class LazyFaultTolerance<T> implements FaultTolerance<T> {
private final Supplier<FaultTolerance<T>> builder;
private final Supplier<FaultToleranceImpl<?, ?, T>> builder;
private final Class<?> asyncType;

private final ReentrantLock lock = new ReentrantLock();

private volatile FaultTolerance<T> instance;
private volatile FaultToleranceImpl<?, ?, T> instance;

LazyFaultTolerance(Supplier<FaultTolerance<T>> builder, Class<?> asyncType) {
LazyFaultTolerance(Supplier<FaultToleranceImpl<?, ?, T>> builder, Class<?> asyncType) {
this.builder = builder;
this.asyncType = asyncType;
}
Expand All @@ -23,6 +24,10 @@ public Class<?> internalGetAsyncType() {
return asyncType;
}

public T call(Callable<T> action, MeteredOperationName meteredOperationName) throws Exception {
return instance().call(action, meteredOperationName);
}

@Override
public T call(Callable<T> action) throws Exception {
return instance().call(action);
Expand Down Expand Up @@ -50,8 +55,8 @@ public <U> FaultTolerance<U> castAsync(Class<?> asyncType) {
return instance().castAsync(asyncType);
}

private FaultTolerance<T> instance() {
FaultTolerance<T> instance = this.instance;
private FaultToleranceImpl<?, ?, T> instance() {
FaultToleranceImpl<?, ?, T> instance = this.instance;
if (instance == null) {
lock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.smallrye.faulttolerance.core.metrics;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

public class DelegatingCompletionStageMetricsCollector<V> implements FaultToleranceStrategy<CompletionStage<V>> {
private final FaultToleranceStrategy<CompletionStage<V>> delegate;
private final MetricsProvider provider;
private final MeteredOperation originalOperation;

private final ConcurrentMap<MeteredOperation, CompletionStageMetricsCollector<V>> cache = new ConcurrentHashMap<>();

public DelegatingCompletionStageMetricsCollector(FaultToleranceStrategy<CompletionStage<V>> delegate,
MetricsProvider provider, MeteredOperation originalOperation) {
this.delegate = delegate;
this.provider = provider;
this.originalOperation = originalOperation;
}

@Override
public CompletionStage<V> apply(InvocationContext<CompletionStage<V>> ctx) throws Exception {
MeteredOperationName name = ctx.get(MeteredOperationName.class);
MeteredOperation operation = name != null
? new DelegatingMeteredOperation(originalOperation, name.get())
: originalOperation;
CompletionStageMetricsCollector<V> delegate = cache.computeIfAbsent(operation,
ignored -> new CompletionStageMetricsCollector<>(this.delegate, provider.create(operation), operation));
return delegate.apply(ctx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.smallrye.faulttolerance.core.metrics;

final class DelegatingMeteredOperation implements MeteredOperation {
private final MeteredOperation operation;
private final String name;

DelegatingMeteredOperation(MeteredOperation operation, String name) {
this.operation = operation;
this.name = name;
}

@Override
public boolean isAsynchronous() {
return operation.isAsynchronous();
}

@Override
public boolean hasBulkhead() {
return operation.hasBulkhead();
}

@Override
public boolean hasCircuitBreaker() {
return operation.hasCircuitBreaker();
}

@Override
public boolean hasFallback() {
return operation.hasFallback();
}

@Override
public boolean hasRateLimit() {
return operation.hasRateLimit();
}

@Override
public boolean hasRetry() {
return operation.hasRetry();
}

@Override
public boolean hasTimeout() {
return operation.hasTimeout();
}

@Override
public String name() {
return name;
}

@Override
public Object cacheKey() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.smallrye.faulttolerance.core.metrics;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import io.smallrye.faulttolerance.core.FaultToleranceStrategy;
import io.smallrye.faulttolerance.core.InvocationContext;

public class DelegatingMetricsCollector<V> implements FaultToleranceStrategy<V> {
private final FaultToleranceStrategy<V> delegate;
private final MetricsProvider provider;
private final MeteredOperation originalOperation;

private final ConcurrentMap<MeteredOperation, MetricsCollector<V>> cache = new ConcurrentHashMap<>();

public DelegatingMetricsCollector(FaultToleranceStrategy<V> delegate, MetricsProvider provider,
MeteredOperation originalOperation) {
this.delegate = delegate;
this.provider = provider;
this.originalOperation = originalOperation;
}

@Override
public V apply(InvocationContext<V> ctx) throws Exception {
MeteredOperationName name = ctx.get(MeteredOperationName.class);
MeteredOperation operation = name != null
? new DelegatingMeteredOperation(originalOperation, name.get())
: originalOperation;
MetricsCollector<V> delegate = cache.computeIfAbsent(operation,
ignored -> new MetricsCollector<>(this.delegate, provider.create(operation), operation));
return delegate.apply(ctx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.smallrye.faulttolerance.core.metrics;

public final class MeteredOperationName {
private final String name;

public MeteredOperationName(String name) {
this.name = name;
}

public String get() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.smallrye.faulttolerance.core.invocation.StrategyInvoker;
import io.smallrye.faulttolerance.core.metrics.CompletionStageMetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MeteredOperation;
import io.smallrye.faulttolerance.core.metrics.MeteredOperationName;
import io.smallrye.faulttolerance.core.metrics.MetricsCollector;
import io.smallrye.faulttolerance.core.metrics.MetricsProvider;
import io.smallrye.faulttolerance.core.rate.limit.CompletionStageRateLimit;
Expand Down Expand Up @@ -210,14 +211,16 @@ private Object preconfiguredFlow(FaultToleranceOperation operation, InvocationCo
throw new FaultToleranceException("Configured fault tolerance '" + identifier
+ "' is not created by the FaultTolerance API, this is not supported");
}
LazyFaultTolerance<Object> lazyFaultTolerance = (LazyFaultTolerance<Object>) faultTolerance;

Class<?> asyncType = ((LazyFaultTolerance<?>) faultTolerance).internalGetAsyncType();
Class<?> asyncType = lazyFaultTolerance.internalGetAsyncType();
MeteredOperationName meteredOperationName = new MeteredOperationName(operation.getName());

AsyncSupport<?, ?> forOperation = AsyncSupportRegistry.get(operation.getParameterTypes(), operation.getReturnType());
AsyncSupport<?, ?> fromConfigured = asyncType == null ? null : AsyncSupportRegistry.get(new Class[0], asyncType);

if (forOperation == null && fromConfigured == null) {
return faultTolerance.call(interceptionContext::proceed);
return lazyFaultTolerance.call(interceptionContext::proceed, meteredOperationName);
} else if (forOperation == null) {
throw new FaultToleranceException("Configured fault tolerance '" + identifier
+ "' expects the operation to " + fromConfigured.mustDescription()
Expand All @@ -231,7 +234,7 @@ private Object preconfiguredFlow(FaultToleranceOperation operation, InvocationCo
+ "' expects the operation to " + fromConfigured.mustDescription()
+ ", but it " + forOperation.doesDescription() + ": " + operation);
} else {
return faultTolerance.call(interceptionContext::proceed);
return lazyFaultTolerance.call(interceptionContext::proceed, meteredOperationName);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.smallrye.faulttolerance.reuse.async.completionstage.metrics;

import static java.util.concurrent.CompletableFuture.completedFuture;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.faulttolerance.api.FaultTolerance;

@ApplicationScoped
public class MyFaultTolerance {
@Produces
@Identifier("my-fault-tolerance")
public static final FaultTolerance<CompletionStage<String>> FT = FaultTolerance.<String> createAsync()
.withRetry().maxRetries(2).done()
.withFallback().handler(() -> completedFuture("fallback")).done()
.build();
}
Loading

0 comments on commit dc377b7

Please sign in to comment.