diff --git a/src/main/java/com/rabbitmq/perf/MulticastSet.java b/src/main/java/com/rabbitmq/perf/MulticastSet.java
index 55e3165d..3392e8aa 100644
--- a/src/main/java/com/rabbitmq/perf/MulticastSet.java
+++ b/src/main/java/com/rabbitmq/perf/MulticastSet.java
@@ -15,6 +15,8 @@
// info@rabbitmq.com.
package com.rabbitmq.perf;
+import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.RANDOM;
+import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.ROUND_ROBIN;
import static com.rabbitmq.perf.Utils.isRecoverable;
import static java.lang.Math.min;
import static java.lang.String.format;
@@ -48,8 +50,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,7 +106,8 @@ public MulticastSet(
completionHandler,
new ShutdownService(),
new ExpectedMetrics(params, new SimpleMeterRegistry(), "perftest_", Collections.emptyMap()),
- InstanceSynchronization.NO_OP);
+ InstanceSynchronization.NO_OP,
+ RANDOM);
}
public MulticastSet(
@@ -114,7 +119,8 @@ public MulticastSet(
CompletionHandler completionHandler,
ShutdownService shutdownService,
ExpectedMetrics expectedMetrics,
- InstanceSynchronization instanceSynchronization) {
+ InstanceSynchronization instanceSynchronization,
+ PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.performanceMetrics = performanceMetrics;
this.factory = factory;
this.params = params;
@@ -158,7 +164,7 @@ public MulticastSet(
input -> Long.valueOf(input));
}
- this.connectionCreator = new ConnectionCreator(this.factory, this.uris);
+ this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation);
this.expectedMetrics = expectedMetrics;
this.instanceSynchronization = instanceSynchronization;
}
@@ -952,12 +958,17 @@ private static class ConnectionCreator {
private final ConnectionFactory cf;
private final List
addresses;
+ private final UnaryOperator> connectionAllocation;
- private ConnectionCreator(ConnectionFactory cf, List uris) {
+ private ConnectionCreator(
+ ConnectionFactory cf,
+ List uris,
+ PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.cf = cf;
if (uris == null || uris.isEmpty()) {
// URI already set on the connection factory, nothing special to do
addresses = Collections.emptyList();
+ this.connectionAllocation = UnaryOperator.identity();
} else {
List addresses = new ArrayList<>(uris.size());
for (String uri : uris) {
@@ -968,6 +979,26 @@ private ConnectionCreator(ConnectionFactory cf, List uris) {
}
}
this.addresses = Collections.unmodifiableList(addresses);
+ if (connectionAllocation == RANDOM) {
+ this.connectionAllocation =
+ l -> {
+ List addrs = new ArrayList<>(l);
+ if (addresses.size() > 1) {
+ Collections.shuffle(addrs);
+ }
+ return addrs;
+ };
+ } else if (connectionAllocation == ROUND_ROBIN) {
+ AtomicInteger allocationCount = new AtomicInteger(0);
+ this.connectionAllocation =
+ l -> {
+ Address addr = l.get(allocationCount.getAndIncrement() % l.size());
+ return Collections.singletonList(addr);
+ };
+ } else {
+ throw new IllegalArgumentException(
+ "Unknown connection allocation type: " + connectionAllocation.name());
+ }
}
}
@@ -984,11 +1015,7 @@ Connection createConnection(String name) throws IOException, TimeoutException {
if (this.addresses.isEmpty()) {
connection = this.cf.newConnection(name);
} else {
- List addrs = new ArrayList<>(addresses);
- if (addresses.size() > 1) {
- Collections.shuffle(addrs);
- }
- connection = this.cf.newConnection(addrs, name);
+ connection = this.cf.newConnection(this.connectionAllocation.apply(this.addresses), name);
}
addBlockedListener(connection);
return connection;
diff --git a/src/main/java/com/rabbitmq/perf/PerfTest.java b/src/main/java/com/rabbitmq/perf/PerfTest.java
index 6bc39b60..9fac03e8 100644
--- a/src/main/java/com/rabbitmq/perf/PerfTest.java
+++ b/src/main/java/com/rabbitmq/perf/PerfTest.java
@@ -242,6 +242,23 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
MulticastParams p = multicastParams(cmd, uris, perfTestOptions);
+ String connectionAllocationParam =
+ strArg(cmd, "cal", CONNECTION_ALLOCATION.RANDOM.allocation());
+ CONNECTION_ALLOCATION connectionAllocation =
+ Arrays.stream(CONNECTION_ALLOCATION.values())
+ .filter(a -> a.allocation().equals(connectionAllocationParam))
+ .findAny()
+ .orElse(null);
+
+ validate(
+ () -> connectionAllocation != null,
+ "--connection-allocation must one of "
+ + Arrays.stream(CONNECTION_ALLOCATION.values())
+ .map(CONNECTION_ALLOCATION::allocation)
+ .collect(Collectors.joining(", ")),
+ systemExiter,
+ consoleErr);
+
ConcurrentMap completionReasons = new ConcurrentHashMap<>();
MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p, completionReasons);
@@ -344,7 +361,8 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
completionHandler,
shutdownService,
expectedMetrics,
- instanceSynchronization);
+ instanceSynchronization,
+ connectionAllocation);
set.run(true);
statsSummary.run();
@@ -1364,6 +1382,13 @@ static Options getOptions() {
options.addOption(new Option("tnd", "tcp-no-delay", true, "value for TCP NODELAY option"));
+ options.addOption(
+ new Option(
+ "cal",
+ "connection-allocation",
+ true,
+ "the way to allocate connection across nodes (random or round-robin), default is random."));
+
return options;
}
@@ -1614,6 +1639,21 @@ enum EXIT_WHEN {
IDLE
}
+ enum CONNECTION_ALLOCATION {
+ RANDOM("random"),
+ ROUND_ROBIN("round-robin");
+
+ private final String allocation;
+
+ CONNECTION_ALLOCATION(String allocation) {
+ this.allocation = allocation;
+ }
+
+ public String allocation() {
+ return allocation;
+ }
+ }
+
private static ByteCapacity validateByteCapacity(
String value, SystemExiter exiter, PrintStream output) {
try {