From b9544744370fdfbfede7acbab9c9fe1e92af7065 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 14 Jun 2024 15:40:21 +0800 Subject: [PATCH 01/11] support pulsar messaging.publish.duration semantic --- .../build.gradle.kts | 1 + .../messaging/MessagingMetricsAdvice.java | 39 ++++++ .../messaging/MessagingProducerMetrics.java | 79 ++++++++++++ .../MessagingProducerMetricsTest.java | 119 ++++++++++++++++++ .../v2_8/telemetry/PulsarSingletons.java | 4 +- 5 files changed, 241 insertions(+), 1 deletion(-) create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java create mode 100644 instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java create mode 100644 instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index 6acb75e5f79d..f901051e5438 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -12,6 +12,7 @@ group = "io.opentelemetry.instrumentation" dependencies { api("io.opentelemetry.semconv:opentelemetry-semconv") + api("io.opentelemetry.semconv:opentelemetry-semconv-incubating") api(project(":instrumentation-api")) implementation("io.opentelemetry:opentelemetry-api-incubator") diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java new file mode 100644 index 000000000000..01d266d4670b --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -0,0 +1,39 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; + +import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.semconv.ErrorAttributes; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.List; + +public class MessagingMetricsAdvice { + static final List DURATION_SECONDS_BUCKETS = + unmodifiableList( + asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + + static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { + if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { + return; + } + ((ExtendedDoubleHistogramBuilder) builder) + .setAttributesAdvice( + asList( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + MessagingIncubatingAttributes.MESSAGING_OPERATION, + MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, + ErrorAttributes.ERROR_TYPE, + ServerAttributes.SERVER_PORT, + ServerAttributes.SERVER_ADDRESS)); + } + private MessagingMetricsAdvice() {} +} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java new file mode 100644 index 000000000000..8dda57a4203b --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.logging.Level.FINE; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics; +import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class MessagingProducerMetrics implements OperationListener { + private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + + private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = + ContextKey.named("pulsar-producer-metrics-state"); + private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); + + + private final DoubleHistogram publishDurationHistogram; + private MessagingProducerMetrics(Meter meter) { + DoubleHistogramBuilder durationBuilder = + meter + .histogramBuilder("messaging.publish.duration") + .setDescription("Measures the duration of publish operation.") + .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) + .setUnit("s"); + MessagingMetricsAdvice.applyPublishDurationAdvice(durationBuilder); + publishDurationHistogram = durationBuilder.build(); + } + + public static OperationMetrics get() { + return OperationMetricsUtil.create("messaging produce", MessagingProducerMetrics::new); + } + + @Override + @CanIgnoreReturnValue + public Context onStart(Context context, Attributes startAttributes, long startNanos) { + return context.with( + PULSAR_PUBLISH_METRICS_STATE, + new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); + } + + @Override + public void onEnd(Context context, Attributes endAttributes, long endNanos) { + MessagingProducerMetrics.State state = context.get(PULSAR_PUBLISH_METRICS_STATE); + if (state == null) { + logger.log( + FINE, + "No state present when ending context {0}. Cannot record pulsar publish metrics.", + context); + return; + } + + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); + + publishDurationHistogram.record((endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + } + + @AutoValue + abstract static class State { + + abstract Attributes startAttributes(); + + abstract long startTimeNanos(); + } +} diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java new file mode 100644 index 000000000000..ea41e02e0870 --- /dev/null +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -0,0 +1,119 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.semconv.ServerAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +public class MessagingProducerMetricsTest { + + static final double[] DURATION_BUCKETS = + MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + + @Test + void collectsMetrics() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + OperationListener listener = MessagingProducerMetrics.get().create(meterProvider.get("test")); + + Attributes requestAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") + .put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic") + .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") + .put(ServerAttributes.SERVER_PORT, 6650) + .put(ServerAttributes.SERVER_ADDRESS, "localhost") + .build(); + + Attributes responseAttributes = + Attributes.builder() + .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0") + .put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2) + .build(); + + Context parent = + Context.root() + .with( + Span.wrap( + SpanContext.create( + "ff01020304050600ff0a0b0c0d0e0f00", + "090a0b0c0d0e0f00", + TraceFlags.getSampled(), + TraceState.getDefault()))); + + Context context1 = listener.onStart(parent, requestAttributes, nanos(100)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + Context context2 = listener.onStart(Context.root(), requestAttributes, nanos(150)); + + assertThat(metricReader.collectAllMetrics()).isEmpty(); + + listener.onEnd(context1, responseAttributes, nanos(250)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSum(0.15 /* seconds */) + .hasAttributesSatisfying( + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, + "pulsar"), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "persistent://public/default/topic"), + equalTo(ServerAttributes.SERVER_PORT, 6650), + equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) + .hasExemplarsSatisfying( + exemplar -> + exemplar + .hasTraceId("ff01020304050600ff0a0b0c0d0e0f00") + .hasSpanId("090a0b0c0d0e0f00")) + .hasBucketBoundaries(DURATION_BUCKETS)))); + + listener.onEnd(context2, responseAttributes, nanos(300)); + + assertThat(metricReader.collectAllMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> point.hasSum(0.3 /* seconds */)))); + } + + private static long nanos(int millis) { + return TimeUnit.MILLISECONDS.toNanos(millis); + } +} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 498c075f68d6..0b15cf51f025 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -136,7 +137,8 @@ private static Instrumenter createProducerInstrumenter() { .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.PUBLISH)) .addAttributesExtractor( - ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); + ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) + .addOperationMetrics(MessagingProducerMetrics.get()); if (InstrumentationConfig.get() .getBoolean("otel.instrumentation.pulsar.experimental-span-attributes", false)) { From d855f97adfd4228a4c50e39ca04ff428c2d84108 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Fri, 14 Jun 2024 16:28:34 +0800 Subject: [PATCH 02/11] code style --- .../semconv/messaging/MessagingMetricsAdvice.java | 1 + .../semconv/messaging/MessagingProducerMetrics.java | 5 +++-- .../semconv/messaging/MessagingProducerMetricsTest.java | 9 ++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 01d266d4670b..730f82c7ded2 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -35,5 +35,6 @@ static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { ServerAttributes.SERVER_PORT, ServerAttributes.SERVER_ADDRESS)); } + private MessagingMetricsAdvice() {} } diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 8dda57a4203b..7dc4400a9738 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -28,8 +28,8 @@ public class MessagingProducerMetrics implements OperationListener { ContextKey.named("pulsar-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); - private final DoubleHistogram publishDurationHistogram; + private MessagingProducerMetrics(Meter meter) { DoubleHistogramBuilder durationBuilder = meter @@ -66,7 +66,8 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); - publishDurationHistogram.record((endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + publishDurationHistogram.record( + (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); } @AutoValue diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index ea41e02e0870..03d3505f1e8a 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -39,7 +39,8 @@ void collectsMetrics() { Attributes requestAttributes = Attributes.builder() .put(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar") - .put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + .put( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, "persistent://public/default/topic") .put(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish") .put(ServerAttributes.SERVER_PORT, 6650) @@ -86,10 +87,12 @@ void collectsMetrics() { point .hasSum(0.15 /* seconds */) .hasAttributesSatisfying( - equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, + equalTo( + MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"), equalTo( - MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + MessagingIncubatingAttributes + .MESSAGING_DESTINATION_NAME, "persistent://public/default/topic"), equalTo(ServerAttributes.SERVER_PORT, 6650), equalTo(ServerAttributes.SERVER_ADDRESS, "localhost")) From b5d512bdff50029ed4792a32d6bec4cd4fb4fc56 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:33:59 +0800 Subject: [PATCH 03/11] Update instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java Co-authored-by: Lauri Tulmin --- .../api/incubator/semconv/messaging/MessagingMetricsAdvice.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 730f82c7ded2..6c0ef92be758 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -15,7 +15,7 @@ import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import java.util.List; -public class MessagingMetricsAdvice { +final class MessagingMetricsAdvice { static final List DURATION_SECONDS_BUCKETS = unmodifiableList( asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); From 3de71a852f062fde81234fe2908cc3b6bff06148 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:34:45 +0800 Subject: [PATCH 04/11] Update instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java Co-authored-by: Lauri Tulmin --- .../incubator/semconv/messaging/MessagingProducerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 7dc4400a9738..4a11c53b804b 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; -public class MessagingProducerMetrics implements OperationListener { +public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = From 6bb2c90abcf705a57c4fdc0d9ab7838f6ed2319a Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:35:56 +0800 Subject: [PATCH 05/11] Update instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java Co-authored-by: Steve Rao --- .../semconv/messaging/MessagingProducerMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index 03d3505f1e8a..8b1ed642add4 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -25,7 +25,7 @@ public class MessagingProducerMetricsTest { - static final double[] DURATION_BUCKETS = + private static final double[] DURATION_BUCKETS = MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); @Test From 6f68ebd42dd63fed0a686a0edca900ba6e06b28b Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Mon, 17 Jun 2024 20:36:41 +0800 Subject: [PATCH 06/11] Update instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java Co-authored-by: Steve Rao --- .../semconv/messaging/MessagingProducerMetricsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index 8b1ed642add4..491387253ee9 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -public class MessagingProducerMetricsTest { +class MessagingProducerMetricsTest { private static final double[] DURATION_BUCKETS = MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); From 9f42598dc8d8b682d6ea7fcd7b5c263e3714d795 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 00:01:10 +0800 Subject: [PATCH 07/11] fix with cr --- .../build.gradle.kts | 2 +- .../messaging/MessagingMetricsAdvice.java | 20 ++++++++++++++----- .../messaging/MessagingProducerMetrics.java | 13 ++++++++---- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/instrumentation-api-incubator/build.gradle.kts b/instrumentation-api-incubator/build.gradle.kts index f901051e5438..00d887da1b25 100644 --- a/instrumentation-api-incubator/build.gradle.kts +++ b/instrumentation-api-incubator/build.gradle.kts @@ -12,7 +12,6 @@ group = "io.opentelemetry.instrumentation" dependencies { api("io.opentelemetry.semconv:opentelemetry-semconv") - api("io.opentelemetry.semconv:opentelemetry-semconv-incubating") api(project(":instrumentation-api")) implementation("io.opentelemetry:opentelemetry-api-incubator") @@ -22,6 +21,7 @@ dependencies { testImplementation(project(":testing-common")) testImplementation("io.opentelemetry:opentelemetry-sdk") testImplementation("io.opentelemetry:opentelemetry-sdk-testing") + testImplementation("io.opentelemetry.semconv:opentelemetry-semconv-incubating") } tasks { diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 6c0ef92be758..8ada61c1683f 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -8,11 +8,11 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; import io.opentelemetry.semconv.ErrorAttributes; import io.opentelemetry.semconv.ServerAttributes; -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import java.util.List; final class MessagingMetricsAdvice { @@ -20,6 +20,16 @@ final class MessagingMetricsAdvice { unmodifiableList( asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_SYSTEM = + AttributeKey.stringKey("messaging.system"); + private static final AttributeKey MESSAGING_DESTINATION_NAME = + AttributeKey.stringKey("messaging.destination.name"); + private static final AttributeKey MESSAGING_OPERATION = + AttributeKey.stringKey("messaging.operation"); + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); + static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { return; @@ -27,10 +37,10 @@ static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { ((ExtendedDoubleHistogramBuilder) builder) .setAttributesAdvice( asList( - MessagingIncubatingAttributes.MESSAGING_SYSTEM, - MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, - MessagingIncubatingAttributes.MESSAGING_OPERATION, - MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, + MESSAGING_SYSTEM, + MESSAGING_DESTINATION_NAME, + MESSAGING_OPERATION, + MESSAGING_BATCH_MESSAGE_COUNT, ErrorAttributes.ERROR_TYPE, ServerAttributes.SERVER_PORT, ServerAttributes.SERVER_ADDRESS)); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 4a11c53b804b..0800436f96d3 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -21,11 +21,16 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Logger; +/** + * {@link OperationListener} which keeps track of Producer + * metrics. + */ public final class MessagingProducerMetrics implements OperationListener { private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); - private static final ContextKey PULSAR_PUBLISH_METRICS_STATE = - ContextKey.named("pulsar-producer-metrics-state"); + private static final ContextKey MESSAGING_PRODUCER_METRICS_STATE = + ContextKey.named("messaging-producer-metrics-state"); private static final Logger logger = Logger.getLogger(MessagingProducerMetrics.class.getName()); private final DoubleHistogram publishDurationHistogram; @@ -49,13 +54,13 @@ public static OperationMetrics get() { @CanIgnoreReturnValue public Context onStart(Context context, Attributes startAttributes, long startNanos) { return context.with( - PULSAR_PUBLISH_METRICS_STATE, + MESSAGING_PRODUCER_METRICS_STATE, new AutoValue_MessagingProducerMetrics_State(startAttributes, startNanos)); } @Override public void onEnd(Context context, Attributes endAttributes, long endNanos) { - MessagingProducerMetrics.State state = context.get(PULSAR_PUBLISH_METRICS_STATE); + MessagingProducerMetrics.State state = context.get(MESSAGING_PRODUCER_METRICS_STATE); if (state == null) { logger.log( FINE, From 8b5e585843ef97356b32df97520f2c64d2e5031c Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 15:14:19 +0800 Subject: [PATCH 08/11] Add pulsar metrics test --- .../messaging/MessagingProducerMetrics.java | 2 +- .../pulsar/v2_8/AbstractPulsarClientTest.java | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 0800436f96d3..8a2824793bb8 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -64,7 +64,7 @@ public void onEnd(Context context, Attributes endAttributes, long endNanos) { if (state == null) { logger.log( FINE, - "No state present when ending context {0}. Cannot record pulsar publish metrics.", + "No state present when ending context {0}. Cannot record produce publish metrics.", context); return; } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index f32daa20e074..bebbffd773c2 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -15,6 +15,8 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; @@ -22,6 +24,7 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import java.time.Duration; @@ -77,6 +80,12 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); + private static final List DURATION_SECONDS_BUCKETS = + unmodifiableList( + asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); + private static final double[] DURATION_BUCKETS = + DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + @BeforeAll static void beforeAll() throws PulsarClientException { pulsar = @@ -163,6 +172,26 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( batchReceiveAttributes(topic, null, false)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test From 53b2dee0427c49617f701f271ac258bf8d3b651f Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 18:17:40 +0800 Subject: [PATCH 09/11] Update doc url --- .../incubator/semconv/messaging/MessagingProducerMetrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java index 8a2824793bb8..44d5b243744a 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetrics.java @@ -23,7 +23,7 @@ /** * {@link OperationListener} which keeps track of Producer + * href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#metric-messagingpublishduration">Producer * metrics. */ public final class MessagingProducerMetrics implements OperationListener { From df187f432dfc7ade4ab5f5d51102ca8edec5f849 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 23:18:10 +0800 Subject: [PATCH 10/11] use array --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index bebbffd773c2..18deed26be82 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -15,8 +15,6 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; -import static java.util.Arrays.asList; -import static java.util.Collections.unmodifiableList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; @@ -79,12 +77,9 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - - private static final List DURATION_SECONDS_BUCKETS = - unmodifiableList( - asList(0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0)); - private static final double[] DURATION_BUCKETS = - DURATION_SECONDS_BUCKETS.stream().mapToDouble(d -> d).toArray(); + private static final double[] DURATION_BUCKETS = new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; @BeforeAll static void beforeAll() throws PulsarClientException { From e2d5e0982cc41aa5aac82d89f24d0b8a7b7e5521 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 18 Jun 2024 23:33:31 +0800 Subject: [PATCH 11/11] use array --- .../pulsar/v2_8/AbstractPulsarClientTest.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 18deed26be82..7883d5082c4f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -77,9 +77,10 @@ abstract class AbstractPulsarClientTest { private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - private static final double[] DURATION_BUCKETS = new double[] { - 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 - }; + private static final double[] DURATION_BUCKETS = + new double[] { + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 + }; @BeforeAll static void beforeAll() throws PulsarClientException {