Skip to content

Commit

Permalink
Fix spring-integration context leak (open-telemetry#4673)
Browse files Browse the repository at this point in the history
  • Loading branch information
trask authored and RashmiRam committed May 23, 2022
1 parent cb52ddc commit 75d62af
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor {

@Override
public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {

Map<MessageChannel, ContextAndScope> localMap = LOCAL_CONTEXT_AND_SCOPE.get();
if (localMap.get(messageChannel) != null) {
// GlobalChannelInterceptorProcessor.afterSingletonsInstantiated() adds the global
// interceptors for every bean name / channel pair, which means it's possible that this
// interceptor is added twice to the same channel if the channel is registered twice under
// different bean names
//
// there's an option for this class to implement VetoCapableInterceptor and prevent itself
// from being registered if it's already registered, but the VetoCapableInterceptor interface
// broke backwards compatibility in 5.2.0, and the version prior to 5.2.0 takes a parameter
// of type ChannelInterceptorAware which doesn't exist after 5.2.0, and while it's possible to
// implement both at the same time (since we compile using 4.1.0), muzzle doesn't like the
// missing class type when running testLatestDeps
return message;
}

Context parentContext = Context.current();
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);

Expand All @@ -55,16 +72,12 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
// instrumentation should not create another one
if (shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
LOCAL_CONTEXT_AND_SCOPE
.get()
.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else {
// in case there already was another span in the context: back off and just inject the current
// context into the message
context = parentContext;
LOCAL_CONTEXT_AND_SCOPE
.get()
.put(messageChannel, ContextAndScope.create(null, context.makeCurrent()));
localMap.put(messageChannel, ContextAndScope.create(null, context.makeCurrent()));
}

propagators
Expand Down Expand Up @@ -113,14 +126,21 @@ public void afterReceiveCompletion(
@Override
public Message<?> beforeHandle(
Message<?> message, MessageChannel channel, MessageHandler handler) {

Map<MessageChannel, ContextAndScope> localMap = LOCAL_CONTEXT_AND_SCOPE.get();
if (localMap.get(channel) != null) {
// see comment explaining the same conditional in preSend()
return message;
}

MessageWithChannel messageWithChannel = MessageWithChannel.create(message, channel);
Context context =
propagators
.getTextMapPropagator()
.extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE);
// beforeHandle()/afterMessageHandles() always execute in a different thread than send(), so
// there's no real risk of overwriting the send() context
LOCAL_CONTEXT_AND_SCOPE.get().put(channel, ContextAndScope.create(null, context.makeCurrent()));
localMap.put(channel, ContextAndScope.create(null, context.makeCurrent()));
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,41 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
"executorChannel" | "executorChannel process"
}
def "should not add interceptor twice"() {
given:
def channel = applicationContext.getBean("directChannel1", SubscribableChannel)
def messageHandler = new CapturingMessageHandler()
channel.subscribe(messageHandler)
when:
channel.send(MessageBuilder.withPayload("test")
.build())
then:
def capturedMessage = messageHandler.join()
assertTraces(1) {
trace(0, 2) {
span(0) {
// the channel name is overwritten by the last bean registration
name "application.directChannel2 process"
kind CONSUMER
}
span(1) {
name "handler"
childOf span(0)
}
def interceptorSpan = span(0)
verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan)
}
}
cleanup:
channel.unsubscribe(messageHandler)
}
def "should not create a span when there is already a span in the context"() {
given:
def channel = applicationContext.getBean("directChannel", SubscribableChannel)
Expand Down Expand Up @@ -165,11 +200,24 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci
@SpringBootConfiguration
@EnableAutoConfiguration
static class MessageChannelsConfig {
SubscribableChannel problematicSharedChannel = new DirectChannel()
@Bean
SubscribableChannel directChannel() {
new DirectChannel()
}
@Bean
SubscribableChannel directChannel1() {
problematicSharedChannel
}
@Bean
SubscribableChannel directChannel2() {
problematicSharedChannel
}
@Bean
SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) {
def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor())
Expand Down

0 comments on commit 75d62af

Please sign in to comment.