Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pulsar Consumer metrics #11891

Merged
merged 35 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b954474
support pulsar messaging.publish.duration semantic
crossoverJie Jun 14, 2024
d855f97
code style
crossoverJie Jun 14, 2024
b5d512b
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
3de71a8
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
6bb2c90
Update instrumentation-api-incubator/src/test/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
6f68ebd
Update instrumentation-api-incubator/src/test/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
9f42598
fix with cr
crossoverJie Jun 17, 2024
8b5e585
Add pulsar metrics test
crossoverJie Jun 18, 2024
53b2dee
Update doc url
crossoverJie Jun 18, 2024
53b4471
pulsar-receive-duration
crossoverJie Jun 18, 2024
df187f4
use array
crossoverJie Jun 18, 2024
e2d5e09
use array
crossoverJie Jun 18, 2024
8651722
Merge branch 'main' into pulsar-receive-duration
crossoverJie Jun 19, 2024
52405fd
pulsar-receive-duration
crossoverJie Jun 23, 2024
8bfc6fd
support messaging.receive.messages
crossoverJie Jun 24, 2024
61d402d
Merge remote-tracking branch 'otel-origin/main' into pulsar-receive-d…
crossoverJie Jul 24, 2024
fde08c4
support messaging.receive.messages
crossoverJie Jul 24, 2024
7991bef
Add a test for Partition Consumer
crossoverJie Jul 25, 2024
3ca21a9
fix with cr
crossoverJie Aug 2, 2024
e889e16
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
2c7b92e
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
9710b0d
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
84c9fbe
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
a0d0cd8
check recordMetadata is null
crossoverJie Aug 2, 2024
6780c4d
revert about kafka
crossoverJie Aug 2, 2024
dfc799c
Merge remote-tracking branch 'origin/pulsar-receive-duration' into pu…
crossoverJie Aug 2, 2024
fe04048
revert about kafka
crossoverJie Aug 2, 2024
c849a3b
revert about kafka
crossoverJie Aug 2, 2024
8573d0e
revert about kafka
crossoverJie Aug 2, 2024
8bb50f5
revert about kafka
crossoverJie Aug 2, 2024
99c8256
revert about kafka
crossoverJie Aug 2, 2024
807d60b
fix with cr
crossoverJie Aug 7, 2024
06e8b8e
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
laurit Aug 9, 2024
76b103d
fix with cr
crossoverJie Aug 11, 2024
cd83eed
fix with cr
crossoverJie Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static java.util.logging.Level.FINE;

import com.google.auto.value.AutoValue;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics;
import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* {@link OperationListener} which keeps track of <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#consumer-metrics">Consumer
* metrics</a>.
*/
public final class MessagingConsumerMetrics implements OperationListener {
private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1);

// copied from MessagingIncubatingAttributes
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");
private static final ContextKey<MessagingConsumerMetrics.State> MESSAGING_CONSUMER_METRICS_STATE =
ContextKey.named("messaging-consumer-metrics-state");
private static final Logger logger = Logger.getLogger(MessagingConsumerMetrics.class.getName());

private final DoubleHistogram receiveDurationHistogram;
private final LongCounter receiveMessageCount;

private MessagingConsumerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
meter
.histogramBuilder("messaging.receive.duration")
.setDescription("Measures the duration of receive operation.")
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
.setUnit("s");
MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder);
receiveDurationHistogram = durationBuilder.build();

LongCounterBuilder longCounterBuilder =
meter
.counterBuilder("messaging.receive.messages")
.setDescription("Measures the number of received messages.")
.setUnit("{message}");
Comment on lines +46 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we go ahead and update to the latest spec to avoid additional churn? https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-metrics.md

cc @open-telemetry/semconv-messaging-approvers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semConvVersion hasn't been upgraded to the latest version yet. Is it okay to only upgrade some semantics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's ok, if we don't have the new constants available yet, you can create static final constants in this class and reference those

MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder);
receiveMessageCount = longCounterBuilder.build();
}

