From ccce6d4d9d5259092657503022413a81b14a795d Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Mon, 16 Sep 2024 11:46:32 +0200 Subject: [PATCH 1/2] Implement MicroProfile Fault Tolerance 4.1 This includes support for OpenTelemetry Metrics. For testing, this commit uses SmallRye OpenTelemetry. --- doc/antora.yml | 4 +- .../ROOT/pages/integration/metrics.adoc | 63 ++-- .../pages/integration/programmatic-api.adoc | 7 +- .../ROOT/pages/reference/bulkhead.adoc | 35 +- .../ROOT/pages/reference/circuit-breaker.adoc | 25 +- doc/modules/ROOT/pages/reference/metrics.adoc | 103 +----- .../pages/reference/programmatic-api.adoc | 2 +- .../ROOT/pages/reference/rate-limit.adoc | 5 +- doc/modules/ROOT/pages/reference/retry.adoc | 10 +- doc/modules/ROOT/pages/reference/timeout.adoc | 15 +- implementation/core/pom.xml | 5 + .../core/metrics/OpenTelemetryRecorder.java | 328 ++++++++++++++++++ implementation/fault-tolerance/pom.xml | 5 + .../FaultToleranceExtension.java | 78 ++++- .../metrics/CompoundMetricsProvider.java | 225 ++++++++++++ .../metrics/MetricsIntegration.java | 5 +- .../metrics/MicroProfileMetricsProvider.java | 4 + .../metrics/MicrometerProvider.java | 4 + .../metrics/OpenTelemetryProvider.java | 63 ++++ implementation/standalone/pom.xml | 5 + .../standalone/LazyDependencies.java | 2 + .../standalone/OpenTelemetryAdapter.java | 43 +++ pom.xml | 34 +- testsuite/integration/pom.xml | 4 - testsuite/tck/pom.xml | 12 + .../tck/FaultToleranceExtension.java | 1 + .../faulttolerance/tck/ForceOtelBean.java | 18 + .../tck/OtelConfigExtension.java | 13 + .../jakarta.enterprise.inject.spi.Extension | 1 + 29 files changed, 945 insertions(+), 174 deletions(-) create mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/OpenTelemetryRecorder.java create mode 100644 implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/CompoundMetricsProvider.java create mode 100644 implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/OpenTelemetryProvider.java create mode 100644 implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/OpenTelemetryAdapter.java create mode 100644 testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/ForceOtelBean.java create mode 100644 testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/OtelConfigExtension.java create mode 100644 testsuite/tck/src/test/resources/META-INF/services/jakarta.enterprise.inject.spi.Extension diff --git a/doc/antora.yml b/doc/antora.yml index 1405309a3..a6a5c76e8 100644 --- a/doc/antora.yml +++ b/doc/antora.yml @@ -10,7 +10,7 @@ asciidoc: smallrye-fault-tolerance-version: '6.4.1' microprofile-fault-tolerance: MicroProfile Fault Tolerance - microprofile-fault-tolerance-version: '4.0.2' - microprofile-fault-tolerance-url: https://download.eclipse.org/microprofile/microprofile-fault-tolerance-4.0/microprofile-fault-tolerance-spec-4.0.html + microprofile-fault-tolerance-version: '4.1' + microprofile-fault-tolerance-url: https://download.eclipse.org/microprofile/microprofile-fault-tolerance-4.1/microprofile-fault-tolerance-spec-4.1.html vertx4-version: '4.5.8' diff --git a/doc/modules/ROOT/pages/integration/metrics.adoc b/doc/modules/ROOT/pages/integration/metrics.adoc index fd1f24839..07af188dc 100644 --- a/doc/modules/ROOT/pages/integration/metrics.adoc +++ b/doc/modules/ROOT/pages/integration/metrics.adoc @@ -1,64 +1,65 @@ = Metrics -{smallrye-fault-tolerance} provides support for MicroProfile Metrics and Micrometer. +{smallrye-fault-tolerance} provides support for MicroProfile Metrics, OpenTelemetry and Micrometer. Alternatively, metrics may be completely disabled at the integration level. As usual, this integration is based on CDI. -{smallrye-fault-tolerance} includes an internal interface `MetricsProvider` and 3 different implementations. -Exactly 1 bean of type `MetricsProvider` must exist. -An instance of that bean is used to interact with the metrics system. +{smallrye-fault-tolerance} includes an internal interface `MetricsProvider` and these implementations: -There are 2 ways to select which metrics provider bean exists: +* `io.smallrye.faulttolerance.metrics.MicroProfileMetricsProvider` +* `io.smallrye.faulttolerance.metrics.OpenTelemetryProvider` +* `io.smallrye.faulttolerance.metrics.MicrometerProvider` +* `io.smallrye.faulttolerance.metrics.NoopProvider` + +There are 2 possible ways how to integrate metrics: + +* exactly 1 class from the list above is a bean; +* more than 1 class from the list above is a bean, in which case, `io.smallrye.faulttolerance.metrics.CompoundMetricsProvider` must also be a bean. + +NOTE: Only the _names_ of the classes listed above are treated as public. +That is, the classes should be treated as opaque, no guarantees about their internals are made. + +== Default Integration -- using a constructor of the Portable Extension, -- altering the set of discovered types. +In case the integrator uses the CDI Portable Extension `FaultToleranceExtension` and lets the container create an instance, metrics presence is discovered automatically. +All present metrics systems are used. == Using a `FaultToleranceExtension` Constructor In case the integrator uses the CDI Portable Extension `FaultToleranceExtension` and creates its instance manually, they can use a constructor. -In addition to a zero-parameter constructor, there's a constructor that takes a `MetricsIntegration` parameter. +In addition to a zero-parameter constructor, which is used in the default integration as described above, there are constructors that take a parameter of `MetricsIntegration` or `Set`. + `MetricsIntegration` is an enum with these values: * `MICROPROFILE_METRICS`: use MicroProfile Metrics integration +* `OPENTELEMETRY`: use OpenTelemetry (MicroProfile Telemetry) integration * `MICROMETER`: use Micrometer integration * `NOOP`: no metrics -As mentioned above, this is only useful if the integrator creates an instance of the extension themselves. -If the integrator relies on the CDI container to discover and instantiate the extension, the zero-parameter constructor is used, which defaults to `MICROPROFILE_METRICS`. -Such integrator can use the 2nd approach of altering the set of discovered types. - -== Altering the Set of Discovered Types - -The integrator may select the metrics provider by making sure that the correct implementation is discovered during CDI type discovery. -The existing metrics providers are: - -* `io.smallrye.faulttolerance.metrics.MicroProfileMetricsProvider` -* `io.smallrye.faulttolerance.metrics.MicrometerProvider` -* `io.smallrye.faulttolerance.metrics.NoopProvider` - -NOTE: Only the _names_ of the classes listed above are treated as public. -That is, the classes should be treated as opaque, no guarantees about their internals are made. - -Exactly one of these classes must be discovered during CDI type discovery. - -NOTE: Integrators that rely on the CDI container to instantiate `FaultToleranceExtension` must be aware that in this case, the extension adds `MicroProfileMetricsProvider` to the set of discovered types. -If they want to use a different metrics provider, they need to veto the `MicroProfileMetricsProvider` type. - == Metrics Providers Metrics providers have additional requirements, as described below. === MicroProfile Metrics -If MicroProfile Metrics are used, the integrator must ensure that the following artifacts are present: +If MicroProfile Metrics should be used, the integrator must ensure that the following artifacts are present: * `org.eclipse.microprofile.metrics:microprofile-metrics-api`; * some implementation of MicroProfile Metrics. +=== OpenTelemetry + +If OpenTelemetry should be used, the integrator must ensure that the following artifact is present: + +* `io.opentelemetry:opentelemetry-api`. + +Further, a bean of type `io.opentelemetry.api.metrics.Meter` must exist. +This bean is used to emit the actual metrics. + === Micrometer -If Micrometer is used, the integrator must ensure that the following artifact is present: +If Micrometer should be used, the integrator must ensure that the following artifact is present: * `io.micrometer:micrometer-core`. diff --git a/doc/modules/ROOT/pages/integration/programmatic-api.adoc b/doc/modules/ROOT/pages/integration/programmatic-api.adoc index de448dc70..3770b59f1 100644 --- a/doc/modules/ROOT/pages/integration/programmatic-api.adoc +++ b/doc/modules/ROOT/pages/integration/programmatic-api.adoc @@ -40,7 +40,8 @@ After `StandaloneFaultTolerance.shutdown()`, it is not possible to reinitialize === Metrics In the standalone implementation, MicroProfile Metrics make no sense, as that is exclusively based on CDI. -It is however possible to integrate with Micrometer. +It is however possible to integrate with OpenTelemetry or Micrometer. -The `Configuration.metricsAdapter()` method must be implemented and return an instance of `io.smallrye.faulttolerance.standalone.MicrometerAdapter`. -The constructor of `MicrometerAdapter` accepts the Micrometer registry (`MeterRegistry`) to which metrics shall be emitted. +The `Configuration.metricsAdapter()` method must be implemented and return an instance of `io.smallrye.faulttolerance.standalone.OpenTelemetryAdapter` or `io.smallrye.faulttolerance.standalone.MicrometerAdapter`. +The constructor of `OpenTelemetryAdapter` accepts the `Meter` to which metrics shall be emitted. +The constructor of `MicrometerAdapter` accepts the `MeterRegistry` to which metrics shall be emitted. diff --git a/doc/modules/ROOT/pages/reference/bulkhead.adoc b/doc/modules/ROOT/pages/reference/bulkhead.adoc index 1e4cb8509..ffdc0321a 100644 --- a/doc/modules/ROOT/pages/reference/bulkhead.adoc +++ b/doc/modules/ROOT/pages/reference/bulkhead.adoc @@ -67,7 +67,10 @@ Bulkhead exposes the following metrics: [cols="1,5"] |=== | Name | `ft.bulkhead.calls.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the bulkhead logic was run. This is usually once per method call, but may be zero times if the circuit breaker or rate limit prevented execution or more than once if the method call was retried. | Tags @@ -78,7 +81,10 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.bulkhead.executionsRunning` -| Type | `Gauge` +| Type +a| * MP Metrics: `Gauge` +* OpenTelemetry: `LongUpDownCounter` +* Micrometer: `Gauge` | Unit | None | Description | Number of currently running executions. | Tags @@ -88,7 +94,10 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.bulkhead.executionsWaiting` -| Type | `Gauge` +| Type +a| * MP Metrics: `Gauge` +* OpenTelemetry: `LongUpDownCounter` +* Micrometer: `Gauge` | Unit | None | Description | Number of executions currently waiting in the queue. | Tags @@ -99,8 +108,14 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.bulkhead.runningDuration` -| Type | `Histogram` -| Unit | Nanoseconds +| Type +a| * MP Metrics: `Histogram` +* OpenTelemetry: `DoubleHistogram` with explicit bucket boundaries `[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]` +* Micrometer: `Timer` +| Unit +a| * MP Metrics: nanoseconds +* OpenTelemetry: seconds +* Micrometer: nanoseconds | Description | Histogram of the time that method executions spent running. | Tags a| * `method` - the fully qualified method name @@ -109,8 +124,14 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.bulkhead.waitingDuration` -| Type | `Histogram` -| Unit | Nanoseconds +| Type +a| * MP Metrics: `Histogram` +* OpenTelemetry: `DoubleHistogram` with explicit bucket boundaries `[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]` +* Micrometer: `Timer` +| Unit +a| * MP Metrics: nanoseconds +* OpenTelemetry: seconds +* Micrometer: nanoseconds | Description | Histogram of the time that method executions spent waiting in the queue. | Tags a| * `method` - the fully qualified method name diff --git a/doc/modules/ROOT/pages/reference/circuit-breaker.adoc b/doc/modules/ROOT/pages/reference/circuit-breaker.adoc index aa0905c7e..906101909 100644 --- a/doc/modules/ROOT/pages/reference/circuit-breaker.adoc +++ b/doc/modules/ROOT/pages/reference/circuit-breaker.adoc @@ -131,7 +131,10 @@ Circuit breaker exposes the following metrics: [cols="1,5"] |=== | Name | `ft.circuitbreaker.calls.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the circuit breaker logic was run. This is usually once per method call, but may be more than once if the method call is retried. | Tags @@ -145,8 +148,14 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.circuitbreaker.state.total` -| Type | `Gauge` -| Unit | Nanoseconds +| Type +a| * MP Metrics: `Gauge` +* OpenTelemetry: `LongCounter` +* Micrometer: `TimeGauge` +| Unit +a| * MP Metrics: nanoseconds +* OpenTelemetry: nanoseconds +* Micrometer: nanoseconds | Description | Amount of time the circuit breaker has spent in each state | Tags a| * `method` - the fully qualified method name @@ -157,7 +166,10 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.circuitbreaker.opened.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | Number of times the circuit breaker has moved from closed state to open state | Tags @@ -169,7 +181,10 @@ a| * `method` - the fully qualified method name | Name | `ft.circuitbreaker.state.current` 2+a| include::partial$srye-feature.adoc[] -| Type | `Gauge` (`0` or `1`) +| Type +a| * MP Metrics: `Gauge` +* OpenTelemetry: `LongUpDownCounter` +* Micrometer: `Gauge` | Unit | None | Description | Whether the circuit breaker is currently in given state (`1`) or not (`0`) | Tags diff --git a/doc/modules/ROOT/pages/reference/metrics.adoc b/doc/modules/ROOT/pages/reference/metrics.adoc index 3888cfdce..dea275cdb 100644 --- a/doc/modules/ROOT/pages/reference/metrics.adoc +++ b/doc/modules/ROOT/pages/reference/metrics.adoc @@ -1,6 +1,6 @@ = Metrics -{smallrye-fault-tolerance} exposes metrics to MicroProfile Metrics, as {microprofile-fault-tolerance-url}#_integration_with_microprofile_metrics[specified] by {microprofile-fault-tolerance}. +{smallrye-fault-tolerance} exposes metrics, as {microprofile-fault-tolerance-url}#_integration_with_microprofile_metrics_and_microprofile_telemetry[specified] by {microprofile-fault-tolerance}. [[general]] == General Metrics @@ -10,7 +10,10 @@ For all methods guarded with some fault tolerance strategy, the following metric [cols="1,5"] |=== | Name | `ft.invocations.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the method was called. | Tags @@ -42,7 +45,10 @@ The behavior of the timer thread can be observed through the following metrics: [cols="1,5"] |=== | Name | `ft.timer.scheduled` -| Type | `Gauge` +| Type +a| * MP Metrics: `Gauge` +* OpenTelemetry: `LongUpDownCounter` +* Micrometer: `Gauge` | Unit | None | Description | The number of tasks that are currently scheduled (for future execution) on the timer. | Tags @@ -51,95 +57,8 @@ a| * `id` - the ID of the timer, to distinguish multiple timers in a multi-appli == Micrometer Support -In addition to the MicroProfile Metrics support, {smallrye-fault-tolerance} also provides support for https://micrometer.io/[Micrometer]. -The set of metrics emitted to Micrometer is the same as the set of metrics emitted to MicroProfile Metrics, using the same metric names and tags. -Metric types are mapped as closely as possible: - -|=== -| Name | MicroProfile Metrics | Micrometer | Note - -| `ft.invocations.total` -| counter -| counter -| - -| `ft.retry.calls.total` -| counter -| counter -| - -| `ft.retry.retries.total` -| counter -| counter -| - -| `ft.timeout.calls.total` -| counter -| counter -| - -| `ft.timeout.executionDuration` -| histogram -| timer -| - -| `ft.circuitbreaker.calls.total` -| counter -| counter -| - -| `ft.circuitbreaker.state.total` -| gauge -| time gauge -| - -| `ft.circuitbreaker.state.current` -| gauge -| gauge -| * - -| `ft.circuitbreaker.opened.total` -| counter -| counter -| - -| `ft.bulkhead.calls.total` -| counter -| counter -| - -| `ft.bulkhead.executionsRunning` -| gauge -| gauge -| - -| `ft.bulkhead.executionsWaiting` -| gauge -| gauge -| - -| `ft.bulkhead.runningDuration` -| histogram -| timer -| - -| `ft.bulkhead.waitingDuration` -| histogram -| timer -| - -| `ft.ratelimit.calls.total` -| counter -| counter -| * - -| `ft.timer.scheduled` -| gauge -| gauge -| * -|=== - -{empty}* This is a {smallrye-fault-tolerance} feature, not specified by {microprofile-fault-tolerance}. +In addition to the MicroProfile Metrics and OpenTelemetry support (as specified by {microprofile-fault-tolerance}), {smallrye-fault-tolerance} also provides support for https://micrometer.io/[Micrometer]. +The set of metrics emitted to Micrometer is the same, using the same metric names and tags. Note that distribution summaries in Micrometer, including timers, do not emit quantiles by default. Micrometer recommends that libraries should not configure them out of the box, so if you need them, you should use a `MeterFilter`. diff --git a/doc/modules/ROOT/pages/reference/programmatic-api.adoc b/doc/modules/ROOT/pages/reference/programmatic-api.adoc index f06752e3f..5a08caf75 100644 --- a/doc/modules/ROOT/pages/reference/programmatic-api.adoc +++ b/doc/modules/ROOT/pages/reference/programmatic-api.adoc @@ -350,7 +350,7 @@ private static final FaultTolerance guarded = FaultTolerance.cre <1> A description of `hello` is set, it will be used as a value of the `method` tag in all metrics. It is possible to create multiple `FaultTolerance` objects with the same description. -In this case, it won't be possbile to distinguish the different `FaultTolerance` objects in metrics; their values will be aggregated. +In this case, it won't be possible to distinguish the different `FaultTolerance` objects in metrics; their values will be aggregated. If no description is provided, a random UUID is used. diff --git a/doc/modules/ROOT/pages/reference/rate-limit.adoc b/doc/modules/ROOT/pages/reference/rate-limit.adoc index 21975bf52..e90e6a8c9 100644 --- a/doc/modules/ROOT/pages/reference/rate-limit.adoc +++ b/doc/modules/ROOT/pages/reference/rate-limit.adoc @@ -133,7 +133,10 @@ Rate limit exposes the following metrics: [cols="1,5"] |=== | Name | `ft.ratelimit.calls.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the rate limit logic was run. This is usually once per method call, but may be zero times if the circuit breaker prevented execution or more than once if the method call was retried. | Tags diff --git a/doc/modules/ROOT/pages/reference/retry.adoc b/doc/modules/ROOT/pages/reference/retry.adoc index aa93ca256..830abcade 100644 --- a/doc/modules/ROOT/pages/reference/retry.adoc +++ b/doc/modules/ROOT/pages/reference/retry.adoc @@ -100,7 +100,10 @@ Retry exposes the following metrics: [cols="1,5"] |=== | Name | `ft.retry.calls.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the retry logic was run. This will always be once per method call. | Tags @@ -112,7 +115,10 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.retry.retries.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the method was retried | Tags diff --git a/doc/modules/ROOT/pages/reference/timeout.adoc b/doc/modules/ROOT/pages/reference/timeout.adoc index 2d884e115..5d9f7d389 100644 --- a/doc/modules/ROOT/pages/reference/timeout.adoc +++ b/doc/modules/ROOT/pages/reference/timeout.adoc @@ -49,7 +49,10 @@ Timeout exposes the following metrics: [cols="1,5"] |=== | Name | `ft.timeout.calls.total` -| Type | `Counter` +| Type +a| * MP Metrics: `Counter` +* OpenTelemetry: `LongCounter` +* Micrometer: `Counter` | Unit | None | Description | The number of times the timeout logic was run. This is usually once per method call, but may be zero times if the circuit breaker or rate limit prevents execution or more than once if the method is retried. | Tags @@ -60,8 +63,14 @@ a| * `method` - the fully qualified method name [cols="1,5"] |=== | Name | `ft.timeout.executionDuration` -| Type | `Histogram` -| Unit | Nanoseconds +| Type +a| * MP Metrics: `Histogram` +* OpenTelemetry: `DoubleHistogram` with explicit bucket boundaries `[0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10]` +* Micrometer: `Timer` +| Unit +a| * MP Metrics: nanoseconds +* OpenTelemetry: seconds +* Micrometer: nanoseconds | Description | Histogram of execution times for the method | Tags a| * `method` - the fully qualified method name diff --git a/implementation/core/pom.xml b/implementation/core/pom.xml index 1c7f74c2b..017c49794 100644 --- a/implementation/core/pom.xml +++ b/implementation/core/pom.xml @@ -55,6 +55,11 @@ microprofile-metrics-api provided + + io.opentelemetry + opentelemetry-api + provided + io.micrometer micrometer-core diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/OpenTelemetryRecorder.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/OpenTelemetryRecorder.java new file mode 100644 index 000000000..0e09ad84b --- /dev/null +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/metrics/OpenTelemetryRecorder.java @@ -0,0 +1,328 @@ +package io.smallrye.faulttolerance.core.metrics; + +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.BULKHEAD_CALLS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.BULKHEAD_EXECUTIONS_RUNNING; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.BULKHEAD_EXECUTIONS_WAITING; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.BULKHEAD_RUNNING_DURATION; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.BULKHEAD_WAITING_DURATION; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.CIRCUIT_BREAKER_CALLS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.CIRCUIT_BREAKER_OPENED_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.CIRCUIT_BREAKER_STATE_CURRENT; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.CIRCUIT_BREAKER_STATE_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.INVOCATIONS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.RATE_LIMIT_CALLS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.RETRY_CALLS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.RETRY_RETRIES_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.TIMEOUT_CALLS_TOTAL; +import static io.smallrye.faulttolerance.core.metrics.MetricsConstants.TIMEOUT_EXECUTION_DURATION; + +import java.util.List; +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.Meter; +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreakerEvents; + +public class OpenTelemetryRecorder implements MetricsRecorder { + private static final List BUCKET_BOUNDARIES = List.of( + 0.005, 0.01, + 0.025, 0.05, 0.075, 0.1, + 0.25, 0.5, 0.75, 1.0, + 2.5, 5.0, 7.5, 10.0); + private static final double NANOS_TO_SECONDS = 1.0 / 1_000_000_000.0; + + private static final AttributeKey METHOD = AttributeKey.stringKey("method"); + + private static final AttributeKey RESULT = AttributeKey.stringKey("result"); + private static final String RESULT_VALUE_RETURNED = "valueReturned"; + private static final String RESULT_EXCEPTION_THROWN = "exceptionThrown"; + + private static final AttributeKey FALLBACK = AttributeKey.stringKey("fallback"); + private static final String FALLBACK_APPLIED = "applied"; + private static final String FALLBACK_NOT_APPLIED = "notApplied"; + private static final String FALLBACK_NOT_DEFINED = "notDefined"; + + private static final AttributeKey RETRIED = AttributeKey.stringKey("retried"); + private static final String RETRIED_TRUE = "true"; + private static final String RETRIED_FALSE = "false"; + private static final AttributeKey RETRY_RESULT = AttributeKey.stringKey("retryResult"); + private static final String RETRY_RESULT_VALUE_RETURNED = "valueReturned"; + private static final String RETRY_RESULT_EXCEPTION_NOT_RETRYABLE = "exceptionNotRetryable"; + private static final String RETRY_RESULT_MAX_RETRIES_REACHED = "maxRetriesReached"; + private static final String RETRY_RESULT_MAX_DURATION_REACHED = "maxDurationReached"; + + private static final AttributeKey TIMED_OUT = AttributeKey.stringKey("timedOut"); + private static final String TIMED_OUT_TRUE = "true"; + private static final String TIMED_OUT_FALSE = "false"; + + private static final AttributeKey CIRCUIT_BREAKER_RESULT = AttributeKey.stringKey("circuitBreakerResult"); + private static final String CIRCUIT_BREAKER_RESULT_SUCCESS = "success"; + private static final String CIRCUIT_BREAKER_RESULT_FAILURE = "failure"; + private static final String CIRCUIT_BREAKER_RESULT_CB_OPEN = "circuitBreakerOpen"; + + private static final AttributeKey CIRCUIT_BREAKER_STATE = AttributeKey.stringKey("state"); + private static final String CIRCUIT_BREAKER_STATE_CLOSED = "closed"; + private static final String CIRCUIT_BREAKER_STATE_OPEN = "open"; + private static final String CIRCUIT_BREAKER_STATE_HALF_OPEN = "halfOpen"; + + private static final AttributeKey BULKHEAD_RESULT = AttributeKey.stringKey("bulkheadResult"); + private static final String BULKHEAD_RESULT_ACCEPTED = "accepted"; + private static final String BULKHEAD_RESULT_REJECTED = "rejected"; + + private static final AttributeKey RATE_LIMIT_RESULT = AttributeKey.stringKey("rateLimitResult"); + private static final String RATE_LIMIT_RESULT_PERMITTED = "permitted"; + private static final String RATE_LIMIT_RESULT_REJECTED = "rejected"; + + private final Meter meter; + private final String methodName; + + private final LongCounter invocationsTotal; + private final LongCounter retryCallsTotal; + private final LongCounter retryRetriesTotal; + private final LongCounter timeoutCallsTotal; + private final DoubleHistogram timeoutExecutionDuration; + private final LongCounter circuitBreakerCallsTotal; + private final LongCounter circuitBreakerOpenedTotal; + private final LongCounter bulkheadCallsTotal; + private final DoubleHistogram bulkheadRunningDuration; + private final DoubleHistogram bulkheadWaitingDuration; + private final LongCounter rateLimitCallsTotal; + + public OpenTelemetryRecorder(Meter meter, MeteredOperation operation) { + this.meter = meter; + this.methodName = operation.name(); + + // make sure all applicable metrics for given method are registered eagerly + // we only touch sync metrics, because async metrics are registered eagerly elsewhere + + this.invocationsTotal = meter.counterBuilder(INVOCATIONS_TOTAL).build(); + + if (operation.hasRetry()) { + this.retryCallsTotal = meter.counterBuilder(RETRY_CALLS_TOTAL).build(); + this.retryRetriesTotal = meter.counterBuilder(RETRY_RETRIES_TOTAL).build(); + } else { + this.retryCallsTotal = null; + this.retryRetriesTotal = null; + } + + if (operation.hasTimeout()) { + this.timeoutCallsTotal = meter.counterBuilder(TIMEOUT_CALLS_TOTAL).build(); + this.timeoutExecutionDuration = meter.histogramBuilder(TIMEOUT_EXECUTION_DURATION) + .setUnit("seconds") + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build(); + } else { + this.timeoutCallsTotal = null; + this.timeoutExecutionDuration = null; + } + + if (operation.hasCircuitBreaker()) { + this.circuitBreakerCallsTotal = meter.counterBuilder(CIRCUIT_BREAKER_CALLS_TOTAL).build(); + this.circuitBreakerOpenedTotal = meter.counterBuilder(CIRCUIT_BREAKER_OPENED_TOTAL).build(); + } else { + this.circuitBreakerCallsTotal = null; + this.circuitBreakerOpenedTotal = null; + } + + if (operation.hasBulkhead()) { + this.bulkheadCallsTotal = meter.counterBuilder(BULKHEAD_CALLS_TOTAL).build(); + this.bulkheadRunningDuration = meter.histogramBuilder(BULKHEAD_RUNNING_DURATION) + .setUnit("seconds") + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build(); + if (operation.isAsynchronous()) { + this.bulkheadWaitingDuration = meter.histogramBuilder(BULKHEAD_WAITING_DURATION) + .setUnit("seconds") + .setExplicitBucketBoundariesAdvice(BUCKET_BOUNDARIES) + .build(); + } else { + this.bulkheadWaitingDuration = null; + } + } else { + this.bulkheadCallsTotal = null; + this.bulkheadRunningDuration = null; + this.bulkheadWaitingDuration = null; + } + + if (operation.hasRateLimit()) { + this.rateLimitCallsTotal = meter.counterBuilder(RATE_LIMIT_CALLS_TOTAL).build(); + } else { + this.rateLimitCallsTotal = null; + } + } + + private void registerAsyncUpDownCounter(LongSupplier supplier, String name, Attributes attributes) { + meter.upDownCounterBuilder(name).buildWithCallback(m -> m.record(supplier.getAsLong(), attributes)); + } + + private void registerAsyncUpDownCounter(BooleanSupplier supplier, String name, Attributes attributes) { + meter.upDownCounterBuilder(name).buildWithCallback(m -> m.record(supplier.getAsBoolean() ? 1 : 0, attributes)); + } + + private void registerAsyncCounter(LongSupplier supplier, String name, String unit, Attributes attributes) { + meter.counterBuilder(name).setUnit(unit).buildWithCallback(m -> m.record(supplier.getAsLong(), attributes)); + } + + // --- + + @Override + public void executionFinished(boolean succeeded, boolean fallbackDefined, boolean fallbackApplied) { + String fallback = fallbackDefined + ? (fallbackApplied ? FALLBACK_APPLIED : FALLBACK_NOT_APPLIED) + : FALLBACK_NOT_DEFINED; + + invocationsTotal.add(1, Attributes.of( + METHOD, methodName, + RESULT, succeeded ? RESULT_VALUE_RETURNED : RESULT_EXCEPTION_THROWN, + FALLBACK, fallback)); + } + + @Override + public void retryAttempted() { + retryRetriesTotal.add(1, Attributes.of(METHOD, methodName)); + } + + @Override + public void retryValueReturned(boolean retried) { + retryCallsTotal.add(1, Attributes.of( + METHOD, methodName, + RETRIED, retried ? RETRIED_TRUE : RETRIED_FALSE, + RETRY_RESULT, RETRY_RESULT_VALUE_RETURNED)); + } + + @Override + public void retryExceptionNotRetryable(boolean retried) { + retryCallsTotal.add(1, Attributes.of( + METHOD, methodName, + RETRIED, retried ? RETRIED_TRUE : RETRIED_FALSE, + RETRY_RESULT, RETRY_RESULT_EXCEPTION_NOT_RETRYABLE)); + } + + @Override + public void retryMaxRetriesReached(boolean retried) { + retryCallsTotal.add(1, Attributes.of( + METHOD, methodName, + RETRIED, retried ? RETRIED_TRUE : RETRIED_FALSE, + RETRY_RESULT, RETRY_RESULT_MAX_RETRIES_REACHED)); + } + + @Override + public void retryMaxDurationReached(boolean retried) { + retryCallsTotal.add(1, Attributes.of( + METHOD, methodName, + RETRIED, retried ? RETRIED_TRUE : RETRIED_FALSE, + RETRY_RESULT, RETRY_RESULT_MAX_DURATION_REACHED)); + } + + @Override + public void timeoutFinished(boolean timedOut, long time) { + timeoutCallsTotal.add(1, Attributes.of( + METHOD, methodName, + TIMED_OUT, timedOut ? TIMED_OUT_TRUE : TIMED_OUT_FALSE)); + timeoutExecutionDuration.record(time * NANOS_TO_SECONDS, Attributes.of(METHOD, methodName)); + } + + @Override + public void circuitBreakerFinished(CircuitBreakerEvents.Result result) { + String circuitBreakerResult = null; + switch (result) { + case SUCCESS: + circuitBreakerResult = CIRCUIT_BREAKER_RESULT_SUCCESS; + break; + case FAILURE: + circuitBreakerResult = CIRCUIT_BREAKER_RESULT_FAILURE; + break; + case PREVENTED: + circuitBreakerResult = CIRCUIT_BREAKER_RESULT_CB_OPEN; + break; + } + circuitBreakerCallsTotal.add(1, Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_RESULT, circuitBreakerResult)); + } + + @Override + public void circuitBreakerMovedToOpen() { + circuitBreakerOpenedTotal.add(1, Attributes.of(METHOD, methodName)); + } + + @Override + public void registerCircuitBreakerIsClosed(BooleanSupplier supplier) { + registerAsyncUpDownCounter(supplier, CIRCUIT_BREAKER_STATE_CURRENT, Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_CLOSED)); + } + + @Override + public void registerCircuitBreakerIsOpen(BooleanSupplier supplier) { + registerAsyncUpDownCounter(supplier, CIRCUIT_BREAKER_STATE_CURRENT, Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_OPEN)); + } + + @Override + public void registerCircuitBreakerIsHalfOpen(BooleanSupplier supplier) { + registerAsyncUpDownCounter(supplier, CIRCUIT_BREAKER_STATE_CURRENT, Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_HALF_OPEN)); + } + + @Override + public void registerCircuitBreakerTimeSpentInClosed(LongSupplier supplier) { + registerAsyncCounter(supplier, CIRCUIT_BREAKER_STATE_TOTAL, "nanoseconds", Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_CLOSED)); + } + + @Override + public void registerCircuitBreakerTimeSpentInOpen(LongSupplier supplier) { + registerAsyncCounter(supplier, CIRCUIT_BREAKER_STATE_TOTAL, "nanoseconds", Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_OPEN)); + } + + @Override + public void registerCircuitBreakerTimeSpentInHalfOpen(LongSupplier supplier) { + registerAsyncCounter(supplier, CIRCUIT_BREAKER_STATE_TOTAL, "nanoseconds", Attributes.of( + METHOD, methodName, + CIRCUIT_BREAKER_STATE, CIRCUIT_BREAKER_STATE_HALF_OPEN)); + } + + @Override + public void bulkheadDecisionMade(boolean accepted) { + bulkheadCallsTotal.add(1, Attributes.of( + METHOD, methodName, + BULKHEAD_RESULT, accepted ? BULKHEAD_RESULT_ACCEPTED : BULKHEAD_RESULT_REJECTED)); + } + + @Override + public void registerBulkheadExecutionsRunning(LongSupplier supplier) { + registerAsyncUpDownCounter(supplier, BULKHEAD_EXECUTIONS_RUNNING, Attributes.of(METHOD, methodName)); + } + + @Override + public void registerBulkheadExecutionsWaiting(LongSupplier supplier) { + registerAsyncUpDownCounter(supplier, BULKHEAD_EXECUTIONS_WAITING, Attributes.of(METHOD, methodName)); + } + + @Override + public void updateBulkheadRunningDuration(long time) { + bulkheadRunningDuration.record(time * NANOS_TO_SECONDS, Attributes.of(METHOD, methodName)); + } + + @Override + public void updateBulkheadWaitingDuration(long time) { + bulkheadWaitingDuration.record(time * NANOS_TO_SECONDS, Attributes.of(METHOD, methodName)); + } + + @Override + public void rateLimitDecisionMade(boolean permitted) { + rateLimitCallsTotal.add(1, Attributes.of( + METHOD, methodName, + RATE_LIMIT_RESULT, permitted ? RATE_LIMIT_RESULT_PERMITTED : RATE_LIMIT_RESULT_REJECTED)); + } +} diff --git a/implementation/fault-tolerance/pom.xml b/implementation/fault-tolerance/pom.xml index e2e7c1f9b..0362f7de4 100644 --- a/implementation/fault-tolerance/pom.xml +++ b/implementation/fault-tolerance/pom.xml @@ -75,6 +75,11 @@ micrometer-core true + + io.opentelemetry + opentelemetry-api + true + io.opentracing opentracing-api diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceExtension.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceExtension.java index 31efb40d3..809fb7a9f 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceExtension.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/FaultToleranceExtension.java @@ -25,6 +25,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,10 +71,12 @@ import io.smallrye.faulttolerance.config.FaultToleranceMethods; import io.smallrye.faulttolerance.config.FaultToleranceOperation; import io.smallrye.faulttolerance.internal.StrategyCache; +import io.smallrye.faulttolerance.metrics.CompoundMetricsProvider; import io.smallrye.faulttolerance.metrics.MetricsIntegration; import io.smallrye.faulttolerance.metrics.MicroProfileMetricsProvider; import io.smallrye.faulttolerance.metrics.MicrometerProvider; import io.smallrye.faulttolerance.metrics.NoopProvider; +import io.smallrye.faulttolerance.metrics.OpenTelemetryProvider; public class FaultToleranceExtension implements Extension { @@ -88,14 +91,46 @@ public class FaultToleranceExtension implements Extension { private final ConcurrentMap faultToleranceOperations = new ConcurrentHashMap<>(); private final ConcurrentMap> existingCircuitBreakerNames = new ConcurrentHashMap<>(); - private final MetricsIntegration metricsIntegration; + + private final Set metricsIntegrations; + + private static boolean isPresent(String className) { + try { + Class.forName(className); + return true; + } catch (ClassNotFoundException ignored) { + // not present + return false; + } + } + + private static Set allPresentMetrics() { + Set result = EnumSet.noneOf(MetricsIntegration.class); + if (isPresent("org.eclipse.microprofile.metrics.MetricRegistry")) { + result.add(MetricsIntegration.MICROPROFILE_METRICS); + } + if (isPresent("io.opentelemetry.api.metrics.Meter")) { + result.add(MetricsIntegration.OPENTELEMETRY); + } + if (isPresent("io.micrometer.core.instrument.MeterRegistry")) { + result.add(MetricsIntegration.MICROMETER); + } + if (result.isEmpty()) { + result.add(MetricsIntegration.NOOP); + } + return result; + } public FaultToleranceExtension() { - this(MetricsIntegration.MICROPROFILE_METRICS); + this(allPresentMetrics()); } public FaultToleranceExtension(MetricsIntegration metricsIntegration) { - this.metricsIntegration = metricsIntegration; + this(EnumSet.of(metricsIntegration)); + } + + public FaultToleranceExtension(Set metricsIntegrations) { + this.metricsIntegrations = EnumSet.copyOf(metricsIntegrations); } void registerInterceptorBindings(@Observes BeforeBeanDiscovery bbd, BeanManager bm) { @@ -129,18 +164,6 @@ void registerInterceptorBindings(@Observes BeforeBeanDiscovery bbd, BeanManager DefaultFaultToleranceOperationProvider.class.getName()); bbd.addAnnotatedType(bm.createAnnotatedType(DefaultExistingCircuitBreakerNames.class), DefaultExistingCircuitBreakerNames.class.getName()); - switch (metricsIntegration) { - case MICROPROFILE_METRICS: - bbd.addAnnotatedType(bm.createAnnotatedType(MicroProfileMetricsProvider.class), - MicroProfileMetricsProvider.class.getName()); - break; - case MICROMETER: - bbd.addAnnotatedType(bm.createAnnotatedType(MicrometerProvider.class), MicrometerProvider.class.getName()); - break; - case NOOP: - bbd.addAnnotatedType(bm.createAnnotatedType(NoopProvider.class), NoopProvider.class.getName()); - break; - } bbd.addAnnotatedType(bm.createAnnotatedType(StrategyCache.class), StrategyCache.class.getName()); bbd.addAnnotatedType(bm.createAnnotatedType(CircuitBreakerMaintenanceImpl.class), CircuitBreakerMaintenanceImpl.class.getName()); @@ -151,6 +174,31 @@ void registerInterceptorBindings(@Observes BeforeBeanDiscovery bbd, BeanManager CdiFaultToleranceSpi.EagerDependencies.class.getName()); bbd.addAnnotatedType(bm.createAnnotatedType(CdiFaultToleranceSpi.LazyDependencies.class), CdiFaultToleranceSpi.LazyDependencies.class.getName()); + + if (metricsIntegrations.size() > 1) { + bbd.addAnnotatedType(bm.createAnnotatedType(CompoundMetricsProvider.class), + CompoundMetricsProvider.class.getName()); + } + for (MetricsIntegration metricsIntegration : metricsIntegrations) { + switch (metricsIntegration) { + case MICROPROFILE_METRICS: + bbd.addAnnotatedType(bm.createAnnotatedType(MicroProfileMetricsProvider.class), + MicroProfileMetricsProvider.class.getName()); + break; + case OPENTELEMETRY: + bbd.addAnnotatedType(bm.createAnnotatedType(OpenTelemetryProvider.class), + OpenTelemetryProvider.class.getName()); + break; + case MICROMETER: + bbd.addAnnotatedType(bm.createAnnotatedType(MicrometerProvider.class), + MicrometerProvider.class.getName()); + break; + case NOOP: + bbd.addAnnotatedType(bm.createAnnotatedType(NoopProvider.class), + NoopProvider.class.getName()); + break; + } + } } void changeInterceptorPriority(@Observes ProcessAnnotatedType event) { diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/CompoundMetricsProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/CompoundMetricsProvider.java new file mode 100644 index 000000000..19691dd67 --- /dev/null +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/CompoundMetricsProvider.java @@ -0,0 +1,225 @@ +package io.smallrye.faulttolerance.metrics; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BooleanSupplier; +import java.util.function.LongSupplier; + +import jakarta.annotation.Priority; +import jakarta.enterprise.inject.Alternative; +import jakarta.enterprise.inject.spi.CDI; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreakerEvents; +import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsProvider; +import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; + +@Singleton +@Alternative +@Priority(1) +public class CompoundMetricsProvider implements MetricsProvider { + @Inject + @ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true") + boolean metricsEnabled; + + private final Map cache = new ConcurrentHashMap<>(); + + private final MetricsProvider[] providers; + + CompoundMetricsProvider() { + CDI cdi = CDI.current(); + List> allProviders = List.of(MicroProfileMetricsProvider.class, + OpenTelemetryProvider.class, MicrometerProvider.class); + + List providers = new ArrayList<>(); + for (Class clazz : allProviders) { + try { + providers.add(cdi.select(clazz).get()); + } catch (Exception ignored) { + // either the bean does not exist, or some of its dependencies are not injectable + } + } + this.providers = providers.toArray(new MetricsProvider[0]); + } + + @Override + public boolean isEnabled() { + return metricsEnabled; + } + + @Override + public MetricsRecorder create(MeteredOperation operation) { + if (metricsEnabled) { + return cache.computeIfAbsent(operation.cacheKey(), ignored -> { + MetricsRecorder[] recorders = new MetricsRecorder[providers.length]; + for (int i = 0; i < providers.length; i++) { + recorders[i] = providers[i].create(operation); + } + return new CompoundMetricsRecorder(recorders); + }); + } else { + return MetricsRecorder.NOOP; + } + } + + private static class CompoundMetricsRecorder implements MetricsRecorder { + private final MetricsRecorder[] recorders; + + private CompoundMetricsRecorder(MetricsRecorder... recorders) { + this.recorders = recorders; + } + + @Override + public void executionFinished(boolean succeeded, boolean fallbackDefined, boolean fallbackApplied) { + for (MetricsRecorder recorder : recorders) { + recorder.executionFinished(succeeded, fallbackDefined, fallbackApplied); + } + } + + @Override + public void retryAttempted() { + for (MetricsRecorder recorder : recorders) { + recorder.retryAttempted(); + } + } + + @Override + public void retryValueReturned(boolean retried) { + for (MetricsRecorder recorder : recorders) { + recorder.retryValueReturned(retried); + } + } + + @Override + public void retryExceptionNotRetryable(boolean retried) { + for (MetricsRecorder recorder : recorders) { + recorder.retryExceptionNotRetryable(retried); + } + } + + @Override + public void retryMaxRetriesReached(boolean retried) { + for (MetricsRecorder recorder : recorders) { + recorder.retryMaxRetriesReached(retried); + } + } + + @Override + public void retryMaxDurationReached(boolean retried) { + for (MetricsRecorder recorder : recorders) { + recorder.retryMaxDurationReached(retried); + } + } + + @Override + public void timeoutFinished(boolean timedOut, long time) { + for (MetricsRecorder recorder : recorders) { + recorder.timeoutFinished(timedOut, time); + } + } + + @Override + public void circuitBreakerFinished(CircuitBreakerEvents.Result result) { + for (MetricsRecorder recorder : recorders) { + recorder.circuitBreakerFinished(result); + } + } + + @Override + public void circuitBreakerMovedToOpen() { + for (MetricsRecorder recorder : recorders) { + recorder.circuitBreakerMovedToOpen(); + } + } + + @Override + public void registerCircuitBreakerIsClosed(BooleanSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerIsClosed(supplier); + } + } + + @Override + public void registerCircuitBreakerIsOpen(BooleanSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerIsOpen(supplier); + } + } + + @Override + public void registerCircuitBreakerIsHalfOpen(BooleanSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerIsHalfOpen(supplier); + } + } + + @Override + public void registerCircuitBreakerTimeSpentInClosed(LongSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerTimeSpentInClosed(supplier); + } + } + + @Override + public void registerCircuitBreakerTimeSpentInOpen(LongSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerTimeSpentInOpen(supplier); + } + } + + @Override + public void registerCircuitBreakerTimeSpentInHalfOpen(LongSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerCircuitBreakerTimeSpentInHalfOpen(supplier); + } + } + + @Override + public void bulkheadDecisionMade(boolean accepted) { + for (MetricsRecorder recorder : recorders) { + recorder.bulkheadDecisionMade(accepted); + } + } + + @Override + public void registerBulkheadExecutionsRunning(LongSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerBulkheadExecutionsRunning(supplier); + } + } + + @Override + public void registerBulkheadExecutionsWaiting(LongSupplier supplier) { + for (MetricsRecorder recorder : recorders) { + recorder.registerBulkheadExecutionsWaiting(supplier); + } + } + + @Override + public void updateBulkheadRunningDuration(long time) { + for (MetricsRecorder recorder : recorders) { + recorder.updateBulkheadRunningDuration(time); + } + } + + @Override + public void updateBulkheadWaitingDuration(long time) { + for (MetricsRecorder recorder : recorders) { + recorder.updateBulkheadWaitingDuration(time); + } + } + + @Override + public void rateLimitDecisionMade(boolean permitted) { + for (MetricsRecorder recorder : recorders) { + recorder.rateLimitDecisionMade(permitted); + } + } + } +} diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MetricsIntegration.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MetricsIntegration.java index 1ac490c99..96578bbad 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MetricsIntegration.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MetricsIntegration.java @@ -1,11 +1,14 @@ package io.smallrye.faulttolerance.metrics; public enum MetricsIntegration { - /** * Metrics integration using {@link MicroProfileMetricsProvider}. */ MICROPROFILE_METRICS, + /** + * Metrics integration using {@link OpenTelemetryProvider}. + */ + OPENTELEMETRY, /** * Metrics integration using {@link MicrometerProvider}. */ diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java index 450e20e55..720faeb1a 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicroProfileMetricsProvider.java @@ -39,6 +39,10 @@ public class MicroProfileMetricsProvider implements MetricsProvider { @PostConstruct void init() { + if (!metricsEnabled) { + return; + } + Metadata metadata = Metadata.builder() .withName(MetricsConstants.TIMER_SCHEDULED) .withUnit(MetricUnits.NONE) diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java index 4371f10f4..45695d762 100644 --- a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/MicrometerProvider.java @@ -36,6 +36,10 @@ public class MicrometerProvider implements MetricsProvider { @PostConstruct void init() { + if (!metricsEnabled) { + return; + } + Timer timer = executorHolder.getTimer(); registry.gauge(MetricsConstants.TIMER_SCHEDULED, Collections.singletonList(Tag.of("id", "" + timer.getId())), timer, Timer::countScheduledTasks); diff --git a/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/OpenTelemetryProvider.java b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/OpenTelemetryProvider.java new file mode 100644 index 000000000..cb163834a --- /dev/null +++ b/implementation/fault-tolerance/src/main/java/io/smallrye/faulttolerance/metrics/OpenTelemetryProvider.java @@ -0,0 +1,63 @@ +package io.smallrye.faulttolerance.metrics; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import jakarta.annotation.PostConstruct; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.smallrye.faulttolerance.ExecutorHolder; +import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; +import io.smallrye.faulttolerance.core.metrics.MetricsProvider; +import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; +import io.smallrye.faulttolerance.core.metrics.OpenTelemetryRecorder; +import io.smallrye.faulttolerance.core.timer.Timer; + +@Singleton +public class OpenTelemetryProvider implements MetricsProvider { + @Inject + Meter meter; + + @Inject + @ConfigProperty(name = "MP_Fault_Tolerance_Metrics_Enabled", defaultValue = "true") + boolean metricsEnabled; + + @Inject + ExecutorHolder executorHolder; + + private final Map cache = new ConcurrentHashMap<>(); + + @PostConstruct + void init() { + if (!metricsEnabled) { + return; + } + + Timer timer = executorHolder.getTimer(); + Attributes attributes = Attributes.of(AttributeKey.stringKey("id"), "" + timer.getId()); + meter.upDownCounterBuilder(MetricsConstants.TIMER_SCHEDULED) + .buildWithCallback(m -> m.record(timer.countScheduledTasks(), attributes)); + } + + @Override + public boolean isEnabled() { + return metricsEnabled; + } + + @Override + public MetricsRecorder create(MeteredOperation operation) { + if (metricsEnabled) { + return cache.computeIfAbsent(operation.cacheKey(), + ignored -> new OpenTelemetryRecorder(meter, operation)); + } else { + return MetricsRecorder.NOOP; + } + } +} diff --git a/implementation/standalone/pom.xml b/implementation/standalone/pom.xml index a1a2438cb..991b68c27 100644 --- a/implementation/standalone/pom.xml +++ b/implementation/standalone/pom.xml @@ -32,6 +32,11 @@ smallrye-fault-tolerance-core + + io.opentelemetry + opentelemetry-api + provided + io.micrometer micrometer-core diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java index 21c854894..d54a2a7f0 100644 --- a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/LazyDependencies.java @@ -56,6 +56,8 @@ public MetricsProvider metricsProvider() { metricsProvider = ((NoopAdapter) metricsAdapter).createMetricsProvider(); } else if (metricsAdapter instanceof MicrometerAdapter) { metricsProvider = ((MicrometerAdapter) metricsAdapter).createMetricsProvider(timer); + } else if (metricsAdapter instanceof OpenTelemetryAdapter) { + metricsProvider = ((OpenTelemetryAdapter) metricsAdapter).createMetricsProvider(timer); } else { throw new IllegalStateException("Invalid metrics adapter: " + metricsAdapter); } diff --git a/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/OpenTelemetryAdapter.java b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/OpenTelemetryAdapter.java new file mode 100644 index 000000000..7c715e336 --- /dev/null +++ b/implementation/standalone/src/main/java/io/smallrye/faulttolerance/standalone/OpenTelemetryAdapter.java @@ -0,0 +1,43 @@ +package io.smallrye.faulttolerance.standalone; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; +import io.smallrye.faulttolerance.core.metrics.MeteredOperation; +import io.smallrye.faulttolerance.core.metrics.MetricsConstants; +import io.smallrye.faulttolerance.core.metrics.MetricsProvider; +import io.smallrye.faulttolerance.core.metrics.MetricsRecorder; +import io.smallrye.faulttolerance.core.metrics.OpenTelemetryRecorder; +import io.smallrye.faulttolerance.core.timer.Timer; + +public final class OpenTelemetryAdapter implements MetricsAdapter { + private final Meter meter; + + public OpenTelemetryAdapter(Meter meter) { + this.meter = meter; + } + + MetricsProvider createMetricsProvider(Timer timer) { + Attributes attributes = Attributes.of(AttributeKey.stringKey("id"), "" + timer.getId()); + meter.upDownCounterBuilder(MetricsConstants.TIMER_SCHEDULED) + .buildWithCallback(m -> m.record(timer.countScheduledTasks(), attributes)); + + return new MetricsProvider() { + private final Map cache = new ConcurrentHashMap<>(); + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public MetricsRecorder create(MeteredOperation operation) { + return cache.computeIfAbsent(operation.cacheKey(), + ignored -> new OpenTelemetryRecorder(meter, operation)); + } + }; + } +} diff --git a/pom.xml b/pom.xml index d02e30697..d0fe189e0 100644 --- a/pom.xml +++ b/pom.xml @@ -47,16 +47,19 @@ 1.9.20 1.8.1 1.12.5 - 4.0.2 + 4.1.1 3.1 4.0.1 1.3 + 1.39.0 3.8.2 4.0.0 2.1.0 + + 2.8.1 2.4.0 0.33.0 4.5.8 @@ -174,6 +177,11 @@ mutiny ${version.mutiny} + + io.opentelemetry + opentelemetry-api + ${version.opentelemetry} + io.opentracing opentracing-api @@ -234,6 +242,12 @@ ${version.smallrye-config} test + + io.smallrye + smallrye-context-propagation-cdi + ${version.smallrye-context-propagation} + test + io.smallrye smallrye-metrics @@ -241,15 +255,21 @@ test - io.smallrye - smallrye-context-propagation - ${version.smallrye-context-propagation} + io.smallrye.opentelemetry + smallrye-opentelemetry-cdi + ${version.smallrye-opentelemetry} test - io.smallrye - smallrye-context-propagation-cdi - ${version.smallrye-context-propagation} + io.smallrye.opentelemetry + smallrye-opentelemetry-config + ${version.smallrye-opentelemetry} + test + + + io.smallrye.opentelemetry + smallrye-opentelemetry-exporters + ${version.smallrye-opentelemetry} test diff --git a/testsuite/integration/pom.xml b/testsuite/integration/pom.xml index 692aff36e..d4e639628 100644 --- a/testsuite/integration/pom.xml +++ b/testsuite/integration/pom.xml @@ -42,10 +42,6 @@ io.smallrye smallrye-metrics - - io.smallrye - smallrye-context-propagation - io.smallrye smallrye-context-propagation-cdi diff --git a/testsuite/tck/pom.xml b/testsuite/tck/pom.xml index d78e1f588..b3bf8b058 100644 --- a/testsuite/tck/pom.xml +++ b/testsuite/tck/pom.xml @@ -40,6 +40,18 @@ io.smallrye.config smallrye-config + + io.smallrye.opentelemetry + smallrye-opentelemetry-cdi + + + io.smallrye.opentelemetry + smallrye-opentelemetry-config + + + io.smallrye.opentelemetry + smallrye-opentelemetry-exporters + io.smallrye smallrye-metrics diff --git a/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/FaultToleranceExtension.java b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/FaultToleranceExtension.java index 22807f83c..a859ae0f3 100644 --- a/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/FaultToleranceExtension.java +++ b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/FaultToleranceExtension.java @@ -31,6 +31,7 @@ public void register(ExtensionBuilder builder) { builder.service(ApplicationArchiveProcessor.class, RetryTckOnMac.class); builder.service(DeploymentExceptionTransformer.class, TckDeploymentExceptionTransformer.class); builder.observer(CleanupMetricRegistries.class); + builder.observer(ForceOtelBean.class); } } diff --git a/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/ForceOtelBean.java b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/ForceOtelBean.java new file mode 100644 index 000000000..4d1062133 --- /dev/null +++ b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/ForceOtelBean.java @@ -0,0 +1,18 @@ +package io.smallrye.faulttolerance.tck; + +import jakarta.enterprise.inject.spi.CDI; + +import org.jboss.arquillian.core.api.annotation.Observes; +import org.jboss.arquillian.test.spi.event.suite.Before; + +import io.opentelemetry.api.OpenTelemetry; + +public class ForceOtelBean { + public void beforeEachTest(@Observes Before event) { + if (event.getTestClass().getName().startsWith("org.eclipse.microprofile.fault.tolerance.tck.telemetryMetrics.")) { + // TODO make sure the `OpenTelemetry` bean is instantiated eagerly, which in turn triggers + // initialization of the MP FT TCK supporting infrastructure, which is required in the tests + CDI.current().select(OpenTelemetry.class).get(); + } + } +} diff --git a/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/OtelConfigExtension.java b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/OtelConfigExtension.java new file mode 100644 index 000000000..28428925c --- /dev/null +++ b/testsuite/tck/src/test/java/io/smallrye/faulttolerance/tck/OtelConfigExtension.java @@ -0,0 +1,13 @@ +package io.smallrye.faulttolerance.tck; + +import jakarta.enterprise.event.Observes; +import jakarta.enterprise.inject.spi.BeforeBeanDiscovery; +import jakarta.enterprise.inject.spi.Extension; + +import io.smallrye.opentelemetry.implementation.config.OpenTelemetryConfigProducer; + +public class OtelConfigExtension implements Extension { + public void beforeBeanDiscovery(@Observes BeforeBeanDiscovery bbd) { + bbd.addAnnotatedType(OpenTelemetryConfigProducer.class, OpenTelemetryConfigProducer.class.getName()); + } +} diff --git a/testsuite/tck/src/test/resources/META-INF/services/jakarta.enterprise.inject.spi.Extension b/testsuite/tck/src/test/resources/META-INF/services/jakarta.enterprise.inject.spi.Extension new file mode 100644 index 000000000..44992f582 --- /dev/null +++ b/testsuite/tck/src/test/resources/META-INF/services/jakarta.enterprise.inject.spi.Extension @@ -0,0 +1 @@ +io.smallrye.faulttolerance.tck.OtelConfigExtension From c0671fdabe3354a342c3f63a1a9a19b6e4bd2f5b Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Wed, 2 Oct 2024 16:48:25 +0200 Subject: [PATCH 2/2] Fix typos in the @AsynchronousNonBlocking javadoc --- .../smallrye/faulttolerance/api/AsynchronousNonBlocking.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java b/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java index bef0a9a83..e610015b8 100644 --- a/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java +++ b/api/src/main/java/io/smallrye/faulttolerance/api/AsynchronousNonBlocking.java @@ -21,7 +21,7 @@ * When a method marked with this annotation is called, the method call is allowed to proceed on the original thread. * It is assumed that the guarded method will, at some point, perform some non-blocking asynchronous operation(s), * such as non-blocking IO, and that it synchronously returns a {@code CompletionStage}. It is further assumed that - * the completion of the asynchronos non-blocking operation(s) executed by the guarded method is followed by + * the completion of the asynchronous non-blocking operation(s) executed by the guarded method is followed by * completion of the returned {@code CompletionStage}. *

* When the guarded method returns, a {@code CompletionStage} is returned to the caller and can be used to access @@ -34,8 +34,7 @@ *

* If a method marked with this annotation doesn't declare return type of {@code CompletionStage}, * {@link org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException - * FaultToleranceDefinitionException} - * occurs during deployment. + * FaultToleranceDefinitionException} occurs during deployment. *

* If a class is annotated with this annotation, all its methods are treated as if they were marked with this annotation. * If one of the methods doesn't return {@code CompletionStage}, {@code FaultToleranceDefinitionException}