diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingConsumeMessageHookImpl.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingConsumeMessageHookImpl.java index 94d03c44c764..a1e307019bc5 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingConsumeMessageHookImpl.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingConsumeMessageHookImpl.java @@ -6,11 +6,15 @@ package io.opentelemetry.instrumentation.rocketmqclient.v4_8; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.util.VirtualField; import org.apache.rocketmq.client.hook.ConsumeMessageContext; import org.apache.rocketmq.client.hook.ConsumeMessageHook; final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { + private static final VirtualField contextAndScopeField = + VirtualField.find(ConsumeMessageContext.class, ContextAndScope.class); + private final RocketMqConsumerInstrumenter instrumenter; TracingConsumeMessageHookImpl(RocketMqConsumerInstrumenter instrumenter) { @@ -30,12 +34,9 @@ public void consumeMessageBefore(ConsumeMessageContext context) { Context parentContext = Context.current(); Context newContext = instrumenter.start(parentContext, context.getMsgList()); - // it's safe to store the scope in the rocketMq trace context, both before() and after() methods - // are always called from the same thread; see: - // - ConsumeMessageConcurrentlyService$ConsumeRequest#run() - // - ConsumeMessageOrderlyService$ConsumeRequest#run() if (newContext != parentContext) { - context.setMqTraceContext(ContextAndScope.create(newContext, newContext.makeCurrent())); + contextAndScopeField.set( + context, ContextAndScope.create(newContext, newContext.makeCurrent())); } } @@ -44,8 +45,8 @@ public void consumeMessageAfter(ConsumeMessageContext context) { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - if (context.getMqTraceContext() instanceof ContextAndScope) { - ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext(); + ContextAndScope contextAndScope = contextAndScopeField.get(context); + if (contextAndScope != null) { contextAndScope.close(); instrumenter.end(contextAndScope.getContext(), context.getMsgList()); } diff --git a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingSendMessageHookImpl.java b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingSendMessageHookImpl.java index 494f3104f533..3dafc3e0e3c6 100644 --- a/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingSendMessageHookImpl.java +++ b/instrumentation/rocketmq/rocketmq-client/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmqclient/v4_8/TracingSendMessageHookImpl.java @@ -7,11 +7,15 @@ import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.util.VirtualField; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; final class TracingSendMessageHookImpl implements SendMessageHook { + private static final VirtualField contextField = + VirtualField.find(SendMessageContext.class, Context.class); + private final Instrumenter instrumenter; TracingSendMessageHookImpl(Instrumenter instrumenter) { @@ -32,7 +36,7 @@ public void sendMessageBefore(SendMessageContext context) { if (!instrumenter.shouldStart(parentContext, context)) { return; } - context.setMqTraceContext(instrumenter.start(parentContext, context)); + contextField.set(context, instrumenter.start(parentContext, context)); } @Override @@ -40,9 +44,9 @@ public void sendMessageAfter(SendMessageContext context) { if (context == null) { return; } - if (context.getMqTraceContext() instanceof Context + Context otelContext = contextField.get(context); + if (otelContext != null && (context.getSendResult() != null || context.getException() != null)) { - Context otelContext = (Context) context.getMqTraceContext(); instrumenter.end(otelContext, context, null, context.getException()); } }