From cfaba19f0c5a77e8d13bf2e5d1b108d69beca76d Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Tue, 1 Nov 2022 15:24:56 +0800 Subject: [PATCH 1/8] Implement consumer part of rocketmq new client instrumentation --- .../rocketmqclient/v5_0/MessageMapGetter.java | 25 +++ .../{MapSetter.java => MessageMapSetter.java} | 2 +- .../v5_0/RocketMqConsumerInstrumentation.java | 129 ++++++++++++++++ ...etMqConsumerProcessAttributeExtractor.java | 40 +++++ ...ocketMqConsumerProcessAttributeGetter.java | 93 ++++++++++++ ...ocketMqConsumerReceiveAttributeGetter.java | 83 ++++++++++ .../v5_0/RocketMqInstrumentationModule.java | 3 +- .../v5_0/RocketMqInstrumenterFactory.java | 62 +++++++- ...ocketMqMessageListenerInstrumentation.java | 94 ++++++++++++ ...qPublishingMessageImplInstrumentation.java | 2 +- .../v5_0/RocketMqSingletons.java | 30 +++- .../rocketmqclient/v5_0/Timer.java | 31 ++++ .../v5_0/VirtualFieldStore.java | 21 +++ .../v5_0/AbstractRocketMqClientTest.java | 143 +++++++++++++----- 14 files changed, 716 insertions(+), 42 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{MapSetter.java => MessageMapSetter.java} (92%) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java new file mode 100644 index 000000000000..b199f203744a --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.propagation.TextMapGetter; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum MessageMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(MessageView carrier) { + return carrier.getProperties().keySet(); + } + + @Nullable + @Override + public String get(@Nullable MessageView carrier, String key) { + return carrier == null ? null : carrier.getProperties().get(key); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java similarity index 92% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java index 1eeaefd08558..76997033873f 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java @@ -11,7 +11,7 @@ import javax.annotation.Nullable; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -enum MapSetter implements TextMapSetter { +enum MessageMapSetter implements TextMapSetter { INSTANCE; @Override diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java new file mode 100644 index 000000000000..8e02e7315a31 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java @@ -0,0 +1,129 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.List; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; +import org.apache.rocketmq.client.java.message.MessageViewImpl; +import org.apache.rocketmq.client.java.route.MessageQueueImpl; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; + +final class RocketMqConsumerInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(named("receiveMessage")) + .and(takesArguments(3)) + .and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest"))) + .and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl"))) + .and(takesArgument(2, named("java.time.Duration"))), + RocketMqConsumerInstrumentation.class.getName() + "$ReceiveMessageAdvice"); + } + + @SuppressWarnings("unused") + public static class ReceiveMessageAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Timer onStart() { + return Timer.start(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) ReceiveMessageRequest request, + @Advice.Argument(1) MessageQueueImpl messageQueue, + @Advice.Enter Timer timer, + @Advice.Return ListenableFuture future) { + String consumerGroup = request.getGroup().getName(); + SpanFinishingCallback spanFinishingCallback = + new SpanFinishingCallback(consumerGroup, messageQueue, timer); + Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor()); + } + } + + public static class SpanFinishingCallback implements FutureCallback { + + private final String consumerGroup; + private final MessageQueueImpl messageQueue; + private final Timer timer; + + public SpanFinishingCallback(String consumerGroup, MessageQueueImpl messageQueue, Timer timer) { + this.consumerGroup = consumerGroup; + this.messageQueue = messageQueue; + this.timer = timer; + } + + @Override + public void onSuccess(ReceiveMessageResult receiveMessageResult) { + List messageViews = receiveMessageResult.getMessageViewImpls(); + // Don't create spans when no messages were received. + if (messageViews.isEmpty()) { + return; + } + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup); + } + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, messageQueue)) { + Context context = + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + messageQueue, + null, + null, + timer.startTime(), + timer.now()); + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setContextByMessage(messageView, context); + } + } + } + + @Override + public void onFailure(Throwable throwable) { + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, messageQueue)) { + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + messageQueue, + null, + throwable, + timer.startTime(), + timer.now()); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java new file mode 100644 index 000000000000..df2dfec7ce93 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.ArrayList; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerProcessAttributeExtractor + implements AttributesExtractor { + INSTANCE; + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + MessageView messageView, + @Nullable ConsumeResult consumeResult, + @Nullable Throwable error) {} + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, MessageView messageView) { + messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys())); + attributes.put( + MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView)); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java new file mode 100644 index 000000000000..e742a9ee6818 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerProcessAttributeGetter + implements MessagingAttributesGetter { + INSTANCE; + + @Nullable + @Override + public String system(MessageView messageView) { + return "rocketmq"; + } + + @Nullable + @Override + public String destinationKind(MessageView messageView) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Nullable + @Override + public String destination(MessageView messageView) { + return messageView.getTopic(); + } + + @Override + public boolean temporaryDestination(MessageView messageView) { + return false; + } + + @Nullable + @Override + public String protocol(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String protocolVersion(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String url(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String conversationId(MessageView messageView) { + return null; + } + + @Nullable + @Override + public Long messagePayloadSize(MessageView messageView) { + return (long) messageView.getBody().remaining(); + } + + @Nullable + @Override + public Long messagePayloadCompressedSize(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String messageId(MessageView messageView, @Nullable ConsumeResult unused) { + return messageView.getMessageId().toString(); + } + + @Override + public List header(MessageView messageView, String name) { + String value = messageView.getProperties().get(name); + if (value != null) { + return Collections.singletonList(value); + } + return Collections.emptyList(); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java new file mode 100644 index 000000000000..ce6a3e17f28a --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.java.route.MessageQueueImpl; + +enum RocketMqConsumerReceiveAttributeGetter + implements MessagingAttributesGetter> { + INSTANCE; + + @Nullable + @Override + public String system(MessageQueueImpl messageQueue) { + return "rocketmq"; + } + + @Nullable + @Override + public String destinationKind(MessageQueueImpl messageQueue) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Nullable + @Override + public String destination(MessageQueueImpl messageQueue) { + return messageQueue.getTopic(); + } + + @Override + public boolean temporaryDestination(MessageQueueImpl messageQueue) { + return false; + } + + @Nullable + @Override + public String protocol(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public String protocolVersion(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public String url(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public String conversationId(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public Long messagePayloadSize(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public Long messagePayloadCompressedSize(MessageQueueImpl messageQueue) { + return null; + } + + @Nullable + @Override + public String messageId(MessageQueueImpl messageQueue, @Nullable List unused) { + return null; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java index 9c04ced7cad6..89c93b3822b2 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java @@ -21,6 +21,7 @@ public RocketMqInstrumentationModule() { @Override public List typeInstrumentations() { return asList( - new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation()); + new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation(), + new RocketMqConsumerInstrumentation(), new RocketMqMessageListenerInstrumentation()); } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index 5d6fd7efdb7e..debf6f61c07c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -10,13 +10,19 @@ import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import java.util.List; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; +import org.apache.rocketmq.client.java.route.MessageQueueImpl; final class RocketMqInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-5.0"; @@ -25,7 +31,6 @@ private RocketMqInstrumenterFactory() {} public static Instrumenter createProducerInstrumenter( OpenTelemetry openTelemetry, List capturedHeaders) { - RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE; MessageOperation operation = MessageOperation.SEND; @@ -41,12 +46,63 @@ public static Instrumenter createProduce .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE) .setSpanStatusExtractor( (spanStatusBuilder, message, sendReceipt, error) -> { - if (null != error) { + if (error != null) { + spanStatusBuilder.setStatus(StatusCode.ERROR); + } + }); + return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE); + } + + public static Instrumenter> createConsumerReceiveInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders) { + RocketMqConsumerReceiveAttributeGetter getter = RocketMqConsumerReceiveAttributeGetter.INSTANCE; + MessageOperation operation = MessageOperation.RECEIVE; + + MessagingAttributesExtractor> attributesExtractor = + buildMessagingAttributesExtractor(getter, operation, capturedHeaders); + + InstrumenterBuilder> instrumenterBuilder = + Instrumenter.>builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(attributesExtractor) + .setSpanStatusExtractor( + (spanStatusBuilder, messageView, unused, error) -> { + if (error != null) { spanStatusBuilder.setStatus(StatusCode.ERROR); } }); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + public static Instrumenter createConsumerProcessInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders) { + RocketMqConsumerProcessAttributeGetter getter = RocketMqConsumerProcessAttributeGetter.INSTANCE; + MessageOperation operation = MessageOperation.PROCESS; + + MessagingAttributesExtractor attributesExtractor = + buildMessagingAttributesExtractor(getter, operation, capturedHeaders); + + SpanLinksExtractor spanLinksExtractor = + new PropagatorBasedSpanLinksExtractor<>( + openTelemetry.getPropagators().getTextMapPropagator(), MessageMapGetter.INSTANCE); - return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE); + InstrumenterBuilder instrumenterBuilder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(RocketMqConsumerProcessAttributeExtractor.INSTANCE) + .setSpanStatusExtractor( + (spanStatusBuilder, messageView, consumeResult, error) -> { + if (error != null || ConsumeResult.FAILURE.equals(consumeResult)) { + spanStatusBuilder.setStatus(StatusCode.ERROR); + } + }) + .addSpanLinksExtractor(spanLinksExtractor); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } private static MessagingAttributesExtractor buildMessagingAttributesExtractor( diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java new file mode 100644 index 000000000000..27f46e46aff9 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java @@ -0,0 +1,94 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.message.MessageView; + +/** Only for {@link org.apache.rocketmq.client.apis.consumer.PushConsumer}. */ +final class RocketMqMessageListenerInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + isPublic() + .and(takesArguments(5)) + .and(takesArgument(0, named("org.apache.rocketmq.client.java.misc.ClientId"))) + .and( + takesArgument( + 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener"))) + .and(takesArgument(2, ThreadPoolExecutor.class)) + .and( + takesArgument( + 3, named("org.apache.rocketmq.client.java.hook.MessageInterceptor"))) + .and(takesArgument(4, ScheduledExecutorService.class))), + RocketMqMessageListenerInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { + // Replace messageListener by wrapper. + messageListener = new MessageListenerWrapper(messageListener); + } + } + + public static class MessageListenerWrapper implements MessageListener { + private final MessageListener delegator; + + public MessageListenerWrapper(MessageListener delegator) { + this.delegator = delegator; + } + + @Override + public ConsumeResult consume(MessageView messageView) { + Context parentContext = VirtualFieldStore.getContextByMessage(messageView); + if (parentContext == null) { + parentContext = Context.current(); + } + Instrumenter processInstrumenter = + RocketMqSingletons.consumerProcessInstrumenter(); + if (!processInstrumenter.shouldStart(parentContext, messageView)) { + return delegator.consume(messageView); + } + Context context = processInstrumenter.start(parentContext, messageView); + try (Scope ignored = context.makeCurrent()) { + ConsumeResult consumeResult = delegator.consume(messageView); + processInstrumenter.end(context, messageView, consumeResult, null); + return consumeResult; + } catch (Throwable t) { + processInstrumenter.end(context, messageView, null, t); + throw t; + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java index 7590366164ed..10abc8e1c5a7 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java @@ -66,7 +66,7 @@ public static void onExit(@Advice.This PublishingMessageImpl message) { @SuppressWarnings("unused") public static class GetPropertiesAdvice { - /** Update the message properties to propagate context recorded by {@link MapSetter}. */ + /** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */ @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( @Advice.This MessageImpl messageImpl, diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java index 01b6e6c8a7cc..704952d9635c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java @@ -6,24 +6,50 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import java.util.List; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; +import org.apache.rocketmq.client.java.route.MessageQueueImpl; public final class RocketMqSingletons { private static final Instrumenter PRODUCER_INSTRUMENTER; + private static final Instrumenter> + CONSUMER_RECEIVE_INSTRUMENTER; + private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; static { + OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); + List messagingHeaders = ExperimentalConfig.get().getMessagingHeaders(); + PRODUCER_INSTRUMENTER = - RocketMqInstrumenterFactory.createProducerInstrumenter( - GlobalOpenTelemetry.get(), ExperimentalConfig.get().getMessagingHeaders()); + RocketMqInstrumenterFactory.createProducerInstrumenter(openTelemetry, messagingHeaders); + + CONSUMER_RECEIVE_INSTRUMENTER = + RocketMqInstrumenterFactory.createConsumerReceiveInstrumenter( + openTelemetry, messagingHeaders); + + CONSUMER_PROCESS_INSTRUMENTER = + RocketMqInstrumenterFactory.createConsumerProcessInstrumenter( + openTelemetry, messagingHeaders); } public static Instrumenter producerInstrumenter() { return PRODUCER_INSTRUMENTER; } + public static Instrumenter> consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter consumerProcessInstrumenter() { + return CONSUMER_PROCESS_INSTRUMENTER; + } + private RocketMqSingletons() {} } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java new file mode 100644 index 000000000000..d9846b6b0572 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import java.time.Instant; + +public class Timer { + public static Timer start() { + return new Timer(Instant.now(), System.nanoTime()); + } + + private final Instant startTime; + private final long startNanoTime; + + private Timer(Instant startTime, long startNanoTime) { + this.startTime = startTime; + this.startNanoTime = startNanoTime; + } + + public Instant startTime() { + return startTime; + } + + public Instant now() { + long durationNanos = System.nanoTime() - startNanoTime; + return startTime().plusNanos(durationNanos); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java index cf84acd3fc7e..9f59aef86dd4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java @@ -8,13 +8,18 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; import java.util.Map; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; public class VirtualFieldStore { private static final VirtualField messageContextField = VirtualField.find(PublishingMessageImpl.class, Context.class); + private static final VirtualField messageViewContextField = + VirtualField.find(MessageView.class, Context.class); private static final VirtualField> messageExtraPropertiesField = VirtualField.find(PublishingMessageImpl.class, Map.class); + private static final VirtualField messageConsumerGroupField = + VirtualField.find(MessageView.class, String.class); private VirtualFieldStore() {} @@ -22,10 +27,18 @@ public static Context getContextByMessage(PublishingMessageImpl message) { return messageContextField.get(message); } + public static Context getContextByMessage(MessageView messageView) { + return messageViewContextField.get(messageView); + } + public static void setContextByMessage(PublishingMessageImpl message, Context context) { messageContextField.set(message, context); } + public static void setContextByMessage(MessageView message, Context context) { + messageViewContextField.set(message, context); + } + public static Map getExtraPropertiesByMessage(PublishingMessageImpl message) { return messageExtraPropertiesField.get(message); } @@ -34,4 +47,12 @@ public static void setExtraPropertiesByMessage( PublishingMessageImpl message, Map extraProperties) { messageExtraPropertiesField.set(message, extraProperties); } + + public static String getConsumerGroupByMessage(MessageView messageView) { + return messageConsumerGroupField.get(messageView); + } + + public static void setConsumerGroupByMessage(MessageView messageView, String consumerGroup) { + messageConsumerGroupField.set(messageView, consumerGroup); + } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 0999a8e823e0..bb84a845aaec 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -10,24 +10,38 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.FixedHostPortGenericContainer; import org.testcontainers.containers.GenericContainer; @@ -38,57 +52,79 @@ public abstract class AbstractRocketMqClientTest { // TODO(aaron-ai): replace it by the official image. private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + private static GenericContainer container; + private static String endpoints; // We still need this container type to do fixed-port-mapping. - @SuppressWarnings({"deprecation", "rawtypes", "resource"}) - @Test - public void testSendMessage() throws ClientException { + @SuppressWarnings({"resource", "deprecation", "rawtypes"}) + @BeforeAll + static void setUp() { int proxyPort = PortUtils.findOpenPorts(4); int brokerPort = proxyPort + 1; int brokerHaPort = proxyPort + 2; int namesrvPort = proxyPort + 3; - try (GenericContainer container = + endpoints = "127.0.0.1:" + proxyPort; + container = new FixedHostPortGenericContainer(IMAGE_NAME) .withFixedExposedPort(proxyPort, proxyPort) .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) - .withExposedPorts(proxyPort)) { - // Start the container. - container.start(); - String endpoints = "127.0.0.1:" + proxyPort; - ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); - // Inner topic of the container. - String topic = "normal-topic-0"; - ClientServiceProvider provider = ClientServiceProvider.loadService(); - Producer producer = + .withExposedPorts(proxyPort); + // Start the container. + container.start(); + } + + @AfterAll + static void tearDown() { + container.close(); + } + + @Test + public void testSendAndConsumeMessage() throws ClientException, IOException { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-0"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-0"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .build()) { + try (Producer producer = provider .newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) - .build(); + .build()) { - String tag = "tagA"; - String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; - byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); - Message message = - provider - .newMessageBuilder() - .setTopic(topic) - .setTag(tag) - .setKeys(keys) - .setBody(body) - .build(); + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); - SendReceipt sendReceipt = producer.send(message); - testing() - .waitAndAssertTraces( - traceAssert -> - traceAssert.hasSpansSatisfyingExactly( - spanDataAssert -> - spanDataAssert + SendReceipt sendReceipt = producer.send(message); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> + span.hasKind(SpanKind.PRODUCER) .hasName(topic + " send") .hasStatus(StatusData.unset()) .hasAttributesSatisfyingExactly( @@ -102,7 +138,46 @@ public void testSendMessage() throws ClientException { equalTo( MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic)))); + equalTo(MESSAGING_DESTINATION, topic))); + sendSpanData.set(trace.getSpan(0)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " receive") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "receive")), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(sendSpanData.get().getSpanContext())) + // As the child of receive span. + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")))); + } } } } From 29e744e1c27b27ffc9208cb65b9f1fcce30e3568 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Fri, 4 Nov 2022 11:17:59 +0800 Subject: [PATCH 2/8] Use RequestMessageRequest as the 'REQUEST' of receiving instrumantion --- .../v5_0/RocketMqConsumerInstrumentation.java | 26 ++++++-------- ...etMqConsumerReceiveAttributeExtractor.java | 36 +++++++++++++++++++ ...ocketMqConsumerReceiveAttributeGetter.java | 28 +++++++-------- .../v5_0/RocketMqInstrumenterFactory.java | 13 +++---- .../v5_0/RocketMqSingletons.java | 7 ++-- .../v5_0/AbstractRocketMqClientTest.java | 1 + 6 files changed, 73 insertions(+), 38 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java index 8e02e7315a31..b4f661924c4b 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java @@ -23,7 +23,6 @@ import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; import org.apache.rocketmq.client.java.message.MessageViewImpl; -import org.apache.rocketmq.client.java.route.MessageQueueImpl; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; @@ -58,25 +57,21 @@ public static Timer onStart() { @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( @Advice.Argument(0) ReceiveMessageRequest request, - @Advice.Argument(1) MessageQueueImpl messageQueue, @Advice.Enter Timer timer, @Advice.Return ListenableFuture future) { String consumerGroup = request.getGroup().getName(); - SpanFinishingCallback spanFinishingCallback = - new SpanFinishingCallback(consumerGroup, messageQueue, timer); + SpanFinishingCallback spanFinishingCallback = new SpanFinishingCallback(request, timer); Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor()); } } public static class SpanFinishingCallback implements FutureCallback { - private final String consumerGroup; - private final MessageQueueImpl messageQueue; + private final ReceiveMessageRequest request; private final Timer timer; - public SpanFinishingCallback(String consumerGroup, MessageQueueImpl messageQueue, Timer timer) { - this.consumerGroup = consumerGroup; - this.messageQueue = messageQueue; + public SpanFinishingCallback(ReceiveMessageRequest request, Timer timer) { + this.request = request; this.timer = timer; } @@ -87,18 +82,19 @@ public void onSuccess(ReceiveMessageResult receiveMessageResult) { if (messageViews.isEmpty()) { return; } + String consumerGroup = request.getGroup().getName(); for (MessageViewImpl messageView : messageViews) { VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup); } - Instrumenter> receiveInstrumenter = + Instrumenter> receiveInstrumenter = RocketMqSingletons.consumerReceiveInstrumenter(); Context parentContext = Context.current(); - if (receiveInstrumenter.shouldStart(parentContext, messageQueue)) { + if (receiveInstrumenter.shouldStart(parentContext, request)) { Context context = InstrumenterUtil.startAndEnd( receiveInstrumenter, parentContext, - messageQueue, + request, null, null, timer.startTime(), @@ -111,14 +107,14 @@ public void onSuccess(ReceiveMessageResult receiveMessageResult) { @Override public void onFailure(Throwable throwable) { - Instrumenter> receiveInstrumenter = + Instrumenter> receiveInstrumenter = RocketMqSingletons.consumerReceiveInstrumenter(); Context parentContext = Context.current(); - if (receiveInstrumenter.shouldStart(parentContext, messageQueue)) { + if (receiveInstrumenter.shouldStart(parentContext, request)) { InstrumenterUtil.startAndEnd( receiveInstrumenter, parentContext, - messageQueue, + request, null, throwable, timer.startTime(), diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java new file mode 100644 index 000000000000..379ea4992eb9 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerReceiveAttributeExtractor + implements AttributesExtractor> { + INSTANCE; + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, ReceiveMessageRequest request) {} + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + ReceiveMessageRequest request, + @Nullable List messageViews, + @Nullable Throwable error) { + String consumerGroup = request.getGroup().getName(); + attributes.put(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java index ce6a3e17f28a..564838fbb151 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java @@ -5,79 +5,79 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.List; import javax.annotation.Nullable; import org.apache.rocketmq.client.apis.message.MessageView; -import org.apache.rocketmq.client.java.route.MessageQueueImpl; enum RocketMqConsumerReceiveAttributeGetter - implements MessagingAttributesGetter> { + implements MessagingAttributesGetter> { INSTANCE; @Nullable @Override - public String system(MessageQueueImpl messageQueue) { + public String system(ReceiveMessageRequest request) { return "rocketmq"; } @Nullable @Override - public String destinationKind(MessageQueueImpl messageQueue) { + public String destinationKind(ReceiveMessageRequest request) { return SemanticAttributes.MessagingDestinationKindValues.TOPIC; } @Nullable @Override - public String destination(MessageQueueImpl messageQueue) { - return messageQueue.getTopic(); + public String destination(ReceiveMessageRequest request) { + return request.getMessageQueue().getTopic().getName(); } @Override - public boolean temporaryDestination(MessageQueueImpl messageQueue) { + public boolean temporaryDestination(ReceiveMessageRequest request) { return false; } @Nullable @Override - public String protocol(MessageQueueImpl messageQueue) { + public String protocol(ReceiveMessageRequest request) { return null; } @Nullable @Override - public String protocolVersion(MessageQueueImpl messageQueue) { + public String protocolVersion(ReceiveMessageRequest request) { return null; } @Nullable @Override - public String url(MessageQueueImpl messageQueue) { + public String url(ReceiveMessageRequest request) { return null; } @Nullable @Override - public String conversationId(MessageQueueImpl messageQueue) { + public String conversationId(ReceiveMessageRequest request) { return null; } @Nullable @Override - public Long messagePayloadSize(MessageQueueImpl messageQueue) { + public Long messagePayloadSize(ReceiveMessageRequest request) { return null; } @Nullable @Override - public Long messagePayloadCompressedSize(MessageQueueImpl messageQueue) { + public Long messagePayloadCompressedSize(ReceiveMessageRequest request) { return null; } @Nullable @Override - public String messageId(MessageQueueImpl messageQueue, @Nullable List unused) { + public String messageId(ReceiveMessageRequest request, @Nullable List unused) { return null; } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index debf6f61c07c..fcc023c78362 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; @@ -22,7 +23,6 @@ import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -import org.apache.rocketmq.client.java.route.MessageQueueImpl; final class RocketMqInstrumenterFactory { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-5.0"; @@ -53,20 +53,21 @@ public static Instrumenter createProduce return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE); } - public static Instrumenter> createConsumerReceiveInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + public static Instrumenter> + createConsumerReceiveInstrumenter(OpenTelemetry openTelemetry, List capturedHeaders) { RocketMqConsumerReceiveAttributeGetter getter = RocketMqConsumerReceiveAttributeGetter.INSTANCE; MessageOperation operation = MessageOperation.RECEIVE; - MessagingAttributesExtractor> attributesExtractor = + MessagingAttributesExtractor> attributesExtractor = buildMessagingAttributesExtractor(getter, operation, capturedHeaders); - InstrumenterBuilder> instrumenterBuilder = - Instrumenter.>builder( + InstrumenterBuilder> instrumenterBuilder = + Instrumenter.>builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE) .setSpanStatusExtractor( (spanStatusBuilder, messageView, unused, error) -> { if (error != null) { diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java index 704952d9635c..dcc3dc488959 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java @@ -5,6 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; @@ -14,12 +15,11 @@ import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -import org.apache.rocketmq.client.java.route.MessageQueueImpl; public final class RocketMqSingletons { private static final Instrumenter PRODUCER_INSTRUMENTER; - private static final Instrumenter> + private static final Instrumenter> CONSUMER_RECEIVE_INSTRUMENTER; private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; @@ -43,7 +43,8 @@ public static Instrumenter producerInstr return PRODUCER_INSTRUMENTER; } - public static Instrumenter> consumerReceiveInstrumenter() { + public static Instrumenter> + consumerReceiveInstrumenter() { return CONSUMER_RECEIVE_INSTRUMENTER; } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index bb84a845aaec..357490979683 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -148,6 +148,7 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { .hasName(topic + " receive") .hasStatus(StatusData.unset()) .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), equalTo(MESSAGING_SYSTEM, "rocketmq"), equalTo( MESSAGING_DESTINATION_KIND, From e8cfaeda1b308928699f4e5aeb29eb1ed3babd41 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Mon, 7 Nov 2022 12:58:26 +0800 Subject: [PATCH 3/8] Support receiveInstrumentationEnabled flag --- .../javaagent/build.gradle.kts | 20 +++++ .../v5_0/RocketMqInstrumenterFactory.java | 25 ++++-- .../v5_0/RocketMqSingletons.java | 8 +- .../v5_0/AbstractRocketMqClientTest.java | 85 +++++++++++++++++++ 4 files changed, 125 insertions(+), 13 deletions(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts index 4efaaaad3845..2b35ece9bded 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts @@ -16,3 +16,23 @@ dependencies { testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing")) } + +tasks { + val testReceiveSpanDisabled by registering(Test::class) { + filter { + includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessageWithReceiveSpanSuppressed") + } + include("**/RocketMqClientTest.*") + } + + test { + filter { + includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessage") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testReceiveSpanDisabled) + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index fcc023c78362..6d6f44c1e7fe 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -54,7 +54,8 @@ public static Instrumenter createProduce } public static Instrumenter> - createConsumerReceiveInstrumenter(OpenTelemetry openTelemetry, List capturedHeaders) { + createConsumerReceiveInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders, boolean enabled) { RocketMqConsumerReceiveAttributeGetter getter = RocketMqConsumerReceiveAttributeGetter.INSTANCE; MessageOperation operation = MessageOperation.RECEIVE; @@ -66,6 +67,7 @@ public static Instrumenter createProduce openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) + .setEnabled(enabled) .addAttributesExtractor(attributesExtractor) .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE) .setSpanStatusExtractor( @@ -78,17 +80,15 @@ public static Instrumenter createProduce } public static Instrumenter createConsumerProcessInstrumenter( - OpenTelemetry openTelemetry, List capturedHeaders) { + OpenTelemetry openTelemetry, + List capturedHeaders, + boolean receiveInstrumentationEnabled) { RocketMqConsumerProcessAttributeGetter getter = RocketMqConsumerProcessAttributeGetter.INSTANCE; MessageOperation operation = MessageOperation.PROCESS; MessagingAttributesExtractor attributesExtractor = buildMessagingAttributesExtractor(getter, operation, capturedHeaders); - SpanLinksExtractor spanLinksExtractor = - new PropagatorBasedSpanLinksExtractor<>( - openTelemetry.getPropagators().getTextMapPropagator(), MessageMapGetter.INSTANCE); - InstrumenterBuilder instrumenterBuilder = Instrumenter.builder( openTelemetry, @@ -101,9 +101,16 @@ public static Instrumenter createConsumerProcessInst if (error != null || ConsumeResult.FAILURE.equals(consumeResult)) { spanStatusBuilder.setStatus(StatusCode.ERROR); } - }) - .addSpanLinksExtractor(spanLinksExtractor); - return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + }); + + if (receiveInstrumentationEnabled) { + SpanLinksExtractor spanLinksExtractor = + new PropagatorBasedSpanLinksExtractor<>( + openTelemetry.getPropagators().getTextMapPropagator(), MessageMapGetter.INSTANCE); + instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + return instrumenterBuilder.buildConsumerInstrumenter(MessageMapGetter.INSTANCE); } private static MessagingAttributesExtractor buildMessagingAttributesExtractor( diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java index dcc3dc488959..605838c13c2a 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java @@ -26,17 +26,17 @@ public final class RocketMqSingletons { static { OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); List messagingHeaders = ExperimentalConfig.get().getMessagingHeaders(); + boolean receiveInstrumentationEnabled = + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled(); PRODUCER_INSTRUMENTER = RocketMqInstrumenterFactory.createProducerInstrumenter(openTelemetry, messagingHeaders); - CONSUMER_RECEIVE_INSTRUMENTER = RocketMqInstrumenterFactory.createConsumerReceiveInstrumenter( - openTelemetry, messagingHeaders); - + openTelemetry, messagingHeaders, receiveInstrumentationEnabled); CONSUMER_PROCESS_INSTRUMENTER = RocketMqInstrumenterFactory.createConsumerProcessInstrumenter( - openTelemetry, messagingHeaders); + openTelemetry, messagingHeaders, receiveInstrumentationEnabled); } public static Instrumenter producerInstrumenter() { diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 357490979683..cd80d5ce280b 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -181,4 +181,89 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { } } } + + @Test + public void testSendAndConsumeMessageWithReceiveSpanSuppressed() + throws ClientException, IOException { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-1"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-1"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .build()) { + try (Producer producer = + provider + .newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build()) { + + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = producer.send(message); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic)), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // As the child of send span. + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")))); + } + } + } } From c42f7ae5c6913bfd68cae4f55a2db3b534efecf9 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Wed, 9 Nov 2022 14:05:53 +0800 Subject: [PATCH 4/8] Remove unused code --- .../rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java index b4f661924c4b..cd91fd2681d4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java @@ -59,7 +59,6 @@ public static void onExit( @Advice.Argument(0) ReceiveMessageRequest request, @Advice.Enter Timer timer, @Advice.Return ListenableFuture future) { - String consumerGroup = request.getGroup().getName(); SpanFinishingCallback spanFinishingCallback = new SpanFinishingCallback(request, timer); Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor()); } From 86c6977a10796b7aa3acae7c939530cbb5545f6f Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Wed, 9 Nov 2022 14:57:18 +0800 Subject: [PATCH 5/8] Move SpanFinishingCallback to top-level class --- .../v5_0/ReceiveSpanFinishingCallback.java | 74 +++++++++++++++++++ .../v5_0/RocketMqConsumerInstrumentation.java | 68 +---------------- .../v5_0/RocketMqProducerInstrumentation.java | 28 +------ .../v5_0/SendSpanFinishingCallback.java | 37 ++++++++++ 4 files changed, 114 insertions(+), 93 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java new file mode 100644 index 000000000000..affcf68c2f70 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import java.util.List; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; +import org.apache.rocketmq.client.java.message.MessageViewImpl; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; + +public final class ReceiveSpanFinishingCallback implements FutureCallback { + + private final ReceiveMessageRequest request; + private final Timer timer; + + public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) { + this.request = request; + this.timer = timer; + } + + @Override + public void onSuccess(ReceiveMessageResult receiveMessageResult) { + List messageViews = receiveMessageResult.getMessageViewImpls(); + // Don't create spans when no messages were received. + if (messageViews.isEmpty()) { + return; + } + String consumerGroup = request.getGroup().getName(); + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup); + } + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, request)) { + Context context = + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + request, + null, + null, + timer.startTime(), + timer.now()); + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setContextByMessage(messageView, context); + } + } + } + + @Override + public void onFailure(Throwable throwable) { + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, request)) { + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + request, + null, + throwable, + timer.startTime(), + timer.now()); + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java index cd91fd2681d4..29b936143425 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java @@ -11,19 +11,12 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import apache.rocketmq.v2.ReceiveMessageRequest; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.List; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; -import org.apache.rocketmq.client.java.message.MessageViewImpl; -import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; @@ -59,66 +52,9 @@ public static void onExit( @Advice.Argument(0) ReceiveMessageRequest request, @Advice.Enter Timer timer, @Advice.Return ListenableFuture future) { - SpanFinishingCallback spanFinishingCallback = new SpanFinishingCallback(request, timer); + ReceiveSpanFinishingCallback spanFinishingCallback = + new ReceiveSpanFinishingCallback(request, timer); Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor()); } } - - public static class SpanFinishingCallback implements FutureCallback { - - private final ReceiveMessageRequest request; - private final Timer timer; - - public SpanFinishingCallback(ReceiveMessageRequest request, Timer timer) { - this.request = request; - this.timer = timer; - } - - @Override - public void onSuccess(ReceiveMessageResult receiveMessageResult) { - List messageViews = receiveMessageResult.getMessageViewImpls(); - // Don't create spans when no messages were received. - if (messageViews.isEmpty()) { - return; - } - String consumerGroup = request.getGroup().getName(); - for (MessageViewImpl messageView : messageViews) { - VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup); - } - Instrumenter> receiveInstrumenter = - RocketMqSingletons.consumerReceiveInstrumenter(); - Context parentContext = Context.current(); - if (receiveInstrumenter.shouldStart(parentContext, request)) { - Context context = - InstrumenterUtil.startAndEnd( - receiveInstrumenter, - parentContext, - request, - null, - null, - timer.startTime(), - timer.now()); - for (MessageViewImpl messageView : messageViews) { - VirtualFieldStore.setContextByMessage(messageView, context); - } - } - } - - @Override - public void onFailure(Throwable throwable) { - Instrumenter> receiveInstrumenter = - RocketMqSingletons.consumerReceiveInstrumenter(); - Context parentContext = Context.current(); - if (receiveInstrumenter.shouldStart(parentContext, request)) { - InstrumenterUtil.startAndEnd( - receiveInstrumenter, - parentContext, - request, - null, - throwable, - timer.startTime(), - timer.now()); - } - } - } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java index 40602994ce68..bb73fb135be2 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java @@ -22,7 +22,6 @@ import net.bytebuddy.matcher.ElementMatcher; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture; @@ -86,34 +85,9 @@ public static void onEnter( Context context = instrumenter.start(parentContext, message); Futures.addCallback( future, - new SpanFinishingCallback(instrumenter, context, message), + new SendSpanFinishingCallback(instrumenter, context, message), MoreExecutors.directExecutor()); } } } - - public static class SpanFinishingCallback implements FutureCallback { - private final Instrumenter instrumenter; - private final Context context; - private final PublishingMessageImpl message; - - public SpanFinishingCallback( - Instrumenter instrumenter, - Context context, - PublishingMessageImpl message) { - this.instrumenter = instrumenter; - this.context = context; - this.message = message; - } - - @Override - public void onSuccess(SendReceiptImpl sendReceipt) { - instrumenter.end(context, message, sendReceipt, null); - } - - @Override - public void onFailure(Throwable t) { - instrumenter.end(context, message, null, t); - } - } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java new file mode 100644 index 000000000000..3ccdb88a25ba --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; +import org.apache.rocketmq.client.java.message.PublishingMessageImpl; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; + +public final class SendSpanFinishingCallback implements FutureCallback { + private final Instrumenter instrumenter; + private final Context context; + private final PublishingMessageImpl message; + + public SendSpanFinishingCallback( + Instrumenter instrumenter, + Context context, + PublishingMessageImpl message) { + this.instrumenter = instrumenter; + this.context = context; + this.message = message; + } + + @Override + public void onSuccess(SendReceiptImpl sendReceipt) { + instrumenter.end(context, message, sendReceipt, null); + } + + @Override + public void onFailure(Throwable t) { + instrumenter.end(context, message, null, t); + } +} From 96b297b3a3f04df491da6f7830a885b66699d2ae Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Mon, 14 Nov 2022 22:25:47 +0800 Subject: [PATCH 6/8] Fix issues --- .../javaagent/build.gradle.kts | 6 +- .../v5_0/ConsumeServiceInstrumentation.java | 52 ++++++ ....java => ConsumerImplInstrumentation.java} | 4 +- .../v5_0/MessageListenerWrapper.java | 43 +++++ ....java => ProducerImplInstrumentation.java} | 4 +- ...PublishingMessageImplInstrumentation.java} | 8 +- .../v5_0/RocketMqInstrumentationModule.java | 4 +- .../v5_0/RocketMqInstrumenterFactory.java | 16 +- ...ocketMqMessageListenerInstrumentation.java | 94 ----------- ...RocketMqClientSuppressReceiveSpanTest.java | 21 +++ ...RocketMqClientSuppressReceiveSpanTest.java | 154 ++++++++++++++++++ .../v5_0/AbstractRocketMqClientTest.java | 138 +++------------- .../v5_0/RocketMqProxyContainer.java | 44 +++++ 13 files changed, 351 insertions(+), 237 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqConsumerInstrumentation.java => ConsumerImplInstrumentation.java} (93%) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqProducerInstrumentation.java => ProducerImplInstrumentation.java} (96%) rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqPublishingMessageImplInstrumentation.java => PublishingMessageImplInstrumentation.java} (91%) delete mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts index 2b35ece9bded..2f87b75b3076 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts @@ -20,14 +20,14 @@ dependencies { tasks { val testReceiveSpanDisabled by registering(Test::class) { filter { - includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessageWithReceiveSpanSuppressed") + includeTestsMatching("RocketMqClientSuppressReceiveSpanTest") } - include("**/RocketMqClientTest.*") + include("**/RocketMqClientSuppressReceiveSpanTest.*") } test { filter { - includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessage") + excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest") } jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java new file mode 100644 index 000000000000..f7f08dd498e5 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.consumer.MessageListener; + +final class ConsumeServiceInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + isPublic() + .and(takesArguments(5)) + .and( + takesArgument( + 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))), + ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { + // Replace messageListener by wrapper. + if (!(messageListener instanceof MessageListenerWrapper)) { + messageListener = new MessageListenerWrapper(messageListener); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java similarity index 93% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java index 29b936143425..46d4c0a5645e 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; -final class RocketMqConsumerInstrumentation implements TypeInstrumentation { +final class ConsumerImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl"); @@ -36,7 +36,7 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest"))) .and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl"))) .and(takesArgument(2, named("java.time.Duration"))), - RocketMqConsumerInstrumentation.class.getName() + "$ReceiveMessageAdvice"); + ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice"); } @SuppressWarnings("unused") diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java new file mode 100644 index 000000000000..4fd8a82c6433 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.message.MessageView; + +public final class MessageListenerWrapper implements MessageListener { + private final MessageListener delegator; + + public MessageListenerWrapper(MessageListener delegator) { + this.delegator = delegator; + } + + @Override + public ConsumeResult consume(MessageView messageView) { + Context parentContext = VirtualFieldStore.getContextByMessage(messageView); + if (parentContext == null) { + parentContext = Context.current(); + } + Instrumenter processInstrumenter = + RocketMqSingletons.consumerProcessInstrumenter(); + if (!processInstrumenter.shouldStart(parentContext, messageView)) { + return delegator.consume(messageView); + } + Context context = processInstrumenter.start(parentContext, messageView); + try (Scope ignored = context.makeCurrent()) { + ConsumeResult consumeResult = delegator.consume(messageView); + processInstrumenter.end(context, messageView, consumeResult, null); + return consumeResult; + } catch (Throwable t) { + processInstrumenter.end(context, messageView, null, t); + throw t; + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java similarity index 96% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java index bb73fb135be2..1972d98d29f4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture; -final class RocketMqProducerInstrumentation implements TypeInstrumentation { +final class ProducerImplInstrumentation implements TypeInstrumentation { /** Match the implementation of RocketMQ producer. */ @Override @@ -51,7 +51,7 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(3, List.class)) .and(takesArgument(4, List.class)) .and(takesArgument(5, int.class)), - RocketMqProducerInstrumentation.class.getName() + "$SendAdvice"); + ProducerImplInstrumentation.class.getName() + "$SendAdvice"); } @SuppressWarnings("unused") diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java similarity index 91% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java index 10abc8e1c5a7..eb2b114b586c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.client.java.message.MessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation { +final class PublishingMessageImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) { takesArgument( 1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings"))) .and(takesArgument(2, boolean.class)), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); transformer.applyAdviceToMethod( isMethod().and(named("getProperties")).and(isPublic()), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); } @SuppressWarnings("unused") @@ -56,7 +56,7 @@ public static class ConstructorAdvice { * The constructor of {@link PublishingMessageImpl} is always called in the same thread that * user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link * Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link - * RocketMqProducerInstrumentation}. + * ProducerImplInstrumentation}. */ @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.This PublishingMessageImpl message) { diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java index 89c93b3822b2..af1fa81b0e49 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java @@ -21,7 +21,7 @@ public RocketMqInstrumentationModule() { @Override public List typeInstrumentations() { return asList( - new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation(), - new RocketMqConsumerInstrumentation(), new RocketMqMessageListenerInstrumentation()); + new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(), + new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation()); } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index 6d6f44c1e7fe..b5149011af8c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -43,13 +43,7 @@ public static Instrumenter createProduce INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE) - .setSpanStatusExtractor( - (spanStatusBuilder, message, sendReceipt, error) -> { - if (error != null) { - spanStatusBuilder.setStatus(StatusCode.ERROR); - } - }); + .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE); return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE); } @@ -69,13 +63,7 @@ public static Instrumenter createProduce MessagingSpanNameExtractor.create(getter, operation)) .setEnabled(enabled) .addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE) - .setSpanStatusExtractor( - (spanStatusBuilder, messageView, unused, error) -> { - if (error != null) { - spanStatusBuilder.setStatus(StatusCode.ERROR); - } - }); + .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE); return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java deleted file mode 100644 index 27f46e46aff9..000000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; - -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; -import org.apache.rocketmq.client.apis.message.MessageView; - -/** Only for {@link org.apache.rocketmq.client.apis.consumer.PushConsumer}. */ -final class RocketMqMessageListenerInstrumentation implements TypeInstrumentation { - @Override - public ElementMatcher typeMatcher() { - // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. - return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); - } - - @Override - public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isConstructor() - .and( - isPublic() - .and(takesArguments(5)) - .and(takesArgument(0, named("org.apache.rocketmq.client.java.misc.ClientId"))) - .and( - takesArgument( - 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener"))) - .and(takesArgument(2, ThreadPoolExecutor.class)) - .and( - takesArgument( - 3, named("org.apache.rocketmq.client.java.hook.MessageInterceptor"))) - .and(takesArgument(4, ScheduledExecutorService.class))), - RocketMqMessageListenerInstrumentation.class.getName() + "$ConstructorAdvice"); - } - - @SuppressWarnings("unused") - public static class ConstructorAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { - // Replace messageListener by wrapper. - messageListener = new MessageListenerWrapper(messageListener); - } - } - - public static class MessageListenerWrapper implements MessageListener { - private final MessageListener delegator; - - public MessageListenerWrapper(MessageListener delegator) { - this.delegator = delegator; - } - - @Override - public ConsumeResult consume(MessageView messageView) { - Context parentContext = VirtualFieldStore.getContextByMessage(messageView); - if (parentContext == null) { - parentContext = Context.current(); - } - Instrumenter processInstrumenter = - RocketMqSingletons.consumerProcessInstrumenter(); - if (!processInstrumenter.shouldStart(parentContext, messageView)) { - return delegator.consume(messageView); - } - Context context = processInstrumenter.start(parentContext, messageView); - try (Scope ignored = context.makeCurrent()) { - ConsumeResult consumeResult = delegator.consume(messageView); - processInstrumenter.end(context, messageView, consumeResult, null); - return consumeResult; - } catch (Throwable t) { - processInstrumenter.end(context, messageView, null, t); - throw t; - } - } - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..be92fd182a39 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class RocketMqClientSuppressReceiveSpanTest + extends AbstractRocketMqClientSuppressReceiveSpanTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..36ae660b1fe7 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,154 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class AbstractRocketMqClientSuppressReceiveSpanTest { + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); + + protected abstract InstrumentationExtension testing(); + + @BeforeAll + static void setUp() { + container.start(); + } + + @AfterAll + static void tearDown() { + container.close(); + } + + @Test + void testSendAndConsumeMessage() throws Throwable { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-0"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-0"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) + .build()) { + try (Producer producer = + provider + .newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build()) { + + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic)), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // As the child of send span. + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index cd80d5ce280b..58f7e4354024 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -19,19 +19,17 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; -import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -43,36 +41,15 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.FixedHostPortGenericContainer; -import org.testcontainers.containers.GenericContainer; public abstract class AbstractRocketMqClientTest { - protected abstract InstrumentationExtension testing(); + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); - // TODO(aaron-ai): replace it by the official image. - private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; - private static GenericContainer container; - private static String endpoints; + protected abstract InstrumentationExtension testing(); - // We still need this container type to do fixed-port-mapping. - @SuppressWarnings({"resource", "deprecation", "rawtypes"}) @BeforeAll static void setUp() { - int proxyPort = PortUtils.findOpenPorts(4); - int brokerPort = proxyPort + 1; - int brokerHaPort = proxyPort + 2; - int namesrvPort = proxyPort + 3; - endpoints = "127.0.0.1:" + proxyPort; - container = - new FixedHostPortGenericContainer(IMAGE_NAME) - .withFixedExposedPort(proxyPort, proxyPort) - .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) - .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) - .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) - .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) - .withExposedPorts(proxyPort); - // Start the container. container.start(); } @@ -82,9 +59,9 @@ static void tearDown() { } @Test - public void testSendAndConsumeMessage() throws ClientException, IOException { + void testSendAndConsumeMessage() throws Throwable { ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); // Inner topic of the container. String topic = "normal-topic-0"; ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -97,7 +74,11 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) - .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) .build()) { try (Producer producer = provider @@ -117,16 +98,22 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { .setBody(body) .build(); - SendReceipt sendReceipt = producer.send(message); + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); AtomicReference sendSpanData = new AtomicReference<>(); testing() .waitAndAssertTraces( trace -> { trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> span.hasKind(SpanKind.PRODUCER) .hasName(topic + " send") .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), @@ -139,7 +126,7 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), equalTo(MESSAGING_DESTINATION, topic))); - sendSpanData.set(trace.getSpan(0)); + sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( @@ -177,92 +164,11 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "process")))); - } - } - } - - @Test - public void testSendAndConsumeMessageWithReceiveSpanSuppressed() - throws ClientException, IOException { - ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); - // Inner topic of the container. - String topic = "normal-topic-1"; - ClientServiceProvider provider = ClientServiceProvider.loadService(); - String consumerGroup = "group-normal-topic-1"; - String tag = "tagA"; - FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); - try (PushConsumer ignored = - provider - .newPushConsumerBuilder() - .setClientConfiguration(clientConfiguration) - .setConsumerGroup(consumerGroup) - .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) - .setMessageListener(messageView -> ConsumeResult.SUCCESS) - .build()) { - try (Producer producer = - provider - .newProducerBuilder() - .setClientConfiguration(clientConfiguration) - .setTopics(topic) - .build()) { - - String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; - byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); - Message message = - provider - .newMessageBuilder() - .setTopic(topic) - .setTag(tag) - .setKeys(keys) - .setBody(body) - .build(); - - SendReceipt sendReceipt = producer.send(message); - testing() - .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasKind(SpanKind.PRODUCER) - .hasName(topic + " send") - .hasStatus(StatusData.unset()) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), - equalTo( - MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, - sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic)), + equalTo(MESSAGING_OPERATION, "process")), span -> - span.hasKind(SpanKind.CONSUMER) - .hasName(topic + " process") - .hasStatus(StatusData.unset()) - // As the child of send span. - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo( - MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, - sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "process")))); + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); } } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java new file mode 100644 index 000000000000..957bea744b4e --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; + +public class RocketMqProxyContainer { + // TODO(aaron-ai): replace it by the official image. + private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + + private final GenericContainer container; + final String endpoints; + + // We still need this container type to do fixed-port-mapping. + @SuppressWarnings({"resource", "deprecation", "rawtypes"}) + RocketMqProxyContainer() { + int proxyPort = PortUtils.findOpenPorts(4); + int brokerPort = proxyPort + 1; + int brokerHaPort = proxyPort + 2; + int namesrvPort = proxyPort + 3; + endpoints = "127.0.0.1:" + proxyPort; + container = + new FixedHostPortGenericContainer(IMAGE_NAME) + .withFixedExposedPort(proxyPort, proxyPort) + .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) + .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) + .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) + .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) + .withExposedPorts(proxyPort); + } + + void start() { + container.start(); + } + + void close() { + container.close(); + } +} From 894ee80d760f31282df37e8af76cb30365a0a62c Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Tue, 15 Nov 2022 13:29:59 +0800 Subject: [PATCH 7/8] Flipping method order in RocketMqConsumerProcessAttributeExtractor --- ...ocketMqConsumerProcessAttributeExtractor.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java index df2dfec7ce93..35c4eb2e0b20 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java @@ -21,14 +21,6 @@ enum RocketMqConsumerProcessAttributeExtractor implements AttributesExtractor { INSTANCE; - @Override - public void onEnd( - AttributesBuilder attributes, - Context context, - MessageView messageView, - @Nullable ConsumeResult consumeResult, - @Nullable Throwable error) {} - @Override public void onStart( AttributesBuilder attributes, Context parentContext, MessageView messageView) { @@ -37,4 +29,12 @@ public void onStart( attributes.put( MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView)); } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + MessageView messageView, + @Nullable ConsumeResult consumeResult, + @Nullable Throwable error) {} } From 328188284ea104d36aa4143ee8e32abe2409f5a8 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Tue, 15 Nov 2022 13:55:36 +0800 Subject: [PATCH 8/8] Polish code --- .../v5_0/ConsumeServiceInstrumentation.java | 3 --- .../rocketmqclient/v5_0/MessageListenerWrapper.java | 9 ++++++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java index f7f08dd498e5..3c6fd08e18ff 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java @@ -9,7 +9,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; @@ -21,7 +20,6 @@ final class ConsumeServiceInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { - // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); } @@ -31,7 +29,6 @@ public void transform(TypeTransformer transformer) { isConstructor() .and( isPublic() - .and(takesArguments(5)) .and( takesArgument( 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))), diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java index 4fd8a82c6433..0d013bf2a4ef 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java @@ -31,13 +31,16 @@ public ConsumeResult consume(MessageView messageView) { return delegator.consume(messageView); } Context context = processInstrumenter.start(parentContext, messageView); + ConsumeResult consumeResult = null; + Throwable error = null; try (Scope ignored = context.makeCurrent()) { - ConsumeResult consumeResult = delegator.consume(messageView); - processInstrumenter.end(context, messageView, consumeResult, null); + consumeResult = delegator.consume(messageView); return consumeResult; } catch (Throwable t) { - processInstrumenter.end(context, messageView, null, t); + error = t; throw t; + } finally { + processInstrumenter.end(context, messageView, consumeResult, error); } } }