public static OperationMetrics get() {
return OperationMetricsUtil.create("messaging consumer", MessagingConsumerMetrics::new);
}

@Override
@CanIgnoreReturnValue
public Context onStart(Context context, Attributes startAttributes, long startNanos) {
return context.with(
MESSAGING_CONSUMER_METRICS_STATE,
new AutoValue_MessagingConsumerMetrics_State(startAttributes, startNanos));
}

@Override
public void onEnd(Context context, Attributes endAttributes, long endNanos) {
MessagingConsumerMetrics.State state = context.get(MESSAGING_CONSUMER_METRICS_STATE);
if (state == null) {
logger.log(
FINE,
"No state present when ending context {0}. Cannot record consumer receive metrics.",
context);
return;
}

Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();
receiveDurationHistogram.record(
(endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context);

Long receiveMessagesCount = getReceiveMessagesCount(state.startAttributes(), endAttributes);
if (receiveMessagesCount != null) {
receiveMessageCount.add(receiveMessagesCount, attributes, context);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't quite correct. We should record the count of all messages but currently we are recording only the count of batch messages. Perhaps getReceiveMessagesCount should return long instead of Long and when MESSAGING_BATCH_MESSAGE_COUNT attribute is missing then return 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is required for batch receive operations.
https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#metric-messagingreceivemessages

This change will cause the messaging.receive.messages metric to appear in regular consumption as well, which does not align with the description in the spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo you are misinterpreting the spec. It says that this metric is not required when messaging system does not support batch receives because the receive count can be obtained from a different metric. The metric description says Measures the number of received messages. which in my opinion means all messages not only messages that were received as a batch. Another question is since this metric is opt-in should MessagingConsumerMetrics.get() take a boolean argument that could be used to opt-in to this metric when instrumentation supports batch receives? @trask do you have any suggestions for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your addition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's align with the latest semconv which I think answer this question?

}

private static Long getReceiveMessagesCount(Attributes... attributesList) {
for (Attributes attributes : attributesList) {
Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT);
if (value != null) {
return value;
}
}
return null;
}

@AutoValue
abstract static class State {

abstract Attributes startAttributes();

abstract long startTimeNanos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder;
import io.opentelemetry.api.incubator.metrics.ExtendedLongCounterBuilder;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.semconv.ErrorAttributes;
import io.opentelemetry.semconv.ServerAttributes;
import java.util.List;
Expand Down Expand Up @@ -46,5 +48,35 @@ static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) {
ServerAttributes.SERVER_ADDRESS));
}

static void applyReceiveDurationAdvice(DoubleHistogramBuilder builder) {
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
return;
}
((ExtendedDoubleHistogramBuilder) builder)
.setAttributesAdvice(
laurit marked this conversation as resolved.
Show resolved Hide resolved
asList(
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
ErrorAttributes.ERROR_TYPE,
ServerAttributes.SERVER_PORT,
ServerAttributes.SERVER_ADDRESS));
}

static void applyReceiveMessagesAdvice(LongCounterBuilder builder) {
if (!(builder instanceof ExtendedLongCounterBuilder)) {
return;
}
((ExtendedLongCounterBuilder) builder)
.setAttributesAdvice(
asList(
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
ErrorAttributes.ERROR_TYPE,
ServerAttributes.SERVER_PORT,
ServerAttributes.SERVER_ADDRESS));
}

private MessagingMetricsAdvice() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ private static String getTopicName(Messages<?> messages) {
if (topicName == null) {
topicName = name;
} else if (!topicName.equals(name)) {
return null;
// this is a partitioned topic
// persistent://public/default/test-partition-0 persistent://public/default/test-partition-1
// return persistent://public/default/test
return topicName.substring(0, topicName.lastIndexOf("-partition-"));
laurit marked this conversation as resolved.
Show resolved Hide resolved
}
}
return topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingConsumerMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
Expand Down Expand Up @@ -76,6 +77,7 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addOperationMetrics(MessagingConsumerMetrics.get())
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));

