From 666c70c81a7169d32873181071104732469d8a69 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Tue, 24 Aug 2021 10:10:00 +0200 Subject: [PATCH 1/4] Instrument spring-kafka batch message listeners --- .../api/instrumenter/Instrumenter.java | 9 +- .../api/instrumenter/InstrumenterBuilder.java | 10 +- ...=> PropagatorBasedSpanLinksExtractor.java} | 9 +- .../api/instrumenter/SpanLinkExtractor.java | 30 --- .../api/instrumenter/SpanLinksBuilder.java | 33 +++ .../instrumenter/SpanLinksBuilderImpl.java | 30 +++ .../api/instrumenter/SpanLinksExtractor.java | 31 +++ .../api/instrumenter/InstrumenterTest.java | 30 ++- ...ropagatorBasedSpanLinksExtractorTest.java} | 24 +- .../kafkaclients/TracingIterable.java | 12 +- .../kafkaclients/TracingIterator.java | 17 +- .../kafkaclients/TracingList.java | 26 +- .../KafkaConsumerAttributesExtractor.java | 9 +- .../kafka/KafkaConsumerIteratorWrapper.java | 18 ++ .../javaagent/build.gradle.kts | 39 +++ ...ssageListenerContainerInstrumentation.java | 51 ++++ .../BatchConsumerAttributesExtractor.java | 82 ++++++ .../spring/kafka/BatchRecords.java | 33 +++ .../kafka/InstrumentedBatchInterceptor.java | 106 ++++++++ .../kafka/KafkaBatchErrorCauseExtractor.java | 21 ++ .../SpringKafkaInstrumentationModule.java | 25 ++ .../spring/kafka/SpringKafkaSingletons.java | 71 +++++ .../instrumentation/spring/kafka/State.java | 27 ++ .../SpringKafkaInstrumentationTest.groovy | 253 ++++++++++++++++++ settings.gradle.kts | 1 + 25 files changed, 906 insertions(+), 91 deletions(-) rename instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/{PropagatorBasedSpanLinkExtractor.java => PropagatorBasedSpanLinksExtractor.java} (63%) delete mode 100644 instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinkExtractor.java create mode 100644 instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilder.java create mode 100644 instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilderImpl.java create mode 100644 instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksExtractor.java rename instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/{PropagatorBasedSpanLinkExtractorTest.java => PropagatorBasedSpanLinksExtractorTest.java} (69%) create mode 100644 instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaInstrumentationModule.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java create mode 100644 instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java index bb376600102e..0930b1700318 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/Instrumenter.java @@ -66,9 +66,9 @@ public static InstrumenterBuilder newBuil private final SpanNameExtractor spanNameExtractor; private final SpanKindExtractor spanKindExtractor; private final SpanStatusExtractor spanStatusExtractor; + private final List> spanLinksExtractors; private final List> attributesExtractors; - private final List> spanLinkExtractors; private final List requestListeners; private final ErrorCauseExtractor errorCauseExtractor; @Nullable private final StartTimeExtractor startTimeExtractor; @@ -83,8 +83,8 @@ public static InstrumenterBuilder newBuil this.spanNameExtractor = builder.spanNameExtractor; this.spanKindExtractor = builder.spanKindExtractor; this.spanStatusExtractor = builder.spanStatusExtractor; + this.spanLinksExtractors = new ArrayList<>(builder.spanLinksExtractors); this.attributesExtractors = new ArrayList<>(builder.attributesExtractors); - this.spanLinkExtractors = new ArrayList<>(builder.spanLinkExtractors); this.requestListeners = new ArrayList<>(builder.requestListeners); this.errorCauseExtractor = builder.errorCauseExtractor; this.startTimeExtractor = builder.startTimeExtractor; @@ -131,8 +131,9 @@ public Context start(Context parentContext, REQUEST request) { spanBuilder.setStartTimestamp(startTimeExtractor.extract(request)); } - for (SpanLinkExtractor extractor : spanLinkExtractors) { - spanBuilder.addLink(extractor.extract(parentContext, request)); + SpanLinksBuilder spanLinksBuilder = new SpanLinksBuilderImpl(spanBuilder); + for (SpanLinksExtractor spanLinksExtractor : spanLinksExtractors) { + spanLinksExtractor.extract(spanLinksBuilder, parentContext, request); } UnsafeAttributes attributesBuilder = new UnsafeAttributes(); diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java index 93535b07239d..6a461c361b5a 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java @@ -44,9 +44,9 @@ public final class InstrumenterBuilder { final String instrumentationName; final SpanNameExtractor spanNameExtractor; + final List> spanLinksExtractors = new ArrayList<>(); final List> attributesExtractors = new ArrayList<>(); - final List> spanLinkExtractors = new ArrayList<>(); final List requestListeners = new ArrayList<>(); SpanKindExtractor spanKindExtractor = SpanKindExtractor.alwaysInternal(); @@ -100,10 +100,10 @@ public InstrumenterBuilder addAttributesExtractors( return addAttributesExtractors(Arrays.asList(attributesExtractors)); } - /** Adds a {@link SpanLinkExtractor} to extract span link from requests. */ - public InstrumenterBuilder addSpanLinkExtractor( - SpanLinkExtractor spanLinkExtractor) { - spanLinkExtractors.add(spanLinkExtractor); + /** Adds a {@link SpanLinksExtractor} to extract span link from requests. */ + public InstrumenterBuilder addSpanLinksExtractor( + SpanLinksExtractor spanLinksExtractor) { + spanLinksExtractors.add(spanLinksExtractor); return this; } diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractor.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractor.java similarity index 63% rename from instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractor.java rename to instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractor.java index f0fabf11c40f..13ad796c4264 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractor.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractor.java @@ -6,23 +6,22 @@ package io.opentelemetry.instrumentation.api.instrumenter; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.context.propagation.TextMapGetter; -final class PropagatorBasedSpanLinkExtractor implements SpanLinkExtractor { +final class PropagatorBasedSpanLinksExtractor implements SpanLinksExtractor { private final ContextPropagators propagators; private final TextMapGetter getter; - PropagatorBasedSpanLinkExtractor(ContextPropagators propagators, TextMapGetter getter) { + PropagatorBasedSpanLinksExtractor(ContextPropagators propagators, TextMapGetter getter) { this.propagators = propagators; this.getter = getter; } @Override - public SpanContext extract(Context parentContext, REQUEST request) { + public void extract(SpanLinksBuilder spanLinks, Context parentContext, REQUEST request) { Context extracted = propagators.getTextMapPropagator().extract(parentContext, request, getter); - return Span.fromContext(extracted).getSpanContext(); + spanLinks.addLink(Span.fromContext(extracted).getSpanContext()); } } diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinkExtractor.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinkExtractor.java deleted file mode 100644 index 91020e94fc76..000000000000 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinkExtractor.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.api.instrumenter; - -import io.opentelemetry.api.trace.SpanContext; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapGetter; - -/** Extractor of a span link for a request. */ -@FunctionalInterface -public interface SpanLinkExtractor { - /** - * Extract a {@link SpanContext} that should be linked to the newly created span. Returning {@code - * SpanContext.getInvalid()} will not add any link to the span. - */ - SpanContext extract(Context parentContext, REQUEST request); - - /** - * Returns a new {@link SpanLinkExtractor} that will extract a {@link SpanContext} from the - * request using configured {@code propagators}. - */ - static SpanLinkExtractor fromUpstreamRequest( - ContextPropagators propagators, TextMapGetter getter) { - return new PropagatorBasedSpanLinkExtractor<>(propagators, getter); - } -} diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilder.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilder.java new file mode 100644 index 000000000000..328c138c1099 --- /dev/null +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilder.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.instrumenter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; + +/** A builder that exposes methods for adding links to a span. */ +public interface SpanLinksBuilder { + + /** + * Adds a link to the newly created {@code Span}. Invalid {@link SpanContext}s will be skipped. + * + * @param spanContext the context of the linked {@code Span}. + * @return this. + * @see SpanBuilder#addLink(SpanContext) + */ + SpanLinksBuilder addLink(SpanContext spanContext); + + /** + * Adds a link to the newly created {@code Span}. Invalid {@link SpanContext}s will be skipped. + * + * @param spanContext the context of the linked {@code Span}. + * @param attributes the attributes of the {@code Link}. + * @return this. + * @see SpanBuilder#addLink(SpanContext) + */ + SpanLinksBuilder addLink(SpanContext spanContext, Attributes attributes); +} diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilderImpl.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilderImpl.java new file mode 100644 index 000000000000..36959a7fed6d --- /dev/null +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksBuilderImpl.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.instrumenter; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanContext; + +final class SpanLinksBuilderImpl implements SpanLinksBuilder { + private final SpanBuilder spanBuilder; + + SpanLinksBuilderImpl(SpanBuilder spanBuilder) { + this.spanBuilder = spanBuilder; + } + + @Override + public SpanLinksBuilder addLink(SpanContext spanContext) { + spanBuilder.addLink(spanContext); + return this; + } + + @Override + public SpanLinksBuilder addLink(SpanContext spanContext, Attributes attributes) { + spanBuilder.addLink(spanContext, attributes); + return this; + } +} diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksExtractor.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksExtractor.java new file mode 100644 index 000000000000..3b7fba03387d --- /dev/null +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanLinksExtractor.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.api.instrumenter; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapGetter; + +/** Extractor of span links for a request. */ +@FunctionalInterface +public interface SpanLinksExtractor { + + /** + * Extracts {@link SpanContext}s that should be linked to the newly created span and adds them to + * {@code spanLinks}. + */ + void extract(SpanLinksBuilder spanLinks, Context parentContext, REQUEST request); + + /** + * Returns a new {@link SpanLinksExtractor} that will extract a {@link SpanContext} from the + * request using configured {@code propagators}. + */ + static SpanLinksExtractor fromUpstreamRequest( + ContextPropagators propagators, TextMapGetter getter) { + return new PropagatorBasedSpanLinksExtractor<>(propagators, getter); + } +} diff --git a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterTest.java b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterTest.java index d250ee502e02..c5078bc301af 100644 --- a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterTest.java +++ b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterTest.java @@ -103,14 +103,17 @@ protected void onEnd( } } - static class LinkExtractor implements SpanLinkExtractor> { + static class LinksExtractor implements SpanLinksExtractor> { + @Override - public SpanContext extract(Context parentContext, Map request) { - return SpanContext.create( - request.get("linkTraceId"), - request.get("linkSpanId"), - TraceFlags.getSampled(), - TraceState.getDefault()); + public void extract( + SpanLinksBuilder spanLinks, Context parentContext, Map request) { + spanLinks.addLink( + SpanContext.create( + request.get("linkTraceId"), + request.get("linkSpanId"), + TraceFlags.getSampled(), + TraceState.getDefault())); } } @@ -145,7 +148,7 @@ void server() { Instrumenter., Map>newBuilder( otelTesting.getOpenTelemetry(), "test", unused -> "span") .addAttributesExtractors(new AttributesExtractor1(), new AttributesExtractor2()) - .addSpanLinkExtractor(new LinkExtractor()) + .addSpanLinksExtractor(new LinksExtractor()) .newServerInstrumenter(new MapGetter()); Context context = instrumenter.start(Context.root(), REQUEST); @@ -258,7 +261,7 @@ void server_http() { mockNetAttributes, new AttributesExtractor1(), new AttributesExtractor2()) - .addSpanLinkExtractor(new LinkExtractor()) + .addSpanLinksExtractor(new LinksExtractor()) .newServerInstrumenter(new MapGetter()); when(mockNetAttributes.peerIp(REQUEST, null)).thenReturn("2.2.2.2"); @@ -297,7 +300,7 @@ void server_http_xForwardedFor() { mockNetAttributes, new AttributesExtractor1(), new AttributesExtractor2()) - .addSpanLinkExtractor(new LinkExtractor()) + .addSpanLinksExtractor(new LinksExtractor()) .newServerInstrumenter(new MapGetter()); Map request = new HashMap<>(REQUEST); @@ -340,7 +343,7 @@ void server_http_noForwarded() { mockNetAttributes, new AttributesExtractor1(), new AttributesExtractor2()) - .addSpanLinkExtractor(new LinkExtractor()) + .addSpanLinksExtractor(new LinksExtractor()) .newServerInstrumenter(new MapGetter()); Map request = new HashMap<>(REQUEST); @@ -378,7 +381,7 @@ void client() { Instrumenter., Map>newBuilder( otelTesting.getOpenTelemetry(), "test", unused -> "span") .addAttributesExtractors(new AttributesExtractor1(), new AttributesExtractor2()) - .addSpanLinkExtractor(new LinkExtractor()) + .addSpanLinksExtractor(new LinksExtractor()) .newClientInstrumenter(Map::put); Map request = new HashMap<>(REQUEST); @@ -512,7 +515,8 @@ void shouldNotAddInvalidLink() { Instrumenter instrumenter = Instrumenter.newBuilder( otelTesting.getOpenTelemetry(), "test", request -> "test span") - .addSpanLinkExtractor((parentContext, request) -> SpanContext.getInvalid()) + .addSpanLinksExtractor( + (spanLinks, parentContext, request) -> spanLinks.addLink(SpanContext.getInvalid())) .newInstrumenter(); // when diff --git a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractorTest.java b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractorTest.java similarity index 69% rename from instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractorTest.java rename to instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractorTest.java index 2b6d644694b9..d3efff43ec7a 100644 --- a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinkExtractorTest.java +++ b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/PropagatorBasedSpanLinksExtractorTest.java @@ -6,7 +6,7 @@ package io.opentelemetry.instrumentation.api.instrumenter; import static java.util.Collections.singletonMap; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.verify; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; @@ -19,31 +19,37 @@ import io.opentelemetry.context.propagation.TextMapGetter; import java.util.Map; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; -class PropagatorBasedSpanLinkExtractorTest { +@ExtendWith(MockitoExtension.class) +class PropagatorBasedSpanLinksExtractorTest { private static final String TRACE_ID = TraceId.fromLongs(0, 123); private static final String SPAN_ID = SpanId.fromLong(456); + @Mock SpanLinksBuilder spanLinks; + @Test void shouldExtractSpanLink() { // given ContextPropagators propagators = ContextPropagators.create(W3CTraceContextPropagator.getInstance()); - SpanLinkExtractor> underTest = - SpanLinkExtractor.fromUpstreamRequest(propagators, new MapGetter()); + SpanLinksExtractor> underTest = + SpanLinksExtractor.fromUpstreamRequest(propagators, new MapGetter()); Map request = singletonMap("traceparent", String.format("00-%s-%s-01", TRACE_ID, SPAN_ID)); // when - SpanContext link = underTest.extract(Context.root(), request); + underTest.extract(spanLinks, Context.root(), request); // then - assertEquals( - SpanContext.createFromRemoteParent( - TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault()), - link); + verify(spanLinks) + .addLink( + SpanContext.createFromRemoteParent( + TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault())); } static final class MapGetter implements TextMapGetter> { diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index 1660c6ec8535..b5644b36cb01 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -10,22 +10,22 @@ import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingIterable implements Iterable> { - private final Iterable> delegate; +public class TracingIterable implements Iterable> { + private final Iterable> delegate; private boolean firstIterator = true; - public TracingIterable(Iterable> delegate) { + public TracingIterable(Iterable> delegate) { this.delegate = delegate; } @Override - public Iterator> iterator() { - Iterator> it; + public Iterator> iterator() { + Iterator> it; // We should only return one iterator with tracing. // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = new TracingIterator(delegate.iterator(), consumerInstrumenter()); + it = new TracingIterator<>(delegate.iterator(), consumerInstrumenter()); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index 406e0cf57328..e1ae7b6a9d2b 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -8,13 +8,15 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.checkerframework.checker.nullness.qual.Nullable; -public class TracingIterator implements Iterator> { +public class TracingIterator + implements Iterator>, KafkaConsumerIteratorWrapper { - private final Iterator> delegateIterator; + private final Iterator> delegateIterator; private final Instrumenter, Void> instrumenter; private final Context parentContext; @@ -28,7 +30,7 @@ public class TracingIterator implements Iterator> { @Nullable private Scope currentScope; public TracingIterator( - Iterator> delegateIterator, + Iterator> delegateIterator, Instrumenter, Void> instrumenter) { this.delegateIterator = delegateIterator; this.instrumenter = instrumenter; @@ -42,11 +44,11 @@ public boolean hasNext() { } @Override - public ConsumerRecord next() { + public ConsumerRecord next() { // in case they didn't call hasNext()... closeScopeAndEndSpan(); - ConsumerRecord next = delegateIterator.next(); + ConsumerRecord next = delegateIterator.next(); if (next != null && instrumenter.shouldStart(parentContext, next)) { currentRequest = next; currentContext = instrumenter.start(parentContext, currentRequest); @@ -69,4 +71,9 @@ private void closeScopeAndEndSpan() { public void remove() { delegateIterator.remove(); } + + @Override + public Iterator> unwrap() { + return delegateIterator; + } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java index 7a92951b8b52..603ae4c53099 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java @@ -10,10 +10,10 @@ import java.util.ListIterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingList extends TracingIterable implements List> { - private final List> delegate; +public class TracingList extends TracingIterable implements List> { + private final List> delegate; - public TracingList(List> delegate) { + public TracingList(List> delegate) { super(delegate); this.delegate = delegate; } @@ -44,12 +44,12 @@ public T[] toArray(T[] a) { } @Override - public boolean add(ConsumerRecord consumerRecord) { + public boolean add(ConsumerRecord consumerRecord) { return delegate.add(consumerRecord); } @Override - public void add(int index, ConsumerRecord element) { + public void add(int index, ConsumerRecord element) { delegate.add(index, element); } @@ -59,7 +59,7 @@ public boolean remove(Object o) { } @Override - public ConsumerRecord remove(int index) { + public ConsumerRecord remove(int index) { return delegate.remove(index); } @@ -69,12 +69,12 @@ public boolean containsAll(Collection c) { } @Override - public boolean addAll(Collection> c) { + public boolean addAll(Collection> c) { return delegate.addAll(c); } @Override - public boolean addAll(int index, Collection> c) { + public boolean addAll(int index, Collection> c) { return delegate.addAll(index, c); } @@ -94,13 +94,13 @@ public void clear() { } @Override - public ConsumerRecord get(int index) { + public ConsumerRecord get(int index) { // TODO: should this be instrumented as well? return delegate.get(index); } @Override - public ConsumerRecord set(int index, ConsumerRecord element) { + public ConsumerRecord set(int index, ConsumerRecord element) { return delegate.set(index, element); } @@ -115,21 +115,21 @@ public int lastIndexOf(Object o) { } @Override - public ListIterator> listIterator() { + public ListIterator> listIterator() { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka // Consumer so we will not do that for now return delegate.listIterator(); } @Override - public ListIterator> listIterator(int index) { + public ListIterator> listIterator(int index) { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka // Consumer so we will not do that for now return delegate.listIterator(index); } @Override - public List> subList(int fromIndex, int toIndex) { + public List> subList(int fromIndex, int toIndex) { // TODO: the API for subList is not really good to instrument it in context of Kafka // Consumer so we will not do that for now // Kafka is essentially a sequential commit log. We should only enable tracing when traversing diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java index d8243dd392d6..a7dbcd1a0215 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerAttributesExtractor.java @@ -13,6 +13,13 @@ public final class KafkaConsumerAttributesExtractor extends MessagingAttributesExtractor, Void> { + + private final MessageOperation messageOperation; + + public KafkaConsumerAttributesExtractor(MessageOperation messageOperation) { + this.messageOperation = messageOperation; + } + @Override protected String system(ConsumerRecord consumerRecord) { return "kafka"; @@ -65,7 +72,7 @@ protected Long messagePayloadSize(ConsumerRecord consumerRecord) { @Override protected MessageOperation operation(ConsumerRecord consumerRecord) { - return MessageOperation.PROCESS; + return messageOperation; } @Override diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java new file mode 100644 index 000000000000..b0f1bbfd626e --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIteratorWrapper.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface KafkaConsumerIteratorWrapper { + + /** + * Returns the actual, non-tracing iterator. This method is only supposed to be used by other + * Kafka consumer instrumentations that want to suppress the kafka-clients one. + */ + Iterator> unwrap(); +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts new file mode 100644 index 000000000000..896be8200e1c --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/build.gradle.kts @@ -0,0 +1,39 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("org.springframework.kafka") + module.set("spring-kafka") + versions.set("[2.7.0,)") + assertInverse.set(true) + } +} + +val versions: Map by project + +dependencies { + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") + + implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent")) + + library("org.springframework.kafka:spring-kafka:2.7.0") + + testInstrumentation(project(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")) + + testImplementation("org.testcontainers:kafka:${versions["org.testcontainers"]}") + + testLibrary("org.springframework.boot:spring-boot-starter-test:2.5.3") + testLibrary("org.springframework.boot:spring-boot-starter:2.5.3") +} + +tasks { + named("test") { + usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + + // TODO run tests both with and without experimental span attributes + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java new file mode 100644 index 000000000000..1ba195abb51b --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/AbstractMessageListenerContainerInstrumentation.java @@ -0,0 +1,51 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import static net.bytebuddy.matcher.ElementMatchers.isProtected; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.BatchInterceptor; + +public class AbstractMessageListenerContainerInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.springframework.kafka.listener.AbstractMessageListenerContainer"); + } + + @Override + public void transform(TypeTransformer transformer) { + // getBatchInterceptor() is called internally by AbstractMessageListenerContainer + // implementations + transformer.applyAdviceToMethod( + named("getBatchInterceptor") + .and(isProtected()) + .and(takesArguments(0)) + .and(returns(named("org.springframework.kafka.listener.BatchInterceptor"))), + this.getClass().getName() + "$GetBatchInterceptorAdvice"); + } + + @SuppressWarnings("unused") + public static class GetBatchInterceptorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return(readOnly = false) BatchInterceptor interceptor) { + if (!(interceptor instanceof InstrumentedBatchInterceptor)) { + ContextStore contextStore = InstrumentationContext.get(ConsumerRecords.class, State.class); + interceptor = new InstrumentedBatchInterceptor<>(contextStore, interceptor); + } + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java new file mode 100644 index 000000000000..48af8a3537ac --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchConsumerAttributesExtractor.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.kafka.common.TopicPartition; +import org.checkerframework.checker.nullness.qual.Nullable; + +public final class BatchConsumerAttributesExtractor + extends MessagingAttributesExtractor, Void> { + @Override + protected String system(BatchRecords batchRecords) { + return "kafka"; + } + + @Override + protected String destinationKind(BatchRecords batchRecords) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Override + protected @Nullable String destination(BatchRecords batchRecords) { + Set topics = + batchRecords.records().partitions().stream() + .map(TopicPartition::topic) + .collect(Collectors.toSet()); + // only return topic when there's exactly one in the batch + return topics.size() == 1 ? topics.iterator().next() : null; + } + + @Override + protected boolean temporaryDestination(BatchRecords batchRecords) { + return false; + } + + @Override + protected @Nullable String protocol(BatchRecords batchRecords) { + return null; + } + + @Override + protected @Nullable String protocolVersion(BatchRecords batchRecords) { + return null; + } + + @Override + protected @Nullable String url(BatchRecords batchRecords) { + return null; + } + + @Override + protected @Nullable String conversationId(BatchRecords batchRecords) { + return null; + } + + @Override + protected @Nullable Long messagePayloadSize(BatchRecords batchRecords) { + return null; + } + + @Override + protected @Nullable Long messagePayloadCompressedSize(BatchRecords batchRecords) { + return null; + } + + @Override + protected MessageOperation operation(BatchRecords batchRecords) { + return MessageOperation.PROCESS; + } + + @Override + protected @Nullable String messageId(BatchRecords batchRecords, @Nullable Void unused) { + return null; + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java new file mode 100644 index 000000000000..18b3924c48ca --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/BatchRecords.java @@ -0,0 +1,33 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +@AutoValue +public abstract class BatchRecords { + + public static BatchRecords create( + ConsumerRecords consumerRecords, List linkedReceiveSpans) { + return new AutoValue_BatchRecords<>(consumerRecords, linkedReceiveSpans); + } + + public abstract ConsumerRecords records(); + + public abstract List linkedReceiveSpans(); + + public static SpanLinksExtractor> spanLinksExtractor() { + return (spanLinks, parentContext, batchRecords) -> { + batchRecords.linkedReceiveSpans().forEach(spanLinks::addLink); + }; + } + + BatchRecords() {} +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java new file mode 100644 index 000000000000..8068c558d72e --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/InstrumentedBatchInterceptor.java @@ -0,0 +1,106 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter; +import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.receiveInstrumenter; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.springframework.kafka.listener.BatchInterceptor; + +public final class InstrumentedBatchInterceptor implements BatchInterceptor { + private final ContextStore, State> contextStore; + @Nullable private final BatchInterceptor decorated; + + public InstrumentedBatchInterceptor( + ContextStore, State> contextStore, + @Nullable BatchInterceptor decorated) { + this.contextStore = contextStore; + this.decorated = decorated; + } + + @Override + public ConsumerRecords intercept( + ConsumerRecords consumerRecords, Consumer consumer) { + + Context parentContext = Context.current(); + + // create spans for all records received in a batch + List receiveSpanContexts = traceReceivingRecords(parentContext, consumerRecords); + + // then start a span for processing that links all those receive spans + BatchRecords batchRecords = BatchRecords.create(consumerRecords, receiveSpanContexts); + if (processInstrumenter().shouldStart(parentContext, batchRecords)) { + Context context = processInstrumenter().start(parentContext, batchRecords); + Scope scope = context.makeCurrent(); + contextStore.put(consumerRecords, State.create(batchRecords, context, scope)); + } + + return decorated == null ? consumerRecords : decorated.intercept(consumerRecords, consumer); + } + + private List traceReceivingRecords( + Context parentContext, ConsumerRecords records) { + List receiveSpanContexts = new ArrayList<>(); + + Iterator> it = records.iterator(); + // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though there's + // no current CONSUMER span + // this instrumentation will create CONSUMER receive spans for each record instead of + // kafka-clients + if (it instanceof KafkaConsumerIteratorWrapper) { + it = ((KafkaConsumerIteratorWrapper) it).unwrap(); + } + + while (it.hasNext()) { + ConsumerRecord record = it.next(); + if (receiveInstrumenter().shouldStart(parentContext, record)) { + Context context = receiveInstrumenter().start(parentContext, record); + receiveSpanContexts.add(Span.fromContext(context).getSpanContext()); + receiveInstrumenter().end(context, record, null, null); + } + } + + return receiveSpanContexts; + } + + @Override + public void success(ConsumerRecords records, Consumer consumer) { + end(records, null); + if (decorated != null) { + decorated.success(records, consumer); + } + } + + @Override + public void failure(ConsumerRecords records, Exception exception, Consumer consumer) { + end(records, exception); + if (decorated != null) { + decorated.failure(records, exception, consumer); + } + } + + private void end(ConsumerRecords records, @Nullable Throwable error) { + State state = contextStore.get(records); + contextStore.put(records, null); + if (state != null) { + state.scope().close(); + processInstrumenter().end(state.context(), state.request(), null, error); + } + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java new file mode 100644 index 000000000000..d43954afc8ed --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/KafkaBatchErrorCauseExtractor.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import io.opentelemetry.instrumentation.api.instrumenter.ErrorCauseExtractor; +import org.springframework.kafka.listener.ListenerExecutionFailedException; + +public final class KafkaBatchErrorCauseExtractor implements ErrorCauseExtractor { + private final ErrorCauseExtractor delegate = ErrorCauseExtractor.jdk(); + + @Override + public Throwable extractCause(Throwable error) { + if (error instanceof ListenerExecutionFailedException && error.getCause() != null) { + error = error.getCause(); + } + return delegate.extractCause(error); + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaInstrumentationModule.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaInstrumentationModule.java new file mode 100644 index 000000000000..ab0a968ae8ba --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaInstrumentationModule.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import static java.util.Collections.singletonList; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class SpringKafkaInstrumentationModule extends InstrumentationModule { + public SpringKafkaInstrumentationModule() { + super("spring-kafka", "spring-kafka-2.7"); + } + + @Override + public List typeInstrumentations() { + return singletonList(new AbstractMessageListenerContainerInstrumentation()); + } +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java new file mode 100644 index 000000000000..7e836c2cbd55 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/SpringKafkaSingletons.java @@ -0,0 +1,71 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public final class SpringKafkaSingletons { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-kafka-2.7"; + + private static final Instrumenter, Void> RECEIVE_INSTRUMENTER = + buildReceiveInstrumenter(); + private static final Instrumenter, Void> PROCESS_INSTRUMENTER = + buildProcessInstrumenter(); + + private static Instrumenter, Void> buildReceiveInstrumenter() { + KafkaConsumerAttributesExtractor consumerAttributesExtractor = + new KafkaConsumerAttributesExtractor(MessageOperation.RECEIVE); + SpanNameExtractor> spanNameExtractor = + MessagingSpanNameExtractor.create(consumerAttributesExtractor); + + InstrumenterBuilder, Void> builder = + Instrumenter., Void>newBuilder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(consumerAttributesExtractor) + .addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor()); + + if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { + builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); + } + + return builder.newConsumerInstrumenter(new KafkaHeadersGetter()); + } + + private static Instrumenter, Void> buildProcessInstrumenter() { + BatchConsumerAttributesExtractor attributesExtractor = new BatchConsumerAttributesExtractor(); + SpanNameExtractor> spanNameExtractor = + MessagingSpanNameExtractor.create(attributesExtractor); + + return Instrumenter., Void>newBuilder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(attributesExtractor) + .addSpanLinksExtractor(BatchRecords.spanLinksExtractor()) + .setErrorCauseExtractor(new KafkaBatchErrorCauseExtractor()) + .newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + public static Instrumenter, Void> receiveInstrumenter() { + return RECEIVE_INSTRUMENTER; + } + + public static Instrumenter, Void> processInstrumenter() { + return PROCESS_INSTRUMENTER; + } + + private SpringKafkaSingletons() {} +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java new file mode 100644 index 000000000000..2162b5521f95 --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/kafka/State.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.kafka; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +@AutoValue +public abstract class State { + + public static State create( + BatchRecords request, Context context, Scope scope) { + return new AutoValue_State<>(request, context, scope); + } + + public abstract BatchRecords request(); + + public abstract Context context(); + + public abstract Scope scope(); + + State() {} +} diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy new file mode 100644 index 000000000000..db470e19a6cb --- /dev/null +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -0,0 +1,253 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.sdk.trace.data.SpanData +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.boot.SpringApplication +import org.springframework.boot.SpringBootConfiguration +import org.springframework.boot.autoconfigure.EnableAutoConfiguration +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Bean +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.config.TopicBuilder +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.testcontainers.containers.KafkaContainer +import spock.lang.Shared + +class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { + @Shared + static KafkaContainer kafka + @Shared + static ConfigurableApplicationContext applicationContext + + def setupSpec() { + kafka = new KafkaContainer() + kafka.start() + + def app = new SpringApplication(ConsumerConfig) + app.setDefaultProperties([ + "spring.jmx.enabled" : false, + "spring.main.web-application-type" : "none", + "spring.kafka.bootstrap-servers" : kafka.bootstrapServers, + "spring.kafka.consumer.auto-offset-reset": "earliest", + "spring.kafka.consumer.linger-ms" : 10, + ]) + applicationContext = app.run() + } + + def cleanupSpec() { + kafka.stop() + applicationContext?.stop() + } + + def "should create spans for batch receive+process"() { + given: + def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) + + when: + runWithSpan("producer") { + kafkaTemplate.send("testTopic", "10", "testSpan1") + kafkaTemplate.send("testTopic", "20", "testSpan2") + kafkaTemplate.flush() + } + + then: + assertTraces(2) { + SpanData consumer1, consumer2 + + trace(0, 5) { + span(0) { + name "producer" + } + span(1) { + name "testTopic send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(2) { + name "testTopic receive" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 + "kafka.offset" Long + "kafka.record.queue_time_ms" Long + } + } + span(3) { + name "testTopic send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(4) { + name "testTopic receive" + kind CONSUMER + childOf span(3) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 + "kafka.offset" Long + "kafka.record.queue_time_ms" Long + } + } + + consumer1 = span(2) + consumer2 = span(4) + } + trace(1, 2) { + span(0) { + name "testTopic process" + kind CONSUMER + hasLink consumer1 + hasLink consumer2 + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + } + } + span(1) { + name "consumer" + childOf span(0) + } + } + } + } + + def "should handle failure in Kafka listener"() { + given: + def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) + + when: + runWithSpan("producer") { + kafkaTemplate.send("testTopic", "10", "error") + kafkaTemplate.flush() + } + + then: + assertTraces(2) { + SpanData consumer + + trace(0, 3) { + span(0) { + name "producer" + } + span(1) { + name "testTopic send" + kind PRODUCER + childOf span(0) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + } + } + span(2) { + name "testTopic receive" + kind CONSUMER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" 0 + "kafka.offset" Long + "kafka.record.queue_time_ms" Long + } + } + + consumer = span(2) + } + trace(1, 2) { + span(0) { + name "testTopic process" + kind CONSUMER + hasLink consumer + status ERROR + errorEvent IllegalArgumentException, "boom" + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" "testTopic" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + } + } + span(1) { + name "consumer" + childOf span(0) + } + } + } + } + + @SpringBootConfiguration + @EnableAutoConfiguration + static class ConsumerConfig { + + @Bean + NewTopic topic() { + return TopicBuilder.name("testTopic") + .partitions(1) + .replicas(1) + .build() + } + + @KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory") + void listener(List> records) { + runInternalSpan("consumer") + records.forEach({ record -> + if (record.value() == "error") { + throw new IllegalArgumentException("boom") + } + }) + } + + @Bean + ConcurrentKafkaListenerContainerFactory batchFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>() + factory.setConsumerFactory(consumerFactory) + factory.setBatchListener(true) + factory.setAutoStartup(true) + // setting interceptBeforeTx to true eliminates kafka-clients noise - otherwise spans would be created on every ConsumerRecords#iterator() call + factory.setContainerCustomizer({ container -> + container.setInterceptBeforeTx(true) + }) + factory + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1eef28cfea8e..91575b7d0fe5 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -307,6 +307,7 @@ include(":instrumentation:spring:spring-data-1.8:javaagent") include(":instrumentation:spring:spring-integration-4.1:javaagent") include(":instrumentation:spring:spring-integration-4.1:library") include(":instrumentation:spring:spring-integration-4.1:testing") +include(":instrumentation:spring:spring-kafka-2.7:javaagent") include(":instrumentation:spring:spring-rabbit-1.0:javaagent") include(":instrumentation:spring:spring-scheduling-3.1:javaagent") include(":instrumentation:spring:spring-web-3.1:javaagent") From 5a3bc8f65a4b90791080a13066b1c53bc6fb0bc5 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 26 Aug 2021 10:03:52 +0200 Subject: [PATCH 2/4] Update instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java Co-authored-by: Trask Stalnaker --- .../instrumentation/api/instrumenter/InstrumenterBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java index 6a461c361b5a..09a006347ca2 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/InstrumenterBuilder.java @@ -100,7 +100,7 @@ public InstrumenterBuilder addAttributesExtractors( return addAttributesExtractors(Arrays.asList(attributesExtractors)); } - /** Adds a {@link SpanLinksExtractor} to extract span link from requests. */ + /** Adds a {@link SpanLinksExtractor} to extract span links from requests. */ public InstrumenterBuilder addSpanLinksExtractor( SpanLinksExtractor spanLinksExtractor) { spanLinksExtractors.add(spanLinksExtractor); From 04710878325d1890af6c7a680aff929eb0258ce0 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 26 Aug 2021 10:26:32 +0200 Subject: [PATCH 3/4] fix compilation failure --- .../instrumentation/kafkaclients/KafkaSingletons.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index b713235eb979..709485e84bef 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor; import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor; @@ -39,7 +40,8 @@ public final class KafkaSingletons { } private static Instrumenter, Void> buildConsumerInstrumenter() { - KafkaConsumerAttributesExtractor attributesExtractor = new KafkaConsumerAttributesExtractor(); + KafkaConsumerAttributesExtractor attributesExtractor = + new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS); SpanNameExtractor> spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); From 9d475b9dc76290b9287cda2ca282b8fdc4aca289 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 26 Aug 2021 12:44:27 +0200 Subject: [PATCH 4/4] Suppress nested CONSUMER spans --- .../api/instrumenter/SpanSuppressionStrategy.java | 2 +- .../api/instrumenter/SpanSuppressionStrategyTest.java | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategy.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategy.java index 89f1b2474c0e..3891d5b984e0 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategy.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategy.java @@ -16,7 +16,7 @@ abstract class SpanSuppressionStrategy { private static final SpanSuppressionStrategy SERVER_STRATEGY = new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.SERVER)); private static final SpanSuppressionStrategy CONSUMER_STRATEGY = - new StoreOnlyStrategy(singleton(SpanKey.CONSUMER)); + new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.CONSUMER)); private static final SpanSuppressionStrategy ALL_CLIENTS_STRATEGY = new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.ALL_CLIENTS)); private static final SpanSuppressionStrategy ALL_PRODUCERS_STRATEGY = diff --git a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategyTest.java b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategyTest.java index 24aa4cdb357e..a2cd51a76874 100644 --- a/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategyTest.java +++ b/instrumentation-api/src/test/java/io/opentelemetry/instrumentation/api/instrumenter/SpanSuppressionStrategyTest.java @@ -65,8 +65,7 @@ public void consumerSpan_getSet() { Context context = SpanKey.CONSUMER.storeInContext(Context.root(), SPAN); - // never suppress CONSUMER - assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse(); + assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue(); assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN); allClientSpanKeys().forEach(spanKey -> assertThat(spanKey.fromContextOrNull(context)).isNull()); @@ -188,13 +187,13 @@ public void noKeys_serverIsSuppressed() { } @Test - public void noKeys_consumerIsNeverSuppressed() { + public void noKeys_consumerIsSuppressed() { SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(new HashSet<>()); Context context = strategy.storeInContext(Context.root(), SpanKind.CONSUMER, SPAN); - assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse(); + assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue(); assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN); allClientSpanKeys()