From 53417d8f077bb8c4494f9f4245a1aa4582fcc4c6 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Sat, 2 Apr 2022 14:36:14 +0300 Subject: [PATCH 1/2] Flaky spring kafka batch test --- .../SpringKafkaInstrumentationTest.groovy | 52 ++++++++++++++++--- 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index 7c4c7b8ae1cb..491fe86f6989 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -8,6 +8,9 @@ import io.opentelemetry.instrumentation.testing.GlobalTraceUtil import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import java.time.Duration +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.consumer.ConsumerRecord import org.springframework.boot.SpringApplication @@ -64,12 +67,29 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate) when: - runWithSpan("producer") { - // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer - kafkaTemplate.executeInTransaction({ ops -> - ops.send("testTopic", "10", "testSpan1") - ops.send("testTopic", "20", "testSpan2") - }) + // This test assumes that messages are sent and received as a batch. Occasionally it happens + // that the messages are not received as a batch, but one by one. This doesn't match what the + // assertion expects. To reduce flakiness we retry the test when messages weren't received as + // a batch. + def maxAttempts = 5 + for (i in 1..maxAttempts) { + Listener.reset() + + runWithSpan("producer") { + // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer + kafkaTemplate.executeInTransaction({ ops -> + ops.send("testTopic", "10", "testSpan1") + ops.send("testTopic", "20", "testSpan2") + }) + } + + Listener.waitForMessages() + if (Listener.getLastBatchSize() == 2) { + break + } else if (i < maxAttempts) { + ignoreTracesAndClear(2) + System.err.println("Messages weren't received as batch, retrying") + } } then: @@ -242,9 +262,16 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } static class Listener { + static AtomicInteger lastBatchSize = new AtomicInteger() + static CountDownLatch messageReceived = new CountDownLatch(2) @KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory") void listener(List> records) { + lastBatchSize.set(records.size()) + records.size().times { + messageReceived.countDown() + } + GlobalTraceUtil.runWithSpan("consumer") {} records.forEach({ record -> if (record.value() == "error") { @@ -252,5 +279,18 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { } }) } + + static void reset() { + messageReceived = new CountDownLatch(2) + lastBatchSize.set(0) + } + + static void waitForMessages() { + messageReceived.await(30, TimeUnit.SECONDS) + } + + static int getLastBatchSize() { + return lastBatchSize.get() + } } } From 201e9bc014a8a09e4cf4a6ac4b6c744d8d5db278 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 4 Apr 2022 14:11:48 +0300 Subject: [PATCH 2/2] Update instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy Co-authored-by: Mateusz Rzeszutek --- .../src/test/groovy/SpringKafkaInstrumentationTest.groovy | 1 - 1 file changed, 1 deletion(-) diff --git a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy index 491fe86f6989..3cef9e6bbc65 100644 --- a/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy +++ b/instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy @@ -76,7 +76,6 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification { Listener.reset() runWithSpan("producer") { - // wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer kafkaTemplate.executeInTransaction({ ops -> ops.send("testTopic", "10", "testSpan1") ops.send("testTopic", "20", "testSpan2")