From 75d62af8d578109db5ce7eed2a2764fa7b411f04 Mon Sep 17 00:00:00 2001 From: Trask Stalnaker Date: Fri, 19 Nov 2021 10:22:42 -0800 Subject: [PATCH] Fix spring-integration context leak (#4673) --- .../TracingChannelInterceptor.java | 34 ++++++++++--- ...bstractSpringIntegrationTracingTest.groovy | 48 +++++++++++++++++++ 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java index 12b816cbaaf4..9a98844ce67e 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java @@ -37,6 +37,23 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor { @Override public Message preSend(Message message, MessageChannel messageChannel) { + + Map localMap = LOCAL_CONTEXT_AND_SCOPE.get(); + if (localMap.get(messageChannel) != null) { + // GlobalChannelInterceptorProcessor.afterSingletonsInstantiated() adds the global + // interceptors for every bean name / channel pair, which means it's possible that this + // interceptor is added twice to the same channel if the channel is registered twice under + // different bean names + // + // there's an option for this class to implement VetoCapableInterceptor and prevent itself + // from being registered if it's already registered, but the VetoCapableInterceptor interface + // broke backwards compatibility in 5.2.0, and the version prior to 5.2.0 takes a parameter + // of type ChannelInterceptorAware which doesn't exist after 5.2.0, and while it's possible to + // implement both at the same time (since we compile using 4.1.0), muzzle doesn't like the + // missing class type when running testLatestDeps + return message; + } + Context parentContext = Context.current(); MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel); @@ -55,16 +72,12 @@ public Message preSend(Message message, MessageChannel messageChannel) { // instrumentation should not create another one if (shouldStart(parentContext, messageWithChannel)) { context = instrumenter.start(parentContext, messageWithChannel); - LOCAL_CONTEXT_AND_SCOPE - .get() - .put(messageChannel, ContextAndScope.create(context, context.makeCurrent())); + localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent())); } else { // in case there already was another span in the context: back off and just inject the current // context into the message context = parentContext; - LOCAL_CONTEXT_AND_SCOPE - .get() - .put(messageChannel, ContextAndScope.create(null, context.makeCurrent())); + localMap.put(messageChannel, ContextAndScope.create(null, context.makeCurrent())); } propagators @@ -113,6 +126,13 @@ public void afterReceiveCompletion( @Override public Message beforeHandle( Message message, MessageChannel channel, MessageHandler handler) { + + Map localMap = LOCAL_CONTEXT_AND_SCOPE.get(); + if (localMap.get(channel) != null) { + // see comment explaining the same conditional in preSend() + return message; + } + MessageWithChannel messageWithChannel = MessageWithChannel.create(message, channel); Context context = propagators @@ -120,7 +140,7 @@ public Message beforeHandle( .extract(Context.current(), messageWithChannel, MessageHeadersGetter.INSTANCE); // beforeHandle()/afterMessageHandles() always execute in a different thread than send(), so // there's no real risk of overwriting the send() context - LOCAL_CONTEXT_AND_SCOPE.get().put(channel, ContextAndScope.create(null, context.makeCurrent())); + localMap.put(channel, ContextAndScope.create(null, context.makeCurrent())); return message; } diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy index 4730ccbf10b2..b24a972b7066 100644 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy @@ -89,6 +89,41 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci "executorChannel" | "executorChannel process" } + def "should not add interceptor twice"() { + given: + def channel = applicationContext.getBean("directChannel1", SubscribableChannel) + + def messageHandler = new CapturingMessageHandler() + channel.subscribe(messageHandler) + + when: + channel.send(MessageBuilder.withPayload("test") + .build()) + + then: + def capturedMessage = messageHandler.join() + + assertTraces(1) { + trace(0, 2) { + span(0) { + // the channel name is overwritten by the last bean registration + name "application.directChannel2 process" + kind CONSUMER + } + span(1) { + name "handler" + childOf span(0) + } + + def interceptorSpan = span(0) + verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan) + } + } + + cleanup: + channel.unsubscribe(messageHandler) + } + def "should not create a span when there is already a span in the context"() { given: def channel = applicationContext.getBean("directChannel", SubscribableChannel) @@ -165,11 +200,24 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci @SpringBootConfiguration @EnableAutoConfiguration static class MessageChannelsConfig { + + SubscribableChannel problematicSharedChannel = new DirectChannel() + @Bean SubscribableChannel directChannel() { new DirectChannel() } + @Bean + SubscribableChannel directChannel1() { + problematicSharedChannel + } + + @Bean + SubscribableChannel directChannel2() { + problematicSharedChannel + } + @Bean SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) { def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor())