Skip to content

Commit

Permalink
Merge pull request #777 from rabbitmq/connection-allocation-argument
Browse files Browse the repository at this point in the history
Add --connection-allocation argument
  • Loading branch information
acogoluegnes authored Dec 13, 2024
2 parents cdc32d2 + 842f588 commit 394bc1e
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 10 deletions.
45 changes: 36 additions & 9 deletions src/main/java/com/rabbitmq/perf/MulticastSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// [email protected].
package com.rabbitmq.perf;

import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.RANDOM;
import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.ROUND_ROBIN;
import static com.rabbitmq.perf.Utils.isRecoverable;
import static java.lang.Math.min;
import static java.lang.String.format;
Expand Down Expand Up @@ -48,8 +50,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -102,7 +106,8 @@ public MulticastSet(
completionHandler,
new ShutdownService(),
new ExpectedMetrics(params, new SimpleMeterRegistry(), "perftest_", Collections.emptyMap()),
InstanceSynchronization.NO_OP);
InstanceSynchronization.NO_OP,
RANDOM);
}

public MulticastSet(
Expand All @@ -114,7 +119,8 @@ public MulticastSet(
CompletionHandler completionHandler,
ShutdownService shutdownService,
ExpectedMetrics expectedMetrics,
InstanceSynchronization instanceSynchronization) {
InstanceSynchronization instanceSynchronization,
PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.performanceMetrics = performanceMetrics;
this.factory = factory;
this.params = params;
Expand Down Expand Up @@ -158,7 +164,7 @@ public MulticastSet(
input -> Long.valueOf(input));
}

this.connectionCreator = new ConnectionCreator(this.factory, this.uris);
this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation);
this.expectedMetrics = expectedMetrics;
this.instanceSynchronization = instanceSynchronization;
}
Expand Down Expand Up @@ -952,12 +958,17 @@ private static class ConnectionCreator {

private final ConnectionFactory cf;
private final List<Address> addresses;
private final UnaryOperator<List<Address>> connectionAllocation;

private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
private ConnectionCreator(
ConnectionFactory cf,
List<String> uris,
PerfTest.CONNECTION_ALLOCATION connectionAllocation) {
this.cf = cf;
if (uris == null || uris.isEmpty()) {
// URI already set on the connection factory, nothing special to do
addresses = Collections.emptyList();
this.connectionAllocation = UnaryOperator.identity();
} else {
List<Address> addresses = new ArrayList<>(uris.size());
for (String uri : uris) {
Expand All @@ -968,6 +979,26 @@ private ConnectionCreator(ConnectionFactory cf, List<String> uris) {
}
}
this.addresses = Collections.unmodifiableList(addresses);
if (connectionAllocation == RANDOM) {
this.connectionAllocation =
l -> {
List<Address> addrs = new ArrayList<>(l);
if (addresses.size() > 1) {
Collections.shuffle(addrs);
}
return addrs;
};
} else if (connectionAllocation == ROUND_ROBIN) {
AtomicInteger allocationCount = new AtomicInteger(0);
this.connectionAllocation =
l -> {
Address addr = l.get(allocationCount.getAndIncrement() % l.size());
return Collections.singletonList(addr);
};
} else {
throw new IllegalArgumentException(
"Unknown connection allocation type: " + connectionAllocation.name());
}
}
}

Expand All @@ -984,11 +1015,7 @@ Connection createConnection(String name) throws IOException, TimeoutException {
if (this.addresses.isEmpty()) {
connection = this.cf.newConnection(name);
} else {
List<Address> addrs = new ArrayList<>(addresses);
if (addresses.size() > 1) {
Collections.shuffle(addrs);
}
connection = this.cf.newConnection(addrs, name);
connection = this.cf.newConnection(this.connectionAllocation.apply(this.addresses), name);
}
addBlockedListener(connection);
return connection;
Expand Down
42 changes: 41 additions & 1 deletion src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,23 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {

MulticastParams p = multicastParams(cmd, uris, perfTestOptions);

String connectionAllocationParam =
strArg(cmd, "cal", CONNECTION_ALLOCATION.RANDOM.allocation());
CONNECTION_ALLOCATION connectionAllocation =
Arrays.stream(CONNECTION_ALLOCATION.values())
.filter(a -> a.allocation().equals(connectionAllocationParam))
.findAny()
.orElse(null);

validate(
() -> connectionAllocation != null,
"--connection-allocation must one of "
+ Arrays.stream(CONNECTION_ALLOCATION.values())
.map(CONNECTION_ALLOCATION::allocation)
.collect(Collectors.joining(", ")),
systemExiter,
consoleErr);

ConcurrentMap<String, Integer> completionReasons = new ConcurrentHashMap<>();

MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p, completionReasons);
Expand Down Expand Up @@ -344,7 +361,8 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
completionHandler,
shutdownService,
expectedMetrics,
instanceSynchronization);
instanceSynchronization,
connectionAllocation);
set.run(true);

statsSummary.run();
Expand Down Expand Up @@ -1364,6 +1382,13 @@ static Options getOptions() {

options.addOption(new Option("tnd", "tcp-no-delay", true, "value for TCP NODELAY option"));

options.addOption(
new Option(
"cal",
"connection-allocation",
true,
"the way to allocate connection across nodes (random or round-robin), default is random."));

return options;
}

Expand Down Expand Up @@ -1614,6 +1639,21 @@ enum EXIT_WHEN {
IDLE
}

enum CONNECTION_ALLOCATION {
RANDOM("random"),
ROUND_ROBIN("round-robin");

private final String allocation;

CONNECTION_ALLOCATION(String allocation) {
this.allocation = allocation;
}

public String allocation() {
return allocation;
}
}

private static ByteCapacity validateByteCapacity(
String value, SystemExiter exiter, PrintStream output) {
try {
Expand Down

0 comments on commit 394bc1e

Please sign in to comment.