From 1a6294e1be8431b62e9ee3d3ebe49f15116e3942 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Sat, 18 Sep 2021 04:17:00 +0200 Subject: [PATCH] Make CONSUMER receive span a parent of CONSUMER process spans in kafka-streams (#4151) --- .../kafkaclients/TracingIterable.java | 9 ++- .../kafka/KafkaConsumerIterableWrapper.java | 17 +++++ .../groovy/KafkaStreamsTest.groovy | 49 +++++-------- .../KafkaStreamsInstrumentationModule.java | 8 +- .../kafkastreams/KafkaStreamsSingletons.java | 11 ++- ...ava => PartitionGroupInstrumentation.java} | 22 ++++-- .../RecordDeserializerInstrumentation.java | 66 +++++++++++++++++ ...odeRecordDeserializerInstrumentation.java} | 15 +++- ...on.java => StreamTaskInstrumentation.java} | 34 +++++++-- .../StreamThreadInstrumentation.java | 73 +++++++++++++++++++ .../src/test/groovy/KafkaStreamsTest.groovy | 51 +++++-------- .../api/Java8BytecodeBridge.java | 6 ++ 12 files changed, 274 insertions(+), 87 deletions(-) create mode 100644 instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIterableWrapper.java rename instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/{StreamTaskStartInstrumentation.java => PartitionGroupInstrumentation.java} (67%) create mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java rename instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/{KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java => SourceNodeRecordDeserializerInstrumentation.java} (77%) rename instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/{StreamTaskStopInstrumentation.java => StreamTaskInstrumentation.java} (53%) create mode 100644 instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index 8b942138e39c..70da1bc02138 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -6,11 +6,13 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper; import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.checkerframework.checker.nullness.qual.Nullable; -public class TracingIterable implements Iterable> { +public class TracingIterable + implements Iterable>, KafkaConsumerIterableWrapper { private final Iterable> delegate; @Nullable private final SpanContext receiveSpanContext; private boolean firstIterator = true; @@ -36,4 +38,9 @@ public Iterator> iterator() { return it; } + + @Override + public Iterable> unwrap() { + return delegate; + } } diff --git a/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIterableWrapper.java b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIterableWrapper.java new file mode 100644 index 000000000000..257922265216 --- /dev/null +++ b/instrumentation/kafka-clients/kafka-clients-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafka/KafkaConsumerIterableWrapper.java @@ -0,0 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface KafkaConsumerIterableWrapper { + + /** + * Returns the actual, non-tracing iterable. This method is only supposed to be used by other + * Kafka consumer instrumentations that want to suppress the kafka-clients one. + */ + Iterable> unwrap(); +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy index 9327d28179d3..ced88a36a934 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/latestDepTest/groovy/KafkaStreamsTest.groovy @@ -150,7 +150,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { SpanData producerPending, producerProcessed - trace(0, 3) { + trace(0, 1) { // kafka-clients PRODUCER span(0) { name STREAM_PENDING + " send" @@ -162,39 +162,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } - // kafka-stream CONSUMER - span(1) { - name STREAM_PENDING + " process" - kind CONSUMER - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - "asdf" "testing" - } - } - // kafka-stream PRODUCER - span(2) { - name STREAM_PROCESSED + " send" - kind PRODUCER - childOf span(1) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } producerPending = span(0) - producerProcessed = span(2) } - trace(1, 2) { + trace(1, 3) { // kafka-clients CONSUMER receive span(0) { name STREAM_PENDING + " receive" @@ -207,7 +178,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" } } - // kafka-clients CONSUMER process + // kafka-stream CONSUMER span(1) { name STREAM_PENDING + " process" kind CONSUMER @@ -222,8 +193,22 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "kafka.offset" 0 "kafka.record.queue_time_ms" { it >= 0 } + "asdf" "testing" + } + } + // kafka-clients PRODUCER + span(2) { + name STREAM_PROCESSED + " send" + kind PRODUCER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } + + producerProcessed = span(2) } trace(2, 2) { // kafka-clients CONSUMER receive diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java index fcef833ce995..c61cecfed349 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsInstrumentationModule.java @@ -21,8 +21,10 @@ public KafkaStreamsInstrumentationModule() { @Override public List typeInstrumentations() { return asList( - new KafkaStreamsSourceNodeRecordDeserializerInstrumentation(), - new StreamTaskStartInstrumentation(), - new StreamTaskStopInstrumentation()); + new PartitionGroupInstrumentation(), + new RecordDeserializerInstrumentation(), + new SourceNodeRecordDeserializerInstrumentation(), + new StreamTaskInstrumentation(), + new StreamThreadInstrumentation()); } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java index fffd520ec42f..26ca1bb6b755 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -9,6 +9,7 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; @@ -39,10 +40,12 @@ public final class KafkaStreamsSingletons { if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) { builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor()); } - // TODO: use the local receive span as parent, keep the producer in a link - return KafkaPropagation.isPropagationEnabled() - ? builder.newConsumerInstrumenter(new KafkaHeadersGetter()) - : builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); + if (KafkaPropagation.isPropagationEnabled()) { + builder.addSpanLinksExtractor( + SpanLinksExtractor.fromUpstreamRequest( + GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter())); + } + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); } public static Instrumenter, Void> instrumenter() { diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java similarity index 67% rename from instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java rename to instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java index e0e33538003e..2685e5f13a41 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStartInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/PartitionGroupInstrumentation.java @@ -5,6 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; +import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.wrapSpan; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,16 +14,19 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.processor.internals.StampedRecord; -public class StreamTaskStartInstrumentation implements TypeInstrumentation { +// the advice applied by this instrumentation actually starts the span +public class PartitionGroupInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -35,11 +40,11 @@ public void transform(TypeTransformer transformer) { .and(isPackagePrivate()) .and(named("nextRecord")) .and(returns(named("org.apache.kafka.streams.processor.internals.StampedRecord"))), - StreamTaskStartInstrumentation.class.getName() + "$StartSpanAdvice"); + PartitionGroupInstrumentation.class.getName() + "$NextRecordAdvice"); } @SuppressWarnings("unused") - public static class StartSpanAdvice { + public static class NextRecordAdvice { @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit(@Advice.Return StampedRecord record) { @@ -53,7 +58,14 @@ public static void onExit(@Advice.Return StampedRecord record) { return; } - Context parentContext = Java8BytecodeBridge.currentContext(); + // use the receive CONSUMER span as parent if it's available + Context parentContext = currentContext(); + SpanContext receiveSpanContext = + InstrumentationContext.get(ConsumerRecord.class, SpanContext.class).get(record.value); + if (receiveSpanContext != null) { + parentContext = parentContext.with(wrapSpan(receiveSpanContext)); + } + if (!instrumenter().shouldStart(parentContext, record.value)) { return; } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java new file mode 100644 index 000000000000..e6771f2484b9 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/RecordDeserializerInstrumentation.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static net.bytebuddy.matcher.ElementMatchers.isInterface; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +// at some point in time SourceNodeRecordDeserializer was refactored into RecordDeserializer +public class RecordDeserializerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.kafka.streams.processor.internals.RecordDeserializer"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.kafka.streams.processor.internals.RecordDeserializer") + .and(not(isInterface())); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPackagePrivate()) + .and(named("deserialize")) + .and(takesArgument(1, named("org.apache.kafka.clients.consumer.ConsumerRecord"))) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))), + RecordDeserializerInstrumentation.class.getName() + "$DeserializeAdvice"); + } + + @SuppressWarnings("unused") + public static class DeserializeAdvice { + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Argument(1) ConsumerRecord incoming, + @Advice.Return(readOnly = false) ConsumerRecord result) { + + // copy the receive CONSUMER span association + ContextStore singleRecordReceiveSpan = + InstrumentationContext.get(ConsumerRecord.class, SpanContext.class); + singleRecordReceiveSpan.put(result, singleRecordReceiveSpan.get(incoming)); + } + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java similarity index 77% rename from instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java rename to instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java index f1fee3cc8ba1..4a28f47d87e6 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSourceNodeRecordDeserializerInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/SourceNodeRecordDeserializerInstrumentation.java @@ -11,8 +11,11 @@ import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -20,8 +23,7 @@ import org.apache.kafka.common.record.TimestampType; // This is necessary because SourceNodeRecordDeserializer drops the headers. :-( -public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation - implements TypeInstrumentation { +public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -36,8 +38,7 @@ public void transform(TypeTransformer transformer) { .and(named("deserialize")) .and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerRecord"))) .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))), - KafkaStreamsSourceNodeRecordDeserializerInstrumentation.class.getName() - + "$SaveHeadersAdvice"); + SourceNodeRecordDeserializerInstrumentation.class.getName() + "$SaveHeadersAdvice"); } @SuppressWarnings("unused") @@ -47,6 +48,7 @@ public static class SaveHeadersAdvice { public static void saveHeaders( @Advice.Argument(0) ConsumerRecord incoming, @Advice.Return(readOnly = false) ConsumerRecord result) { + result = new ConsumerRecord<>( result.topic(), @@ -60,6 +62,11 @@ public static void saveHeaders( result.key(), result.value(), incoming.headers()); + + // copy the receive CONSUMER span association + ContextStore singleRecordReceiveSpan = + InstrumentationContext.get(ConsumerRecord.class, SpanContext.class); + singleRecordReceiveSpan.put(result, singleRecordReceiveSpan.get(incoming)); } } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java similarity index 53% rename from instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java rename to instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java index 96a46398a4e7..840f9547baa5 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskStopInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamTaskInstrumentation.java @@ -7,18 +7,20 @@ import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER; -import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; -public class StreamTaskStopInstrumentation implements TypeInstrumentation { +public class StreamTaskInstrumentation implements TypeInstrumentation { @Override public ElementMatcher typeMatcher() { @@ -28,12 +30,17 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( - isMethod().and(isPublic()).and(named("process")), - StreamTaskStopInstrumentation.class.getName() + "$StopSpanAdvice"); + named("process").and(isPublic()), + StreamTaskInstrumentation.class.getName() + "$ProcessAdvice"); + transformer.applyAdviceToMethod( + named("addRecords").and(isPublic()).and(takesArgument(1, Iterable.class)), + StreamTaskInstrumentation.class.getName() + "$AddRecordsAdvice"); } + // the method decorated by this advice calls PartitionGroup.nextRecord(), which triggers + // PartitionGroupInstrumentation that actually starts the span @SuppressWarnings("unused") - public static class StopSpanAdvice { + public static class ProcessAdvice { @Advice.OnMethodEnter public static StateHolder onEnter() { @@ -54,4 +61,21 @@ public static void stopSpan( } } } + + // this advice removes the CONSUMER spans created by the kafka-clients instrumentation + @SuppressWarnings("unused") + public static class AddRecordsAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 1, readOnly = false) + Iterable> records) { + + // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though + // there's no current CONSUMER span + if (records instanceof KafkaConsumerIterableWrapper) { + records = ((KafkaConsumerIterableWrapper) records).unwrap(); + } + } + } } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java new file mode 100644 index 000000000000..44bbd64b5cfd --- /dev/null +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/StreamThreadInstrumentation.java @@ -0,0 +1,73 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; +import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper; +import java.util.Iterator; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; + +// This instrumentation copies the receive CONSUMER span context from the ConsumerRecords aggregate +// object to each individual record +public class StreamThreadInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.kafka.streams.processor.internals.StreamThread"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("pollRequests") + .and(isPrivate()) + .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), + this.getClass().getName() + "$PollRecordsAdvice"); + } + + @SuppressWarnings("unused") + public static class PollRecordsAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit(@Advice.Return ConsumerRecords records) { + if (records.isEmpty()) { + return; + } + + SpanContext receiveSpanContext = + InstrumentationContext.get(ConsumerRecords.class, SpanContext.class).get(records); + if (receiveSpanContext == null) { + return; + } + + ContextStore singleRecordReceiveSpan = + InstrumentationContext.get(ConsumerRecord.class, SpanContext.class); + + Iterator> it = records.iterator(); + // this will forcefully suppress the kafka-clients CONSUMER instrumentation even though + // there's no current CONSUMER span + if (it instanceof KafkaConsumerIteratorWrapper) { + it = ((KafkaConsumerIteratorWrapper) it).unwrap(); + } + + while (it.hasNext()) { + ConsumerRecord record = it.next(); + singleRecordReceiveSpan.put(record, receiveSpanContext); + } + } + } +} diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy index cab08efa3d81..f29c7592df41 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/javaagent/src/test/groovy/KafkaStreamsTest.groovy @@ -150,7 +150,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { SpanData producerPending, producerProcessed - trace(0, 3) { + trace(0, 1) { // kafka-clients PRODUCER span(0) { name STREAM_PENDING + " send" @@ -162,39 +162,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } - // kafka-stream CONSUMER - span(1) { - name STREAM_PENDING + " process" - kind CONSUMER - childOf span(0) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } - "kafka.offset" 0 - "kafka.record.queue_time_ms" { it >= 0 } - "asdf" "testing" - } - } - // kafka-stream PRODUCER - span(2) { - name STREAM_PROCESSED + " send" - kind PRODUCER - childOf span(1) - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - } - } producerPending = span(0) - producerProcessed = span(2) } - trace(1, 2) { + trace(1, 3) { // kafka-clients CONSUMER receive span(0) { name STREAM_PENDING + " receive" @@ -207,12 +178,12 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" } } - // kafka-clients CONSUMER process + // kafka-stream CONSUMER span(1) { name STREAM_PENDING + " process" kind CONSUMER childOf span(0) - hasLink producerPending + hasLink(producerPending) attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING @@ -222,8 +193,22 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification { "${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 } "kafka.offset" 0 "kafka.record.queue_time_ms" { it >= 0 } + "asdf" "testing" + } + } + // kafka-clients PRODUCER + span(2) { + name STREAM_PROCESSED + " send" + kind PRODUCER + childOf span(1) + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } + + producerProcessed = span(2) } trace(2, 2) { // kafka-clients CONSUMER receive diff --git a/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java b/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java index 52ad825f34b4..d884c88db4b6 100644 --- a/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java +++ b/javaagent-instrumentation-api/src/main/java/io/opentelemetry/javaagent/instrumentation/api/Java8BytecodeBridge.java @@ -6,6 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.api; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; /** @@ -37,5 +38,10 @@ public static Span spanFromContext(Context context) { return Span.fromContext(context); } + /** Calls {@link Span#wrap(SpanContext)}. */ + public static Span wrapSpan(SpanContext spanContext) { + return Span.wrap(spanContext); + } + private Java8BytecodeBridge() {} }