Skip to content

Commit

Permalink
Flaky spring kafka batch test (open-telemetry#5740)
Browse files Browse the repository at this point in the history
* Flaky spring kafka batch test

* Update instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy

Co-authored-by: Mateusz Rzeszutek <[email protected]>

Co-authored-by: Mateusz Rzeszutek <[email protected]>
  • Loading branch information
2 people authored and RashmiRam committed May 23, 2022
1 parent 9c87213 commit a77c265
Showing 1 changed file with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.boot.SpringApplication
Expand Down Expand Up @@ -64,12 +67,28 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate)

when:
runWithSpan("producer") {
// wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "testSpan1")
ops.send("testTopic", "20", "testSpan2")
})
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
// a batch.
def maxAttempts = 5
for (i in 1..maxAttempts) {
Listener.reset()

runWithSpan("producer") {
kafkaTemplate.executeInTransaction({ ops ->
ops.send("testTopic", "10", "testSpan1")
ops.send("testTopic", "20", "testSpan2")
})
}

Listener.waitForMessages()
if (Listener.getLastBatchSize() == 2) {
break
} else if (i < maxAttempts) {
ignoreTracesAndClear(2)
System.err.println("Messages weren't received as batch, retrying")
}
}

then:
Expand Down Expand Up @@ -242,15 +261,35 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
}

static class Listener {
static AtomicInteger lastBatchSize = new AtomicInteger()
static CountDownLatch messageReceived = new CountDownLatch(2)

@KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory")
void listener(List<ConsumerRecord<String, String>> records) {
lastBatchSize.set(records.size())
records.size().times {
messageReceived.countDown()
}

GlobalTraceUtil.runWithSpan("consumer") {}
records.forEach({ record ->
if (record.value() == "error") {
throw new IllegalArgumentException("boom")
}
})
}

static void reset() {
messageReceived = new CountDownLatch(2)
lastBatchSize.set(0)
}

static void waitForMessages() {
messageReceived.await(30, TimeUnit.SECONDS)
}

static int getLastBatchSize() {
return lastBatchSize.get()
}
}
}

0 comments on commit a77c265

Please sign in to comment.