Skip to content

Commit

Permalink
Create producer span from spring integration instrumentation (open-te…
Browse files Browse the repository at this point in the history
…lemetry#4932)

* Add an option to create producer span from spring integration instrumentation

* Update instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/TracingChannelInterceptor.java

Co-authored-by: Mateusz Rzeszutek <[email protected]>

* add null check

* add doc

* Update instrumentation/spring/README.md

Co-authored-by: Fabrizio Ferri-Benedetti <[email protected]>

Co-authored-by: Mateusz Rzeszutek <[email protected]>
Co-authored-by: Fabrizio Ferri-Benedetti <[email protected]>
  • Loading branch information
3 people authored and RashmiRam committed May 23, 2022
1 parent 8893a8b commit ffacd6f
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 44 deletions.
3 changes: 2 additions & 1 deletion instrumentation/spring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ In this guide we will be using a running example. In section one and two, we wil
| System property | Type | Default | Description |
|---|---|---|---|
| `otel.instrumentation.spring-integration.global-channel-interceptor-patterns` | List | `*` | An array of Spring channel name patterns that will be intercepted. See [Spring Integration docs](https://docs.spring.io/spring-integration/reference/html/channel.html#global-channel-configuration-interceptors) for more details. |
| `otel.instrumentation.spring-integration.producer.enabled` | Boolean | `false` | Create producer spans when messages are sent to an output channel. Enable when you're using a messaging library that doesn't have its own instrumentation for generating producer spans. Note that the detection of output channels only works for [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) `DirectWithAttributesChannel`. |
| `otel.instrumentation.spring-webflux.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring WebFlux version 5.0. |
| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Sprinv Web MVC 3.1. |
| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring Web MVC 3.1. |

# Manual Instrumentation Guide

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,24 @@ tasks {
jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=true")
}

val testWithProducerInstrumentation by registering(Test::class) {
filter {
includeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
include("**/SpringCloudStreamProducerTest.*")
jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false")
jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=false")
jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true")
}

test {
dependsOn(testWithRabbitInstrumentation)
dependsOn(testWithProducerInstrumentation)

filter {
excludeTestsMatching("SpringIntegrationAndRabbitTest")
excludeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.test.AgentTestTrait

class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,25 @@ dependencies {
}

tasks {
val testWithProducerInstrumentation by registering(Test::class) {
filter {
includeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
include("**/SpringCloudStreamProducerTest.*")
jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true")
}

test {
dependsOn(testWithProducerInstrumentation)

filter {
excludeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
}

withType<Test>().configureEach {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.spring.integration;

import com.google.auto.value.AutoValue;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

Expand All @@ -19,4 +20,18 @@ public abstract class MessageWithChannel {
static MessageWithChannel create(Message<?> message, MessageChannel messageChannel) {
return new AutoValue_MessageWithChannel(message, messageChannel);
}

public String getChannelName() {
final String channelName;
MessageChannel channel = getMessageChannel();
if (channel instanceof AbstractMessageChannel) {
channelName = ((AbstractMessageChannel) channel).getFullChannelName();
} else if (channel instanceof org.springframework.messaging.support.AbstractMessageChannel) {
channelName =
((org.springframework.messaging.support.AbstractMessageChannel) channel).getBeanName();
} else {
channelName = channel.getClass().getSimpleName();
}
return channelName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr
}

private final ContextPropagators propagators;
private final Instrumenter<MessageWithChannel, Void> instrumenter;
private final Instrumenter<MessageWithChannel, Void> consumerInstrumenter;
private final Instrumenter<MessageWithChannel, Void> producerInstrumenter;

SpringIntegrationTracing(
ContextPropagators propagators, Instrumenter<MessageWithChannel, Void> instrumenter) {
ContextPropagators propagators,
Instrumenter<MessageWithChannel, Void> consumerInstrumenter,
Instrumenter<MessageWithChannel, Void> producerInstrumenter) {
this.propagators = propagators;
this.instrumenter = instrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}

/**
Expand All @@ -49,6 +53,6 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr
* @see org.springframework.integration.config.GlobalChannelInterceptor
*/
public ChannelInterceptor newChannelInterceptor() {
return new TracingChannelInterceptor(propagators, instrumenter);
return new TracingChannelInterceptor(propagators, consumerInstrumenter, producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -33,17 +34,37 @@ public SpringIntegrationTracingBuilder addAttributesExtractor(
return this;
}

private static String consumerSpanName(MessageWithChannel messageWithChannel) {
return messageWithChannel.getChannelName() + " process";
}

private static String producerSpanName(MessageWithChannel messageWithChannel) {
return messageWithChannel.getChannelName() + " send";
}

/**
* Returns a new {@link SpringIntegrationTracing} with the settings of this {@link
* SpringIntegrationTracingBuilder}.
*/
public SpringIntegrationTracing build() {
Instrumenter<MessageWithChannel, Void> instrumenter =
Instrumenter<MessageWithChannel, Void> consumerInstrumenter =
Instrumenter.<MessageWithChannel, Void>builder(
openTelemetry, INSTRUMENTATION_NAME, new MessageChannelSpanNameExtractor())
openTelemetry,
INSTRUMENTATION_NAME,
SpringIntegrationTracingBuilder::consumerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(new SpringMessagingAttributesExtractor())
.addAttributesExtractor(SpringMessagingAttributesExtractor.process())
.newConsumerInstrumenter(MessageHeadersGetter.INSTANCE);
return new SpringIntegrationTracing(openTelemetry.getPropagators(), instrumenter);

Instrumenter<MessageWithChannel, Void> producerInstrumenter =
Instrumenter.<MessageWithChannel, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
SpringIntegrationTracingBuilder::producerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(SpringMessagingAttributesExtractor.send())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
return new SpringIntegrationTracing(
openTelemetry.getPropagators(), consumerInstrumenter, producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,23 @@
final class SpringMessagingAttributesExtractor
extends MessagingAttributesExtractor<MessageWithChannel, Void> {

private final MessageOperation messageOperation;

private SpringMessagingAttributesExtractor(MessageOperation messageOperation) {
this.messageOperation = messageOperation;
}

static SpringMessagingAttributesExtractor process() {
return new SpringMessagingAttributesExtractor(MessageOperation.PROCESS);
}

static SpringMessagingAttributesExtractor send() {
return new SpringMessagingAttributesExtractor(MessageOperation.SEND);
}

@Override
public MessageOperation operation() {
return MessageOperation.PROCESS;
return messageOperation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
Expand All @@ -23,16 +29,23 @@

final class TracingChannelInterceptor implements ExecutorChannelInterceptor {

private static final boolean PRODUCER_SPAN_ENABLED =
Config.get().getBoolean("otel.instrumentation.spring-integration.producer.enabled", false);

private static final ThreadLocal<Map<MessageChannel, ContextAndScope>> LOCAL_CONTEXT_AND_SCOPE =
ThreadLocal.withInitial(IdentityHashMap::new);

private final ContextPropagators propagators;
private final Instrumenter<MessageWithChannel, Void> instrumenter;
private final Instrumenter<MessageWithChannel, Void> consumerInstrumenter;
private final Instrumenter<MessageWithChannel, Void> producerInstrumenter;

TracingChannelInterceptor(
ContextPropagators propagators, Instrumenter<MessageWithChannel, Void> instrumenter) {
ContextPropagators propagators,
Instrumenter<MessageWithChannel, Void> consumerInstrumenter,
Instrumenter<MessageWithChannel, Void> producerInstrumenter) {
this.propagators = propagators;
this.instrumenter = instrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}

@Override
Expand All @@ -54,6 +67,8 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
return message;
}

boolean createProducerSpan = createProducerSpan(messageChannel);

Context parentContext = Context.current();
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);

Expand All @@ -70,8 +85,12 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
// that puts something into a messaging queue/system
// 2. another messaging instrumentation has already created a CONSUMER span, in which case this
// instrumentation should not create another one
if (shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
if (!createProducerSpan && shouldStartConsumer(parentContext, messageWithChannel)) {
context = consumerInstrumenter.start(parentContext, messageWithChannel);
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else if (createProducerSpan
&& producerInstrumenter.shouldStart(parentContext, messageWithChannel)) {
context = producerInstrumenter.start(parentContext, messageWithChannel);
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else {
// in case there already was another span in the context: back off and just inject the current
Expand All @@ -86,8 +105,9 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
return createMessageWithHeaders(message, messageHeaderAccessor);
}

private boolean shouldStart(Context parentContext, MessageWithChannel messageWithChannel) {
return instrumenter.shouldStart(parentContext, messageWithChannel)
private boolean shouldStartConsumer(
Context parentContext, MessageWithChannel messageWithChannel) {
return consumerInstrumenter.shouldStart(parentContext, messageWithChannel)
&& Span.fromContextOrNull(parentContext) == null;
}

Expand All @@ -104,6 +124,9 @@ public void afterSendCompletion(

if (context != null) {
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);
boolean createProducerSpan = createProducerSpan(messageChannel);
Instrumenter<MessageWithChannel, Void> instrumenter =
createProducerSpan ? producerInstrumenter : consumerInstrumenter;
instrumenter.end(context, messageWithChannel, null, e);
}
}
Expand Down Expand Up @@ -175,4 +198,70 @@ private static Message<?> createMessageWithHeaders(
.copyHeaders(messageHeaderAccessor.toMessageHeaders())
.build();
}

private static final Class<?> directWithAttributesChannelClass =
getDirectWithAttributesChannelClass();
private static final MethodHandle channelGetAttributeMh =
getChannelAttributeMh(directWithAttributesChannelClass);

private static Class<?> getDirectWithAttributesChannelClass() {
try {
return Class.forName(
"org.springframework.cloud.stream.messaging.DirectWithAttributesChannel");
} catch (ClassNotFoundException ignore) {
return null;
}
}

private static MethodHandle getChannelAttributeMh(Class<?> directWithAttributesChannelClass) {
if (directWithAttributesChannelClass == null) {
return null;
}

try {
return MethodHandles.lookup()
.findVirtual(
directWithAttributesChannelClass,
"getAttribute",
MethodType.methodType(Object.class, String.class));
} catch (NoSuchMethodException | IllegalAccessException exception) {
return null;
}
}

private static boolean createProducerSpan(MessageChannel messageChannel) {
if (!PRODUCER_SPAN_ENABLED) {
return false;
}

messageChannel = unwrapProxy(messageChannel);
if (!directWithAttributesChannelClass.isInstance(messageChannel)) {
// we can only tell if it is an output channel for instances of DirectWithAttributesChannel
// that are used by spring cloud stream
return false;
}

try {
return "output".equals(channelGetAttributeMh.invoke(messageChannel, "type"));
} catch (Throwable throwable) {
return false;
}
}

// unwrap spring aop proxy
// based on org.springframework.test.util.AopTestUtils#getTargetObject
public static <T> T unwrapProxy(T candidate) {
try {
if (AopUtils.isAopProxy(candidate) && candidate instanceof Advised) {
Object target = ((Advised) candidate).getTargetSource().getTarget();
if (target != null) {
return (T) target;
}
}

return candidate;
} catch (Throwable ignore) {
return candidate;
}
}
}
Loading

0 comments on commit ffacd6f

Please sign in to comment.