From 842f58896e14e3b188aff0d31511672596c3dd8e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 13 Dec 2024 16:59:10 +0100 Subject: [PATCH] Add --connection-allocation argument --- .../java/com/rabbitmq/perf/MulticastSet.java | 45 +++++++++++++++---- src/main/java/com/rabbitmq/perf/PerfTest.java | 42 ++++++++++++++++- 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/rabbitmq/perf/MulticastSet.java b/src/main/java/com/rabbitmq/perf/MulticastSet.java index 55e3165d..3392e8aa 100644 --- a/src/main/java/com/rabbitmq/perf/MulticastSet.java +++ b/src/main/java/com/rabbitmq/perf/MulticastSet.java @@ -15,6 +15,8 @@ // info@rabbitmq.com. package com.rabbitmq.perf; +import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.RANDOM; +import static com.rabbitmq.perf.PerfTest.CONNECTION_ALLOCATION.ROUND_ROBIN; import static com.rabbitmq.perf.Utils.isRecoverable; import static java.lang.Math.min; import static java.lang.String.format; @@ -48,8 +50,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.IntStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +106,8 @@ public MulticastSet( completionHandler, new ShutdownService(), new ExpectedMetrics(params, new SimpleMeterRegistry(), "perftest_", Collections.emptyMap()), - InstanceSynchronization.NO_OP); + InstanceSynchronization.NO_OP, + RANDOM); } public MulticastSet( @@ -114,7 +119,8 @@ public MulticastSet( CompletionHandler completionHandler, ShutdownService shutdownService, ExpectedMetrics expectedMetrics, - InstanceSynchronization instanceSynchronization) { + InstanceSynchronization instanceSynchronization, + PerfTest.CONNECTION_ALLOCATION connectionAllocation) { this.performanceMetrics = performanceMetrics; this.factory = factory; this.params = params; @@ -158,7 +164,7 @@ public MulticastSet( input -> Long.valueOf(input)); } - this.connectionCreator = new ConnectionCreator(this.factory, this.uris); + this.connectionCreator = new ConnectionCreator(this.factory, this.uris, connectionAllocation); this.expectedMetrics = expectedMetrics; this.instanceSynchronization = instanceSynchronization; } @@ -952,12 +958,17 @@ private static class ConnectionCreator { private final ConnectionFactory cf; private final List
addresses; + private final UnaryOperator> connectionAllocation; - private ConnectionCreator(ConnectionFactory cf, List uris) { + private ConnectionCreator( + ConnectionFactory cf, + List uris, + PerfTest.CONNECTION_ALLOCATION connectionAllocation) { this.cf = cf; if (uris == null || uris.isEmpty()) { // URI already set on the connection factory, nothing special to do addresses = Collections.emptyList(); + this.connectionAllocation = UnaryOperator.identity(); } else { List
addresses = new ArrayList<>(uris.size()); for (String uri : uris) { @@ -968,6 +979,26 @@ private ConnectionCreator(ConnectionFactory cf, List uris) { } } this.addresses = Collections.unmodifiableList(addresses); + if (connectionAllocation == RANDOM) { + this.connectionAllocation = + l -> { + List
addrs = new ArrayList<>(l); + if (addresses.size() > 1) { + Collections.shuffle(addrs); + } + return addrs; + }; + } else if (connectionAllocation == ROUND_ROBIN) { + AtomicInteger allocationCount = new AtomicInteger(0); + this.connectionAllocation = + l -> { + Address addr = l.get(allocationCount.getAndIncrement() % l.size()); + return Collections.singletonList(addr); + }; + } else { + throw new IllegalArgumentException( + "Unknown connection allocation type: " + connectionAllocation.name()); + } } } @@ -984,11 +1015,7 @@ Connection createConnection(String name) throws IOException, TimeoutException { if (this.addresses.isEmpty()) { connection = this.cf.newConnection(name); } else { - List
addrs = new ArrayList<>(addresses); - if (addresses.size() > 1) { - Collections.shuffle(addrs); - } - connection = this.cf.newConnection(addrs, name); + connection = this.cf.newConnection(this.connectionAllocation.apply(this.addresses), name); } addBlockedListener(connection); return connection; diff --git a/src/main/java/com/rabbitmq/perf/PerfTest.java b/src/main/java/com/rabbitmq/perf/PerfTest.java index 6bc39b60..9fac03e8 100644 --- a/src/main/java/com/rabbitmq/perf/PerfTest.java +++ b/src/main/java/com/rabbitmq/perf/PerfTest.java @@ -242,6 +242,23 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { MulticastParams p = multicastParams(cmd, uris, perfTestOptions); + String connectionAllocationParam = + strArg(cmd, "cal", CONNECTION_ALLOCATION.RANDOM.allocation()); + CONNECTION_ALLOCATION connectionAllocation = + Arrays.stream(CONNECTION_ALLOCATION.values()) + .filter(a -> a.allocation().equals(connectionAllocationParam)) + .findAny() + .orElse(null); + + validate( + () -> connectionAllocation != null, + "--connection-allocation must one of " + + Arrays.stream(CONNECTION_ALLOCATION.values()) + .map(CONNECTION_ALLOCATION::allocation) + .collect(Collectors.joining(", ")), + systemExiter, + consoleErr); + ConcurrentMap completionReasons = new ConcurrentHashMap<>(); MulticastSet.CompletionHandler completionHandler = getCompletionHandler(p, completionReasons); @@ -344,7 +361,8 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) { completionHandler, shutdownService, expectedMetrics, - instanceSynchronization); + instanceSynchronization, + connectionAllocation); set.run(true); statsSummary.run(); @@ -1364,6 +1382,13 @@ static Options getOptions() { options.addOption(new Option("tnd", "tcp-no-delay", true, "value for TCP NODELAY option")); + options.addOption( + new Option( + "cal", + "connection-allocation", + true, + "the way to allocate connection across nodes (random or round-robin), default is random.")); + return options; } @@ -1614,6 +1639,21 @@ enum EXIT_WHEN { IDLE } + enum CONNECTION_ALLOCATION { + RANDOM("random"), + ROUND_ROBIN("round-robin"); + + private final String allocation; + + CONNECTION_ALLOCATION(String allocation) { + this.allocation = allocation; + } + + public String allocation() { + return allocation; + } + } + private static ByteCapacity validateByteCapacity( String value, SystemExiter exiter, PrintStream output) { try {