From c5012d41081a3195455392a2f37600eb89636f5c Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Mon, 14 Nov 2022 22:25:47 +0800 Subject: [PATCH] Fix issues --- .../javaagent/build.gradle.kts | 6 +- .../v5_0/ConsumeServiceInstrumentation.java | 53 ++++++ ....java => ConsumerImplInstrumentation.java} | 4 +- .../v5_0/MessageListenerWrapper.java | 43 +++++ ....java => ProducerImplInstrumentation.java} | 4 +- ...PublishingMessageImplInstrumentation.java} | 8 +- .../v5_0/RocketMqInstrumentationModule.java | 4 +- .../v5_0/RocketMqInstrumenterFactory.java | 16 +- ...ocketMqMessageListenerInstrumentation.java | 94 ----------- ...RocketMqClientSuppressReceiveSpanTest.java | 21 +++ ...RocketMqClientSuppressReceiveSpanTest.java | 154 ++++++++++++++++++ .../v5_0/AbstractRocketMqClientTest.java | 138 +++------------- .../v5_0/RocketMqProxyContainer.java | 44 +++++ 13 files changed, 352 insertions(+), 237 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqConsumerInstrumentation.java => ConsumerImplInstrumentation.java} (93%) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqProducerInstrumentation.java => ProducerImplInstrumentation.java} (96%) rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqPublishingMessageImplInstrumentation.java => PublishingMessageImplInstrumentation.java} (91%) delete mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts index 2b35ece9bded..2f87b75b3076 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts @@ -20,14 +20,14 @@ dependencies { tasks { val testReceiveSpanDisabled by registering(Test::class) { filter { - includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessageWithReceiveSpanSuppressed") + includeTestsMatching("RocketMqClientSuppressReceiveSpanTest") } - include("**/RocketMqClientTest.*") + include("**/RocketMqClientSuppressReceiveSpanTest.*") } test { filter { - includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessage") + excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest") } jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java new file mode 100644 index 000000000000..fa5f139be111 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.consumer.MessageListener; + +/** Only for {@link org.apache.rocketmq.client.apis.consumer.PushConsumer}. */ +final class ConsumeServiceInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + isPublic() + .and(takesArguments(5)) + .and( + takesArgument( + 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))), + ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { + // Replace messageListener by wrapper. + if (!(messageListener instanceof MessageListenerWrapper)) { + messageListener = new MessageListenerWrapper(messageListener); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java similarity index 93% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java index 29b936143425..46d4c0a5645e 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java @@ -21,7 +21,7 @@ import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; -final class RocketMqConsumerInstrumentation implements TypeInstrumentation { +final class ConsumerImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl"); @@ -36,7 +36,7 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest"))) .and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl"))) .and(takesArgument(2, named("java.time.Duration"))), - RocketMqConsumerInstrumentation.class.getName() + "$ReceiveMessageAdvice"); + ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice"); } @SuppressWarnings("unused") diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java new file mode 100644 index 000000000000..4fd8a82c6433 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.message.MessageView; + +public final class MessageListenerWrapper implements MessageListener { + private final MessageListener delegator; + + public MessageListenerWrapper(MessageListener delegator) { + this.delegator = delegator; + } + + @Override + public ConsumeResult consume(MessageView messageView) { + Context parentContext = VirtualFieldStore.getContextByMessage(messageView); + if (parentContext == null) { + parentContext = Context.current(); + } + Instrumenter processInstrumenter = + RocketMqSingletons.consumerProcessInstrumenter(); + if (!processInstrumenter.shouldStart(parentContext, messageView)) { + return delegator.consume(messageView); + } + Context context = processInstrumenter.start(parentContext, messageView); + try (Scope ignored = context.makeCurrent()) { + ConsumeResult consumeResult = delegator.consume(messageView); + processInstrumenter.end(context, messageView, consumeResult, null); + return consumeResult; + } catch (Throwable t) { + processInstrumenter.end(context, messageView, null, t); + throw t; + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java similarity index 96% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java index bb73fb135be2..1972d98d29f4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture; -final class RocketMqProducerInstrumentation implements TypeInstrumentation { +final class ProducerImplInstrumentation implements TypeInstrumentation { /** Match the implementation of RocketMQ producer. */ @Override @@ -51,7 +51,7 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(3, List.class)) .and(takesArgument(4, List.class)) .and(takesArgument(5, int.class)), - RocketMqProducerInstrumentation.class.getName() + "$SendAdvice"); + ProducerImplInstrumentation.class.getName() + "$SendAdvice"); } @SuppressWarnings("unused") diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java similarity index 91% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java index 10abc8e1c5a7..eb2b114b586c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.client.java.message.MessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation { +final class PublishingMessageImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) { takesArgument( 1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings"))) .and(takesArgument(2, boolean.class)), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); transformer.applyAdviceToMethod( isMethod().and(named("getProperties")).and(isPublic()), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); } @SuppressWarnings("unused") @@ -56,7 +56,7 @@ public static class ConstructorAdvice { * The constructor of {@link PublishingMessageImpl} is always called in the same thread that * user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link * Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link - * RocketMqProducerInstrumentation}. + * ProducerImplInstrumentation}. */ @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.This PublishingMessageImpl message) { diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java index 89c93b3822b2..af1fa81b0e49 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java @@ -21,7 +21,7 @@ public RocketMqInstrumentationModule() { @Override public List typeInstrumentations() { return asList( - new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation(), - new RocketMqConsumerInstrumentation(), new RocketMqMessageListenerInstrumentation()); + new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(), + new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation()); } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index 6d6f44c1e7fe..b5149011af8c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -43,13 +43,7 @@ public static Instrumenter createProduce INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE) - .setSpanStatusExtractor( - (spanStatusBuilder, message, sendReceipt, error) -> { - if (error != null) { - spanStatusBuilder.setStatus(StatusCode.ERROR); - } - }); + .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE); return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE); } @@ -69,13 +63,7 @@ public static Instrumenter createProduce MessagingSpanNameExtractor.create(getter, operation)) .setEnabled(enabled) .addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE) - .setSpanStatusExtractor( - (spanStatusBuilder, messageView, unused, error) -> { - if (error != null) { - spanStatusBuilder.setStatus(StatusCode.ERROR); - } - }); + .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE); return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java deleted file mode 100644 index 27f46e46aff9..000000000000 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqMessageListenerInstrumentation.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; - -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.apis.consumer.ConsumeResult; -import org.apache.rocketmq.client.apis.consumer.MessageListener; -import org.apache.rocketmq.client.apis.message.MessageView; - -/** Only for {@link org.apache.rocketmq.client.apis.consumer.PushConsumer}. */ -final class RocketMqMessageListenerInstrumentation implements TypeInstrumentation { - @Override - public ElementMatcher typeMatcher() { - // Instrument ConsumerService instead of MessageListener because lambda could not be enhanced. - return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); - } - - @Override - public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isConstructor() - .and( - isPublic() - .and(takesArguments(5)) - .and(takesArgument(0, named("org.apache.rocketmq.client.java.misc.ClientId"))) - .and( - takesArgument( - 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener"))) - .and(takesArgument(2, ThreadPoolExecutor.class)) - .and( - takesArgument( - 3, named("org.apache.rocketmq.client.java.hook.MessageInterceptor"))) - .and(takesArgument(4, ScheduledExecutorService.class))), - RocketMqMessageListenerInstrumentation.class.getName() + "$ConstructorAdvice"); - } - - @SuppressWarnings("unused") - public static class ConstructorAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { - // Replace messageListener by wrapper. - messageListener = new MessageListenerWrapper(messageListener); - } - } - - public static class MessageListenerWrapper implements MessageListener { - private final MessageListener delegator; - - public MessageListenerWrapper(MessageListener delegator) { - this.delegator = delegator; - } - - @Override - public ConsumeResult consume(MessageView messageView) { - Context parentContext = VirtualFieldStore.getContextByMessage(messageView); - if (parentContext == null) { - parentContext = Context.current(); - } - Instrumenter processInstrumenter = - RocketMqSingletons.consumerProcessInstrumenter(); - if (!processInstrumenter.shouldStart(parentContext, messageView)) { - return delegator.consume(messageView); - } - Context context = processInstrumenter.start(parentContext, messageView); - try (Scope ignored = context.makeCurrent()) { - ConsumeResult consumeResult = delegator.consume(messageView); - processInstrumenter.end(context, messageView, consumeResult, null); - return consumeResult; - } catch (Throwable t) { - processInstrumenter.end(context, messageView, null, t); - throw t; - } - } - } -} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..be92fd182a39 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class RocketMqClientSuppressReceiveSpanTest + extends AbstractRocketMqClientSuppressReceiveSpanTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..36ae660b1fe7 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,154 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class AbstractRocketMqClientSuppressReceiveSpanTest { + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); + + protected abstract InstrumentationExtension testing(); + + @BeforeAll + static void setUp() { + container.start(); + } + + @AfterAll + static void tearDown() { + container.close(); + } + + @Test + void testSendAndConsumeMessage() throws Throwable { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-0"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-0"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) + .build()) { + try (Producer producer = + provider + .newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build()) { + + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic)), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // As the child of send span. + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index cd80d5ce280b..58f7e4354024 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -19,19 +19,17 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; -import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; @@ -43,36 +41,15 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.FixedHostPortGenericContainer; -import org.testcontainers.containers.GenericContainer; public abstract class AbstractRocketMqClientTest { - protected abstract InstrumentationExtension testing(); + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); - // TODO(aaron-ai): replace it by the official image. - private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; - private static GenericContainer container; - private static String endpoints; + protected abstract InstrumentationExtension testing(); - // We still need this container type to do fixed-port-mapping. - @SuppressWarnings({"resource", "deprecation", "rawtypes"}) @BeforeAll static void setUp() { - int proxyPort = PortUtils.findOpenPorts(4); - int brokerPort = proxyPort + 1; - int brokerHaPort = proxyPort + 2; - int namesrvPort = proxyPort + 3; - endpoints = "127.0.0.1:" + proxyPort; - container = - new FixedHostPortGenericContainer(IMAGE_NAME) - .withFixedExposedPort(proxyPort, proxyPort) - .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) - .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) - .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) - .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) - .withExposedPorts(proxyPort); - // Start the container. container.start(); } @@ -82,9 +59,9 @@ static void tearDown() { } @Test - public void testSendAndConsumeMessage() throws ClientException, IOException { + void testSendAndConsumeMessage() throws Throwable { ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); // Inner topic of the container. String topic = "normal-topic-0"; ClientServiceProvider provider = ClientServiceProvider.loadService(); @@ -97,7 +74,11 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) - .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) .build()) { try (Producer producer = provider @@ -117,16 +98,22 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { .setBody(body) .build(); - SendReceipt sendReceipt = producer.send(message); + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); AtomicReference sendSpanData = new AtomicReference<>(); testing() .waitAndAssertTraces( trace -> { trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> span.hasKind(SpanKind.PRODUCER) .hasName(topic + " send") .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), @@ -139,7 +126,7 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), equalTo(MESSAGING_DESTINATION, topic))); - sendSpanData.set(trace.getSpan(0)); + sendSpanData.set(trace.getSpan(1)); }, trace -> trace.hasSpansSatisfyingExactly( @@ -177,92 +164,11 @@ public void testSendAndConsumeMessage() throws ClientException, IOException { MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "process")))); - } - } - } - - @Test - public void testSendAndConsumeMessageWithReceiveSpanSuppressed() - throws ClientException, IOException { - ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); - // Inner topic of the container. - String topic = "normal-topic-1"; - ClientServiceProvider provider = ClientServiceProvider.loadService(); - String consumerGroup = "group-normal-topic-1"; - String tag = "tagA"; - FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); - try (PushConsumer ignored = - provider - .newPushConsumerBuilder() - .setClientConfiguration(clientConfiguration) - .setConsumerGroup(consumerGroup) - .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) - .setMessageListener(messageView -> ConsumeResult.SUCCESS) - .build()) { - try (Producer producer = - provider - .newProducerBuilder() - .setClientConfiguration(clientConfiguration) - .setTopics(topic) - .build()) { - - String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; - byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); - Message message = - provider - .newMessageBuilder() - .setTopic(topic) - .setTag(tag) - .setKeys(keys) - .setBody(body) - .build(); - - SendReceipt sendReceipt = producer.send(message); - testing() - .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasKind(SpanKind.PRODUCER) - .hasName(topic + " send") - .hasStatus(StatusData.unset()) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), - equalTo( - MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, - sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic)), + equalTo(MESSAGING_OPERATION, "process")), span -> - span.hasKind(SpanKind.CONSUMER) - .hasName(topic + " process") - .hasStatus(StatusData.unset()) - // As the child of send span. - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly( - equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), - equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), - equalTo( - MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), - equalTo(MESSAGING_SYSTEM, "rocketmq"), - equalTo( - MESSAGING_MESSAGE_ID, - sendReceipt.getMessageId().toString()), - equalTo( - MESSAGING_DESTINATION_KIND, - SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic), - equalTo(MESSAGING_OPERATION, "process")))); + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); } } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java new file mode 100644 index 000000000000..957bea744b4e --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; + +public class RocketMqProxyContainer { + // TODO(aaron-ai): replace it by the official image. + private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + + private final GenericContainer container; + final String endpoints; + + // We still need this container type to do fixed-port-mapping. + @SuppressWarnings({"resource", "deprecation", "rawtypes"}) + RocketMqProxyContainer() { + int proxyPort = PortUtils.findOpenPorts(4); + int brokerPort = proxyPort + 1; + int brokerHaPort = proxyPort + 2; + int namesrvPort = proxyPort + 3; + endpoints = "127.0.0.1:" + proxyPort; + container = + new FixedHostPortGenericContainer(IMAGE_NAME) + .withFixedExposedPort(proxyPort, proxyPort) + .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) + .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) + .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) + .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) + .withExposedPorts(proxyPort); + } + + void start() { + container.start(); + } + + void close() { + container.close(); + } +}