Expand All @@ -101,6 +103,7 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
.addOperationMetrics(MessagingConsumerMetrics.get())
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand Down Expand Up @@ -189,7 +192,7 @@ private static Context startAndEndConsumerReceive(
Timer timer,
Consumer<?> consumer,
Throwable throwable) {
if (messages == null) {
if (messages == null || messages.size() == 0) {
return null;
}
String brokerUrl = VirtualFieldStore.extract(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -72,12 +73,12 @@ abstract class AbstractPulsarClientTest {
static Consumer<String> consumer;
static Producer<String> producer2;

private static String brokerHost;
private static int brokerPort;
static String brokerHost;
static int brokerPort;

private static final AttributeKey<String> MESSAGE_TYPE =
AttributeKey.stringKey("messaging.pulsar.message.type");
private static final double[] DURATION_BUCKETS =
static final double[] DURATION_BUCKETS =
new double[] {
0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0
};
Expand Down Expand Up @@ -138,10 +139,12 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));

AtomicInteger batchSize = new AtomicInteger();
testing.runWithSpan(
"receive-parent",
() -> {
Messages<String> receivedMsg = consumer.batchReceive();
batchSize.set(receivedMsg.size());
consumer.acknowledge(receivedMsg);
});
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
Expand Down Expand Up @@ -171,6 +174,23 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {

assertThat(testing.metrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.receive.duration")
.hasUnit("s")
.hasDescription("Measures the duration of receive operation.")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSumGreaterThan(0.0)
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost))
.hasBucketBoundaries(DURATION_BUCKETS))),
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.publish.duration")
Expand All @@ -187,7 +207,25 @@ void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost))
.hasBucketBoundaries(DURATION_BUCKETS))));
.hasBucketBoundaries(DURATION_BUCKETS))),
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.receive.messages")
.hasUnit("{message}")
.hasDescription("Measures the number of received messages.")
.hasLongSumSatisfying(
sum -> {
sum.hasPointsSatisfying(
point -> {
point
.hasValue(batchSize.get())
crossoverJie marked this conversation as resolved.
Show resolved Hide resolved
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost));
});
}));
}

@Test
Expand All @@ -208,6 +246,7 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));

AtomicInteger batchSize = new AtomicInteger();
CompletableFuture<Messages<String>> result =
testing.runWithSpan(
"receive-parent",
Expand All @@ -217,6 +256,7 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
.whenComplete(
(messages, throwable) -> {
if (messages != null) {
batchSize.set(messages.size());
testing.runWithSpan(
"callback", () -> acknowledgeMessages(consumer, messages));
}
Expand Down Expand Up @@ -251,6 +291,61 @@ void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));

assertThat(testing.metrics())
.satisfiesExactlyInAnyOrder(
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.receive.duration")
.hasUnit("s")
.hasDescription("Measures the duration of receive operation.")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSumGreaterThan(0.0)
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost))
.hasBucketBoundaries(DURATION_BUCKETS))),
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.publish.duration")
.hasUnit("s")
.hasDescription("Measures the duration of publish operation.")
.hasHistogramSatisfying(
histogram ->
histogram.hasPointsSatisfying(
point ->
point
.hasSumGreaterThan(0.0)
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost))
.hasBucketBoundaries(DURATION_BUCKETS))),
metric ->
OpenTelemetryAssertions.assertThat(metric)
.hasName("messaging.receive.messages")
.hasUnit("{message}")
.hasDescription("Measures the number of received messages.")
.hasLongSumSatisfying(
sum -> {
sum.hasPointsSatisfying(
point -> {
point
.hasValue(batchSize.get())
.hasAttributesSatisfying(
equalTo(MESSAGING_SYSTEM, "pulsar"),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(SERVER_PORT, brokerPort),
equalTo(SERVER_ADDRESS, brokerHost));
});
}));
}

static List<AttributeAssertion> sendAttributes(
Expand Down
Loading
Loading