Skip to content

Commit

Permalink
Fix issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Nov 14, 2022
1 parent 59ab0f3 commit 75ae828
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ dependencies {
tasks {
val testReceiveSpanDisabled by registering(Test::class) {
filter {
includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessageWithReceiveSpanSuppressed")
includeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
include("**/RocketMqClientTest.*")
include("**/RocketMqClientSuppressReceiveSpanTest.*")
}

test {
filter {
includeTestsMatching("RocketMqClientTest.testSendAndConsumeMessage")
excludeTestsMatching("RocketMqClientSuppressReceiveSpanTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.apis.consumer.MessageListener;

final class ConsumeServiceInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
// Instrument ConsumerService instead of MessageListener because lambda could not be enhanced.
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumeService");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
isPublic()
.and(takesArguments(5))
.and(
takesArgument(
1, named("org.apache.rocketmq.client.apis.consumer.MessageListener")))),
ConsumeServiceInstrumentation.class.getName() + "$ConstructorAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 1, readOnly = false) MessageListener messageListener) {
// Replace messageListener by wrapper.
if (!(messageListener instanceof MessageListenerWrapper)) {
messageListener = new MessageListenerWrapper(messageListener);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;

final class RocketMqConsumerInstrumentation implements TypeInstrumentation {
final class ConsumerImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl");
Expand All @@ -36,7 +36,7 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(0, named("apache.rocketmq.v2.ReceiveMessageRequest")))
.and(takesArgument(1, named("org.apache.rocketmq.client.java.route.MessageQueueImpl")))
.and(takesArgument(2, named("java.time.Duration"))),
RocketMqConsumerInstrumentation.class.getName() + "$ReceiveMessageAdvice");
ConsumerImplInstrumentation.class.getName() + "$ReceiveMessageAdvice");
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;

public final class MessageListenerWrapper implements MessageListener {
private final MessageListener delegator;

public MessageListenerWrapper(MessageListener delegator) {
this.delegator = delegator;
}

@Override
public ConsumeResult consume(MessageView messageView) {
Context parentContext = VirtualFieldStore.getContextByMessage(messageView);
if (parentContext == null) {
parentContext = Context.current();
}
Instrumenter<MessageView, ConsumeResult> processInstrumenter =
RocketMqSingletons.consumerProcessInstrumenter();
if (!processInstrumenter.shouldStart(parentContext, messageView)) {
return delegator.consume(messageView);
}
Context context = processInstrumenter.start(parentContext, messageView);
try (Scope ignored = context.makeCurrent()) {
ConsumeResult consumeResult = delegator.consume(messageView);
processInstrumenter.end(context, messageView, consumeResult, null);
return consumeResult;
} catch (Throwable t) {
processInstrumenter.end(context, messageView, null, t);
throw t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.MoreExecutors;
import org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture;

final class RocketMqProducerInstrumentation implements TypeInstrumentation {
final class ProducerImplInstrumentation implements TypeInstrumentation {

/** Match the implementation of RocketMQ producer. */
@Override
Expand All @@ -51,7 +51,7 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(3, List.class))
.and(takesArgument(4, List.class))
.and(takesArgument(5, int.class)),
RocketMqProducerInstrumentation.class.getName() + "$SendAdvice");
ProducerImplInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.rocketmq.client.java.message.MessageImpl;
import org.apache.rocketmq.client.java.message.PublishingMessageImpl;

final class RocketMqPublishingMessageImplInstrumentation implements TypeInstrumentation {
final class PublishingMessageImplInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
Expand All @@ -44,10 +44,10 @@ public void transform(TypeTransformer transformer) {
takesArgument(
1, named("org.apache.rocketmq.client.java.impl.producer.PublishingSettings")))
.and(takesArgument(2, boolean.class)),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$ConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("getProperties")).and(isPublic()),
RocketMqPublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
PublishingMessageImplInstrumentation.class.getName() + "$GetPropertiesAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -56,7 +56,7 @@ public static class ConstructorAdvice {
* The constructor of {@link PublishingMessageImpl} is always called in the same thread that
* user invoke {@link Producer#send(Message)}/{@link Producer#sendAsync(Message)}/{@link
* Producer#send(Message, Transaction)}. Store the {@link Context} here and fetch it in {@link
* RocketMqProducerInstrumentation}.
* ProducerImplInstrumentation}.
*/
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.This PublishingMessageImpl message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public RocketMqInstrumentationModule() {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqPublishingMessageImplInstrumentation(), new RocketMqProducerInstrumentation(),
new RocketMqConsumerInstrumentation(), new RocketMqMessageListenerInstrumentation());
new PublishingMessageImplInstrumentation(), new ProducerImplInstrumentation(),
new ConsumerImplInstrumentation(), new ConsumeServiceInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,7 @@ public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProduce
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, message, sendReceipt, error) -> {
if (error != null) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});
.addAttributesExtractor(RocketMqProducerAttributeExtractor.INSTANCE);
return instrumenterBuilder.buildProducerInstrumenter(MessageMapSetter.INSTANCE);
}

Expand All @@ -69,13 +63,7 @@ public static Instrumenter<PublishingMessageImpl, SendReceiptImpl> createProduce
MessagingSpanNameExtractor.create(getter, operation))
.setEnabled(enabled)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE)
.setSpanStatusExtractor(
(spanStatusBuilder, messageView, unused, error) -> {
if (error != null) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
}
});
.addAttributesExtractor(RocketMqConsumerReceiveAttributeExtractor.INSTANCE);
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.rocketmqclient.v5_0;

import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;

public class RocketMqClientSuppressReceiveSpanTest
extends AbstractRocketMqClientSuppressReceiveSpanTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();

@Override
protected InstrumentationExtension testing() {
return testing;
}
}
Loading

0 comments on commit 75ae828

Please sign in to comment.