Skip to content

Commit

Permalink
Make CONSUMER receive span a parent of CONSUMER process spans in kafk…
Browse files Browse the repository at this point in the history
…a-streams (#4151)
  • Loading branch information
Mateusz Rzeszutek authored Sep 18, 2021
1 parent 3bc0e07 commit 1a6294e
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;

public class TracingIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
public class TracingIterable<K, V>
implements Iterable<ConsumerRecord<K, V>>, KafkaConsumerIterableWrapper<K, V> {
private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final SpanContext receiveSpanContext;
private boolean firstIterator = true;
Expand All @@ -36,4 +38,9 @@ public Iterator<ConsumerRecord<K, V>> iterator() {

return it;
}

@Override
public Iterable<ConsumerRecord<K, V>> unwrap() {
return delegate;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;

public interface KafkaConsumerIterableWrapper<K, V> {

/**
* Returns the actual, non-tracing iterable. This method is only supposed to be used by other
* Kafka consumer instrumentations that want to suppress the kafka-clients one.
*/
Iterable<ConsumerRecord<K, V>> unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {

SpanData producerPending, producerProcessed

trace(0, 3) {
trace(0, 1) {
// kafka-clients PRODUCER
span(0) {
name STREAM_PENDING + " send"
Expand All @@ -162,39 +162,10 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
// kafka-stream CONSUMER
span(1) {
name STREAM_PENDING + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing"
}
}
// kafka-stream PRODUCER
span(2) {
name STREAM_PROCESSED + " send"
kind PRODUCER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}

producerPending = span(0)
producerProcessed = span(2)
}
trace(1, 2) {
trace(1, 3) {
// kafka-clients CONSUMER receive
span(0) {
name STREAM_PENDING + " receive"
Expand All @@ -207,7 +178,7 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
}
}
// kafka-clients CONSUMER process
// kafka-stream CONSUMER
span(1) {
name STREAM_PENDING + " process"
kind CONSUMER
Expand All @@ -222,8 +193,22 @@ class KafkaStreamsTest extends AgentInstrumentationSpecification {
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" 0
"kafka.record.queue_time_ms" { it >= 0 }
"asdf" "testing"
}
}
// kafka-clients PRODUCER
span(2) {
name STREAM_PROCESSED + " send"
kind PRODUCER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}

producerProcessed = span(2)
}
trace(2, 2) {
// kafka-clients CONSUMER receive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ public KafkaStreamsInstrumentationModule() {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KafkaStreamsSourceNodeRecordDeserializerInstrumentation(),
new StreamTaskStartInstrumentation(),
new StreamTaskStopInstrumentation());
new PartitionGroupInstrumentation(),
new RecordDeserializerInstrumentation(),
new SourceNodeRecordDeserializerInstrumentation(),
new StreamTaskInstrumentation(),
new StreamThreadInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
Expand Down Expand Up @@ -39,10 +40,12 @@ public final class KafkaStreamsSingletons {
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}
// TODO: use the local receive span as parent, keep the producer in a link
return KafkaPropagation.isPropagationEnabled()
? builder.newConsumerInstrumenter(new KafkaHeadersGetter())
: builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
if (KafkaPropagation.isPropagationEnabled()) {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
}
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,28 @@

package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.wrapSpan;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.internals.StampedRecord;

public class StreamTaskStartInstrumentation implements TypeInstrumentation {
// the advice applied by this instrumentation actually starts the span
public class PartitionGroupInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
Expand All @@ -35,11 +40,11 @@ public void transform(TypeTransformer transformer) {
.and(isPackagePrivate())
.and(named("nextRecord"))
.and(returns(named("org.apache.kafka.streams.processor.internals.StampedRecord"))),
StreamTaskStartInstrumentation.class.getName() + "$StartSpanAdvice");
PartitionGroupInstrumentation.class.getName() + "$NextRecordAdvice");
}

@SuppressWarnings("unused")
public static class StartSpanAdvice {
public static class NextRecordAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return StampedRecord record) {
Expand All @@ -53,7 +58,14 @@ public static void onExit(@Advice.Return StampedRecord record) {
return;
}

Context parentContext = Java8BytecodeBridge.currentContext();
// use the receive CONSUMER span as parent if it's available
Context parentContext = currentContext();
SpanContext receiveSpanContext =
InstrumentationContext.get(ConsumerRecord.class, SpanContext.class).get(record.value);
if (receiveSpanContext != null) {
parentContext = parentContext.with(wrapSpan(receiveSpanContext));
}

if (!instrumenter().shouldStart(parentContext, record.value)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;

// at some point in time SourceNodeRecordDeserializer was refactored into RecordDeserializer
public class RecordDeserializerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.kafka.streams.processor.internals.RecordDeserializer");
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.streams.processor.internals.RecordDeserializer")
.and(not(isInterface()));
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPackagePrivate())
.and(named("deserialize"))
.and(takesArgument(1, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
RecordDeserializerInstrumentation.class.getName() + "$DeserializeAdvice");
}

@SuppressWarnings("unused")
public static class DeserializeAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.Argument(1) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {

// copy the receive CONSUMER span association
ContextStore<ConsumerRecord, SpanContext> singleRecordReceiveSpan =
InstrumentationContext.get(ConsumerRecord.class, SpanContext.class);
singleRecordReceiveSpan.put(result, singleRecordReceiveSpan.get(incoming));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.record.TimestampType;

// This is necessary because SourceNodeRecordDeserializer drops the headers. :-(
public class KafkaStreamsSourceNodeRecordDeserializerInstrumentation
implements TypeInstrumentation {
public class SourceNodeRecordDeserializerInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
Expand All @@ -36,8 +38,7 @@ public void transform(TypeTransformer transformer) {
.and(named("deserialize"))
.and(takesArgument(0, named("org.apache.kafka.clients.consumer.ConsumerRecord")))
.and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecord"))),
KafkaStreamsSourceNodeRecordDeserializerInstrumentation.class.getName()
+ "$SaveHeadersAdvice");
SourceNodeRecordDeserializerInstrumentation.class.getName() + "$SaveHeadersAdvice");
}

@SuppressWarnings("unused")
Expand All @@ -47,6 +48,7 @@ public static class SaveHeadersAdvice {
public static void saveHeaders(
@Advice.Argument(0) ConsumerRecord<?, ?> incoming,
@Advice.Return(readOnly = false) ConsumerRecord<?, ?> result) {

result =
new ConsumerRecord<>(
result.topic(),
Expand All @@ -60,6 +62,11 @@ public static void saveHeaders(
result.key(),
result.value(),
incoming.headers());

// copy the receive CONSUMER span association
ContextStore<ConsumerRecord, SpanContext> singleRecordReceiveSpan =
InstrumentationContext.get(ConsumerRecord.class, SpanContext.class);
singleRecordReceiveSpan.put(result, singleRecordReceiveSpan.get(incoming));
}
}
}
Loading

0 comments on commit 1a6294e

Please sign in to comment.