Skip to content

Commit

Permalink
Add options to configure size of TCP buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed May 29, 2024
1 parent 24f8ab6 commit 42d0a18
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 53 deletions.
46 changes: 23 additions & 23 deletions src/main/java/com/rabbitmq/perf/PerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
systemExiter.exit(0);
}

int expectedInstances = intArg(cmd, "ei", 0);
int expectedInstances = Utils.intArg(cmd, "ei", 0);
String testID = strArg(cmd, 'd', null);
if (expectedInstances >= 2 && testID == null) {
validate(
Expand Down Expand Up @@ -233,6 +233,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {

factory.setSocketConfigurator(Utils.socketConfigurator(cmd));
if (factory.getNioParams() != null) {
factory.getNioParams().setSocketChannelConfigurator(Utils.socketChannelConfigurator(cmd));
factory.getNioParams().setSslEngineConfigurator(Utils.sslEngineConfigurator(cmd));
}

Expand Down Expand Up @@ -323,7 +324,7 @@ public static void main(String[] args, PerfTestOptions perfTestOptions) {
});

String instanceSyncNamespace = lookUpInstanceSyncNamespace(cmd);
int instanceSyncTimeout = intArg(cmd, "ist", 600);
int instanceSyncTimeout = Utils.intArg(cmd, "ist", 600);
InstanceSynchronization instanceSynchronization =
Utils.defaultInstanceSynchronization(
testID,
Expand Down Expand Up @@ -382,7 +383,7 @@ static MulticastParams multicastParams(
int producerTxSize = intArg(cmd, 'm', 0);
int consumerTxSize = intArg(cmd, 'n', 0);
long confirm = intArg(cmd, 'c', -1);
int confirmTimeout = intArg(cmd, "ct", 30);
int confirmTimeout = Utils.intArg(cmd, "ct", 30);
boolean autoAck = hasOption(cmd, "a");
int multiAckEvery = intArg(cmd, 'A', 0);
int channelPrefetch = intArg(cmd, 'Q', 0);
Expand All @@ -400,9 +401,9 @@ static MulticastParams multicastParams(
boolean useMillis = hasOption(cmd, "ms");
List<String> queueArgs = lstArg(cmd, "qa");
int consumerLatencyInMicroseconds = intArg(cmd, 'L', 0);
int heartbeatSenderThreads = intArg(cmd, "hst", -1);
int heartbeatSenderThreads = Utils.intArg(cmd, "hst", -1);
String messageProperties = strArg(cmd, "mp", null);
int routingKeyCacheSize = intArg(cmd, "rkcs", 0);
int routingKeyCacheSize = Utils.intArg(cmd, "rkcs", 0);
boolean exclusive = hasOption(cmd, "E");
Duration publishingInterval = null;
String publishingIntervalArg = strArg(cmd, "P", null);
Expand All @@ -414,14 +415,14 @@ static MulticastParams multicastParams(
systemExiter.exit(1);
}
}
int producerRandomStartDelayInSeconds = intArg(cmd, "prsd", -1);
int producerSchedulingThreads = intArg(cmd, "pst", -1);
int producerRandomStartDelayInSeconds = Utils.intArg(cmd, "prsd", -1);
int producerSchedulingThreads = Utils.intArg(cmd, "pst", -1);

int consumersThreadPools = intArg(cmd, "ctp", -1);
int shutdownTimeout = intArg(cmd, "st", 5);
int consumersThreadPools = Utils.intArg(cmd, "ctp", -1);
int shutdownTimeout = Utils.intArg(cmd, "st", 5);

int serversStartUpTimeout = intArg(cmd, "sst", -1);
int serversUpLimit = intArg(cmd, "sul", -1);
int serversStartUpTimeout = Utils.intArg(cmd, "sst", -1);
int serversUpLimit = Utils.intArg(cmd, "sul", -1);
String consumerArgs = strArg(cmd, "ca", null);

List<String> variableRates = lstArg(cmd, "vr");
Expand Down Expand Up @@ -467,18 +468,18 @@ static MulticastParams multicastParams(
}

boolean polling = hasOption(cmd, "po");
int pollingInterval = intArg(cmd, "pi", -1);
int pollingInterval = Utils.intArg(cmd, "pi", -1);

boolean nack = hasOption(cmd, "na");
boolean requeue = boolArg(cmd, "re", true);

boolean jsonBody = hasOption(cmd, "jb");
int bodyFieldCount = intArg(cmd, "bfc", 1000);
int bodyFieldCount = Utils.intArg(cmd, "bfc", 1000);
if (bodyFieldCount < 0) {
consoleErr.println("Body field count should greater than 0.");
systemExiter.exit(1);
}
int bodyCount = intArg(cmd, "bc", 100);
int bodyCount = Utils.intArg(cmd, "bc", 100);
if (bodyCount < 0) {
consoleErr.println("Number of pre-generated message bodies should be greater than 0.");
systemExiter.exit(1);
Expand Down Expand Up @@ -524,11 +525,11 @@ static MulticastParams multicastParams(
} else {
exitWhen = EXIT_WHEN.NEVER;
}
Duration consumerStartDelay = Duration.ofSeconds(intArg(cmd, "csd", -1));
Duration consumerStartDelay = Duration.ofSeconds(Utils.intArg(cmd, "csd", -1));

String queuePattern = strArg(cmd, "qp", null);
int from = intArg(cmd, "qpf", -1);
int to = intArg(cmd, "qpt", -1);
int from = Utils.intArg(cmd, "qpf", -1);
int to = Utils.intArg(cmd, "qpt", -1);

if (queuePattern != null || from >= 0 || to >= 0) {
if (queuePattern == null || from < 0 || to < 0) {
Expand Down Expand Up @@ -757,8 +758,8 @@ private static PrintWriter openCsvFileForWriting(

private static ConnectionFactory configureNioIfRequested(
CommandLineProxy cmd, ConnectionFactory factory) {
int nbThreads = intArg(cmd, "niot", -1);
int executorSize = intArg(cmd, "niotp", -1);
int nbThreads = Utils.intArg(cmd, "niot", -1);
int executorSize = Utils.intArg(cmd, "niotp", -1);
if (nbThreads > 0 || executorSize > 0) {
NioParams nioParams = new NioParams();
int[] nbThreadsAndExecutorSize = getNioNbThreadsAndExecutorSize(nbThreads, executorSize);
Expand Down Expand Up @@ -1355,17 +1356,16 @@ static Options getOptions() {
"verbose-full",
false,
"Same as --verbose, but with message headers and body as well. Use only with slow rates."));

options.addOption(new Option("tsbs", "tcp-send-buffer-size", true, "value for TCP SO_SNDBUF option"));
options.addOption(new Option("trbs", "tcp-receive-buffer-size", true, "value for TCP SO_RCVBUF option"));
return options;
}

static int intArg(CommandLineProxy cmd, char opt, int def) {
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
}

static int intArg(CommandLineProxy cmd, String opt, int def) {
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
}

static float floatArg(CommandLineProxy cmd, char opt, float def) {
return Float.parseFloat(cmd.getOptionValue(opt, Float.toString(def)));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/perf/PrometheusMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// [email protected].
package com.rabbitmq.perf;

import static com.rabbitmq.perf.PerfTest.intArg;
import static com.rabbitmq.perf.Utils.intArg;
import static com.rabbitmq.perf.Utils.strArg;

import com.sun.net.httpserver.HttpServer;
Expand Down
75 changes: 46 additions & 29 deletions src/main/java/com/rabbitmq/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,7 @@
package com.rabbitmq.perf;

import com.google.gson.Gson;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.SocketConfigurator;
import com.rabbitmq.client.SocketConfigurators;
import com.rabbitmq.client.SslEngineConfigurator;
import com.rabbitmq.client.SslEngineConfigurators;
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider;
import com.rabbitmq.client.impl.OAuthTokenManagementException;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
Expand Down Expand Up @@ -148,26 +138,49 @@ static List<SNIServerName> sniServerNames(String argumentValue) {
}

static SocketConfigurator socketConfigurator(CommandLineProxy cmd) {
SocketConfigurator socketConfigurator = SocketConfigurators.defaultConfigurator();
List<SNIServerName> serverNames = sniServerNames(strArg(cmd, "sni", null));
if (serverNames.isEmpty()) {
return SocketConfigurators.defaultConfigurator();
} else {
SocketConfigurator socketConfigurator =
socket -> {
if (socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket) socket;
SSLParameters sslParameters =
sslSocket.getSSLParameters() == null
? new SSLParameters()
: sslSocket.getSSLParameters();
sslParameters.setServerNames(serverNames);
sslSocket.setSSLParameters(sslParameters);
} else {
LOGGER.warn("SNI parameter set on a non-TLS connection");
}
};
return SocketConfigurators.defaultConfigurator().andThen(socketConfigurator);
if (!serverNames.isEmpty()) {
socketConfigurator = socketConfigurator.andThen(socket -> {
if (socket instanceof SSLSocket) {
SSLSocket sslSocket = (SSLSocket) socket;
SSLParameters sslParameters =
sslSocket.getSSLParameters() == null
? new SSLParameters()
: sslSocket.getSSLParameters();
sslParameters.setServerNames(serverNames);
sslSocket.setSSLParameters(sslParameters);
} else {
LOGGER.warn("SNI parameter set on a non-TLS connection");
}});
}
int sendBufferSize = intArg(cmd, "tsbs", -1);
int receiveBufferSize = intArg(cmd, "trbs", -1);
socketConfigurator =
socketConfigurator.andThen(
socket -> {
if (sendBufferSize > 0) {
socket.setSendBufferSize(sendBufferSize);
}
if (receiveBufferSize > 0) {
socket.setReceiveBufferSize(receiveBufferSize);
}
});
return socketConfigurator;
}

static SocketChannelConfigurator socketChannelConfigurator(CommandLineProxy cmd) {
int sendBufferSize = intArg(cmd, "tsbs", -1);
int receiveBufferSize = intArg(cmd, "trbs", -1);
return SocketChannelConfigurators.defaultConfigurator().andThen(
socketChannel -> {
if (sendBufferSize > 0) {
socketChannel.socket().setSendBufferSize(sendBufferSize);
}
if (receiveBufferSize > 0) {
socketChannel.socket().setReceiveBufferSize(receiveBufferSize);
}
});
}

static SslEngineConfigurator sslEngineConfigurator(CommandLineProxy cmd) {
Expand Down Expand Up @@ -196,6 +209,10 @@ static String strArg(CommandLineProxy cmd, char opt, String def) {
return cmd.getOptionValue(opt, def);
}

static int intArg(CommandLineProxy cmd, String opt, int def) {
return Integer.parseInt(cmd.getOptionValue(opt, Integer.toString(def)));
}

static void exchangeDeclare(Channel channel, String exchange, String type) throws IOException {
if ("".equals(exchange) || exchange.startsWith("amq.")) {
LOGGER.info("Skipping creation of exchange {}", exchange);
Expand Down

0 comments on commit 42d0a18

Please sign in to comment.