From b3cd45685dc908e71300e9af257fc743545e8fc5 Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Tue, 15 Nov 2022 20:21:14 +0800 Subject: [PATCH] Implement consumer part of rocketmq new client instrumentation (#7019) Fixes #6764 , This PR is about the consumer part. Co-authored-by: Trask Stalnaker --- .../javaagent/build.gradle.kts | 20 +++ .../v5_0/ConsumeServiceInstrumentation.java | 49 +++++ .../v5_0/ConsumerImplInstrumentation.java | 60 +++++++ .../v5_0/MessageListenerWrapper.java | 46 +++++ .../rocketmqclient/v5_0/MessageMapGetter.java | 25 +++ .../{MapSetter.java => MessageMapSetter.java} | 2 +- ....java => ProducerImplInstrumentation.java} | 32 +--- ...PublishingMessageImplInstrumentation.java} | 10 +- .../v5_0/ReceiveSpanFinishingCallback.java | 74 ++++++++ ...etMqConsumerProcessAttributeExtractor.java | 40 +++++ ...ocketMqConsumerProcessAttributeGetter.java | 93 ++++++++++ ...etMqConsumerReceiveAttributeExtractor.java | 36 ++++ ...ocketMqConsumerReceiveAttributeGetter.java | 83 +++++++++ .../v5_0/RocketMqInstrumentationModule.java | 3 +- .../v5_0/RocketMqInstrumenterFactory.java | 62 ++++++- .../v5_0/RocketMqSingletons.java | 31 +++- .../v5_0/SendSpanFinishingCallback.java | 37 ++++ .../rocketmqclient/v5_0/Timer.java | 31 ++++ .../v5_0/VirtualFieldStore.java | 21 +++ ...RocketMqClientSuppressReceiveSpanTest.java | 21 +++ ...RocketMqClientSuppressReceiveSpanTest.java | 154 ++++++++++++++++ .../v5_0/AbstractRocketMqClientTest.java | 167 ++++++++++++------ .../v5_0/RocketMqProxyContainer.java | 44 +++++ 23 files changed, 1048 insertions(+), 93 deletions(-) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{MapSetter.java => MessageMapSetter.java} (92%) rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqProducerInstrumentation.java => ProducerImplInstrumentation.java} (76%) rename instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/{RocketMqPublishingMessageImplInstrumentation.java => PublishingMessageImplInstrumentation.java} (90%) create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java create mode 100644 instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts index 4efaaaad3845..2f87b75b3076 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/build.gradle.kts @@ -16,3 +16,23 @@ dependencies { testImplementation(project(":instrumentation:rocketmq:rocketmq-client:rocketmq-client-5.0:testing")) } + +tasks { + val testReceiveSpanDisabled by registering(Test::class) { + filter { + includeTestsMatching("RocketMqClientSuppressReceiveSpanTest") + } + include("**/RocketMqClientSuppressReceiveSpanTest.*") + } + + test { + filter { + excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + } + + check { + dependsOn(testReceiveSpanDisabled) + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java new file mode 100644 index 000000000000..3c6fd08e18ff --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumeServiceInstrumentation.java @@ -0,0 +1,49 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.apis.consumer.MessageListener; + +final class ConsumeServiceInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor() + .and( + isPublic() + .and( + takesArgument( + 1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))), + ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) { + // Replace messageListener by wrapper. + if (!(messageListener instanceof MessageListenerWrapper)) { + messageListener = new MessageListenerWrapper(messageListener); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java new file mode 100644 index 000000000000..46d4c0a5645e --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ConsumerImplInstrumentation.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; + +final class ConsumerImplInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(named("receiveMessage")) + .and(takesArguments(3)) + .and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest"))) + .and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl"))) + .and(takesArgument(2, named("java.time.Duration"))), + ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice"); + } + + @SuppressWarnings("unused") + public static class ReceiveMessageAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static Timer onStart() { + return Timer.start(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(0) ReceiveMessageRequest request, + @Advice.Enter Timer timer, + @Advice.Return ListenableFuture future) { + ReceiveSpanFinishingCallback spanFinishingCallback = + new ReceiveSpanFinishingCallback(request, timer); + Futures.addCallback(future, spanFinishingCallback, MoreExecutors.directExecutor()); + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java new file mode 100644 index 000000000000..0d013bf2a4ef --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageListenerWrapper.java @@ -0,0 +1,46 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.MessageListener; +import org.apache.rocketmq.client.apis.message.MessageView; + +public final class MessageListenerWrapper implements MessageListener { + private final MessageListener delegator; + + public MessageListenerWrapper(MessageListener delegator) { + this.delegator = delegator; + } + + @Override + public ConsumeResult consume(MessageView messageView) { + Context parentContext = VirtualFieldStore.getContextByMessage(messageView); + if (parentContext == null) { + parentContext = Context.current(); + } + Instrumenter processInstrumenter = + RocketMqSingletons.consumerProcessInstrumenter(); + if (!processInstrumenter.shouldStart(parentContext, messageView)) { + return delegator.consume(messageView); + } + Context context = processInstrumenter.start(parentContext, messageView); + ConsumeResult consumeResult = null; + Throwable error = null; + try (Scope ignored = context.makeCurrent()) { + consumeResult = delegator.consume(messageView); + return consumeResult; + } catch (Throwable t) { + error = t; + throw t; + } finally { + processInstrumenter.end(context, messageView, consumeResult, error); + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java new file mode 100644 index 000000000000..b199f203744a --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapGetter.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.propagation.TextMapGetter; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum MessageMapGetter implements TextMapGetter { + INSTANCE; + + @Override + public Iterable keys(MessageView carrier) { + return carrier.getProperties().keySet(); + } + + @Nullable + @Override + public String get(@Nullable MessageView carrier, String key) { + return carrier == null ? null : carrier.getProperties().get(key); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java similarity index 92% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java index 1eeaefd08558..76997033873f 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MapSetter.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/MessageMapSetter.java @@ -11,7 +11,7 @@ import javax.annotation.Nullable; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -enum MapSetter implements TextMapSetter { +enum MessageMapSetter implements TextMapSetter { INSTANCE; @Override diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java similarity index 76% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java index 40602994ce68..1972d98d29f4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqProducerInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ProducerImplInstrumentation.java @@ -22,12 +22,11 @@ import net.bytebuddy.matcher.ElementMatcher; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.Futures; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors; import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture; -final class RocketMqProducerInstrumentation implements TypeInstrumentation { +final class ProducerImplInstrumentation implements TypeInstrumentation { /** Match the implementation of RocketMQ producer. */ @Override @@ -52,7 +51,7 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(3, List.class)) .and(takesArgument(4, List.class)) .and(takesArgument(5, int.class)), - RocketMqProducerInstrumentation.class.getName() + "$SendAdvice"); + ProducerImplInstrumentation.class.getName() + "$SendAdvice"); } @SuppressWarnings("unused") @@ -86,34 +85,9 @@ public static void onEnter( Context context = instrumenter.start(parentContext, message); Futures.addCallback( future, - new SpanFinishingCallback(instrumenter, context, message), + new SendSpanFinishingCallback(instrumenter, context, message), MoreExecutors.directExecutor()); } } } - - public static class SpanFinishingCallback implements FutureCallback { - private final Instrumenter instrumenter; - private final Context context; - private final PublishingMessageImpl message; - - public SpanFinishingCallback( - Instrumenter instrumenter, - Context context, - PublishingMessageImpl message) { - this.instrumenter = instrumenter; - this.context = context; - this.message = message; - } - - @Override - public void onSuccess(SendReceiptImpl sendReceipt) { - instrumenter.end(context, message, sendReceipt, null); - } - - @Override - public void onFailure(Throwable t) { - instrumenter.end(context, message, null, t); - } - } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java similarity index 90% rename from instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java rename to instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java index 7590366164ed..eb2b114b586c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqPublishingMessageImplInstrumentation.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/PublishingMessageImplInstrumentation.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.client.java.message.MessageImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; -final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation { +final class PublishingMessageImplInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) { takesArgument( 1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings"))) .and(takesArgument(2, boolean.class)), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice"); transformer.applyAdviceToMethod( isMethod().and(named("getProperties")).and(isPublic()), - RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); + PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice"); } @SuppressWarnings("unused") @@ -56,7 +56,7 @@ public static class ConstructorAdvice { * The constructor of {@link PublishingMessageImpl} is always called in the same thread that * user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link * Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link - * RocketMqProducerInstrumentation}. + * ProducerImplInstrumentation}. */ @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.This PublishingMessageImpl message) { @@ -66,7 +66,7 @@ public static void onExit(@Advice.This PublishingMessageImpl message) { @SuppressWarnings("unused") public static class GetPropertiesAdvice { - /** Update the message properties to propagate context recorded by {@link MapSetter}. */ + /** Update the message properties to propagate context recorded by {@link MessageMapSetter}. */ @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( @Advice.This MessageImpl messageImpl, diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java new file mode 100644 index 000000000000..affcf68c2f70 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/ReceiveSpanFinishingCallback.java @@ -0,0 +1,74 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; +import java.util.List; +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.java.impl.consumer.ReceiveMessageResult; +import org.apache.rocketmq.client.java.message.MessageViewImpl; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; + +public final class ReceiveSpanFinishingCallback implements FutureCallback { + + private final ReceiveMessageRequest request; + private final Timer timer; + + public ReceiveSpanFinishingCallback(ReceiveMessageRequest request, Timer timer) { + this.request = request; + this.timer = timer; + } + + @Override + public void onSuccess(ReceiveMessageResult receiveMessageResult) { + List messageViews = receiveMessageResult.getMessageViewImpls(); + // Don't create spans when no messages were received. + if (messageViews.isEmpty()) { + return; + } + String consumerGroup = request.getGroup().getName(); + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setConsumerGroupByMessage(messageView, consumerGroup); + } + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, request)) { + Context context = + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + request, + null, + null, + timer.startTime(), + timer.now()); + for (MessageViewImpl messageView : messageViews) { + VirtualFieldStore.setContextByMessage(messageView, context); + } + } + } + + @Override + public void onFailure(Throwable throwable) { + Instrumenter> receiveInstrumenter = + RocketMqSingletons.consumerReceiveInstrumenter(); + Context parentContext = Context.current(); + if (receiveInstrumenter.shouldStart(parentContext, request)) { + InstrumenterUtil.startAndEnd( + receiveInstrumenter, + parentContext, + request, + null, + throwable, + timer.startTime(), + timer.now()); + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java new file mode 100644 index 000000000000..35c4eb2e0b20 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeExtractor.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; + +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.ArrayList; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerProcessAttributeExtractor + implements AttributesExtractor { + INSTANCE; + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, MessageView messageView) { + messageView.getTag().ifPresent(s -> attributes.put(MESSAGING_ROCKETMQ_MESSAGE_TAG, s)); + attributes.put(MESSAGING_ROCKETMQ_MESSAGE_KEYS, new ArrayList<>(messageView.getKeys())); + attributes.put( + MESSAGING_ROCKETMQ_CLIENT_GROUP, VirtualFieldStore.getConsumerGroupByMessage(messageView)); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + MessageView messageView, + @Nullable ConsumeResult consumeResult, + @Nullable Throwable error) {} +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java new file mode 100644 index 000000000000..e742a9ee6818 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerProcessAttributeGetter.java @@ -0,0 +1,93 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerProcessAttributeGetter + implements MessagingAttributesGetter { + INSTANCE; + + @Nullable + @Override + public String system(MessageView messageView) { + return "rocketmq"; + } + + @Nullable + @Override + public String destinationKind(MessageView messageView) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Nullable + @Override + public String destination(MessageView messageView) { + return messageView.getTopic(); + } + + @Override + public boolean temporaryDestination(MessageView messageView) { + return false; + } + + @Nullable + @Override + public String protocol(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String protocolVersion(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String url(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String conversationId(MessageView messageView) { + return null; + } + + @Nullable + @Override + public Long messagePayloadSize(MessageView messageView) { + return (long) messageView.getBody().remaining(); + } + + @Nullable + @Override + public Long messagePayloadCompressedSize(MessageView messageView) { + return null; + } + + @Nullable + @Override + public String messageId(MessageView messageView, @Nullable ConsumeResult unused) { + return messageView.getMessageId().toString(); + } + + @Override + public List header(MessageView messageView, String name) { + String value = messageView.getProperties().get(name); + if (value != null) { + return Collections.singletonList(value); + } + return Collections.emptyList(); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java new file mode 100644 index 000000000000..379ea4992eb9 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeExtractor.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerReceiveAttributeExtractor + implements AttributesExtractor> { + INSTANCE; + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, ReceiveMessageRequest request) {} + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + ReceiveMessageRequest request, + @Nullable List messageViews, + @Nullable Throwable error) { + String consumerGroup = request.getGroup().getName(); + attributes.put(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java new file mode 100644 index 000000000000..564838fbb151 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqConsumerReceiveAttributeGetter.java @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import apache.rocketmq.v2.ReceiveMessageRequest; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.apis.message.MessageView; + +enum RocketMqConsumerReceiveAttributeGetter + implements MessagingAttributesGetter> { + INSTANCE; + + @Nullable + @Override + public String system(ReceiveMessageRequest request) { + return "rocketmq"; + } + + @Nullable + @Override + public String destinationKind(ReceiveMessageRequest request) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Nullable + @Override + public String destination(ReceiveMessageRequest request) { + return request.getMessageQueue().getTopic().getName(); + } + + @Override + public boolean temporaryDestination(ReceiveMessageRequest request) { + return false; + } + + @Nullable + @Override + public String protocol(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public String protocolVersion(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public String url(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public String conversationId(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public Long messagePayloadSize(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public Long messagePayloadCompressedSize(ReceiveMessageRequest request) { + return null; + } + + @Nullable + @Override + public String messageId(ReceiveMessageRequest request, @Nullable List unused) { + return null; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java index 9c04ced7cad6..af1fa81b0e49 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumentationModule.java @@ -21,6 +21,7 @@ public RocketMqInstrumentationModule() { @Override public List typeInstrumentations() { return asList( - new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation()); + new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(), + new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation()); } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java index 5d6fd7efdb7e..b5149011af8c 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqInstrumenterFactory.java @@ -5,16 +5,22 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor; import java.util.List; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; @@ -25,7 +31,6 @@ private RocketMqInstrumenterFactory() {} public static Instrumenter createProducerInstrumenter( OpenTelemetry openTelemetry, List capturedHeaders) { - RocketMqProducerAttributeGetter getter = RocketMqProducerAttributeGetter.INSTANCE; MessageOperation operation = MessageOperation.SEND; @@ -38,15 +43,62 @@ public static Instrumenter createProduce INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) .addAttributesExtractor(attributesExtractor) - .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE) + .addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE); + return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE); + } + + public static Instrumenter> + createConsumerReceiveInstrumenter( + OpenTelemetry openTelemetry, List capturedHeaders, boolean enabled) { + RocketMqConsumerReceiveAttributeGetter getter = RocketMqConsumerReceiveAttributeGetter.INSTANCE; + MessageOperation operation = MessageOperation.RECEIVE; + + MessagingAttributesExtractor> attributesExtractor = + buildMessagingAttributesExtractor(getter, operation, capturedHeaders); + + InstrumenterBuilder> instrumenterBuilder = + Instrumenter.>builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .setEnabled(enabled) + .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + public static Instrumenter createConsumerProcessInstrumenter( + OpenTelemetry openTelemetry, + List capturedHeaders, + boolean receiveInstrumentationEnabled) { + RocketMqConsumerProcessAttributeGetter getter = RocketMqConsumerProcessAttributeGetter.INSTANCE; + MessageOperation operation = MessageOperation.PROCESS; + + MessagingAttributesExtractor attributesExtractor = + buildMessagingAttributesExtractor(getter, operation, capturedHeaders); + + InstrumenterBuilder instrumenterBuilder = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + MessagingSpanNameExtractor.create(getter, operation)) + .addAttributesExtractor(attributesExtractor) + .addAttributesExtractor(RocketMqConsumerProcessAttributeExtractor.INSTANCE) .setSpanStatusExtractor( - (spanStatusBuilder, message, sendReceipt, error) -> { - if (null != error) { + (spanStatusBuilder, messageView, consumeResult, error) -> { + if (error != null || ConsumeResult.FAILURE.equals(consumeResult)) { spanStatusBuilder.setStatus(StatusCode.ERROR); } }); - return instrumenterBuilder.buildProducerInstrumenter(MapSetter.INSTANCE); + if (receiveInstrumentationEnabled) { + SpanLinksExtractor spanLinksExtractor = + new PropagatorBasedSpanLinksExtractor<>( + openTelemetry.getPropagators().getTextMapPropagator(), MessageMapGetter.INSTANCE); + instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor); + return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + return instrumenterBuilder.buildConsumerInstrumenter(MessageMapGetter.INSTANCE); } private static MessagingAttributesExtractor buildMessagingAttributesExtractor( diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java index 01b6e6c8a7cc..605838c13c2a 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/RocketMqSingletons.java @@ -5,25 +5,52 @@ package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; +import apache.rocketmq.v2.ReceiveMessageRequest; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; +import java.util.List; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; public final class RocketMqSingletons { private static final Instrumenter PRODUCER_INSTRUMENTER; + private static final Instrumenter> + CONSUMER_RECEIVE_INSTRUMENTER; + private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; static { + OpenTelemetry openTelemetry = GlobalOpenTelemetry.get(); + List messagingHeaders = ExperimentalConfig.get().getMessagingHeaders(); + boolean receiveInstrumentationEnabled = + ExperimentalConfig.get().messagingReceiveInstrumentationEnabled(); + PRODUCER_INSTRUMENTER = - RocketMqInstrumenterFactory.createProducerInstrumenter( - GlobalOpenTelemetry.get(), ExperimentalConfig.get().getMessagingHeaders()); + RocketMqInstrumenterFactory.createProducerInstrumenter(openTelemetry, messagingHeaders); + CONSUMER_RECEIVE_INSTRUMENTER = + RocketMqInstrumenterFactory.createConsumerReceiveInstrumenter( + openTelemetry, messagingHeaders, receiveInstrumentationEnabled); + CONSUMER_PROCESS_INSTRUMENTER = + RocketMqInstrumenterFactory.createConsumerProcessInstrumenter( + openTelemetry, messagingHeaders, receiveInstrumentationEnabled); } public static Instrumenter producerInstrumenter() { return PRODUCER_INSTRUMENTER; } + public static Instrumenter> + consumerReceiveInstrumenter() { + return CONSUMER_RECEIVE_INSTRUMENTER; + } + + public static Instrumenter consumerProcessInstrumenter() { + return CONSUMER_PROCESS_INSTRUMENTER; + } + private RocketMqSingletons() {} } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java new file mode 100644 index 000000000000..3ccdb88a25ba --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/SendSpanFinishingCallback.java @@ -0,0 +1,37 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl; +import org.apache.rocketmq.client.java.message.PublishingMessageImpl; +import org.apache.rocketmq.shaded.com.google.common.util.concurrent.FutureCallback; + +public final class SendSpanFinishingCallback implements FutureCallback { + private final Instrumenter instrumenter; + private final Context context; + private final PublishingMessageImpl message; + + public SendSpanFinishingCallback( + Instrumenter instrumenter, + Context context, + PublishingMessageImpl message) { + this.instrumenter = instrumenter; + this.context = context; + this.message = message; + } + + @Override + public void onSuccess(SendReceiptImpl sendReceipt) { + instrumenter.end(context, message, sendReceipt, null); + } + + @Override + public void onFailure(Throwable t) { + instrumenter.end(context, message, null, t); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java new file mode 100644 index 000000000000..d9846b6b0572 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/Timer.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0; + +import java.time.Instant; + +public class Timer { + public static Timer start() { + return new Timer(Instant.now(), System.nanoTime()); + } + + private final Instant startTime; + private final long startNanoTime; + + private Timer(Instant startTime, long startNanoTime) { + this.startTime = startTime; + this.startNanoTime = startNanoTime; + } + + public Instant startTime() { + return startTime; + } + + public Instant now() { + long durationNanos = System.nanoTime() - startNanoTime; + return startTime().plusNanos(durationNanos); + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java index cf84acd3fc7e..9f59aef86dd4 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmqclient/v5_0/VirtualFieldStore.java @@ -8,13 +8,18 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.util.VirtualField; import java.util.Map; +import org.apache.rocketmq.client.apis.message.MessageView; import org.apache.rocketmq.client.java.message.PublishingMessageImpl; public class VirtualFieldStore { private static final VirtualField messageContextField = VirtualField.find(PublishingMessageImpl.class, Context.class); + private static final VirtualField messageViewContextField = + VirtualField.find(MessageView.class, Context.class); private static final VirtualField> messageExtraPropertiesField = VirtualField.find(PublishingMessageImpl.class, Map.class); + private static final VirtualField messageConsumerGroupField = + VirtualField.find(MessageView.class, String.class); private VirtualFieldStore() {} @@ -22,10 +27,18 @@ public static Context getContextByMessage(PublishingMessageImpl message) { return messageContextField.get(message); } + public static Context getContextByMessage(MessageView messageView) { + return messageViewContextField.get(messageView); + } + public static void setContextByMessage(PublishingMessageImpl message, Context context) { messageContextField.set(message, context); } + public static void setContextByMessage(MessageView message, Context context) { + messageViewContextField.set(message, context); + } + public static Map getExtraPropertiesByMessage(PublishingMessageImpl message) { return messageExtraPropertiesField.get(message); } @@ -34,4 +47,12 @@ public static void setExtraPropertiesByMessage( PublishingMessageImpl message, Map extraProperties) { messageExtraPropertiesField.set(message, extraProperties); } + + public static String getConsumerGroupByMessage(MessageView messageView) { + return messageConsumerGroupField.get(messageView); + } + + public static void setConsumerGroupByMessage(MessageView messageView, String consumerGroup) { + messageConsumerGroupField.set(messageView, consumerGroup); + } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..be92fd182a39 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/javaagent/src/test/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class RocketMqClientSuppressReceiveSpanTest + extends AbstractRocketMqClientSuppressReceiveSpanTest { + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java new file mode 100644 index 000000000000..36ae660b1fe7 --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientSuppressReceiveSpanTest.java @@ -0,0 +1,154 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.apis.message.Message; +import org.apache.rocketmq.client.apis.producer.Producer; +import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public abstract class AbstractRocketMqClientSuppressReceiveSpanTest { + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); + + protected abstract InstrumentationExtension testing(); + + @BeforeAll + static void setUp() { + container.start(); + } + + @AfterAll + static void tearDown() { + container.close(); + } + + @Test + void testSendAndConsumeMessage() throws Throwable { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-0"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-0"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) + .build()) { + try (Producer producer = + provider + .newProducerBuilder() + .setClientConfiguration(clientConfiguration) + .setTopics(topic) + .build()) { + + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); + + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasKind(SpanKind.PRODUCER) + .hasName(topic + " send") + .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TYPE, NORMAL), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic)), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // As the child of send span. + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } + } + } +} diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java index 0999a8e823e0..58f7e4354024 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/AbstractRocketMqClientTest.java @@ -10,87 +10,110 @@ import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_DESTINATION_KIND; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_CLIENT_GROUP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_KEYS; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TYPE; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MessagingRocketmqMessageTypeValues.NORMAL; -import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.util.ThrowingSupplier; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.apis.ClientConfiguration; -import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.testcontainers.containers.FixedHostPortGenericContainer; -import org.testcontainers.containers.GenericContainer; public abstract class AbstractRocketMqClientTest { + private static final RocketMqProxyContainer container = new RocketMqProxyContainer(); + protected abstract InstrumentationExtension testing(); - // TODO(aaron-ai): replace it by the official image. - private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + @BeforeAll + static void setUp() { + container.start(); + } + + @AfterAll + static void tearDown() { + container.close(); + } - // We still need this container type to do fixed-port-mapping. - @SuppressWarnings({"deprecation", "rawtypes", "resource"}) @Test - public void testSendMessage() throws ClientException { - int proxyPort = PortUtils.findOpenPorts(4); - int brokerPort = proxyPort + 1; - int brokerHaPort = proxyPort + 2; - int namesrvPort = proxyPort + 3; - try (GenericContainer container = - new FixedHostPortGenericContainer(IMAGE_NAME) - .withFixedExposedPort(proxyPort, proxyPort) - .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) - .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) - .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) - .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) - .withExposedPorts(proxyPort)) { - // Start the container. - container.start(); - String endpoints = "127.0.0.1:" + proxyPort; - ClientConfiguration clientConfiguration = - ClientConfiguration.newBuilder().setEndpoints(endpoints).build(); - // Inner topic of the container. - String topic = "normal-topic-0"; - ClientServiceProvider provider = ClientServiceProvider.loadService(); - Producer producer = + void testSendAndConsumeMessage() throws Throwable { + ClientConfiguration clientConfiguration = + ClientConfiguration.newBuilder().setEndpoints(container.endpoints).build(); + // Inner topic of the container. + String topic = "normal-topic-0"; + ClientServiceProvider provider = ClientServiceProvider.loadService(); + String consumerGroup = "group-normal-topic-0"; + String tag = "tagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + try (PushConsumer ignored = + provider + .newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener( + messageView -> { + testing().runWithSpan("child", () -> {}); + return ConsumeResult.SUCCESS; + }) + .build()) { + try (Producer producer = provider .newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(topic) - .build(); + .build()) { - String tag = "tagA"; - String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; - byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); - Message message = - provider - .newMessageBuilder() - .setTopic(topic) - .setTag(tag) - .setKeys(keys) - .setBody(body) - .build(); + String[] keys = new String[] {"yourMessageKey-0", "yourMessageKey-1"}; + byte[] body = "foobar".getBytes(StandardCharsets.UTF_8); + Message message = + provider + .newMessageBuilder() + .setTopic(topic) + .setTag(tag) + .setKeys(keys) + .setBody(body) + .build(); - SendReceipt sendReceipt = producer.send(message); - testing() - .waitAndAssertTraces( - traceAssert -> - traceAssert.hasSpansSatisfyingExactly( - spanDataAssert -> - spanDataAssert + SendReceipt sendReceipt = + testing() + .runWithSpan( + "parent", + (ThrowingSupplier) () -> producer.send(message)); + AtomicReference sendSpanData = new AtomicReference<>(); + testing() + .waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasKind(SpanKind.PRODUCER) .hasName(topic + " send") .hasStatus(StatusData.unset()) + .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), @@ -102,7 +125,51 @@ public void testSendMessage() throws ClientException { equalTo( MESSAGING_DESTINATION_KIND, SemanticAttributes.MessagingDestinationKindValues.TOPIC), - equalTo(MESSAGING_DESTINATION, topic)))); + equalTo(MESSAGING_DESTINATION, topic))); + sendSpanData.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " receive") + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "receive")), + span -> + span.hasKind(SpanKind.CONSUMER) + .hasName(topic + " process") + .hasStatus(StatusData.unset()) + // Link to send span. + .hasLinks(LinkData.create(sendSpanData.get().getSpanContext())) + // As the child of receive span. + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(MESSAGING_ROCKETMQ_CLIENT_GROUP, consumerGroup), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_TAG, tag), + equalTo(MESSAGING_ROCKETMQ_MESSAGE_KEYS, Arrays.asList(keys)), + equalTo( + MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length), + equalTo(MESSAGING_SYSTEM, "rocketmq"), + equalTo( + MESSAGING_MESSAGE_ID, + sendReceipt.getMessageId().toString()), + equalTo( + MESSAGING_DESTINATION_KIND, + SemanticAttributes.MessagingDestinationKindValues.TOPIC), + equalTo(MESSAGING_DESTINATION, topic), + equalTo(MESSAGING_OPERATION, "process")), + span -> + span.hasName("child") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } } } } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java new file mode 100644 index 000000000000..957bea744b4e --- /dev/null +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-5.0/testing/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v5_0/RocketMqProxyContainer.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmqclient.v5_0; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.GenericContainer; + +public class RocketMqProxyContainer { + // TODO(aaron-ai): replace it by the official image. + private static final String IMAGE_NAME = "aaronai/rocketmq-proxy-it:v1.0.0"; + + private final GenericContainer container; + final String endpoints; + + // We still need this container type to do fixed-port-mapping. + @SuppressWarnings({"resource", "deprecation", "rawtypes"}) + RocketMqProxyContainer() { + int proxyPort = PortUtils.findOpenPorts(4); + int brokerPort = proxyPort + 1; + int brokerHaPort = proxyPort + 2; + int namesrvPort = proxyPort + 3; + endpoints = "127.0.0.1:" + proxyPort; + container = + new FixedHostPortGenericContainer(IMAGE_NAME) + .withFixedExposedPort(proxyPort, proxyPort) + .withEnv("rocketmq.broker.port", String.valueOf(brokerPort)) + .withEnv("rocketmq.proxy.port", String.valueOf(proxyPort)) + .withEnv("rocketmq.broker.ha.port", String.valueOf(brokerHaPort)) + .withEnv("rocketmq.namesrv.port", String.valueOf(namesrvPort)) + .withExposedPorts(proxyPort); + } + + void start() { + container.start(); + } + + void close() { + container.close(); + } +}