-
Notifications
You must be signed in to change notification settings - Fork 293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix instrumentation for Kafka Streams 2.6+ #2951
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
212 changes: 212 additions & 0 deletions
212
...agent/instrumentation/kafka-streams-0.11/src/latestDepTest/groovy/KafkaStreamsTest.groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
import datadog.trace.agent.test.AgentTestRunner | ||
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags | ||
import datadog.trace.bootstrap.instrumentation.api.Tags | ||
import org.apache.kafka.clients.consumer.ConsumerRecord | ||
import org.apache.kafka.common.serialization.Serdes | ||
import org.apache.kafka.streams.KafkaStreams | ||
import org.apache.kafka.streams.StreamsBuilder | ||
import org.apache.kafka.streams.StreamsConfig | ||
import org.apache.kafka.streams.kstream.KStream | ||
import org.apache.kafka.streams.kstream.Produced | ||
import org.apache.kafka.streams.kstream.ValueMapper | ||
import org.junit.ClassRule | ||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory | ||
import org.springframework.kafka.core.DefaultKafkaProducerFactory | ||
import org.springframework.kafka.core.KafkaTemplate | ||
import org.springframework.kafka.listener.ContainerProperties | ||
import org.springframework.kafka.listener.KafkaMessageListenerContainer | ||
import org.springframework.kafka.listener.MessageListener | ||
import org.springframework.kafka.test.EmbeddedKafkaBroker | ||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule | ||
import org.springframework.kafka.test.utils.ContainerTestUtils | ||
import org.springframework.kafka.test.utils.KafkaTestUtils | ||
import spock.lang.Shared | ||
|
||
import java.util.concurrent.LinkedBlockingQueue | ||
import java.util.concurrent.TimeUnit | ||
|
||
class KafkaStreamsTest extends AgentTestRunner { | ||
static final STREAM_PENDING = "test.pending" | ||
static final STREAM_PROCESSED = "test.processed" | ||
|
||
@Shared | ||
@ClassRule | ||
EmbeddedKafkaRule kafkaRule = new EmbeddedKafkaRule(1, true, STREAM_PENDING, STREAM_PROCESSED) | ||
@Shared | ||
EmbeddedKafkaBroker embeddedKafka = kafkaRule.embeddedKafka | ||
|
||
def "test kafka produce and consume with streams in-between"() { | ||
setup: | ||
def config = new Properties() | ||
def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) | ||
config.putAll(producerProps) | ||
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application") | ||
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) | ||
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) | ||
|
||
// CONFIGURE CONSUMER | ||
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)) | ||
|
||
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED)) | ||
|
||
// create a thread safe queue to store the processed message | ||
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>() | ||
|
||
// setup a Kafka message listener | ||
consumerContainer.setupMessageListener(new MessageListener<String, String>() { | ||
@Override | ||
void onMessage(ConsumerRecord<String, String> record) { | ||
// ensure consistent ordering of traces | ||
// this is the last processing step so we should see 2 traces here | ||
TEST_WRITER.waitForTraces(3) | ||
TEST_TRACER.activeSpan().setTag("testing", 123) | ||
records.add(record) | ||
} | ||
}) | ||
|
||
// start the container and underlying message listener | ||
consumerContainer.start() | ||
|
||
// wait until the container has the required number of assigned partitions | ||
ContainerTestUtils.waitForAssignment(consumerContainer, embeddedKafka.getPartitionsPerTopic()) | ||
|
||
// CONFIGURE PROCESSOR | ||
StreamsBuilder builder = new StreamsBuilder() | ||
KStream<String, String> textLines = builder.stream(STREAM_PENDING) | ||
def values = textLines | ||
.mapValues(new ValueMapper<String, String>() { | ||
@Override | ||
String apply(String textLine) { | ||
TEST_WRITER.waitForTraces(2) // ensure consistent ordering of traces | ||
TEST_TRACER.activeSpan().setTag("asdf", "testing") | ||
return textLine.toLowerCase() | ||
} | ||
}) | ||
|
||
def producer = Produced.with(Serdes.String(), Serdes.String()) | ||
values.to(STREAM_PROCESSED, producer) | ||
KafkaStreams streams = new KafkaStreams(builder.build(), config) | ||
streams.start() | ||
|
||
// CONFIGURE PRODUCER | ||
def producerFactory = new DefaultKafkaProducerFactory<String, String>(producerProps) | ||
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory) | ||
|
||
when: | ||
String greeting = "TESTING TESTING 123!" | ||
kafkaTemplate.send(STREAM_PENDING, greeting) | ||
|
||
then: | ||
// check that the message was received | ||
def received = records.poll(10, TimeUnit.SECONDS) | ||
received.value() == greeting.toLowerCase() | ||
received.key() == null | ||
|
||
assertTraces(4) { | ||
trace(1) { | ||
// PRODUCER span 0 | ||
span { | ||
serviceName "kafka" | ||
operationName "kafka.produce" | ||
resourceName "Produce Topic $STREAM_PENDING" | ||
spanType "queue" | ||
errored false | ||
parent() | ||
tags { | ||
"$Tags.COMPONENT" "java-kafka" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER | ||
defaultTags() | ||
} | ||
} | ||
} | ||
trace(1) { | ||
// CONSUMER span 0 | ||
span { | ||
serviceName "kafka" | ||
operationName "kafka.consume" | ||
resourceName "Consume Topic $STREAM_PENDING" | ||
spanType "queue" | ||
errored false | ||
childOf trace(0)[0] | ||
tags { | ||
"$Tags.COMPONENT" "java-kafka" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER | ||
"$InstrumentationTags.PARTITION" { it >= 0 } | ||
"$InstrumentationTags.OFFSET" 0 | ||
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } | ||
defaultTags(true) | ||
} | ||
} | ||
} | ||
trace(2) { | ||
sortSpansByStart() | ||
|
||
// STREAMING span 0 | ||
span { | ||
serviceName "kafka" | ||
operationName "kafka.consume" | ||
resourceName "Consume Topic $STREAM_PENDING" | ||
spanType "queue" | ||
errored false | ||
childOf trace(0)[0] | ||
|
||
tags { | ||
"$Tags.COMPONENT" "java-kafka" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER | ||
"$InstrumentationTags.PARTITION" { it >= 0 } | ||
"$InstrumentationTags.OFFSET" 0 | ||
"asdf" "testing" | ||
defaultTags(true) | ||
} | ||
} | ||
|
||
// STREAMING span 1 | ||
span { | ||
serviceName "kafka" | ||
operationName "kafka.produce" | ||
resourceName "Produce Topic $STREAM_PROCESSED" | ||
spanType "queue" | ||
errored false | ||
childOf span(0) | ||
|
||
tags { | ||
"$Tags.COMPONENT" "java-kafka" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_PRODUCER | ||
defaultTags() | ||
} | ||
} | ||
} | ||
trace(1) { | ||
// CONSUMER span 0 | ||
span { | ||
serviceName "kafka" | ||
operationName "kafka.consume" | ||
resourceName "Consume Topic $STREAM_PROCESSED" | ||
spanType "queue" | ||
errored false | ||
childOf trace(2)[0] | ||
tags { | ||
"$Tags.COMPONENT" "java-kafka" | ||
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER | ||
"$InstrumentationTags.PARTITION" { it >= 0 } | ||
"$InstrumentationTags.OFFSET" 0 | ||
"$InstrumentationTags.RECORD_QUEUE_TIME_MS" { it >= 0 } | ||
"testing" 123 | ||
defaultTags(true) | ||
} | ||
} | ||
} | ||
} | ||
|
||
def headers = received.headers() | ||
headers.iterator().hasNext() | ||
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[2][0].traceId}" | ||
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[2][0].spanId}" | ||
|
||
|
||
cleanup: | ||
producerFactory?.destroy() | ||
streams?.close() | ||
consumerContainer?.stop() | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a question? Is this how we want to be handling this?
I'm concerned this opens us to overmatching if process is overloaded at a later time.
I think we'd be better off having a match for each signature that we want to instrument.
I'm interested to hear other opinions. I'm also not going to block this PR for this reason alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There hasn't been much change to this method - previously it took no-arguments, now it takes a single long. Looking at the class and how this method is used I think it's unlikely that this method will be overloaded, but of course not impossible.
In other instrumentations we've generally gone for looser method matching if it's an internal method of an implementation class. If the advice was simple then I wouldn't be concerned - but this is one case where we're grabbing
activeSpan
/activeScope
and trusting it is the right thing to finish/close. So if we did ever match overloaded (or 'bridge' methods) then this would risk closing both the scope we wanted to close as well as any surrounding scope...So I would be happier to see more of an explicit match here - or alternatively an extra check in
StopSpanAdvice
that we're closing the right span/scope, even just a check of the type (note there is work planned to address this separately by adding methods to the scope manager, but that work still needs to be scheduled)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you prefer if I changed it to an
or
that checks for either number of args?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with that since we're going to fix the
StopSpanAdvice
soonThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've updated the matcher.