Skip to content

Commit

Permalink
Add support for Kafka consumer and producer interceptors. (#4065)
Browse files Browse the repository at this point in the history
* Add support for Kafka consumer and producer interceptors, move common Kafka code to library module.

* Apply feedback

* Apply feedback, #3.

* Apply feedback, #4.

* Add producer / consumer wrappers.

* Move to kafka-clients-2.6.

* Apply feedback #5.

Co-authored-by: Trask Stalnaker <[email protected]>
  • Loading branch information
alesj and trask authored Oct 2, 2021
1 parent 033c20a commit ff0bf0a
Show file tree
Hide file tree
Showing 46 changed files with 1,198 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ dependencies {
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:kafka-clients:kafka-clients-common:javaagent"))
implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))

library("org.apache.kafka:kafka-clients:0.11.0.0")

testImplementation("org.testcontainers:kafka")
testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.Timer;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,84 +5,21 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.config.ExperimentalConfig;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAdditionalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerExperimentalAttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaPropagation;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter();
KafkaInstrumenterBuilder.buildProducerInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter();
KafkaInstrumenterBuilder.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter();

private static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter() {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
SpanNameExtractor<ProducerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ProducerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaProducerAdditionalAttributesExtractor())
.newInstrumenter(SpanKindExtractor.alwaysProducer());
}

private static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter() {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

return Instrumenter.<ReceivedRecords, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.setTimeExtractors(ReceivedRecords::startTime, (request, response, error) -> request.now())
.setDisabled(ExperimentalConfig.get().suppressMessagingReceiveSpans())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter() {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(MessageOperation.PROCESS);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);

InstrumenterBuilder<ConsumerRecord<?, ?>, Void> builder =
Instrumenter.<ConsumerRecord<?, ?>, Void>newBuilder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(attributesExtractor)
.addAttributesExtractor(new KafkaConsumerAdditionalAttributesExtractor());
if (KafkaConsumerExperimentalAttributesExtractor.isEnabled()) {
builder.addAttributesExtractor(new KafkaConsumerExperimentalAttributesExtractor());
}

if (!KafkaPropagation.isPropagationEnabled()) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else if (ExperimentalConfig.get().suppressMessagingReceiveSpans()) {
return builder.newConsumerInstrumenter(new KafkaHeadersGetter());
} else {
builder.addSpanLinksExtractor(
SpanLinksExtractor.fromUpstreamRequest(
GlobalOpenTelemetry.getPropagators(), new KafkaHeadersGetter()));
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
}
KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIterableWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIterableWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerIteratorWrapper;
import java.util.Iterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import java.time.Duration
import java.util.concurrent.TimeUnit
Expand All @@ -15,7 +16,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientDefaultTest extends KafkaClientBaseTest {
class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {

def "test kafka produce and consume"() {
when:
Expand Down Expand Up @@ -190,7 +191,7 @@ class KafkaClientDefaultTest extends KafkaClientBaseTest {
when: "receive messages"
awaitUntilConsumerIsReady()
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
def recordsInPartition = consumerRecords.records(topicPartition)
recordsInPartition.size() == 1
// iterate over records to generate spans
Expand Down Expand Up @@ -251,4 +252,4 @@ class KafkaClientDefaultTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord

Expand All @@ -11,7 +13,7 @@ import java.time.Duration
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest {

def "should not read remote context when consuming messages if propagation is disabled"() {
when: "send message"
Expand Down Expand Up @@ -93,4 +95,4 @@ class KafkaClientPropagationDisabledTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
Expand All @@ -13,7 +15,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER

class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest {

def "test kafka produce and consume"() {
when:
Expand All @@ -29,6 +31,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
}

then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
Expand Down Expand Up @@ -88,6 +91,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
producer.send(new ProducerRecord<>(SHARED_TOPIC, null))

then:
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
for (record in records) {
Expand Down Expand Up @@ -138,6 +142,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
then: "wait for PRODUCER span"
waitForTraces(1)
awaitUntilConsumerIsReady()
when: "receive messages"
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
Expand Down Expand Up @@ -179,4 +184,4 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientBaseTest {
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id("otel.java-conventions")
}

dependencies {
api(project(":testing-common"))

implementation("org.apache.kafka:kafka-clients:0.11.0.0")

implementation(project(":instrumentation:kafka-clients:kafka-clients-common:library"))

implementation("org.testcontainers:kafka")
}
Loading

0 comments on commit ff0bf0a

Please sign in to comment.