Skip to content

Commit

Permalink
Don't use rocketmq trace context
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Oct 21, 2022
1 parent 47cbdf3 commit 663f4a0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
package io.opentelemetry.instrumentation.rocketmqclient.v4_8;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;

final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {

private static final VirtualField<ConsumeMessageContext, ContextAndScope> contextAndScopeField =
VirtualField.find(ConsumeMessageContext.class, ContextAndScope.class);

private final RocketMqConsumerInstrumenter instrumenter;

TracingConsumeMessageHookImpl(RocketMqConsumerInstrumenter instrumenter) {
Expand All @@ -30,12 +34,9 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
Context parentContext = Context.current();
Context newContext = instrumenter.start(parentContext, context.getMsgList());

// it's safe to store the scope in the rocketMq trace context, both before() and after() methods
// are always called from the same thread; see:
// - ConsumeMessageConcurrentlyService$ConsumeRequest#run()
// - ConsumeMessageOrderlyService$ConsumeRequest#run()
if (newContext != parentContext) {
context.setMqTraceContext(ContextAndScope.create(newContext, newContext.makeCurrent()));
contextAndScopeField.set(
context, ContextAndScope.create(newContext, newContext.makeCurrent()));
}
}

Expand All @@ -44,8 +45,8 @@ public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
ContextAndScope contextAndScope = contextAndScopeField.get(context);
if (contextAndScope != null) {
contextAndScope.close();
instrumenter.end(contextAndScope.getContext(), context.getMsgList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;

final class TracingSendMessageHookImpl implements SendMessageHook {

private static final VirtualField<SendMessageContext, Context> contextField =
VirtualField.find(SendMessageContext.class, Context.class);

private final Instrumenter<SendMessageContext, Void> instrumenter;

TracingSendMessageHookImpl(Instrumenter<SendMessageContext, Void> instrumenter) {
Expand All @@ -32,17 +36,17 @@ public void sendMessageBefore(SendMessageContext context) {
if (!instrumenter.shouldStart(parentContext, context)) {
return;
}
context.setMqTraceContext(instrumenter.start(parentContext, context));
contextField.set(context, instrumenter.start(parentContext, context));
}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null) {
return;
}
if (context.getMqTraceContext() instanceof Context
Context otelContext = contextField.get(context);
if (otelContext != null
&& (context.getSendResult() != null || context.getException() != null)) {
Context otelContext = (Context) context.getMqTraceContext();
instrumenter.end(otelContext, context, null, context.getException());
}
}
Expand Down

0 comments on commit 663f4a0

Please sign in to comment.