Skip to content

Commit

Permalink
Allow user to set socket read timeout option (#1188)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jul 26, 2024
1 parent 606699e commit 37d03e8
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ plugins {
id 'signing'
}

def jarVersion = "2.19.2"
def jarVersion = "2.20.0"

def isRelease = System.getenv("BUILD_EVENT") == "release"
def brn = System.getenv("BRANCH_REF_NAME")
Expand Down
49 changes: 45 additions & 4 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ public class Options {
*/
public static final long MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT = 100;

/**
* Constant used for calculating if a socket read timeout is large enough.
*/
public static final long MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT = 100;

/**
* Default server ping interval. The client will send a ping to the server on this interval to insure liveness.
* The server may send pings to the client as well, these are handled automatically by the library,
Expand Down Expand Up @@ -271,6 +276,11 @@ public class Options {
* {@link Builder#connectionTimeout(Duration) connectionTimeout}.
*/
public static final String PROP_CONNECTION_TIMEOUT = PFX + "timeout";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis}.
*/
public static final String PROP_SOCKET_READ_TIMEOUT_MS = PFX + "socket.read.timeout.ms";
/**
* Property used to configure a builder from a Properties object. {@value}, see
* {@link Builder#socketWriteTimeout(long) socketWriteTimeout}.
Expand Down Expand Up @@ -591,6 +601,7 @@ public class Options {
private final Duration reconnectJitter;
private final Duration reconnectJitterTls;
private final Duration connectionTimeout;
private final int socketReadTimeoutMillis;
private final Duration socketWriteTimeout;
private final int socketSoLinger;
private final Duration pingInterval;
Expand Down Expand Up @@ -704,6 +715,7 @@ public static class Builder {
private Duration reconnectJitter = DEFAULT_RECONNECT_JITTER;
private Duration reconnectJitterTls = DEFAULT_RECONNECT_JITTER_TLS;
private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private int socketReadTimeoutMillis = 0;
private Duration socketWriteTimeout = DEFAULT_SOCKET_WRITE_TIMEOUT;
private int socketSoLinger = -1;
private Duration pingInterval = DEFAULT_PING_INTERVAL;
Expand Down Expand Up @@ -840,6 +852,7 @@ public Builder properties(Properties props) {
durationProperty(props, PROP_RECONNECT_JITTER_TLS, DEFAULT_RECONNECT_JITTER_TLS, d -> this.reconnectJitterTls = d);
longProperty(props, PROP_RECONNECT_BUF_SIZE, DEFAULT_RECONNECT_BUF_SIZE, l -> this.reconnectBufferSize = l);
durationProperty(props, PROP_CONNECTION_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT, d -> this.connectionTimeout = d);
intProperty(props, PROP_SOCKET_READ_TIMEOUT_MS, -1, i -> this.socketReadTimeoutMillis = i);
durationProperty(props, PROP_SOCKET_WRITE_TIMEOUT, DEFAULT_SOCKET_WRITE_TIMEOUT, d -> this.socketWriteTimeout = d);
intProperty(props, PROP_SOCKET_SO_LINGER, -1, i -> socketSoLinger = i);

Expand Down Expand Up @@ -1273,6 +1286,16 @@ public Builder connectionTimeout(long connectionTimeoutMillis) {
return this;
}

/**
* Set the timeout to use around socket reads
* @param socketReadTimeoutMillis the timeout milliseconds
* @return the Builder for chaining
*/
public Builder socketReadTimeoutMillis(int socketReadTimeoutMillis) {
this.socketReadTimeoutMillis = socketReadTimeoutMillis;
return this;
}

/**
* Set the timeout to use around socket writes
* @param socketWriteTimeoutMillis the timeout milliseconds
Expand All @@ -1285,7 +1308,7 @@ public Builder socketWriteTimeout(long socketWriteTimeoutMillis) {

/**
* Set the timeout to use around socket writes
* @param socketWriteTimeout the timeout milliseconds
* @param socketWriteTimeout the timeout duration
* @return the Builder for chaining
*/
public Builder socketWriteTimeout(Duration socketWriteTimeout) {
Expand All @@ -1311,7 +1334,7 @@ public Builder socketSoLinger(int socketSoLinger) {
/**
* Set the interval between attempts to pings the server. These pings are automated,
* and capped by {@link #maxPingsOut(int) maxPingsOut()}. As of 2.4.4 the library
* may way up to 2 * time to send a ping. Incoming traffic from the server can postpone
* may wait up to 2 * time to send a ping. Incoming traffic from the server can postpone
* the next ping to avoid pings taking up bandwidth during busy messaging.
* Keep in mind that a ping requires a round trip to the server. Setting this value to a small
* number can result in quick failures due to maxPingsOut being reached, these failures will
Expand All @@ -1323,7 +1346,7 @@ public Builder socketSoLinger(int socketSoLinger) {
* @return the Builder for chaining
*/
public Builder pingInterval(Duration time) {
this.pingInterval = time;
this.pingInterval = time == null ? DEFAULT_PING_INTERVAL : time;
return this;
}

Expand Down Expand Up @@ -1772,6 +1795,15 @@ else if (useDefaultTls) {
new DefaultThreadFactory(threadPrefix));
}

if (socketReadTimeoutMillis > 0) {
long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
if (socketReadTimeoutMillis < srtMin) {
throw new IllegalStateException("Socket Read Timeout must be at least "
+ MINIMUM_SOCKET_READ_TIMEOUT_GT_CONNECTION_TIMEOUT
+ " milliseconds greater than the Ping Interval");
}
}

if (socketWriteTimeout == null || socketWriteTimeout.toMillis() < 1) {
socketWriteTimeout = null;
}
Expand Down Expand Up @@ -1833,6 +1865,7 @@ public Builder(Options o) {
this.reconnectJitter = o.reconnectJitter;
this.reconnectJitterTls = o.reconnectJitterTls;
this.connectionTimeout = o.connectionTimeout;
this.socketReadTimeoutMillis = o.socketReadTimeoutMillis;
this.socketWriteTimeout = o.socketWriteTimeout;
this.socketSoLinger = o.socketSoLinger;
this.pingInterval = o.pingInterval;
Expand Down Expand Up @@ -1896,6 +1929,7 @@ private Options(Builder b) {
this.reconnectJitter = b.reconnectJitter;
this.reconnectJitterTls = b.reconnectJitterTls;
this.connectionTimeout = b.connectionTimeout;
this.socketReadTimeoutMillis = b.socketReadTimeoutMillis;
this.socketWriteTimeout = b.socketWriteTimeout;
this.socketSoLinger = b.socketSoLinger;
this.pingInterval = b.pingInterval;
Expand Down Expand Up @@ -2213,7 +2247,14 @@ public Duration getConnectionTimeout() {
}

/**
* @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout()} in the builder doc
* @return the socketReadTimeoutMillis, see {@link Builder#socketReadTimeoutMillis(int) socketReadTimeoutMillis} in the builder doc
*/
public int getSocketReadTimeoutMillis() {
return socketReadTimeoutMillis;
}

/**
* @return the socketWriteTimeout, see {@link Builder#socketWriteTimeout(long) socketWriteTimeout} in the builder doc
*/
public Duration getSocketWriteTimeout() {
return socketWriteTimeout;
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/nats/client/impl/SocketDataPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public void connect(NatsConnection conn, NatsUri nuri, long timeoutNanos) throws
if (soLinger > -1) {
socket.setSoLinger(true, soLinger);
}
if (options.getSocketReadTimeoutMillis() > 0) {
socket.setSoTimeout(options.getSocketReadTimeoutMillis());
}

if (isWebsocketScheme(nuri.getScheme())) {
if (SECURE_WEBSOCKET_PROTOCOL.equalsIgnoreCase(nuri.getScheme())) {
Expand Down
17 changes: 14 additions & 3 deletions src/test/java/io/nats/client/OptionsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.nats.client.Options.DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE;
import static io.nats.client.Options.*;
import static io.nats.client.support.Encoding.base64UrlEncodeToString;
import static io.nats.client.support.NatsConstants.DEFAULT_PORT;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -93,7 +93,7 @@ private static void _testDefaultOptions(Options o) {

assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait");
assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout");
assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(),
"default cleanup interval");

Expand Down Expand Up @@ -599,7 +599,7 @@ private static void _testDefaultPropertyIntOptions(Options o) {
assertEquals(Options.DEFAULT_MAX_CONTROL_LINE, o.getMaxControlLine(), "default max control line");
assertEquals(Options.DEFAULT_RECONNECT_WAIT, o.getReconnectWait(), "default reconnect wait");
assertEquals(Options.DEFAULT_CONNECTION_TIMEOUT, o.getConnectionTimeout(), "default connection timeout");
assertEquals(Options.DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
assertEquals(DEFAULT_PING_INTERVAL, o.getPingInterval(), "default ping interval");
assertEquals(Options.DEFAULT_REQUEST_CLEANUP_INTERVAL, o.getRequestCleanupInterval(),
"default cleanup interval");
assertEquals(DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE, o.getMaxMessagesInOutgoingQueue(),
Expand Down Expand Up @@ -800,6 +800,17 @@ public void testDefaultDataPort() {
assertEquals(SocketDataPortWithWriteTimeout.class.getCanonicalName(), dataPort.getClass().getCanonicalName(), "new default dataPort");
}

@Test
public void testTimeoutValidations() {
assertThrows(IllegalStateException.class, () -> Options.builder()
.socketReadTimeoutMillis((int)DEFAULT_PING_INTERVAL.toMillis())
.build());

assertThrows(IllegalStateException.class, () -> Options.builder()
.socketWriteTimeout(DEFAULT_CONNECTION_TIMEOUT)
.build());
}

@Test
public void testPropertyDataPortType() {
Properties props = new Properties();
Expand Down

0 comments on commit 37d03e8

Please sign in to comment.