Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create producer span from spring integration instrumentation #4932

Merged
merged 5 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion instrumentation/spring/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ In this guide we will be using a running example. In section one and two, we wil
| System property | Type | Default | Description |
|---|---|---|---|
| `otel.instrumentation.spring-integration.global-channel-interceptor-patterns` | List | `*` | An array of Spring channel name patterns that will be intercepted. See [Spring Integration docs](https://docs.spring.io/spring-integration/reference/html/channel.html#global-channel-configuration-interceptors) for more details. |
| `otel.instrumentation.spring-integration.producer.enabled` | Boolean | `false` | Create producer span in spring integration instrumentation when message is written into an output channel. Detecting output channels works only for [Spring Cloud Stream](https://spring.io/projects/spring-cloud-stream) `DirectWithAttributesChannel`. Use this flag only when you are using a messaging library that does not have instrumentation to create producer spans. |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

laurit marked this conversation as resolved.
Show resolved Hide resolved
| `otel.instrumentation.spring-webflux.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring WebFlux version 5.0. |
| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Sprinv Web MVC 3.1. |
| `otel.instrumentation.spring-webmvc.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes for Spring Web MVC 3.1. |

# Manual Instrumentation Guide

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,24 @@ tasks {
jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=true")
}

val testWithProducerInstrumentation by registering(Test::class) {
filter {
includeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
include("**/SpringCloudStreamProducerTest.*")
jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false")
jvmArgs("-Dotel.instrumentation.spring-rabbit.enabled=false")
jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true")
Comment on lines +54 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

test {
dependsOn(testWithRabbitInstrumentation)
dependsOn(testWithProducerInstrumentation)

filter {
excludeTestsMatching("SpringIntegrationAndRabbitTest")
excludeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
jvmArgs("-Dotel.instrumentation.rabbitmq.enabled=false")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.instrumentation.test.AgentTestTrait

class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements AgentTestTrait {
@Override
Class<?> additionalContextClass() {
null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,25 @@ dependencies {
}

tasks {
val testWithProducerInstrumentation by registering(Test::class) {
filter {
includeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
include("**/SpringCloudStreamProducerTest.*")
jvmArgs("-Dotel.instrumentation.spring-integration.producer.enabled=true")
}

test {
dependsOn(testWithProducerInstrumentation)

filter {
excludeTestsMatching("SpringCloudStreamProducerTest")
isFailOnNoMatchingTests = false
}
}

withType<Test>().configureEach {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].getService())
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.spring.integration;

import com.google.auto.value.AutoValue;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

Expand All @@ -19,4 +20,18 @@ public abstract class MessageWithChannel {
static MessageWithChannel create(Message<?> message, MessageChannel messageChannel) {
return new AutoValue_MessageWithChannel(message, messageChannel);
}

public String getChannelName() {
final String channelName;
MessageChannel channel = getMessageChannel();
if (channel instanceof AbstractMessageChannel) {
channelName = ((AbstractMessageChannel) channel).getFullChannelName();
} else if (channel instanceof org.springframework.messaging.support.AbstractMessageChannel) {
channelName =
((org.springframework.messaging.support.AbstractMessageChannel) channel).getBeanName();
} else {
channelName = channel.getClass().getSimpleName();
}
return channelName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr
}

private final ContextPropagators propagators;
private final Instrumenter<MessageWithChannel, Void> instrumenter;
private final Instrumenter<MessageWithChannel, Void> consumerInstrumenter;
private final Instrumenter<MessageWithChannel, Void> producerInstrumenter;

SpringIntegrationTracing(
ContextPropagators propagators, Instrumenter<MessageWithChannel, Void> instrumenter) {
ContextPropagators propagators,
Instrumenter<MessageWithChannel, Void> consumerInstrumenter,
Instrumenter<MessageWithChannel, Void> producerInstrumenter) {
this.propagators = propagators;
this.instrumenter = instrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}

/**
Expand All @@ -49,6 +53,6 @@ public static SpringIntegrationTracingBuilder builder(OpenTelemetry openTelemetr
* @see org.springframework.integration.config.GlobalChannelInterceptor
*/
public ChannelInterceptor newChannelInterceptor() {
return new TracingChannelInterceptor(propagators, instrumenter);
return new TracingChannelInterceptor(propagators, consumerInstrumenter, producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -33,17 +34,37 @@ public SpringIntegrationTracingBuilder addAttributesExtractor(
return this;
}

private static String consumerSpanName(MessageWithChannel messageWithChannel) {
return messageWithChannel.getChannelName() + " process";
}

private static String producerSpanName(MessageWithChannel messageWithChannel) {
return messageWithChannel.getChannelName() + " send";
}

/**
* Returns a new {@link SpringIntegrationTracing} with the settings of this {@link
* SpringIntegrationTracingBuilder}.
*/
public SpringIntegrationTracing build() {
Instrumenter<MessageWithChannel, Void> instrumenter =
Instrumenter<MessageWithChannel, Void> consumerInstrumenter =
Instrumenter.<MessageWithChannel, Void>builder(
openTelemetry, INSTRUMENTATION_NAME, new MessageChannelSpanNameExtractor())
openTelemetry,
INSTRUMENTATION_NAME,
SpringIntegrationTracingBuilder::consumerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(new SpringMessagingAttributesExtractor())
.addAttributesExtractor(SpringMessagingAttributesExtractor.process())
.newConsumerInstrumenter(MessageHeadersGetter.INSTANCE);
return new SpringIntegrationTracing(openTelemetry.getPropagators(), instrumenter);

Instrumenter<MessageWithChannel, Void> producerInstrumenter =
Instrumenter.<MessageWithChannel, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
SpringIntegrationTracingBuilder::producerSpanName)
.addAttributesExtractors(additionalAttributeExtractors)
.addAttributesExtractor(SpringMessagingAttributesExtractor.send())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
return new SpringIntegrationTracing(
openTelemetry.getPropagators(), consumerInstrumenter, producerInstrumenter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,23 @@
final class SpringMessagingAttributesExtractor
extends MessagingAttributesExtractor<MessageWithChannel, Void> {

private final MessageOperation messageOperation;

private SpringMessagingAttributesExtractor(MessageOperation messageOperation) {
this.messageOperation = messageOperation;
}

static SpringMessagingAttributesExtractor process() {
return new SpringMessagingAttributesExtractor(MessageOperation.PROCESS);
}

static SpringMessagingAttributesExtractor send() {
return new SpringMessagingAttributesExtractor(MessageOperation.SEND);
}

@Override
public MessageOperation operation() {
return MessageOperation.PROCESS;
return messageOperation;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
Expand All @@ -23,16 +29,23 @@

final class TracingChannelInterceptor implements ExecutorChannelInterceptor {

private static final boolean PRODUCER_SPAN_ENABLED =
Config.get().getBoolean("otel.instrumentation.spring-integration.producer.enabled", false);
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved

private static final ThreadLocal<Map<MessageChannel, ContextAndScope>> LOCAL_CONTEXT_AND_SCOPE =
ThreadLocal.withInitial(IdentityHashMap::new);

private final ContextPropagators propagators;
private final Instrumenter<MessageWithChannel, Void> instrumenter;
private final Instrumenter<MessageWithChannel, Void> consumerInstrumenter;
private final Instrumenter<MessageWithChannel, Void> producerInstrumenter;

TracingChannelInterceptor(
ContextPropagators propagators, Instrumenter<MessageWithChannel, Void> instrumenter) {
ContextPropagators propagators,
Instrumenter<MessageWithChannel, Void> consumerInstrumenter,
Instrumenter<MessageWithChannel, Void> producerInstrumenter) {
this.propagators = propagators;
this.instrumenter = instrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}

@Override
Expand All @@ -54,6 +67,8 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
return message;
}

boolean createProducerSpan = createProducerSpan(messageChannel);

Context parentContext = Context.current();
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);

Expand All @@ -70,8 +85,12 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
// that puts something into a messaging queue/system
// 2. another messaging instrumentation has already created a CONSUMER span, in which case this
// instrumentation should not create another one
if (shouldStart(parentContext, messageWithChannel)) {
context = instrumenter.start(parentContext, messageWithChannel);
if (!createProducerSpan && shouldStartConsumer(parentContext, messageWithChannel)) {
context = consumerInstrumenter.start(parentContext, messageWithChannel);
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else if (createProducerSpan
&& producerInstrumenter.shouldStart(parentContext, messageWithChannel)) {
context = producerInstrumenter.start(parentContext, messageWithChannel);
localMap.put(messageChannel, ContextAndScope.create(context, context.makeCurrent()));
} else {
// in case there already was another span in the context: back off and just inject the current
Expand All @@ -86,8 +105,9 @@ public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
return createMessageWithHeaders(message, messageHeaderAccessor);
}

private boolean shouldStart(Context parentContext, MessageWithChannel messageWithChannel) {
return instrumenter.shouldStart(parentContext, messageWithChannel)
private boolean shouldStartConsumer(
Context parentContext, MessageWithChannel messageWithChannel) {
return consumerInstrumenter.shouldStart(parentContext, messageWithChannel)
&& Span.fromContextOrNull(parentContext) == null;
}

Expand All @@ -104,6 +124,9 @@ public void afterSendCompletion(

if (context != null) {
MessageWithChannel messageWithChannel = MessageWithChannel.create(message, messageChannel);
boolean createProducerSpan = createProducerSpan(messageChannel);
Instrumenter<MessageWithChannel, Void> instrumenter =
createProducerSpan ? producerInstrumenter : consumerInstrumenter;
instrumenter.end(context, messageWithChannel, null, e);
}
}
Expand Down Expand Up @@ -175,4 +198,70 @@ private static Message<?> createMessageWithHeaders(
.copyHeaders(messageHeaderAccessor.toMessageHeaders())
.build();
}

private static final Class<?> directWithAttributesChannelClass =
getDirectWithAttributesChannelClass();
private static final MethodHandle channelGetAttributeMh =
getChannelAttributeMh(directWithAttributesChannelClass);

private static Class<?> getDirectWithAttributesChannelClass() {
try {
return Class.forName(
"org.springframework.cloud.stream.messaging.DirectWithAttributesChannel");
} catch (ClassNotFoundException ignore) {
return null;
}
}

private static MethodHandle getChannelAttributeMh(Class<?> directWithAttributesChannelClass) {
if (directWithAttributesChannelClass == null) {
return null;
}

try {
return MethodHandles.lookup()
.findVirtual(
directWithAttributesChannelClass,
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
"getAttribute",
MethodType.methodType(Object.class, String.class));
} catch (NoSuchMethodException | IllegalAccessException exception) {
return null;
}
}

private static boolean createProducerSpan(MessageChannel messageChannel) {
if (!PRODUCER_SPAN_ENABLED) {
return false;
}

messageChannel = unwrapProxy(messageChannel);
if (!directWithAttributesChannelClass.isInstance(messageChannel)) {
// we can only tell if it is an output channel for instances of DirectWithAttributesChannel
// that are used by spring cloud stream
return false;
}

try {
return "output".equals(channelGetAttributeMh.invoke(messageChannel, "type"));
} catch (Throwable throwable) {
return false;
}
}

// unwrap spring aop proxy
// based on org.springframework.test.util.AopTestUtils#getTargetObject
public static <T> T unwrapProxy(T candidate) {
try {
if (AopUtils.isAopProxy(candidate) && candidate instanceof Advised) {
Object target = ((Advised) candidate).getTargetSource().getTarget();
if (target != null) {
return (T) target;
}
}

return candidate;
} catch (Throwable ignore) {
return candidate;
}
}
}
Loading