From 37d03e82aa4d297f85224754d9a9847f3c53b2e7 Mon Sep 17 00:00:00 2001 From: Scott Fauerbach Date: Fri, 26 Jul 2024 12:31:28 -0400 Subject: [PATCH] Allow user to set socket read timeout option (#1188) --- build.gradle | 2 +- src/main/java/io/nats/client/Options.java | 49 +++++++++++++++++-- .../io/nats/client/impl/SocketDataPort.java | 3 ++ .../java/io/nats/client/OptionsTests.java | 17 +++++-- 4 files changed, 63 insertions(+), 8 deletions(-) diff --git a/build.gradle b/build.gradle index ed90f80b6..8c9110421 100644 --- a/build.gradle +++ b/build.gradle @@ -13,7 +13,7 @@ plugins { id 'signing' } -def jarVersion = "2.19.2" +def jarVersion = "2.20.0" def isRelease = System.getenv("BUILD_EVENT") == "release" def brn = System.getenv("BRANCH_REF_NAME") diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index cd39a9620..99213bfc5 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -125,6 +125,11 @@ public class Options { */ public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 100; + /** + * Constant used for calculating if a socket read timeout is large enough. + */ + public static final long MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT = 100; + /** * Default server ping interval. The client will send a ping to the server on this interval to insure liveness. * The server may send pings to the client as well, these are handled automatically by the library, @@ -271,6 +276,11 @@ public class Options { * {@link Builder#connectionTimeout(Duration) connectionTimeout}. */ public static final String PROP_CONNECTION_TIMEOUT = PFX + "timeout"; + /** + * Property used to configure a builder from a Properties object. {@value}, see + * {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis}. + */ + public static final String PROP_SOCKET_READ_TIMEOUT_MS = PFX + "socket.read.timeout.ms"; /** * Property used to configure a builder from a Properties object. {@value}, see * {@link Builder#socketWriteTimeout(long) socketWriteTimeout}. @@ -591,6 +601,7 @@ public class Options { private final Duration reconnectJitter; private final Duration reconnectJitterTls; private final Duration connectionTimeout; + private final int socketReadTimeoutMillis; private final Duration socketWriteTimeout; private final int socketSoLinger; private final Duration pingInterval; @@ -704,6 +715,7 @@ public static class Builder { private Duration reconnectJitter = DEFAULT_RECONNECT_JITTER; private Duration reconnectJitterTls = DEFAULT_RECONNECT_JITTER_TLS; private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; + private int socketReadTimeoutMillis = 0; private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT; private int socketSoLinger = -1; private Duration pingInterval = DEFAULT_PING_INTERVAL; @@ -840,6 +852,7 @@ public Builder properties(Properties props) { durationProperty(props, PROP_RECONNECT_JITTER_TLS, DEFAULT_RECONNECT_JITTER_TLS, d -> this.reconnectJitterTls = d); longProperty(props, PROP_RECONNECT_BUF_SIZE, DEFAULT_RECONNECT_BUF_SIZE, l -> this.reconnectBufferSize = l); durationProperty(props, PROP_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, d -> this.connectionTimeout = d); + intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i); durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d); intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i); @@ -1273,6 +1286,16 @@ public Builder connectionTimeout(long connectionTimeoutMillis) { return this; } + /** + * Set the timeout to use around socket reads + * @param socketReadTimeoutMillis the timeout milliseconds + * @return the Builder for chaining + */ + public Builder socketReadTimeoutMillis(int socketReadTimeoutMillis) { + this.socketReadTimeoutMillis = socketReadTimeoutMillis; + return this; + } + /** * Set the timeout to use around socket writes * @param socketWriteTimeoutMillis the timeout milliseconds @@ -1285,7 +1308,7 @@ public Builder socketWriteTimeout(long socketWriteTimeoutMillis) { /** * Set the timeout to use around socket writes - * @param socketWriteTimeout the timeout milliseconds + * @param socketWriteTimeout the timeout duration * @return the Builder for chaining */ public Builder socketWriteTimeout(Duration socketWriteTimeout) { @@ -1311,7 +1334,7 @@ public Builder socketSoLinger(int socketSoLinger) { /** * Set the interval between attempts to pings the server. These pings are automated, * and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library - * may way up to 2 * time to send a ping. Incoming traffic from the server can postpone + * may wait up to 2 * time to send a ping. Incoming traffic from the server can postpone * the next ping to avoid pings taking up bandwidth during busy messaging. * Keep in mind that a ping requires a round trip to the server. Setting this value to a small * number can result in quick failures due to maxPingsOut being reached, these failures will @@ -1323,7 +1346,7 @@ public Builder socketSoLinger(int socketSoLinger) { * @return the Builder for chaining */ public Builder pingInterval(Duration time) { - this.pingInterval = time; + this.pingInterval = time == null ? DEFAULT_PING_INTERVAL : time; return this; } @@ -1772,6 +1795,15 @@ else if (useDefaultTls) { new DefaultThreadFactory(threadPrefix)); } + if (socketReadTimeoutMillis > 0) { + long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT; + if (socketReadTimeoutMillis < srtMin) { + throw new IllegalStateException("Socket Read Timeout must be at least " + + MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT + + " milliseconds greater than the Ping Interval"); + } + } + if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) { socketWriteTimeout = null; } @@ -1833,6 +1865,7 @@ public Builder(Options o) { this.reconnectJitter = o.reconnectJitter; this.reconnectJitterTls = o.reconnectJitterTls; this.connectionTimeout = o.connectionTimeout; + this.socketReadTimeoutMillis = o.socketReadTimeoutMillis; this.socketWriteTimeout = o.socketWriteTimeout; this.socketSoLinger = o.socketSoLinger; this.pingInterval = o.pingInterval; @@ -1896,6 +1929,7 @@ private Options(Builder b) { this.reconnectJitter = b.reconnectJitter; this.reconnectJitterTls = b.reconnectJitterTls; this.connectionTimeout = b.connectionTimeout; + this.socketReadTimeoutMillis = b.socketReadTimeoutMillis; this.socketWriteTimeout = b.socketWriteTimeout; this.socketSoLinger = b.socketSoLinger; this.pingInterval = b.pingInterval; @@ -2213,7 +2247,14 @@ public Duration getConnectionTimeout() { } /** - * @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout()} in the builder doc + * @return the socketReadTimeoutMillis, see {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis} in the builder doc + */ + public int getSocketReadTimeoutMillis() { + return socketReadTimeoutMillis; + } + + /** + * @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout} in the builder doc */ public Duration getSocketWriteTimeout() { return socketWriteTimeout; diff --git a/src/main/java/io/nats/client/impl/SocketDataPort.java b/src/main/java/io/nats/client/impl/SocketDataPort.java index f6d472a14..f3f546665 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPort.java +++ b/src/main/java/io/nats/client/impl/SocketDataPort.java @@ -85,6 +85,9 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws if (soLinger > -1) { socket.setSoLinger(true, soLinger); } + if (options.getSocketReadTimeoutMillis() > 0) { + socket.setSoTimeout(options.getSocketReadTimeoutMillis()); + } if (isWebsocketScheme(nuri.getScheme())) { if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) { diff --git a/src/test/java/io/nats/client/OptionsTests.java b/src/test/java/io/nats/client/OptionsTests.java index af2094fb2..cf2108185 100644 --- a/src/test/java/io/nats/client/OptionsTests.java +++ b/src/test/java/io/nats/client/OptionsTests.java @@ -37,7 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static io.nats.client.Options.DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE; +import static io.nats.client.Options.*; import static io.nats.client.support.Encoding.base64UrlEncodeToString; import static io.nats.client.support.NatsConstants.DEFAULT_PORT; import static org.junit.jupiter.api.Assertions.*; @@ -93,7 +93,7 @@ private static void _testDefaultOptions(Options o) { assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait"); assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout"); - assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval"); + assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval"); assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(), "default cleanup interval"); @@ -599,7 +599,7 @@ private static void _testDefaultPropertyIntOptions(Options o) { assertEquals(Options.DEFAULT_MAX_CONTROL_LINE, o.getMaxControlLine(), "default max control line"); assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait"); assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout"); - assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval"); + assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval"); assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(), "default cleanup interval"); assertEquals(DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE, o.getMaxMessagesInOutgoingQueue(), @@ -800,6 +800,17 @@ public void testDefaultDataPort() { assertEquals(SocketDataPortWithWriteTimeout.class.getCanonicalName(), dataPort.getClass().getCanonicalName(), "new default dataPort"); } + @Test + public void testTimeoutValidations() { + assertThrows(IllegalStateException.class, () -> Options.builder() + .socketReadTimeoutMillis((int)DEFAULT_PING_INTERVAL.toMillis()) + .build()); + + assertThrows(IllegalStateException.class, () -> Options.builder() + .socketWriteTimeout(DEFAULT_CONNECTION_TIMEOUT) + .build()); + } + @Test public void testPropertyDataPortType() { Properties props = new Properties();