From a46c8a0c7c8695221c0daa5e3792f207504b7664 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 9 Jul 2024 23:50:14 +0800 Subject: [PATCH] Add Pulsar MessagingProducerMetrics (#11591) Co-authored-by: Lauri Tulmin Co-authored-by: Steve Rao --- .../build.gradle.kts | 1 + .../messaging/MessagingMetricsAdvice.java | 50 +++++++ .../messaging/MessagingProducerMetrics.java | 85 ++++++++++++ .../MessagingProducerMetricsTest.java | 122 ++++++++++++++++++ .../v2_8/telemetry/PulsarSingletons.java | 4 +- .../pulsar/v2_8/AbstractPulsarClientTest.java | 25 ++++ 6 files changed, 286 insertions(+), 1 deletion(-) create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java create mode 100644 instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index 6acb75e5f79d..00d887da1b25 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -21,6 +21,7 @@ dependencies { testImplementation(project(":testing-common")) testImplementation("io.opentelemetry:opentelemetry-sdk") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") + testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating") } tasks { diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java new file mode 100644 index 000000000000..8ada61c1683f --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.semconv.ErrorAttributes; +import io.opentelemetry.semconv.ServerAttributes; +import java.util.List; + +final class MessagingMetricsAdvice { + static final List DURATION_SECONDS_BUCKETS = + unmodifiableList( + asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_SYSTEM = + AttributeKey.stringKey("messaging.system"); + private static final AttributeKey MESSAGING_DESTINATION_NAME = + AttributeKey.stringKey("messaging.destination.name"); + private static final AttributeKey MESSAGING_OPERATION = + AttributeKey.stringKey("messaging.operation"); + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); + + static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { + if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { + return; + } + ((ExtendedDoubleHistogramBuilder) builder) + .setAttributesAdvice( + asList( + MESSAGING_SYSTEM, + MESSAGING_DESTINATION_NAME, + MESSAGING_OPERATION, + MESSAGING_BATCH_MESSAGE_COUNT, + ErrorAttributes.ERROR_TYPE, + ServerAttributes.SERVER_PORT, + ServerAttributes.SERVER_ADDRESS)); + } + + private MessagingMetricsAdvice() {} +} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java new file mode 100644 index 000000000000..44d5b243744a --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -0,0 +1,85 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.logging.Level.FINE; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics; +import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * {@link OperationListener} which keeps track of Producer + * metrics. + */ +public final class MessagingProducerMetrics implements OperationListener { + private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + + private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = + ContextKey.named("messaging-producer-metrics-state"); + private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); + + private final DoubleHistogram publishDurationHistogram; + + private MessagingProducerMetrics(Meter meter) { + DoubleHistogramBuilder durationBuilder = + meter + .histogramBuilder("messaging.publish.duration") + .setDescription("Measures the duration of publish operation.") + .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) + .setUnit("s"); + MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); + publishDurationHistogram = durationBuilder.build(); + } + + public static OperationMetrics get() { + return OperationMetricsUtil.create("messaging produce", MessagingProducerMetrics::new); + } + + @Override + @CanIgnoreReturnValue + public Context onStart(Context context, Attributes startAttributes, long startNanos) { + return context.with( + MESSAGING_PRODUCER_METRICS_STATE, + new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); + } + + @Override + public void onEnd(Context context, Attributes endAttributes, long endNanos) { + MessagingProducerMetrics.State state = context.get(MESSAGING_PRODUCER_METRICS_STATE); + if (state == null) { + logger.log( + FINE, + "No state present when ending context {0}. Cannot record produce publish metrics.", + context); + return; + } + + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); + + publishDurationHistogram.record( + (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + } + + @AutoValue + abstract static class State { + + abstract Attributes startAttributes(); + + abstract long startTimeNanos(); + } +} diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java new file mode 100644 index 000000000000..491387253ee9 --- /dev/null +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -0,0 +1,122 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +class MessagingProducerMetricsTest { + + private static final double[] DURATION_BUCKETS = + MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + + @Test + void collectsMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + OperationListener listener = MessagingProducerMetrics.get().create(meterProvider.get("test")); + + Attributes requestAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") + .put( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic") + .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") + .put(ServerAttributes.SERVER_PORT, 6650) + .put(ServerAttributes.SERVER_ADDRESS, "localhost") + .build(); + + Attributes responseAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0") + .put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2) + .build(); + + Context parent = + Context.root() + .with( + Span.wrap( + SpanContext.create( + "ff01020304050600ff0a0b0c0d0e0f00", + "090a0b0c0d0e0f00", + TraceFlags.getSampled(), + TraceState.getDefault()))); + + Context context1 = listener.onStart(parent, requestAttributes, nanos(100)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + Context context2 = listener.onStart(Context.root(), requestAttributes, nanos(150)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + listener.onEnd(context1, responseAttributes, nanos(250)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(0.15 /* seconds */) + .hasAttributesSatisfying( + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + "pulsar"), + equalTo( + MessagingIncubatingAttributes + .MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic"), + equalTo(ServerAttributes.SERVER_PORT, 6650), + equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")) + .hasBucketBoundaries(DURATION_BUCKETS)))); + + listener.onEnd(context2, responseAttributes, nanos(300)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> point.hasSum(0.3 /* seconds */)))); + } + + private static long nanos(int millis) { + return TimeUnit.MILLISECONDS.toNanos(millis); + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 1d05086ef401..1ab6aa3f48ac 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -136,7 +137,8 @@ private static Instrumenter createProducerInstrumenter() { .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addOperationMetrics(MessagingProducerMetrics.get()); if (AgentInstrumentationConfig.get() .getBoolean("otel.instrumentation.pulsar.experimental-span-attributes", false)) { diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index f32daa20e074..7883d5082c4f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -22,6 +22,7 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; @@ -76,6 +77,10 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); + private static final double[] DURATION_BUCKETS = + new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; @BeforeAll static void beforeAll() throws PulsarClientException { @@ -163,6 +168,26 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( batchReceiveAttributes(topic, null, false)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test