Skip to content

Commit

Permalink
Retry rocketmq batch test when it fails (#4922)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Dec 17, 2021
1 parent d7756f1 commit 08ba196
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
@Shared
def msgs = new ArrayList<Message>()

@Shared
TracingMessageListener tracingMessageListener = new TracingMessageListener()

abstract void configureMQProducer(DefaultMQProducer producer)

abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
Expand All @@ -53,7 +56,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
msgs.add(msg2)
producer = BaseConf.getProducer(BaseConf.nsAddr)
configureMQProducer(producer)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new TracingMessageListener())
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener)
configureMQPushConsumer(consumer)
}

Expand Down Expand Up @@ -175,8 +178,26 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
consumer.setConsumeMessageBatchMaxSize(2)

when:
runWithSpan("parent") {
producer.send(msgs)
// This test assumes that messages are sent and received as a batch. Occasionally it happens
// that the messages are not received as a batch, but one by one. This doesn't match what the
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
// a batch.
def maxAttempts = 5
for (i in 1..maxAttempts) {
tracingMessageListener.reset()

runWithSpan("parent") {
producer.send(msgs)
}

tracingMessageListener.waitForMessages()
if (tracingMessageListener.getLastBatchSize() == 2) {
break
} else if (i < maxAttempts) {
// if messages weren't received as a batch we get 1 trace instead of 2
ignoreTracesAndClear(1)
System.err.println("Messages weren't received as batch, retrying")
}
}

then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.rocketmq

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
Expand All @@ -13,9 +15,27 @@ import org.apache.rocketmq.common.message.MessageExt
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan

class TracingMessageListener implements MessageListenerOrderly {
private AtomicInteger lastBatchSize = new AtomicInteger()
private CountDownLatch messageReceived = new CountDownLatch(1)

@Override
ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
lastBatchSize.set(list.size())
messageReceived.countDown()
runInternalSpan("messageListener")
return ConsumeOrderlyStatus.SUCCESS
}

void reset() {
messageReceived = new CountDownLatch(1)
lastBatchSize.set(0)
}

void waitForMessages() {
messageReceived.await()
}

int getLastBatchSize() {
return lastBatchSize.get()
}
}

0 comments on commit 08ba196

Please sign in to comment.