Skip to content

Commit

Permalink
Merge pull request #2951 from DataDog/tyler/kafka-streams-latest
Browse files Browse the repository at this point in the history
Fix instrumentation for Kafka Streams 2.6+
  • Loading branch information
tylerbenson authored Jul 29, 2021
2 parents 27100a0 + aa611e7 commit ee5187d
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ apply from: "$rootDir/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'

testSets {
latestDepTest {
dirName = 'test'
}
latestDepTest
}

dependencies {
Expand All @@ -33,12 +31,10 @@ dependencies {

// Include latest version of kafka itself along with latest version of client libs.
// This seems to help with jar compatibility hell.
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.11', version: '2.3.+'
// (Pinning to 2.3.x: 2.4.0 introduces an error when executing compileLatestDepTestGroovy)
// Caused by: java.lang.NoClassDefFoundError: org.I0Itec.zkclient.ZkClient
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-streams', version: '2.3.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.2.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.2.+'
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.19.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka_2.13', version: '2.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.+'
latestDepTestImplementation group: 'org.apache.kafka', name: 'kafka-streams', version: '2.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.+'
latestDepTestImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '2.+'
latestDepTestImplementation group: 'org.assertj', name: 'assertj-core', version: '3.+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.kstream.ValueMapper
import org.junit.ClassRule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.rule.EmbeddedKafkaRule
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Shared

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

class KafkaStreamsTest extends AgentTestRunner {
static final STREAM_PENDING = "test.pending"
static final STREAM_PROCESSED = "test.processed"

@Shared
@ClassRule
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED)
@Shared
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka

def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
config.putAll(producerProps)
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application")
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())

// CONFIGURE CONSUMER
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka))

def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED))

// create a thread safe queue to store the processed message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()

// setup a Kafka message listener
consumerContainer.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
// ensure consistent ordering of traces
// this is the last processing step so we should see 2 traces here
TEST_WRITER.waitForTraces(3)
TEST_TRACER.activeSpan().setTag("testing", 123)
records.add(record)
}
})

// start the container and underlying message listener
consumerContainer.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic())

// CONFIGURE PROCESSOR
StreamsBuilder builder = new StreamsBuilder()
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
@Override
String apply(String textLine) {
TEST_WRITER.waitForTraces(2) // ensure consistent ordering of traces
TEST_TRACER.activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase()
}
})

def producer = Produced.with(Serdes.String(), Serdes.String())
values.to(STREAM_PROCESSED, producer)
KafkaStreams streams = new KafkaStreams(builder.build(), config)
streams.start()

// CONFIGURE PRODUCER
def producerFactory = new DefaultKafkaProducerFactory<String, String>(producerProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

when:
String greeting = "TESTING TESTING 123!"
kafkaTemplate.send(STREAM_PENDING, greeting)

then:
// check that the message was received
def received = records.poll(10, TimeUnit.SECONDS)
received.value() == greeting.toLowerCase()
received.key() == null

assertTraces(4) {
trace(1) {
// PRODUCER span 0
span {
serviceName "kafka"
operationName "kafka.produce"
resourceName "Produce Topic $STREAM_PENDING"
spanType "queue"
errored false
parent()
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
defaultTags()
}
}
}
trace(1) {
// CONSUMER span 0
span {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PENDING"
spanType "queue"
errored false
childOf trace(0)[0]
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"$InstrumentationTags.PARTITION" { it >= 0 }
"$InstrumentationTags.OFFSET" 0
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
defaultTags(true)
}
}
}
trace(2) {
sortSpansByStart()

// STREAMING span 0
span {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PENDING"
spanType "queue"
errored false
childOf trace(0)[0]

tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"$InstrumentationTags.PARTITION" { it >= 0 }
"$InstrumentationTags.OFFSET" 0
"asdf" "testing"
defaultTags(true)
}
}

// STREAMING span 1
span {
serviceName "kafka"
operationName "kafka.produce"
resourceName "Produce Topic $STREAM_PROCESSED"
spanType "queue"
errored false
childOf span(0)

tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER
defaultTags()
}
}
}
trace(1) {
// CONSUMER span 0
span {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $STREAM_PROCESSED"
spanType "queue"
errored false
childOf trace(2)[0]
tags {
"$Tags.COMPONENT" "java-kafka"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER
"$InstrumentationTags.PARTITION" { it >= 0 }
"$InstrumentationTags.OFFSET" 0
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 }
"testing" 123
defaultTags(true)
}
}
}
}

def headers = received.headers()
headers.iterator().hasNext()
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[2][0].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[2][0].spanId}"


cleanup:
producerFactory?.destroy()
streams?.close()
consumerContainer?.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -103,7 +104,11 @@ public String[] helperClassNames() {
@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
isMethod().and(isPublic()).and(named("process")).and(takesArguments(0)),
isMethod()
.and(isPublic())
.and(named("process"))
// Method signature changed in 2.6.
.and(takesArguments(0).or(takesArguments(1).and(takesArgument(0, long.class)))),
StopInstrumentation.class.getName() + "$StopSpanAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.KStream
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.kstream.ValueMapper
import org.junit.ClassRule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.listener.config.ContainerProperties
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
Expand Down Expand Up @@ -41,14 +43,7 @@ class KafkaStreamsTest extends AgentTestRunner {
// CONFIGURE CONSUMER
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka))

def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(STREAM_PROCESSED)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(STREAM_PROCESSED)
}
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED))

// create a thread safe queue to store the processed message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
Expand All @@ -72,13 +67,7 @@ class KafkaStreamsTest extends AgentTestRunner {
ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic())

// CONFIGURE PROCESSOR
def builder
try {
// Different class names for test and latestDepTest.
builder = Class.forName("org.apache.kafka.streams.kstream.KStreamBuilder").newInstance()
} catch (ClassNotFoundException | NoClassDefFoundError e) {
builder = Class.forName("org.apache.kafka.streams.StreamsBuilder").newInstance()
}
def builder = new KStreamBuilder()
KStream<String, String> textLines = builder.stream(STREAM_PENDING)
def values = textLines
.mapValues(new ValueMapper<String, String>() {
Expand All @@ -90,17 +79,8 @@ class KafkaStreamsTest extends AgentTestRunner {
}
})

KafkaStreams streams
try {
// Different api for test and latestDepTest.
values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED)
streams = new KafkaStreams(builder, config)
} catch (MissingMethodException e) {
def producer = Class.forName("org.apache.kafka.streams.kstream.Produced")
.with(Serdes.String(), Serdes.String())
values.to(STREAM_PROCESSED, producer)
streams = new KafkaStreams(builder.build(), config)
}
values.to(Serdes.String(), Serdes.String(), STREAM_PROCESSED)
KafkaStreams streams = new KafkaStreams(builder, config)
streams.start()

// CONFIGURE PRODUCER
Expand Down

0 comments on commit ee5187d

Please sign in to comment.