Skip to content

Commit

Permalink
[improve][broker] Avoid share same random in multi-threads due to per…
Browse files Browse the repository at this point in the history
…formance issue (apache#18660)
  • Loading branch information
Shawyeok authored and lifepuzzlefun committed Jan 10, 2023
1 parent 4f7cacc commit bbf8e26
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
Expand All @@ -45,8 +45,6 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
.newUpdater(AbstractDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}
Expand Down Expand Up @@ -157,7 +155,7 @@ public Consumer getRandomConsumer() {
return null;
}

return consumerList.get(random.nextInt(consumerList.size()));
return consumerList.get(ThreadLocalRandom.current().nextInt(consumerList.size()));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -392,7 +393,6 @@ public static void main(String[] args) throws Exception {
msgRatePerThread,
payloadByteList,
payloadBytes,
random,
doneLatch
);
});
Expand Down Expand Up @@ -529,7 +529,6 @@ private static void runProducer(int producerId,
int msgRate,
List<byte[]> payloadByteList,
byte[] payloadBytes,
Random random,
CountDownLatch doneLatch) {
PulsarClient client = null;
try {
Expand Down Expand Up @@ -626,9 +625,10 @@ private static void runProducer(int producerId,
if (arguments.payloadFilename != null) {
if (messageFormatter != null) {
payloadData = messageFormatter.formatMessage(arguments.producerName, totalSent,
payloadByteList.get(random.nextInt(payloadByteList.size())));
payloadByteList.get(ThreadLocalRandom.current().nextInt(payloadByteList.size())));
} else {
payloadData = payloadByteList.get(random.nextInt(payloadByteList.size()));
payloadData = payloadByteList.get(
ThreadLocalRandom.current().nextInt(payloadByteList.size()));
}
} else {
payloadData = payloadBytes;
Expand Down Expand Up @@ -656,7 +656,7 @@ private static void runProducer(int producerId,
}
//generate msg key
if (msgKeyMode == MessageKeyGenerationMode.random) {
messageBuilder.key(String.valueOf(random.nextInt()));
messageBuilder.key(String.valueOf(ThreadLocalRandom.current().nextInt()));
} else if (msgKeyMode == MessageKeyGenerationMode.autoIncrement) {
messageBuilder.key(String.valueOf(totalSent));
}
Expand Down

0 comments on commit bbf8e26

Please sign in to comment.