diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index 7c4c7b8ae1cb..3cef9e6bbc65 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -8,6 +8,9 @@ import io.opentelemetry.instrumentation.testing.GlobalTraceUtil import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import java.time.Duration +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.boot.SpringApplication @@ -64,12 +67,28 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) when: - runWithSpan("producer") { - // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer - kafkaTemplate.executeInTransaction({ ops -> - ops.send("testTopic", "10", "testSpan1") - ops.send("testTopic", "20", "testSpan2") - }) + // This test assumes that messages are sent and received as a batch. Occasionally it happens + // that the messages are not received as a batch, but one by one. This doesn't match what the + // assertion expects. To reduce flakiness we retry the test when messages weren't received as + // a batch. + def maxAttempts = 5 + for (i in 1..maxAttempts) { + Listener.reset() + + runWithSpan("producer") { + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "testSpan1") + ops.send("testTopic", "20", "testSpan2") + }) + } + + Listener.waitForMessages() + if (Listener.getLastBatchSize() == 2) { + break + } else if (i < maxAttempts) { + ignoreTracesAndClear(2) + System.err.println("Messages weren't received as batch, retrying") + } } then: @@ -242,9 +261,16 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } static class Listener { + static AtomicInteger lastBatchSize = new AtomicInteger() + static CountDownLatch messageReceived = new CountDownLatch(2) @KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory") void listener(List> records) { + lastBatchSize.set(records.size()) + records.size().times { + messageReceived.countDown() + } + GlobalTraceUtil.runWithSpan("consumer") {} records.forEach({ record -> if (record.value() == "error") { @@ -252,5 +278,18 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } }) } + + static void reset() { + messageReceived = new CountDownLatch(2) + lastBatchSize.set(0) + } + + static void waitForMessages() { + messageReceived.await(30, TimeUnit.SECONDS) + } + + static int getLastBatchSize() { + return lastBatchSize.get() + } } }