diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/CapturedMessageHeadersUtil.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/CapturedMessageHeadersUtil.java new file mode 100644 index 000000000000..e1ec7ae780eb --- /dev/null +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/CapturedMessageHeadersUtil.java @@ -0,0 +1,38 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.instrumenter.messaging; + +import static java.util.Collections.unmodifiableList; + +import io.opentelemetry.api.common.AttributeKey; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; + +final class CapturedMessageHeadersUtil { + + private static final ConcurrentMap>> attributeKeysCache = + new ConcurrentHashMap<>(); + + static List lowercase(List names) { + return unmodifiableList( + names.stream().map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toList())); + } + + static AttributeKey> attributeKey(String headerName) { + return attributeKeysCache.computeIfAbsent(headerName, n -> createKey(n)); + } + + private static AttributeKey> createKey(String headerName) { + // headerName is always lowercase, see MessagingAttributesExtractor + String key = "messaging.header." + headerName.replace('-', '_'); + return AttributeKey.stringArrayKey(key); + } + + private CapturedMessageHeadersUtil() {} +} diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java index d2a790de4a5d..81edc180a841 100644 --- a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractor.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.api.instrumenter.messaging; +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.attributeKey; +import static io.opentelemetry.instrumentation.api.instrumenter.messaging.CapturedMessageHeadersUtil.lowercase; import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.PROCESS; import static io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation.RECEIVE; import static io.opentelemetry.instrumentation.api.internal.AttributesExtractorUtil.internalSet; @@ -16,6 +18,7 @@ import io.opentelemetry.instrumentation.api.internal.SpanKey; import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; import javax.annotation.Nullable; /** @@ -36,20 +39,34 @@ public final class MessagingAttributesExtractor AttributeKey.stringKey("messaging.payload"); /** - * Creates the messaging attributes extractor for the given {@link MessageOperation operation}. + * Creates the messaging attributes extractor for the given {@link MessageOperation operation} + * with default configuration. */ public static MessagingAttributesExtractor create( MessagingAttributesGetter getter, MessageOperation operation) { - return new MessagingAttributesExtractor<>(getter, operation); + return builder(getter, operation).build(); + } + + /** + * Returns a new {@link MessagingAttributesExtractorBuilder} for the given {@link MessageOperation + * operation} that can be used to configure the messaging attributes extractor. + */ + public static MessagingAttributesExtractorBuilder builder( + MessagingAttributesGetter getter, MessageOperation operation) { + return new MessagingAttributesExtractorBuilder<>(getter, operation); } private final MessagingAttributesGetter getter; private final MessageOperation operation; + private final List capturedHeaders; - private MessagingAttributesExtractor( - MessagingAttributesGetter getter, MessageOperation operation) { + MessagingAttributesExtractor( + MessagingAttributesGetter getter, + MessageOperation operation, + List capturedHeaders) { this.getter = getter; this.operation = operation; + this.capturedHeaders = lowercase(capturedHeaders); } @SuppressWarnings("deprecation") // operationName @@ -99,6 +116,13 @@ public void onEnd( @Nullable Throwable error) { internalSet( attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, getter.messageId(request, response)); + + for (String name : capturedHeaders) { + List values = getter.header(request, name); + if (!values.isEmpty()) { + internalSet(attributes, attributeKey(name), values); + } + } } /** diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorBuilder.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorBuilder.java new file mode 100644 index 000000000000..7ad36d671c27 --- /dev/null +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesExtractorBuilder.java @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.instrumenter.messaging; + +import static java.util.Collections.emptyList; + +import java.util.List; + +/** A builder of {@link MessagingAttributesExtractor}. */ +public final class MessagingAttributesExtractorBuilder { + + final MessagingAttributesGetter getter; + final MessageOperation operation; + List capturedHeaders = emptyList(); + + MessagingAttributesExtractorBuilder( + MessagingAttributesGetter getter, MessageOperation operation) { + this.getter = getter; + this.operation = operation; + } + + /** + * Configures the messaging headers that will be captured as span attributes. + * + *

The messaging header values will be captured under the {@code messaging.header.} + * attribute key. The {@code } part in the attribute key is the normalized header name: + * lowercase, with dashes replaced by underscores. + * + * @param capturedHeaders A list of messaging header names. + */ + public MessagingAttributesExtractorBuilder setCapturedHeaders( + List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + + /** + * Returns a new {@link MessagingAttributesExtractor} with the settings of this {@link + * MessagingAttributesExtractorBuilder}. + */ + public MessagingAttributesExtractor build() { + return new MessagingAttributesExtractor<>(getter, operation, capturedHeaders); + } +} diff --git a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesGetter.java b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesGetter.java index 26cfedc40707..18b4293bf56a 100644 --- a/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesGetter.java +++ b/instrumentation-api-semconv/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/messaging/MessagingAttributesGetter.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.api.instrumenter.messaging; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; /** @@ -48,6 +50,17 @@ public interface MessagingAttributesGetter { @Nullable String messageId(REQUEST request, @Nullable RESPONSE response); + /** + * Extracts all values of header named {@code name} from the request, or an empty list if there + * were none. + * + *

Implementations of this method must not return a null value; an empty list should be + * returned instead. + */ + default List header(REQUEST request, String name) { + return Collections.emptyList(); + } + @Nullable default String messagePayload(REQUEST request) { return null; diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageAttributesGetter.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageAttributesGetter.java index 45994eae4178..874d8edad19e 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageAttributesGetter.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsMessageAttributesGetter.java @@ -8,6 +8,8 @@ import static java.util.logging.Level.FINE; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.jms.JMSException; @@ -86,9 +88,22 @@ public Long messagePayloadCompressedSize(MessageWithDestination messageWithDesti public String messageId(MessageWithDestination messageWithDestination, Void unused) { try { return messageWithDestination.message().getJMSMessageID(); - } catch (JMSException e) { - logger.log(FINE, "Failure getting JMS message id", e); + } catch (JMSException exception) { + logger.log(FINE, "Failure getting JMS message id", exception); return null; } } + + @Override + public List header(MessageWithDestination messageWithDestination, String name) { + try { + String value = messageWithDestination.message().getStringProperty(name); + if (value != null) { + return Collections.singletonList(value); + } + } catch (JMSException exception) { + logger.log(FINE, "Failure getting JMS message header", exception); + } + return Collections.emptyList(); + } } diff --git a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java index b1c5e30c0b0b..be32341f0d4f 100644 --- a/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java +++ b/instrumentation/jms-1.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JmsSingletons.java @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; @@ -31,7 +32,7 @@ private static Instrumenter buildProducerInstrumen GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation)) .buildProducerInstrumenter(MessagePropertySetter.INSTANCE); } @@ -44,7 +45,7 @@ private static Instrumenter buildConsumerInstrumen GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation)) .setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } @@ -57,10 +58,19 @@ private static Instrumenter buildListenerInstrumen GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor(buildMessagingAttributesExtractor(getter, operation)) .buildConsumerInstrumenter(MessagePropertyGetter.INSTANCE); } + private static MessagingAttributesExtractor + buildMessagingAttributesExtractor( + MessagingAttributesGetter getter, + MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build(); + } + public static Instrumenter producerInstrumenter() { return PRODUCER_INSTRUMENTER; } diff --git a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy index cb55a4407e33..7df59e550983 100644 --- a/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy +++ b/instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy @@ -267,7 +267,39 @@ class Jms1Test extends AgentInstrumentationSpecification { session.createTemporaryTopic() | "topic" | "(temporary)" } - static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { + def "capture message header as span attribute"() { + setup: + def destinationName = "someQueue" + def destinationType = "queue" + def destination = session.createQueue(destinationName) + def producer = session.createProducer(destination) + def consumer = session.createConsumer(destination) + + def message = session.createTextMessage(messageText) + message.setStringProperty("test-message-header", "test") + message.setIntProperty("test-message-int-header", 1234) + producer.send(message) + + TextMessage receivedMessage = consumer.receive() + String messageId = receivedMessage.getJMSMessageID() + + expect: + receivedMessage.text == messageText + assertTraces(2) { + trace(0, 1) { + producerSpan(it, 0, destinationType, destinationName, true) + } + trace(1, 1) { + consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive", true) + } + } + + cleanup: + producer.close() + consumer.close() + } + + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) { trace.span(index) { name destinationName + " send" kind PRODUCER @@ -280,6 +312,10 @@ class Jms1Test extends AgentInstrumentationSpecification { "$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true } "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + "messaging.header.test_message_int_header" { it == ["1234"] } + } } } } @@ -287,7 +323,7 @@ class Jms1Test extends AgentInstrumentationSpecification { // passing messageId = null will verify message.id is not captured, // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) { + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) { trace.span(index) { name destinationName + " " + operation kind CONSUMER @@ -308,6 +344,10 @@ class Jms1Test extends AgentInstrumentationSpecification { if (destinationName == "(temporary)") { "$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true } + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + "messaging.header.test_message_int_header" { it == ["1234"] } + } } } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index c2820c4190f1..3e2e8898181a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -65,27 +65,28 @@ public static void onExit( Context parentContext = currentContext(); if (consumerReceiveInstrumenter().shouldStart(parentContext, records)) { - Context context = - InstrumenterUtil.startAndEnd( - consumerReceiveInstrumenter(), - parentContext, - records, - null, - error, - timer.startTime(), - timer.now()); - - // we're storing the context of the receive span so that process spans can use it as parent - // context even though the span has ended - // this is the suggested behavior according to the spec batch receive scenario: - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving - VirtualField, Context> consumerRecordsContext = - VirtualField.find(ConsumerRecords.class, Context.class); - consumerRecordsContext.set(records, context); - // disable process tracing and store the receive span for each individual record too boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false); try { + Context context = + InstrumenterUtil.startAndEnd( + consumerReceiveInstrumenter(), + parentContext, + records, + null, + error, + timer.startTime(), + timer.now()); + + // we're storing the context of the receive span so that process spans can use it as + // parent + // context even though the span has ended + // this is the suggested behavior according to the spec batch receive scenario: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving + VirtualField, Context> consumerRecordsContext = + VirtualField.find(ConsumerRecords.class, Context.class); + consumerRecordsContext.set(records, context); + VirtualField, Context> consumerRecordContext = VirtualField.find(ConsumerRecord.class, Context.class); for (ConsumerRecord record : records) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 6db495be87fa..5c8c6e64f578 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -28,6 +28,7 @@ public final class KafkaSingletons { static { KafkaInstrumenterFactory instrumenterFactory = new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setCaptureExperimentalSpanAttributes( InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy index ef8a1e8b877a..6ee69de0dae7 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientDefaultTest.groovy @@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.kafkaclients import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.nio.charset.StandardCharsets import org.apache.kafka.clients.producer.ProducerRecord import java.time.Duration @@ -18,11 +19,15 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { - def "test kafka produce and consume"() { + def "test kafka produce and consume, test headers: #testHeaders"() { when: String greeting = "Hello Kafka!" runWithSpan("parent") { - producer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting) + if (testHeaders) { + producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)) + } + producer.send(producerRecord) { meta, ex -> if (ex == null) { runWithSpan("producer callback") {} } else { @@ -63,6 +68,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" greeting } } @@ -84,6 +92,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive" + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } span(1) { @@ -100,6 +111,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" greeting } } @@ -109,6 +123,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { } } } + + where: + testHeaders << [false, true] } def "test pass through tombstone"() { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java index 330c84bcff9d..e16135f6ea8c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.kafkaclients; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; @@ -23,6 +25,7 @@ public final class KafkaTelemetryBuilder { new ArrayList<>(); private final List, Void>> consumerAttributesExtractors = new ArrayList<>(); + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes = false; private boolean propagationEnabled = true; @@ -42,6 +45,16 @@ public KafkaTelemetryBuilder addConsumerAttributesExtractors( return this; } + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + public KafkaTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + /** * Sets whether experimental attributes should be set to spans. These attributes may be changed or * removed in the future, so only enable this if you know you do not require attributes filled by @@ -65,6 +78,7 @@ public KafkaTelemetryBuilder setPropagationEnabled(boolean propagationEnabled) { public KafkaTelemetry build() { KafkaInstrumenterFactory instrumenterFactory = new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME) + .setCapturedHeaders(capturedHeaders) .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) .setPropagationEnabled(propagationEnabled); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy index 467a578c1559..8d5f4c41afe3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/WrappersTest.groovy @@ -7,6 +7,7 @@ package io.opentelemetry.instrumentation.kafkaclients import io.opentelemetry.instrumentation.test.LibraryTestTrait import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.nio.charset.StandardCharsets import org.apache.kafka.clients.producer.ProducerRecord import spock.lang.Unroll @@ -15,21 +16,27 @@ import java.time.Duration import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static java.util.Collections.singletonList class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait { @Unroll - def "test wrappers"() throws Exception { + def "test wrappers, test headers: #testHeaders"() throws Exception { KafkaTelemetry telemetry = KafkaTelemetry.builder(getOpenTelemetry()) - // TODO run tests both with and without experimental span attributes - .setCaptureExperimentalSpanAttributes(true) - .build() + .setCapturedHeaders(singletonList("test-message-header")) + // TODO run tests both with and without experimental span attributes + .setCaptureExperimentalSpanAttributes(true) + .build() when: String greeting = "Hello Kafka!" def wrappedProducer = telemetry.wrap(producer) runWithSpan("parent") { - wrappedProducer.send(new ProducerRecord(SHARED_TOPIC, greeting)) { meta, ex -> + def producerRecord = new ProducerRecord(SHARED_TOPIC, greeting) + if (testHeaders) { + producerRecord.headers().add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)) + } + wrappedProducer.send(producerRecord) { meta, ex -> if (ex == null) { runWithSpan("producer callback") {} } else { @@ -65,6 +72,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait { "$SemanticAttributes.MESSAGING_SYSTEM" "kafka" "$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" greeting } } @@ -81,6 +91,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait { "$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 } "kafka.offset" Long "kafka.record.queue_time_ms" { it >= 0 } + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" greeting } } @@ -91,6 +104,9 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait { } } } + + where: + testHeaders << [false, true] } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java index 15c4306dc1ae..a674025a7eb5 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaBatchProcessAttributesGetter.java @@ -7,8 +7,11 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -82,4 +85,14 @@ public Long messagePayloadCompressedSize(ConsumerRecords records) { public String messageId(ConsumerRecords records, @Nullable Void unused) { return null; } + + @Override + public List header(ConsumerRecords records, String name) { + return StreamSupport.stream(records.spliterator(), false) + .flatMap( + consumerRecord -> + StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java index a2384a43ff6f..ea210d7b72d1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerAttributesGetter.java @@ -7,6 +7,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -79,6 +83,13 @@ public String messageId(ConsumerRecord consumerRecord, @Nullable Void unus return null; } + @Override + public List header(ConsumerRecord consumerRecord, String name) { + return StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + } + @Nullable @Override public String messagePayload(ConsumerRecord consumerRecord) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java index 0d8dc1f8d94f..e8d146a4dc6a 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaInstrumenterFactory.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.kafka.internal; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; @@ -14,8 +16,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import java.util.Collections; +import java.util.List; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -29,6 +33,7 @@ public final class KafkaInstrumenterFactory { private final OpenTelemetry openTelemetry; private final String instrumentationName; private ErrorCauseExtractor errorCauseExtractor = ErrorCauseExtractor.jdk(); + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes = false; private boolean propagationEnabled = true; private boolean messagingReceiveInstrumentationEnabled = false; @@ -43,6 +48,11 @@ public KafkaInstrumenterFactory setErrorCauseExtractor(ErrorCauseExtractor error return this; } + public KafkaInstrumenterFactory setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + public KafkaInstrumenterFactory setCaptureExperimentalSpanAttributes( boolean captureExperimentalSpanAttributes) { this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; @@ -74,7 +84,8 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) .addAttributesExtractors(extractors) .addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor()) .setErrorCauseExtractor(errorCauseExtractor) @@ -89,7 +100,8 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) .setErrorCauseExtractor(errorCauseExtractor) .setEnabled(messagingReceiveInstrumentationEnabled) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); @@ -110,7 +122,8 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()) .addAttributesExtractors(extractors) .setErrorCauseExtractor(errorCauseExtractor); @@ -139,11 +152,21 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled( openTelemetry, instrumentationName, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)) .addSpanLinksExtractor( new KafkaBatchProcessSpanLinksExtractor( openTelemetry.getPropagators().getTextMapPropagator())) .setErrorCauseExtractor(errorCauseExtractor) .buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } + + private static MessagingAttributesExtractor buildMessagingAttributesExtractor( + MessagingAttributesGetter getter, + MessageOperation operation, + List capturedHeaders) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(capturedHeaders) + .build(); + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java index 987ecaf67193..0fc98a7370c4 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaProducerAttributesGetter.java @@ -8,6 +8,9 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.kafka.clients.producer.ProducerRecord; @@ -81,6 +84,13 @@ public String messageId(ProducerRecord producerRecord, @Nullable Void unus return null; } + @Override + public List header(ProducerRecord producerRecord, String name) { + return StreamSupport.stream(producerRecord.headers().headers(name).spliterator(), false) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + } + @Nullable @Override public String messagePayload(ProducerRecord producerRecord) { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java index 4365ba09f607..f6d504bfa153 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaReceiveAttributesGetter.java @@ -7,8 +7,11 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; @@ -88,4 +91,14 @@ public Long messagePayloadCompressedSize(ConsumerRecords consumerRecords) public String messageId(ConsumerRecords consumerRecords, @Nullable Void unused) { return null; } + + @Override + public List header(ConsumerRecords records, String name) { + return StreamSupport.stream(records.spliterator(), false) + .flatMap( + consumerRecord -> + StreamSupport.stream(consumerRecord.headers().headers(name).spliterator(), false)) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .collect(Collectors.toList()); + } } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java index b1209af17084..ba09591aa376 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -18,6 +18,7 @@ public final class KafkaStreamsSingletons { private static final Instrumenter, Void> INSTRUMENTER = new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setCaptureExperimentalSpanAttributes( InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java index bdc7e40b3065..495c4662d25e 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/ChannelAndMethod.java @@ -7,6 +7,7 @@ import com.google.auto.value.AutoValue; import com.rabbitmq.client.Channel; +import java.util.Map; @AutoValue public abstract class ChannelAndMethod { @@ -18,4 +19,14 @@ public static ChannelAndMethod create(Channel channel, String method) { abstract Channel getChannel(); abstract String getMethod(); + + private Map headers; + + public Map getHeaders() { + return headers; + } + + public void setHeaders(Map headers) { + this.headers = headers; + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAndMethodHolder.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAndMethodHolder.java new file mode 100644 index 000000000000..b779cd56bb89 --- /dev/null +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAndMethodHolder.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq; + +public class RabbitChannelAndMethodHolder { + private ChannelAndMethod channelAndMethod; + + public ChannelAndMethod getChannelAndMethod() { + return channelAndMethod; + } + + public void setChannelAndMethod(ChannelAndMethod channelAndMethod) { + this.channelAndMethod = channelAndMethod; + } +} diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesGetter.java index f71595d5a4b7..a76f676c8895 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesGetter.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelAttributesGetter.java @@ -7,6 +7,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; enum RabbitChannelAttributesGetter implements MessagingAttributesGetter { @@ -74,4 +76,15 @@ public Long messagePayloadCompressedSize(ChannelAndMethod channelAndMethod) { public String messageId(ChannelAndMethod channelAndMethod, @Nullable Void unused) { return null; } + + @Override + public List header(ChannelAndMethod channelAndMethod, String name) { + if (channelAndMethod.getHeaders() != null) { + Object value = channelAndMethod.getHeaders().get(name); + if (value != null) { + return Collections.singletonList(value.toString()); + } + } + return Collections.emptyList(); + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java index a2140a58b9b8..9678d6117639 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitChannelInstrumentation.java @@ -114,6 +114,7 @@ public static void onEnter( context = channelInstrumenter().start(parentContext, request); CURRENT_RABBIT_CONTEXT.set(context); + helper().setChannelAndMethod(context, request); scope = context.makeCurrent(); } @@ -159,7 +160,7 @@ public static void setSpanNameAddHeaders( if (props == null) { props = MessageProperties.MINIMAL_BASIC; } - helper().onProps(span, props); + helper().onProps(context, span, props); // We need to copy the BasicProperties and provide a header map we can modify Map headers = props.getHeaders(); diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesGetter.java index dcde8599d2b5..898ea818a3ef 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesGetter.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitDeliveryAttributesGetter.java @@ -7,6 +7,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; @@ -97,4 +99,13 @@ public String messagePayload(DeliveryRequest request) { return null; } + + @Override + public List header(DeliveryRequest request, String name) { + Object value = request.getProperties().getHeaders().get(name); + if (value != null) { + return Collections.singletonList(value.toString()); + } + return Collections.emptyList(); + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java index 8417bcd2928e..1569e7be4500 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitInstrumenterHelper.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.RabbitSingletons.CHANNEL_AND_METHOD_CONTEXT_KEY; + import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Command; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -41,13 +43,18 @@ public void onPublish(Span span, String exchange, String routingKey) { } } - public void onProps(Span span, AMQP.BasicProperties props) { + public void onProps(Context context, Span span, AMQP.BasicProperties props) { if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { Integer deliveryMode = props.getDeliveryMode(); if (deliveryMode != null) { span.setAttribute("rabbitmq.delivery_mode", deliveryMode); } } + RabbitChannelAndMethodHolder channelContext = context.get(CHANNEL_AND_METHOD_CONTEXT_KEY); + ChannelAndMethod channelAndMethod = channelContext.getChannelAndMethod(); + if (channelAndMethod != null) { + channelAndMethod.setHeaders(props.getHeaders()); + } } private static String normalizeExchangeName(String exchange) { @@ -68,4 +75,11 @@ public static void onCommand(Span span, Command command) { public void inject(Context context, Map headers, MapSetter setter) { GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, headers, setter); } + + public void setChannelAndMethod(Context context, ChannelAndMethod channelAndMethod) { + RabbitChannelAndMethodHolder holder = context.get(CHANNEL_AND_METHOD_CONTEXT_KEY); + if (holder != null) { + holder.setChannelAndMethod(channelAndMethod); + } + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesGetter.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesGetter.java index dbd2a2f6ee05..398cefe62a97 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesGetter.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitReceiveAttributesGetter.java @@ -8,6 +8,8 @@ import com.rabbitmq.client.GetResponse; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; @@ -101,4 +103,16 @@ public String messagePayload(ReceiveRequest receiveRequest) { return null; } + + @Override + public List header(ReceiveRequest request, String name) { + GetResponse response = request.getResponse(); + if (response != null) { + Object value = request.getResponse().getProps().getHeaders().get(name); + if (value != null) { + return Collections.singletonList(value.toString()); + } + } + return Collections.emptyList(); + } } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java index 5dc4956f1f0c..6bb758388d71 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java +++ b/instrumentation/rabbitmq-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/RabbitSingletons.java @@ -10,12 +10,15 @@ import com.rabbitmq.client.GetResponse; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.ContextKey; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.ArrayList; import java.util.List; @@ -29,6 +32,8 @@ public final class RabbitSingletons { private static final Instrumenter channelInstrumenter; private static final Instrumenter receiveInstrumenter; private static final Instrumenter deliverInstrumenter; + static final ContextKey CHANNEL_AND_METHOD_CONTEXT_KEY = + ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key"); static { channelInstrumenter = createChannelInstrumenter(); @@ -52,10 +57,13 @@ private static Instrumenter createChannelInstrumenter() return Instrumenter.builder( GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod) .addAttributesExtractor( - MessagingAttributesExtractor.create( + buildMessagingAttributesExtractor( RabbitChannelAttributesGetter.INSTANCE, MessageOperation.SEND)) .addAttributesExtractor( NetClientAttributesExtractor.create(new RabbitChannelNetAttributesGetter())) + .addContextCustomizer( + (context, request, startAttributes) -> + context.with(CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder())) .buildInstrumenter( channelAndMethod -> channelAndMethod.getMethod().equals("Channel.basicPublish") ? PRODUCER : CLIENT); @@ -64,7 +72,7 @@ private static Instrumenter createChannelInstrumenter() private static Instrumenter createReceiveInstrumenter() { List> extractors = new ArrayList<>(); extractors.add( - MessagingAttributesExtractor.create( + buildMessagingAttributesExtractor( RabbitReceiveAttributesGetter.INSTANCE, MessageOperation.RECEIVE)); extractors.add(NetClientAttributesExtractor.create(new RabbitReceiveNetAttributesGetter())); if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { @@ -80,7 +88,7 @@ private static Instrumenter createReceiveInstrument private static Instrumenter createDeliverInstrumenter() { List> extractors = new ArrayList<>(); extractors.add( - MessagingAttributesExtractor.create( + buildMessagingAttributesExtractor( RabbitDeliveryAttributesGetter.INSTANCE, MessageOperation.PROCESS)); extractors.add(new RabbitDeliveryExtraAttributesExtractor()); if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { @@ -93,5 +101,12 @@ private static Instrumenter createDeliverInstrumenter() { .buildConsumerInstrumenter(DeliveryRequestGetter.INSTANCE); } + private static MessagingAttributesExtractor buildMessagingAttributesExtractor( + MessagingAttributesGetter getter, MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build(); + } + private RabbitSingletons() {} } diff --git a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy index 826052fe6e10..8ac7cbe8dcaf 100644 --- a/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy +++ b/instrumentation/rabbitmq-2.7/javaagent/src/test/groovy/RabbitMqTest.groovy @@ -16,6 +16,8 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.instrumentation.test.asserts.TraceAssert import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import org.springframework.amqp.core.AmqpAdmin import org.springframework.amqp.core.AmqpTemplate import org.springframework.amqp.core.Queue @@ -273,6 +275,44 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb } } + def "capture message header as span attributes"() { + setup: + String queueName = channel.queueDeclare().getQueue() + def properties = new AMQP.BasicProperties.Builder().headers(["test-message-header": "test"]).build() + channel.basicPublish("", queueName, properties, "Hello, world!".getBytes()) + + def latch = new CountDownLatch(1) + def deliveries = [] + + Consumer callback = new DefaultConsumer(channel) { + @Override + void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties props, byte[] body) throws IOException { + deliveries << new String(body) + latch.countDown() + } + } + + channel.basicConsume(queueName, callback) + latch.await(10, TimeUnit.SECONDS) + expect: + deliveries[0] == "Hello, world!" + + and: + assertTraces(3) { + traces.subList(1, 3).sort(orderByRootSpanKind(PRODUCER, CLIENT)) + trace(0, 1) { + rabbitSpan(it, 0, null, null, null, "queue.declare") + } + trace(1, 2) { + rabbitSpan(it, 0, "", null, "send", "", true) + rabbitSpan(it, 1, "", null, "process", "", span(0), null, null, null, false, true) + } + trace(2, 1) { + rabbitSpan(it, 0, null, null, null, "basic.consume") + } + } + } + def rabbitSpan( TraceAssert trace, String exchange, @@ -283,12 +323,24 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb Object linkSpan = null, Throwable exception = null, String errorMsg = null, - Boolean expectTimestamp = false + boolean expectTimestamp = false ) { rabbitSpan(trace, 0, exchange, routingKey, operation, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp) } def rabbitSpan( + TraceAssert trace, + int index, + String exchange, + String routingKey, + String operation, + String resource, + boolean testHeaders + ) { + rabbitSpan(trace, index, exchange, routingKey, operation, resource, null, null, null, null, false, testHeaders) + } + + def rabbitSpan( TraceAssert trace, int index, String exchange, @@ -299,7 +351,8 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb Object linkSpan = null, Throwable exception = null, String errorMsg = null, - Boolean expectTimestamp = false + boolean expectTimestamp = false, + boolean testHeaders = false ) { def spanName = resource @@ -355,6 +408,9 @@ class RabbitMqTest extends AgentInstrumentationSpecification implements WithRabb if (expectTimestamp) { "rabbitmq.record.queue_time_ms" { it instanceof Long && it >= 0 } } + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } switch (trace.span(index).attributes.get(AttributeKey.stringKey("rabbitmq.command"))) { case "basic.publish": diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java index 7cc3bab13d8b..6c7b9a00672a 100644 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientHooks.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.rocketmq.RocketMqTelemetry; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.SendMessageHook; @@ -15,6 +16,7 @@ public final class RocketMqClientHooks { private static final RocketMqTelemetry TELEMETRY = RocketMqTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setPropagationEnabled( InstrumentationConfig.get() .getBoolean("otel.instrumentation.rocketmq-client.propagation", true)) diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerAttributeGetter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerAttributeGetter.java index 9417ce03af07..dc218aa1f386 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerAttributeGetter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerAttributeGetter.java @@ -7,6 +7,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.apache.rocketmq.common.message.MessageExt; @@ -77,6 +79,15 @@ public String messageId(MessageExt request, @Nullable Void unused) { return request.getMsgId(); } + @Override + public List header(MessageExt request, String name) { + String value = request.getProperties().get(name); + if (value != null) { + return Collections.singletonList(value); + } + return Collections.emptyList(); + } + @Nullable @Override public String messagePayload(MessageExt messageExt) { diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java index 60e02fc1a812..7e5eeef13d81 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java @@ -16,7 +16,9 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import java.util.List; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.common.message.MessageExt; @@ -26,6 +28,7 @@ class RocketMqInstrumenterFactory { static Instrumenter createProducerInstrumenter( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean propagationEnabled) { @@ -37,7 +40,8 @@ static Instrumenter createProducerInstrumenter( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)); + .addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)); if (captureExperimentalSpanAttributes) { instrumenterBuilder.addAttributesExtractor( RocketMqProducerExperimentalAttributeExtractor.INSTANCE); @@ -52,6 +56,7 @@ static Instrumenter createProducerInstrumenter( static RocketMqConsumerInstrumenter createConsumerInstrumenter( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean propagationEnabled) { @@ -63,14 +68,23 @@ static RocketMqConsumerInstrumenter createConsumerInstrumenter( return new RocketMqConsumerInstrumenter( createProcessInstrumenter( - openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, false), + openTelemetry, + capturedHeaders, + captureExperimentalSpanAttributes, + propagationEnabled, + false), createProcessInstrumenter( - openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, true), + openTelemetry, + capturedHeaders, + captureExperimentalSpanAttributes, + propagationEnabled, + true), batchReceiveInstrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer())); } private static Instrumenter createProcessInstrumenter( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean propagationEnabled, boolean batch) { @@ -84,7 +98,8 @@ private static Instrumenter createProcessInstrumenter( INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)); - builder.addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)); + builder.addAttributesExtractor( + buildMessagingAttributesExtractor(getter, operation, capturedHeaders)); if (captureExperimentalSpanAttributes) { builder.addAttributesExtractor(RocketMqConsumerExperimentalAttributeExtractor.INSTANCE); } @@ -107,6 +122,15 @@ private static Instrumenter createProcessInstrumenter( } } + private static MessagingAttributesExtractor buildMessagingAttributesExtractor( + MessagingAttributesGetter getter, + MessageOperation operation, + List capturedHeaders) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(capturedHeaders) + .build(); + } + private static String spanNameOnReceive(Void unused) { return "multiple_sources receive"; } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerAttributeGetter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerAttributeGetter.java index 25e9c31a92bc..56039c1d0ffe 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerAttributeGetter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerAttributeGetter.java @@ -7,6 +7,8 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Collections; +import java.util.List; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.apache.rocketmq.client.hook.SendMessageContext; @@ -82,6 +84,15 @@ public String messageId(SendMessageContext request, @Nullable Void unused) { return sendResult == null ? null : sendResult.getMsgId(); } + @Override + public List header(SendMessageContext request, String name) { + String value = request.getMessage().getProperties().get(name); + if (value != null) { + return Collections.singletonList(value); + } + return Collections.emptyList(); + } + @Nullable @Override public String messagePayload(SendMessageContext sendMessageContext) { diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetry.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetry.java index 7e474db50ce9..2430caae141d 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetry.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetry.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.List; import org.apache.rocketmq.client.hook.ConsumeMessageHook; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; @@ -31,14 +32,15 @@ public static RocketMqTelemetryBuilder builder(OpenTelemetry openTelemetry) { RocketMqTelemetry( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean propagationEnabled) { rocketMqConsumerInstrumenter = RocketMqInstrumenterFactory.createConsumerInstrumenter( - openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled); rocketMqProducerInstrumenter = RocketMqInstrumenterFactory.createProducerInstrumenter( - openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled); } /** diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetryBuilder.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetryBuilder.java index 6056e728691f..8931789a50c0 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetryBuilder.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTelemetryBuilder.java @@ -5,13 +5,17 @@ package io.opentelemetry.instrumentation.rocketmq; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.OpenTelemetry; +import java.util.List; /** A builder of {@link RocketMqTelemetry}. */ public final class RocketMqTelemetryBuilder { private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes; private boolean propagationEnabled = true; @@ -39,12 +43,22 @@ public RocketMqTelemetryBuilder setPropagationEnabled(boolean propagationEnabled return this; } + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + public RocketMqTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + /** * Returns a new {@link RocketMqTelemetry} with the settings of this {@link * RocketMqTelemetryBuilder}. */ public RocketMqTelemetry build() { return new RocketMqTelemetry( - openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + openTelemetry, capturedHeaders, captureExperimentalSpanAttributes, propagationEnabled); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy index 2a4e774a34c7..1ef224c2b1a1 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy @@ -10,11 +10,14 @@ import io.opentelemetry.instrumentation.test.LibraryTestTrait import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer import org.apache.rocketmq.client.producer.DefaultMQProducer +import static java.util.Collections.singletonList + class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait { @Override void configureMQProducer(DefaultMQProducer producer) { producer.getDefaultMQProducerImpl().registerSendMessageHook(RocketMqTelemetry.builder(openTelemetry) + .setCapturedHeaders(singletonList("test-message-header")) .setCaptureExperimentalSpanAttributes(true) .build().newTracingSendMessageHook()) } @@ -22,6 +25,7 @@ class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTe @Override void configureMQPushConsumer(DefaultMQPushConsumer consumer) { consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(RocketMqTelemetry.builder(openTelemetry) + .setCapturedHeaders(singletonList("test-message-header")) .setCaptureExperimentalSpanAttributes(true) .build().newTracingConsumeMessageHook()) } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index 27ad81a0a270..d0bfd0d9ad7e 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -303,4 +303,64 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { } } } + + def "capture message header as span attributes"() { + when: + runWithSpan("parent") { + def msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) + msg.putUserProperty("test-message-header", "test") + SendResult sendResult = producer.send(msg) + assert sendResult.sendStatus == SendStatus.SEND_OK + } + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages() + + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + } + span(1) { + name sharedTopic + " send" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" + "$SemanticAttributes.MESSAGING_DESTINATION" sharedTopic + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.send_result" "SEND_OK" + "messaging.header.test_message_header" { it == ["test"] } + } + } + span(2) { + name sharedTopic + " process" + kind CONSUMER + childOf span(1) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq" + "$SemanticAttributes.MESSAGING_DESTINATION" sharedTopic + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "messaging.rocketmq.tags" "TagA" + "messaging.rocketmq.broker_address" String + "messaging.rocketmq.queue_id" Long + "messaging.rocketmq.queue_offset" Long + "messaging.header.test_message_header" { it == ["test"] } + } + } + span(3) { + name "messageListener" + kind INTERNAL + childOf span(2) + } + } + } + } } diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/SpringIntegrationSingletons.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/SpringIntegrationSingletons.java index e7f96144e9cb..99b1faef5af2 100644 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/SpringIntegrationSingletons.java +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/SpringIntegrationSingletons.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.spring.integration.SpringIntegrationTelemetry; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; import java.util.List; import org.springframework.messaging.support.ChannelInterceptor; @@ -23,6 +24,7 @@ public final class SpringIntegrationSingletons { private static final ChannelInterceptor INTERCEPTOR = SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setProducerSpanEnabled( InstrumentationConfig.get() .getBoolean("otel.instrumentation.spring-integration.producer.enabled", false)) diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTelemetryBuilder.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTelemetryBuilder.java index 53152c318714..306cfd5994ce 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTelemetryBuilder.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringIntegrationTelemetryBuilder.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.spring.integration; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; @@ -12,6 +14,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import java.util.ArrayList; import java.util.List; @@ -23,6 +26,7 @@ public final class SpringIntegrationTelemetryBuilder { private final List> additionalAttributeExtractors = new ArrayList<>(); + private List capturedHeaders = emptyList(); private boolean producerSpanEnabled = false; SpringIntegrationTelemetryBuilder(OpenTelemetry openTelemetry) { @@ -39,6 +43,16 @@ public SpringIntegrationTelemetryBuilder addAttributesExtractor( return this; } + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + public SpringIntegrationTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + /** * Sets whether additional {@link SpanKind#PRODUCER PRODUCER} span should be emitted by this * instrumentation. @@ -68,8 +82,10 @@ public SpringIntegrationTelemetry build() { SpringIntegrationTelemetryBuilder::consumerSpanName) .addAttributesExtractors(additionalAttributeExtractors) .addAttributesExtractor( - MessagingAttributesExtractor.create( - SpringMessagingAttributesGetter.INSTANCE, MessageOperation.PROCESS)) + buildMessagingAttributesExtractor( + SpringMessagingAttributesGetter.INSTANCE, + MessageOperation.PROCESS, + capturedHeaders)) .buildConsumerInstrumenter(MessageHeadersGetter.INSTANCE); Instrumenter producerInstrumenter = @@ -79,8 +95,10 @@ public SpringIntegrationTelemetry build() { SpringIntegrationTelemetryBuilder::producerSpanName) .addAttributesExtractors(additionalAttributeExtractors) .addAttributesExtractor( - MessagingAttributesExtractor.create( - SpringMessagingAttributesGetter.INSTANCE, MessageOperation.SEND)) + buildMessagingAttributesExtractor( + SpringMessagingAttributesGetter.INSTANCE, + MessageOperation.SEND, + capturedHeaders)) .buildInstrumenter(SpanKindExtractor.alwaysProducer()); return new SpringIntegrationTelemetry( openTelemetry.getPropagators(), @@ -88,4 +106,14 @@ public SpringIntegrationTelemetry build() { producerInstrumenter, producerSpanEnabled); } + + private static MessagingAttributesExtractor + buildMessagingAttributesExtractor( + MessagingAttributesGetter getter, + MessageOperation operation, + List capturedHeaders) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(capturedHeaders) + .build(); + } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesGetter.java b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesGetter.java index d6565c0ae7d0..21bd3b81e5a0 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesGetter.java +++ b/instrumentation/spring/spring-integration-4.1/library/src/main/java/io/opentelemetry/instrumentation/spring/integration/SpringMessagingAttributesGetter.java @@ -6,6 +6,8 @@ package io.opentelemetry.instrumentation.spring.integration; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; import javax.annotation.Nullable; // this class is needed mostly for correct CONSUMER span suppression @@ -77,4 +79,13 @@ public Long messagePayloadCompressedSize(MessageWithChannel messageWithChannel) public String messageId(MessageWithChannel messageWithChannel, @Nullable Void unused) { return null; } + + @Override + public List header(MessageWithChannel request, String name) { + Object value = request.getMessage().getHeaders().get(name); + if (value != null) { + return Collections.singletonList(value.toString()); + } + return Collections.emptyList(); + } } diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy index 57e1e8dc9cbb..c12f390e1be0 100644 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy @@ -10,12 +10,17 @@ import org.springframework.context.annotation.Configuration import org.springframework.integration.config.GlobalChannelInterceptor import org.springframework.messaging.support.ChannelInterceptor +import static java.util.Collections.singletonList + @Configuration class GlobalInterceptorSpringConfig { @GlobalChannelInterceptor @Bean ChannelInterceptor otelInterceptor() { - SpringIntegrationTelemetry.create(GlobalOpenTelemetry.get()).newChannelInterceptor() + SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(singletonList("test-message-header")) + .build() + .newChannelInterceptor() } } diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy index b24a972b7066..92f26657e2ae 100644 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy @@ -191,6 +191,41 @@ abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpeci channel2.unsubscribe(messageHandler) } + def "capture message header"() { + given: + def channel = applicationContext.getBean("directChannel", SubscribableChannel) + + def messageHandler = new CapturingMessageHandler() + channel.subscribe(messageHandler) + + when: + channel.send(MessageBuilder.withPayload("test") + .setHeader("test-message-header", "test") + .build()) + + then: + def capturedMessage = messageHandler.join() + + assertTraces(1) { + trace(0, 2) { + span(0) { + name "application.directChannel process" + kind CONSUMER + } + span(1) { + name "handler" + childOf span(0) + } + + def interceptorSpan = span(0) + verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan) + } + } + + cleanup: + channel.unsubscribe(messageHandler) + } + static void verifyCorrectSpanWasPropagated(Message capturedMessage, SpanData parentSpan) { def propagatedSpan = capturedMessage.headers.get("traceparent") as String assert propagatedSpan.contains(parentSpan.traceId), "wrong trace id" diff --git a/instrumentation/spring/spring-jms-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/jms/SpringJmsSingletons.java b/instrumentation/spring/spring-jms-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/jms/SpringJmsSingletons.java index a32f847d64e1..cffb4270cc35 100644 --- a/instrumentation/spring/spring-jms-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/jms/SpringJmsSingletons.java +++ b/instrumentation/spring/spring-jms-2.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/jms/SpringJmsSingletons.java @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.instrumentation.jms.JmsMessageAttributesGetter; import io.opentelemetry.javaagent.instrumentation.jms.MessagePropertyGetter; import io.opentelemetry.javaagent.instrumentation.jms.MessageWithDestination; @@ -28,7 +29,10 @@ private static Instrumenter buildListenerInstrumen GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build()) .buildConsumerInstrumenter(MessagePropertyGetter.INSTANCE); } diff --git a/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy b/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy index db77ac9a8761..a700455b810c 100644 --- a/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy +++ b/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringListenerTest.groovy @@ -46,7 +46,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification { config << [AnnotatedListenerConfig, ManualListenerConfig] } - static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { + static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName, boolean testHeaders = false) { trace.span(index) { name destinationName + " send" kind PRODUCER @@ -59,6 +59,10 @@ class SpringListenerTest extends AgentInstrumentationSpecification { "$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true } "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + "messaging.header.test_message_int_header" { it == ["1234"] } + } } } } @@ -66,7 +70,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification { // passing messageId = null will verify message.id is not captured, // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), // any other value for messageId will verify that message.id is captured and has that same value - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) { + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation, boolean testHeaders = false) { trace.span(index) { name destinationName + " " + operation kind CONSUMER @@ -87,6 +91,10 @@ class SpringListenerTest extends AgentInstrumentationSpecification { if (destinationName == "(temporary)") { "$SemanticAttributes.MESSAGING_TEMP_DESTINATION" true } + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + "messaging.header.test_message_int_header" { it == ["1234"] } + } } } } diff --git a/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringTemplateTest.groovy b/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringTemplateTest.groovy index 35c2333a62e1..0233c552eb25 100644 --- a/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringTemplateTest.groovy +++ b/instrumentation/spring/spring-jms-2.0/javaagent/src/test/groovy/SpringTemplateTest.groovy @@ -5,6 +5,8 @@ import com.google.common.io.Files import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import javax.jms.JMSException +import javax.jms.Message import org.hornetq.api.core.TransportConfiguration import org.hornetq.api.core.client.HornetQClient import org.hornetq.api.jms.HornetQJMSClient @@ -17,6 +19,7 @@ import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory import org.hornetq.core.server.HornetQServer import org.hornetq.core.server.HornetQServers import org.springframework.jms.core.JmsTemplate +import org.springframework.jms.core.MessagePostProcessor import spock.lang.Shared import javax.jms.Connection @@ -148,4 +151,32 @@ class SpringTemplateTest extends AgentInstrumentationSpecification { destination | destinationType | destinationName session.createQueue("SpringTemplateJms2") | "queue" | "SpringTemplateJms2" } + + def "capture message header as span attribute"() { + setup: + template.convertAndSend(destination, messageText, new MessagePostProcessor() { + @Override + Message postProcessMessage(Message message) throws JMSException { + message.setStringProperty("test_message_header", "test") + message.setIntProperty("test_message_int_header", 1234) + return message + } + }) + TextMessage receivedMessage = template.receive(destination) + + expect: + receivedMessage.text == messageText + assertTraces(2) { + trace(0, 1) { + producerSpan(it, 0, destinationType, destinationName, true) + } + trace(1, 1) { + consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive", true) + } + } + + where: + destination | destinationType | destinationName + session.createQueue("SpringTemplateJms2") | "queue" | "SpringTemplateJms2" + } } diff --git a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java index 4840daae9c86..a3382b307c9c 100644 --- a/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java +++ b/instrumentation/spring/spring-kafka-2.7/library/src/main/java/io/opentelemetry/instrumentation/spring/kafka/v2_7/SpringKafkaTelemetryBuilder.java @@ -5,8 +5,11 @@ package io.opentelemetry.instrumentation.spring.kafka.v2_7; +import static java.util.Collections.emptyList; + import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import java.util.List; /** A builder of {@link SpringKafkaTelemetry}. */ public final class SpringKafkaTelemetryBuilder { @@ -14,6 +17,7 @@ public final class SpringKafkaTelemetryBuilder { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes = false; private boolean propagationEnabled = true; private boolean messagingReceiveInstrumentationEnabled = false; @@ -22,6 +26,11 @@ public final class SpringKafkaTelemetryBuilder { this.openTelemetry = openTelemetry; } + public SpringKafkaTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + public SpringKafkaTelemetryBuilder setCaptureExperimentalSpanAttributes( boolean captureExperimentalSpanAttributes) { this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; @@ -46,6 +55,7 @@ public SpringKafkaTelemetryBuilder setMessagingReceiveInstrumentationEnabled( public SpringKafkaTelemetry build() { KafkaInstrumenterFactory factory = new KafkaInstrumenterFactory(openTelemetry, INSTRUMENTATION_NAME) + .setCapturedHeaders(capturedHeaders) .setCaptureExperimentalSpanAttributes(captureExperimentalSpanAttributes) .setPropagationEnabled(propagationEnabled) .setMessagingReceiveInstrumentationEnabled(messagingReceiveInstrumentationEnabled) diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesGetter.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesGetter.java index bbf4eadfa9f3..3f539b8dd907 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesGetter.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitMessageAttributesGetter.java @@ -6,6 +6,8 @@ package io.opentelemetry.javaagent.instrumentation.spring.rabbit; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.Collections; +import java.util.List; import java.nio.charset.StandardCharsets; import javax.annotation.Nullable; import org.springframework.amqp.core.Message; @@ -75,6 +77,15 @@ public String messageId(Message message, @Nullable Void unused) { return message.getMessageProperties().getMessageId(); } + @Override + public List header(Message message, String name) { + Object value = message.getMessageProperties().getHeaders().get(name); + if (value != null) { + return Collections.singletonList(value.toString()); + } + return Collections.emptyList(); + } + @Nullable @Override public String messagePayload(Message message) { diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java index 7019d0be9687..b63ac6763a43 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/rabbit/SpringRabbitSingletons.java @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import org.springframework.amqp.core.Message; public final class SpringRabbitSingletons { @@ -27,7 +28,10 @@ public final class SpringRabbitSingletons { GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractor(MessagingAttributesExtractor.create(getter, operation)) + .addAttributesExtractor( + MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) + .build()) .buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE); } diff --git a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy index 8d5d491d7b1d..725138c8c547 100644 --- a/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy +++ b/instrumentation/spring/spring-rabbit-1.0/javaagent/src/test/groovy/ContextPropagationTest.groovy @@ -7,7 +7,10 @@ import com.rabbitmq.client.ConnectionFactory import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.instrumentation.testing.GlobalTraceUtil import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.springframework.amqp.AmqpException import org.springframework.amqp.core.AmqpTemplate +import org.springframework.amqp.core.Message +import org.springframework.amqp.core.MessagePostProcessor import org.springframework.amqp.core.Queue import org.springframework.amqp.rabbit.annotation.RabbitListener import org.springframework.boot.SpringApplication @@ -20,6 +23,7 @@ import org.testcontainers.containers.wait.strategy.Wait import spock.lang.Shared import java.time.Duration +import spock.lang.Unroll import static com.google.common.net.InetAddresses.isInetAddress import static io.opentelemetry.api.trace.SpanKind.CLIENT @@ -62,15 +66,27 @@ class ContextPropagationTest extends AgentInstrumentationSpecification { applicationContext?.close() } - def "should propagate context to consumer"() { + @Unroll + def "should propagate context to consumer, test headers: #testHeaders"() { given: def connection = connectionFactory.newConnection() def channel = connection.createChannel() when: runWithSpan("parent") { - applicationContext.getBean(AmqpTemplate) - .convertAndSend(ConsumerConfig.TEST_QUEUE, "test payload") + if (testHeaders) { + applicationContext.getBean(AmqpTemplate) + .convertAndSend(ConsumerConfig.TEST_QUEUE, (Object) "test payload", new MessagePostProcessor() { + @Override + Message postProcessMessage(Message message) throws AmqpException { + message.getMessageProperties().setHeader("test-message-header", "test") + return message + } + }) + } else { + applicationContext.getBean(AmqpTemplate) + .convertAndSend(ConsumerConfig.TEST_QUEUE, "test") + } } then: @@ -94,6 +110,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "queue" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" String + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" "test payload" } } @@ -111,6 +130,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification { "$SemanticAttributes.MESSAGING_OPERATION" "process" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long "$SemanticAttributes.MESSAGING_RABBITMQ_ROUTING_KEY" String + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" "test payload" } } @@ -125,6 +147,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "queue" "$SemanticAttributes.MESSAGING_OPERATION" "process" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + if (testHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } "messaging.payload" "test payload" } } @@ -153,6 +178,9 @@ class ContextPropagationTest extends AgentInstrumentationSpecification { cleanup: channel?.close() connection?.close() + + where: + testHeaders << [false, true] } @SpringBootConfiguration diff --git a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java index 9cf1d4156d28..eaa3cf070c41 100644 --- a/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java +++ b/instrumentation/vertx/vertx-kafka-client-3.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/vertx/kafka/v3_6/VertxKafkaSingletons.java @@ -23,6 +23,7 @@ public final class VertxKafkaSingletons { static { KafkaInstrumenterFactory factory = new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .setCaptureExperimentalSpanAttributes( InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) diff --git a/javaagent-extension-api/src/main/java/io/opentelemetry/javaagent/bootstrap/internal/ExperimentalConfig.java b/javaagent-extension-api/src/main/java/io/opentelemetry/javaagent/bootstrap/internal/ExperimentalConfig.java index 48a887876f76..3179216d4250 100644 --- a/javaagent-extension-api/src/main/java/io/opentelemetry/javaagent/bootstrap/internal/ExperimentalConfig.java +++ b/javaagent-extension-api/src/main/java/io/opentelemetry/javaagent/bootstrap/internal/ExperimentalConfig.java @@ -5,6 +5,10 @@ package io.opentelemetry.javaagent.bootstrap.internal; +import static java.util.Collections.emptyList; + +import java.util.List; + /** * This class is internal and is hence not for public use. Its APIs are unstable and can change at * any time. @@ -15,6 +19,7 @@ public final class ExperimentalConfig { new ExperimentalConfig(InstrumentationConfig.get()); private final InstrumentationConfig config; + private final List messagingHeaders; /** Returns the global agent configuration. */ public static ExperimentalConfig get() { @@ -23,6 +28,8 @@ public static ExperimentalConfig get() { public ExperimentalConfig(InstrumentationConfig config) { this.config = config; + messagingHeaders = + config.getList("otel.instrumentation.messaging.experimental.capture-headers", emptyList()); } public boolean controllerTelemetryEnabled() { @@ -64,4 +71,8 @@ public boolean messagingReceiveInstrumentationEnabled() { "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", !receiveSpansSuppressed); } + + public List getMessagingHeaders() { + return messagingHeaders; + } } diff --git a/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSource.java b/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSource.java new file mode 100644 index 000000000000..bb7c284814a4 --- /dev/null +++ b/testing/agent-exporter/src/main/java/io/opentelemetry/javaagent/testing/messaging/CapturedMessagingHeadersTestConfigSource.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.testing.messaging; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.config.ConfigPropertySource; +import java.util.HashMap; +import java.util.Map; + +@AutoService(ConfigPropertySource.class) +public class CapturedMessagingHeadersTestConfigSource implements ConfigPropertySource { + + @Override + public Map getProperties() { + Map testConfig = new HashMap<>(); + testConfig.put( + "otel.instrumentation.messaging.experimental.capture-headers", + // most tests use "test-message-header", "test_message_header" is used for JMS2 because + // '-' is not allowed in a JMS property name. JMS property name should be a valid java + // identifier. + "test-message-header, test-message-int-header, test_message_header, test_message_int_header"); + return testConfig; + } +}