Skip to content

Commit

Permalink
[fix][test] Fix flaky RGUsageMTAggrWaitForAllMsgsTest (apache#22252)
Browse files Browse the repository at this point in the history
(cherry picked from commit 43f9d2a)
  • Loading branch information
lhotari authored and nodece committed May 15, 2024
1 parent 803e4b4 commit 80ae6b1
Showing 1 changed file with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

import com.google.common.collect.Sets;
import io.prometheus.client.Summary;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
Expand All @@ -45,18 +49,14 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;


// The tests implement a set of producer/consumer operations on a set of topics.
// [A thread is started for each producer, and each consumer in the test.]
// The tenants and namespaces in those topics are associated with a set of resource-groups (RGs).
// After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics
// are verified on the RGs.
@Slf4j
@Test(groups = "flaky")
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
@BeforeClass
@Override
Expand Down Expand Up @@ -119,9 +119,9 @@ private class ProduceMessages implements Runnable {
private final int numMesgsToProduce;
private final String myProduceTopic;

private int sentNumBytes = 0;
private int sentNumMsgs = 0;
private int numExceptions = 0;
private volatile int sentNumBytes = 0;
private volatile int sentNumMsgs = 0;
private volatile int numExceptions = 0;

ProduceMessages(int prodId, int nMesgs, String[] topics) {
producerId = prodId;
Expand Down Expand Up @@ -202,9 +202,9 @@ private class ConsumeMessages implements Runnable {

private final int recvTimeoutMilliSecs = 1000;
private final int ackTimeoutMilliSecs = 1100; // has to be more than 1 second
private int recvdNumBytes = 0;
private int recvdNumMsgs = 0;
private int numExceptions = 0;
private volatile int recvdNumBytes = 0;
private volatile int recvdNumMsgs = 0;
private volatile int numExceptions = 0;
private volatile boolean allMessagesReceived = false;
private volatile boolean consumerIsReady = false;

Expand Down Expand Up @@ -494,15 +494,15 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception
while (numConsumersDone < NUM_CONSUMERS) {
for (int ix = 0; ix < NUM_CONSUMERS; ix++) {
if (!joinedConsumers[ix]) {
consThr[ix].thread.join();
joinedConsumers[ix] = true;
log.debug("Joined consumer={}", ix);

recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
numConsumerExceptions += consThr[ix].consumer.getNumExceptions();
log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes);

consThr[ix].thread.join();
joinedConsumers[ix] = true;
log.debug("Joined consumer={}", ix);

recvdNumBytes += recvdBytes;
recvdNumMsgs += recvdMsgs;
numConsumersDone++;
Expand Down

0 comments on commit 80ae6b1

Please sign in to comment.