diff --git a/instrumentation/spring/README.md b/instrumentation/spring/README.md index b6dc14e01468..0f55f43a2d2b 100644 --- a/instrumentation/spring/README.md +++ b/instrumentation/spring/README.md @@ -17,8 +17,9 @@ In this guide we will be using a running example. In section one and two, we wil | System property | Type | Default | Description | |---|---|---|---| | `otel.instrumentation.spring-integration.global-channel-interceptor-patterns` | List | `*` | An array of Spring channel name patterns that will be intercepted. See [Spring Integration docs](https://docs.spring.io/spring-integration/reference/html/channel.html#global-channel-configuration-interceptors) for more details. | +| `otel.instrumentation.spring-integration.producer.enabled` | Boolean | `false` | Create producer spans when messages are sent to an output channel. Enable when you're using a messaging library that doesn't have its own instrumentation for generating producer spans. Note that the detection of output channels only works for [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) `DirectWithAttributesChannel`. | | `otel.instrumentation.spring-webflux.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring WebFlux version 5.0. | -| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Sprinv Web MVC 3.1. | +| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring Web MVC 3.1. | # Manual Instrumentation Guide diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/build.gradle.kts b/instrumentation/spring/spring-integration-4.1/javaagent/build.gradle.kts index 43575a9f2a66..0f781044deb1 100644 --- a/instrumentation/spring/spring-integration-4.1/javaagent/build.gradle.kts +++ b/instrumentation/spring/spring-integration-4.1/javaagent/build.gradle.kts @@ -45,11 +45,24 @@ tasks { jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=true") } + val testWithProducerInstrumentation by registering(Test::class) { + filter { + includeTestsMatching("SpringCloudStreamProducerTest") + isFailOnNoMatchingTests = false + } + include("**/SpringCloudStreamProducerTest.*") + jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false") + jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=false") + jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true") + } + test { dependsOn(testWithRabbitInstrumentation) + dependsOn(testWithProducerInstrumentation) filter { excludeTestsMatching("SpringIntegrationAndRabbitTest") + excludeTestsMatching("SpringCloudStreamProducerTest") isFailOnNoMatchingTests = false } jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false") diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy new file mode 100644 index 000000000000..07bf969f66a6 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentTestTrait + +class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements AgentTestTrait { + @Override + Class additionalContextClass() { + null + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/build.gradle.kts b/instrumentation/spring/spring-integration-4.1/library/build.gradle.kts index 750aec6d1920..70f4681a28cd 100644 --- a/instrumentation/spring/spring-integration-4.1/library/build.gradle.kts +++ b/instrumentation/spring/spring-integration-4.1/library/build.gradle.kts @@ -17,7 +17,25 @@ dependencies { } tasks { + val testWithProducerInstrumentation by registering(Test::class) { + filter { + includeTestsMatching("SpringCloudStreamProducerTest") + isFailOnNoMatchingTests = false + } + include("**/SpringCloudStreamProducerTest.*") + jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true") + } + test { + dependsOn(testWithProducerInstrumentation) + + filter { + excludeTestsMatching("SpringCloudStreamProducerTest") + isFailOnNoMatchingTests = false + } + } + + withType().configureEach { systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService()) } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageChannelSpanNameExtractor.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageChannelSpanNameExtractor.java deleted file mode 100644 index 8a87d9f65386..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageChannelSpanNameExtractor.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.spring.integration; - -import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; -import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.messaging.MessageChannel; - -final class MessageChannelSpanNameExtractor implements SpanNameExtractor { - @Override - public String extract(MessageWithChannel messageWithChannel) { - final String channelName; - MessageChannel channel = messageWithChannel.getMessageChannel(); - if (channel instanceof AbstractMessageChannel) { - channelName = ((AbstractMessageChannel) channel).getFullChannelName(); - } else if (channel instanceof org.springframework.messaging.support.AbstractMessageChannel) { - channelName = - ((org.springframework.messaging.support.AbstractMessageChannel) channel).getBeanName(); - } else { - channelName = channel.getClass().getSimpleName(); - } - return channelName + " process"; - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageWithChannel.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageWithChannel.java index ee959799e35e..b94b8dc4523a 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageWithChannel.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/MessageWithChannel.java @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.spring.integration; import com.google.auto.value.AutoValue; +import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -19,4 +20,18 @@ public abstract class MessageWithChannel { static MessageWithChannel create(Message message, MessageChannel messageChannel) { return new AutoValue_MessageWithChannel(message, messageChannel); } + + public String getChannelName() { + final String channelName; + MessageChannel channel = getMessageChannel(); + if (channel instanceof AbstractMessageChannel) { + channelName = ((AbstractMessageChannel) channel).getFullChannelName(); + } else if (channel instanceof org.springframework.messaging.support.AbstractMessageChannel) { + channelName = + ((org.springframework.messaging.support.AbstractMessageChannel) channel).getBeanName(); + } else { + channelName = channel.getClass().getSimpleName(); + } + return channelName; + } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracing.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracing.java index a907362d170e..c43abb44f61e 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracing.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracing.java @@ -31,12 +31,16 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr } private final ContextPropagators propagators; - private final Instrumenter instrumenter; + private final Instrumenter consumerInstrumenter; + private final Instrumenter producerInstrumenter; SpringIntegrationTracing( - ContextPropagators propagators, Instrumenter instrumenter) { + ContextPropagators propagators, + Instrumenter consumerInstrumenter, + Instrumenter producerInstrumenter) { this.propagators = propagators; - this.instrumenter = instrumenter; + this.consumerInstrumenter = consumerInstrumenter; + this.producerInstrumenter = producerInstrumenter; } /** @@ -49,6 +53,6 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr * @see org.springframework.integration.config.GlobalChannelInterceptor */ public ChannelInterceptor newChannelInterceptor() { - return new TracingChannelInterceptor(propagators, instrumenter); + return new TracingChannelInterceptor(propagators, consumerInstrumenter, producerInstrumenter); } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracingBuilder.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracingBuilder.java index 87b1b3c9e985..3f273c5ccf49 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracingBuilder.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTracingBuilder.java @@ -8,6 +8,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import java.util.ArrayList; import java.util.List; @@ -33,17 +34,37 @@ public SpringIntegrationTracingBuilder addAttributesExtractor( return this; } + private static String consumerSpanName(MessageWithChannel messageWithChannel) { + return messageWithChannel.getChannelName() + " process"; + } + + private static String producerSpanName(MessageWithChannel messageWithChannel) { + return messageWithChannel.getChannelName() + " send"; + } + /** * Returns a new {@link SpringIntegrationTracing} with the settings of this {@link * SpringIntegrationTracingBuilder}. */ public SpringIntegrationTracing build() { - Instrumenter instrumenter = + Instrumenter consumerInstrumenter = Instrumenter.builder( - openTelemetry, INSTRUMENTATION_NAME, new MessageChannelSpanNameExtractor()) + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTracingBuilder::consumerSpanName) .addAttributesExtractors(additionalAttributeExtractors) - .addAttributesExtractor(new SpringMessagingAttributesExtractor()) + .addAttributesExtractor(SpringMessagingAttributesExtractor.process()) .newConsumerInstrumenter(MessageHeadersGetter.INSTANCE); - return new SpringIntegrationTracing(openTelemetry.getPropagators(), instrumenter); + + Instrumenter producerInstrumenter = + Instrumenter.builder( + openTelemetry, + INSTRUMENTATION_NAME, + SpringIntegrationTracingBuilder::producerSpanName) + .addAttributesExtractors(additionalAttributeExtractors) + .addAttributesExtractor(SpringMessagingAttributesExtractor.send()) + .newInstrumenter(SpanKindExtractor.alwaysProducer()); + return new SpringIntegrationTracing( + openTelemetry.getPropagators(), consumerInstrumenter, producerInstrumenter); } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesExtractor.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesExtractor.java index 2e0dbfe9a23b..3c3a0b9e77ea 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesExtractor.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesExtractor.java @@ -13,9 +13,23 @@ final class SpringMessagingAttributesExtractor extends MessagingAttributesExtractor { + private final MessageOperation messageOperation; + + private SpringMessagingAttributesExtractor(MessageOperation messageOperation) { + this.messageOperation = messageOperation; + } + + static SpringMessagingAttributesExtractor process() { + return new SpringMessagingAttributesExtractor(MessageOperation.PROCESS); + } + + static SpringMessagingAttributesExtractor send() { + return new SpringMessagingAttributesExtractor(MessageOperation.SEND); + } + @Override public MessageOperation operation() { - return MessageOperation.PROCESS; + return messageOperation; } @Override diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java index 9a98844ce67e..c08f200f9986 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java @@ -8,10 +8,16 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import org.springframework.aop.framework.Advised; +import org.springframework.aop.support.AopUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -23,16 +29,23 @@ final class TracingChannelInterceptor implements ExecutorChannelInterceptor { + private static final boolean PRODUCER_SPAN_ENABLED = + Config.get().getBoolean("otel.instrumentation.spring-integration.producer.enabled", false); + private static final ThreadLocal> LOCAL_CONTEXT_AND_SCOPE = ThreadLocal.withInitial(IdentityHashMap::new); private final ContextPropagators propagators; - private final Instrumenter instrumenter; + private final Instrumenter consumerInstrumenter; + private final Instrumenter producerInstrumenter; TracingChannelInterceptor( - ContextPropagators propagators, Instrumenter instrumenter) { + ContextPropagators propagators, + Instrumenter consumerInstrumenter, + Instrumenter producerInstrumenter) { this.propagators = propagators; - this.instrumenter = instrumenter; + this.consumerInstrumenter = consumerInstrumenter; + this.producerInstrumenter = producerInstrumenter; } @Override @@ -54,6 +67,8 @@ public Message preSend(Message message, MessageChannel messageChannel) { return message; } + boolean createProducerSpan = createProducerSpan(messageChannel); + Context parentContext = Context.current(); MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel); @@ -70,8 +85,12 @@ public Message preSend(Message message, MessageChannel messageChannel) { // that puts something into a messaging queue/system // 2. another messaging instrumentation has already created a CONSUMER span, in which case this // instrumentation should not create another one - if (shouldStart(parentContext, messageWithChannel)) { - context = instrumenter.start(parentContext, messageWithChannel); + if (!createProducerSpan && shouldStartConsumer(parentContext, messageWithChannel)) { + context = consumerInstrumenter.start(parentContext, messageWithChannel); + localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent())); + } else if (createProducerSpan + && producerInstrumenter.shouldStart(parentContext, messageWithChannel)) { + context = producerInstrumenter.start(parentContext, messageWithChannel); localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent())); } else { // in case there already was another span in the context: back off and just inject the current @@ -86,8 +105,9 @@ public Message preSend(Message message, MessageChannel messageChannel) { return createMessageWithHeaders(message, messageHeaderAccessor); } - private boolean shouldStart(Context parentContext, MessageWithChannel messageWithChannel) { - return instrumenter.shouldStart(parentContext, messageWithChannel) + private boolean shouldStartConsumer( + Context parentContext, MessageWithChannel messageWithChannel) { + return consumerInstrumenter.shouldStart(parentContext, messageWithChannel) && Span.fromContextOrNull(parentContext) == null; } @@ -104,6 +124,9 @@ public void afterSendCompletion( if (context != null) { MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel); + boolean createProducerSpan = createProducerSpan(messageChannel); + Instrumenter instrumenter = + createProducerSpan ? producerInstrumenter : consumerInstrumenter; instrumenter.end(context, messageWithChannel, null, e); } } @@ -175,4 +198,70 @@ private static Message createMessageWithHeaders( .copyHeaders(messageHeaderAccessor.toMessageHeaders()) .build(); } + + private static final Class directWithAttributesChannelClass = + getDirectWithAttributesChannelClass(); + private static final MethodHandle channelGetAttributeMh = + getChannelAttributeMh(directWithAttributesChannelClass); + + private static Class getDirectWithAttributesChannelClass() { + try { + return Class.forName( + "org.springframework.cloud.stream.messaging.DirectWithAttributesChannel"); + } catch (ClassNotFoundException ignore) { + return null; + } + } + + private static MethodHandle getChannelAttributeMh(Class directWithAttributesChannelClass) { + if (directWithAttributesChannelClass == null) { + return null; + } + + try { + return MethodHandles.lookup() + .findVirtual( + directWithAttributesChannelClass, + "getAttribute", + MethodType.methodType(Object.class, String.class)); + } catch (NoSuchMethodException | IllegalAccessException exception) { + return null; + } + } + + private static boolean createProducerSpan(MessageChannel messageChannel) { + if (!PRODUCER_SPAN_ENABLED) { + return false; + } + + messageChannel = unwrapProxy(messageChannel); + if (!directWithAttributesChannelClass.isInstance(messageChannel)) { + // we can only tell if it is an output channel for instances of DirectWithAttributesChannel + // that are used by spring cloud stream + return false; + } + + try { + return "output".equals(channelGetAttributeMh.invoke(messageChannel, "type")); + } catch (Throwable throwable) { + return false; + } + } + + // unwrap spring aop proxy + // based on org.springframework.test.util.AopTestUtils#getTargetObject + public static T unwrapProxy(T candidate) { + try { + if (AopUtils.isAopProxy(candidate) && candidate instanceof Advised) { + Object target = ((Advised) candidate).getTargetSource().getTarget(); + if (target != null) { + return (T) target; + } + } + + return candidate; + } catch (Throwable ignore) { + return candidate; + } + } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy new file mode 100644 index 000000000000..90b588087a1b --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.LibraryTestTrait + +class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements LibraryTestTrait { + @Override + Class additionalContextClass() { + GlobalInterceptorSpringConfig + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy new file mode 100644 index 000000000000..bcdb01cda4b0 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static org.junit.jupiter.api.Assumptions.assumeTrue + +import io.opentelemetry.instrumentation.test.InstrumentationSpecification + +abstract class AbstractSpringCloudStreamProducerTest extends InstrumentationSpecification implements WithRabbitProducerConsumerTrait { + private static final boolean HAS_PRODUCER_SPAN = Boolean.getBoolean("otel.instrumentation.spring-integration.producer.enabled") + + abstract Class additionalContextClass() + + def setupSpec() { + startRabbit(additionalContextClass()) + } + + def cleanupSpec() { + stopRabbit() + } + + def "has producer span"() { + assumeTrue(HAS_PRODUCER_SPAN) + + when: + producerContext.getBean("producer", Runnable).run() + + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "producer" + } + span(1) { + name "testProducer.output send" + childOf span(0) + kind PRODUCER + } + span(2) { + name "testConsumer.input process" + childOf span(1) + kind CONSUMER + } + span(3) { + name "consumer" + childOf span(2) + } + } + } + } +}