diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java index f49d267f7989..bdf5ab879786 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java @@ -36,6 +36,8 @@ public final class MessagingAttributesExtractor AttributeKey.booleanKey("messaging.destination.anonymous"); private static final AttributeKey MESSAGING_DESTINATION_NAME = AttributeKey.stringKey("messaging.destination.name"); + private static final AttributeKey MESSAGING_DESTINATION_PARTITION_ID = + AttributeKey.stringKey("messaging.destination.partition.id"); private static final AttributeKey MESSAGING_DESTINATION_TEMPLATE = AttributeKey.stringKey("messaging.destination.template"); private static final AttributeKey MESSAGING_DESTINATION_TEMPORARY = @@ -98,6 +100,8 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST internalSet( attributes, MESSAGING_DESTINATION_TEMPLATE, getter.getDestinationTemplate(request)); } + internalSet( + attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request)); boolean isAnonymousDestination = getter.isAnonymousDestination(request); if (isAnonymousDestination) { internalSet(attributes, MESSAGING_DESTINATION_ANONYMOUS, true); diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java index 263bab6cc8e4..524e36b236c1 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java @@ -62,6 +62,11 @@ default Long getMessagePayloadCompressedSize(REQUEST request) { @Nullable Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response); + @Nullable + default String getDestinationPartitionId(REQUEST request) { + return null; + } + /** * Extracts all values of header named {@code name} from the request, or an empty list if there * were none. diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java new file mode 100644 index 000000000000..fec179dc8da8 --- /dev/null +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingConsumerMetrics.java @@ -0,0 +1,112 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.incubator.semconv.messaging; + +import static java.util.logging.Level.FINE; + +import com.google.auto.value.AutoValue; +import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.instrumentation.api.instrumenter.OperationListener; +import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics; +import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * {@link OperationListener} which keeps track of Consumer + * metrics. + */ +public final class MessagingConsumerMetrics implements OperationListener { + private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1); + + // copied from MessagingIncubatingAttributes + private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = + AttributeKey.longKey("messaging.batch.message_count"); + private static final ContextKey MESSAGING_CONSUMER_METRICS_STATE = + ContextKey.named("messaging-consumer-metrics-state"); + private static final Logger logger = Logger.getLogger(MessagingConsumerMetrics.class.getName()); + + private final DoubleHistogram receiveDurationHistogram; + private final LongCounter receiveMessageCount; + + private MessagingConsumerMetrics(Meter meter) { + DoubleHistogramBuilder durationBuilder = + meter + .histogramBuilder("messaging.receive.duration") + .setDescription("Measures the duration of receive operation.") + .setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS) + .setUnit("s"); + MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder); + receiveDurationHistogram = durationBuilder.build(); + + LongCounterBuilder longCounterBuilder = + meter + .counterBuilder("messaging.receive.messages") + .setDescription("Measures the number of received messages.") + .setUnit("{message}"); + MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder); + receiveMessageCount = longCounterBuilder.build(); + } + + public static OperationMetrics get() { + return OperationMetricsUtil.create("messaging consumer", MessagingConsumerMetrics::new); + } + + @Override + @CanIgnoreReturnValue + public Context onStart(Context context, Attributes startAttributes, long startNanos) { + return context.with( + MESSAGING_CONSUMER_METRICS_STATE, + new AutoValue_MessagingConsumerMetrics_State(startAttributes, startNanos)); + } + + @Override + public void onEnd(Context context, Attributes endAttributes, long endNanos) { + MessagingConsumerMetrics.State state = context.get(MESSAGING_CONSUMER_METRICS_STATE); + if (state == null) { + logger.log( + FINE, + "No state present when ending context {0}. Cannot record consumer receive metrics.", + context); + return; + } + + Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build(); + receiveDurationHistogram.record( + (endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context); + + long receiveMessagesCount = getReceiveMessagesCount(state.startAttributes(), endAttributes); + receiveMessageCount.add(receiveMessagesCount, attributes, context); + } + + private static long getReceiveMessagesCount(Attributes... attributesList) { + for (Attributes attributes : attributesList) { + Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT); + if (value != null) { + return value; + } + } + return 1; + } + + @AutoValue + abstract static class State { + + abstract Attributes startAttributes(); + + abstract long startTimeNanos(); + } +} diff --git a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java index 8ada61c1683f..f87c814d2a31 100644 --- a/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java +++ b/instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java @@ -10,7 +10,9 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder; +import io.opentelemetry.api.incubator.metrics.ExtendedLongCounterBuilder; import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.LongCounterBuilder; import io.opentelemetry.semconv.ErrorAttributes; import io.opentelemetry.semconv.ServerAttributes; import java.util.List; @@ -27,23 +29,41 @@ final class MessagingMetricsAdvice { AttributeKey.stringKey("messaging.destination.name"); private static final AttributeKey MESSAGING_OPERATION = AttributeKey.stringKey("messaging.operation"); - private static final AttributeKey MESSAGING_BATCH_MESSAGE_COUNT = - AttributeKey.longKey("messaging.batch.message_count"); + private static final AttributeKey MESSAGING_DESTINATION_PARTITION_ID = + AttributeKey.stringKey("messaging.destination.partition.id"); + private static final AttributeKey MESSAGING_DESTINATION_TEMPLATE = + AttributeKey.stringKey("messaging.destination.template"); + + private static final List> MESSAGING_ATTRIBUTES = + asList( + MESSAGING_SYSTEM, + MESSAGING_DESTINATION_NAME, + MESSAGING_OPERATION, + MESSAGING_DESTINATION_PARTITION_ID, + MESSAGING_DESTINATION_TEMPLATE, + ErrorAttributes.ERROR_TYPE, + ServerAttributes.SERVER_PORT, + ServerAttributes.SERVER_ADDRESS); static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) { if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { return; } - ((ExtendedDoubleHistogramBuilder) builder) - .setAttributesAdvice( - asList( - MESSAGING_SYSTEM, - MESSAGING_DESTINATION_NAME, - MESSAGING_OPERATION, - MESSAGING_BATCH_MESSAGE_COUNT, - ErrorAttributes.ERROR_TYPE, - ServerAttributes.SERVER_PORT, - ServerAttributes.SERVER_ADDRESS)); + ((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); + } + + static void applyReceiveDurationAdvice(DoubleHistogramBuilder builder) { + if (!(builder instanceof ExtendedDoubleHistogramBuilder)) { + return; + } + ((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); + } + + static void applyReceiveMessagesAdvice(LongCounterBuilder builder) { + if (!(builder instanceof ExtendedLongCounterBuilder)) { + return; + } + ((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES); } private MessagingMetricsAdvice() {} diff --git a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java index 491387253ee9..06521a9801e4 100644 --- a/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java +++ b/instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java @@ -50,7 +50,7 @@ void collectsMetrics() { Attributes responseAttributes = Attributes.builder() .put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0") - .put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2) + .put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, "1") .build(); Context parent = @@ -90,6 +90,10 @@ void collectsMetrics() { equalTo( MessagingIncubatingAttributes.MESSAGING_SYSTEM, "pulsar"), + equalTo( + MessagingIncubatingAttributes + .MESSAGING_DESTINATION_PARTITION_ID, + "1"), equalTo( MessagingIncubatingAttributes .MESSAGING_DESTINATION_NAME, diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchMessagingAttributesGetter.java index 5a77b09e17b7..7e934de2b666 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchMessagingAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchMessagingAttributesGetter.java @@ -11,6 +11,7 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; +import org.apache.pulsar.common.naming.TopicName; enum PulsarBatchMessagingAttributesGetter implements MessagingAttributesGetter { @@ -81,6 +82,16 @@ public Long getBatchMessageCount(PulsarBatchRequest request, @Nullable Void unus return (long) request.getMessages().size(); } + @Nullable + @Override + public String getDestinationPartitionId(PulsarBatchRequest request) { + int partitionIndex = TopicName.getPartitionIndex(request.getDestination()); + if (partitionIndex == -1) { + return null; + } + return String.valueOf(partitionIndex); + } + @Override public List getMessageHeader(PulsarBatchRequest request, String name) { return StreamSupport.stream(request.getMessages().spliterator(), false) diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java index 201c01ed3635..ae1c81cf188c 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java @@ -10,6 +10,7 @@ import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.common.naming.TopicName; public final class PulsarBatchRequest extends BasePulsarRequest { private final Messages messages; @@ -30,7 +31,10 @@ private static String getTopicName(Messages messages) { if (topicName == null) { topicName = name; } else if (!topicName.equals(name)) { - return null; + // this is a partitioned topic + // persistent://public/default/test-partition-0 persistent://public/default/test-partition-1 + // return persistent://public/default/test + return TopicName.get(topicName).getPartitionedTopicName(); } } return topicName; diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java index ab7491b6699f..08492480c143 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java @@ -12,6 +12,7 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.naming.TopicName; enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter { INSTANCE; @@ -83,6 +84,16 @@ public Long getBatchMessageCount(PulsarRequest request, @Nullable Void unused) { return null; } + @Nullable + @Override + public String getDestinationPartitionId(PulsarRequest request) { + int partitionIndex = TopicName.getPartitionIndex(request.getDestination()); + if (partitionIndex == -1) { + return null; + } + return String.valueOf(partitionIndex); + } + @Override public List getMessageHeader(PulsarRequest request, String name) { String value = request.getMessage().getProperty(name); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 1ab6aa3f48ac..51d534f8158b 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -13,6 +13,7 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter; +import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingConsumerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics; import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; @@ -76,6 +77,7 @@ private static Instrumenter createConsumerReceiveInstrument MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE)) .addAttributesExtractor( createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE)) + .addOperationMetrics(MessagingConsumerMetrics.get()) .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())); @@ -101,6 +103,7 @@ private static Instrumenter createConsumerBatchReceive .addAttributesExtractor( ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter())) .addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR)) + .addOperationMetrics(MessagingConsumerMetrics.get()) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -189,7 +192,7 @@ private static Context startAndEndConsumerReceive( Timer timer, Consumer consumer, Throwable throwable) { - if (messages == null) { + if (messages == null || messages.size() == 0) { return null; } String brokerUrl = VirtualFieldStore.extract(consumer); diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java index 7883d5082c4f..65ef82d94d9f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java @@ -11,6 +11,7 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; @@ -43,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.naming.TopicName; import org.assertj.core.api.AbstractLongAssert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -72,12 +74,12 @@ abstract class AbstractPulsarClientTest { static Consumer consumer; static Producer producer2; - private static String brokerHost; - private static int brokerPort; + static String brokerHost; + static int brokerPort; private static final AttributeKey MESSAGE_TYPE = AttributeKey.stringKey("messaging.pulsar.message.type"); - private static final double[] DURATION_BUCKETS = + static final double[] DURATION_BUCKETS = new double[] { 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0 }; @@ -171,6 +173,23 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { assertThat(testing.metrics()) .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.duration") + .hasUnit("s") + .hasDescription("Measures the duration of receive operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), metric -> OpenTelemetryAssertions.assertThat(metric) .hasName("messaging.publish.duration") @@ -187,7 +206,25 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { equalTo(MESSAGING_DESTINATION_NAME, topic), equalTo(SERVER_PORT, brokerPort), equalTo(SERVER_ADDRESS, brokerHost)) - .hasBucketBoundaries(DURATION_BUCKETS)))); + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + })); } @Test @@ -251,6 +288,61 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { span.hasName("callback") .hasKind(SpanKind.INTERNAL) .hasParent(trace.getSpan(1)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.duration") + .hasUnit("s") + .hasDescription("Measures the duration of receive operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + })); } static List sendAttributes( @@ -273,6 +365,10 @@ static List sendAttributes( AttributeKey.stringArrayKey("messaging.header.test_message_header"), Collections.singletonList("test"))); } + int partitionIndex = TopicName.getPartitionIndex(destination); + if (partitionIndex != -1) { + assertions.add(equalTo(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(partitionIndex))); + } return assertions; } @@ -307,6 +403,10 @@ static List receiveAttributes( if (isBatch) { assertions.add(satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)); } + int partitionIndex = TopicName.getPartitionIndex(destination); + if (partitionIndex != -1) { + assertions.add(equalTo(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(partitionIndex))); + } return assertions; } @@ -326,6 +426,10 @@ static List processAttributes( AttributeKey.stringArrayKey("messaging.header.test_message_header"), Collections.singletonList("test"))); } + int partitionIndex = TopicName.getPartitionIndex(destination); + if (partitionIndex != -1) { + assertions.add(equalTo(MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(partitionIndex))); + } return assertions; } diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java index 4ffb30469265..2166d552061f 100644 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -7,8 +7,15 @@ import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.concurrent.CompletableFuture; @@ -18,6 +25,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.junit.jupiter.api.Test; @@ -126,6 +134,61 @@ void testConsumeNonPartitionedTopicUsingReceive() throws Exception { .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasAttributesSatisfyingExactly( receiveAttributes(topic, msgId.toString(), false)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.duration") + .hasUnit("s") + .hasDescription("Measures the duration of receive operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + }), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test @@ -185,6 +248,61 @@ void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { span.hasName("callback") .hasKind(SpanKind.INTERNAL) .hasParent(trace.getSpan(0)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.duration") + .hasUnit("s") + .hasDescription("Measures the duration of receive operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + }), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test @@ -231,6 +349,61 @@ void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasAttributesSatisfyingExactly( receiveAttributes(topic, msgId.toString(), false)))); + + assertThat(testing.metrics()) + .satisfiesExactlyInAnyOrder( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.duration") + .hasUnit("s") + .hasDescription("Measures the duration of receive operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS))), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.hasPointsSatisfying( + point -> { + point + .hasValue(1) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)); + }); + }), + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.publish.duration") + .hasUnit("s") + .hasDescription("Measures the duration of publish operation.") + .hasHistogramSatisfying( + histogram -> + histogram.hasPointsSatisfying( + point -> + point + .hasSumGreaterThan(0.0) + .hasAttributesSatisfying( + equalTo(MESSAGING_SYSTEM, "pulsar"), + equalTo(MESSAGING_DESTINATION_NAME, topic), + equalTo(SERVER_PORT, brokerPort), + equalTo(SERVER_ADDRESS, brokerHost)) + .hasBucketBoundaries(DURATION_BUCKETS)))); } @Test @@ -444,4 +617,59 @@ void testConsumeMultiTopics() throws Exception { .hasAttributesSatisfyingExactly( processAttributes(topic2, msgId2.toString(), false)))); } + + @Test + void testConsumePartitionedTopicUsingBatchReceive() throws Exception { + String topic = "persistent://public/default/testConsumePartitionedTopicUsingBatchReceive"; + admin.topics().createPartitionedTopic(topic, 4); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + for (int i = 0; i < 10; i++) { + producer.send(msg); + } + + Messages receivedMsg = consumer.batchReceive(); + consumer.acknowledge(receivedMsg); + + assertThat(testing.metrics()) + .satisfiesOnlyOnce( + metric -> + OpenTelemetryAssertions.assertThat(metric) + .hasName("messaging.receive.messages") + .hasUnit("{message}") + .hasDescription("Measures the number of received messages.") + .hasLongSumSatisfying( + sum -> { + sum.satisfies( + pointData -> { + pointData + .getPoints() + .forEach( + p -> { + assertThat(p.getValue()).isPositive(); + if (p.getValue() == receivedMsg.size()) { + assertThat( + p.getAttributes() + .get(MESSAGING_DESTINATION_NAME)) + .isEqualTo(topic); + assertThat(p.getAttributes().get(MESSAGING_SYSTEM)) + .isEqualTo("pulsar"); + assertThat(p.getAttributes().get(SERVER_PORT)) + .isEqualTo(brokerPort); + assertThat(p.getAttributes().get(SERVER_ADDRESS)) + .isEqualTo(brokerHost); + } + }); + }); + })); + } }