From 2ccf61a9b3bcc247c32dd5d54386623545d113b2 Mon Sep 17 00:00:00 2001 From: James Watson Date: Wed, 12 Jul 2017 11:29:15 +0100 Subject: [PATCH] make number of keepAlives configurable --- .../io/atomix/copycat/client/CopycatClient.java | 17 ++++++++++++++++- .../copycat/client/DefaultCopycatClient.java | 11 ++++++++--- .../copycat/client/session/ClientSession.java | 8 ++++---- .../client/session/ClientSessionManager.java | 6 ++++-- .../session/ClientSessionManagerTest.java | 2 +- 5 files changed, 33 insertions(+), 11 deletions(-) diff --git a/client/src/main/java/io/atomix/copycat/client/CopycatClient.java b/client/src/main/java/io/atomix/copycat/client/CopycatClient.java index b1231884..ddb3947d 100644 --- a/client/src/main/java/io/atomix/copycat/client/CopycatClient.java +++ b/client/src/main/java/io/atomix/copycat/client/CopycatClient.java @@ -560,6 +560,7 @@ final class Builder implements io.atomix.catalyst.util.Builder { private Serializer serializer; private Duration sessionTimeout = Duration.ZERO; private Duration unstabilityTimeout = Duration.ZERO; + private int keepAlivesPerTimeoutInterval = 2; private ConnectionStrategy connectionStrategy = ConnectionStrategies.ONCE; private ServerSelectionStrategy serverSelectionStrategy = ServerSelectionStrategies.ANY; private RecoveryStrategy recoveryStrategy = RecoveryStrategies.CLOSE; @@ -623,6 +624,19 @@ public Builder withSessionTimeout(Duration sessionTimeout) { return this; } + /** + * Sets the number of keep-alives to send during a session timeout interval. Default value is 2. + * + * @param keepAlivesPerTimeoutInterval The number of keep-alives to send during a session timeout interval + * @return The client builder. + * @throws IllegalArgumentException if the keepAlivesPerTimeoutInterval is not positive + */ + public Builder withKeepAlivesPerTimeoutInterval(int keepAlivesPerTimeoutInterval) { + Assert.arg(keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval must be positive"); + this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval; + return this; + } + /** * Sets the timeout for session unstability. Client is automatically closed if it can not reach servers within * the given timeout. @@ -711,7 +725,8 @@ public CopycatClient build() { connectionStrategy, recoveryStrategy, sessionTimeout, - unstabilityTimeout + unstabilityTimeout, + keepAlivesPerTimeoutInterval ); } } diff --git a/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java b/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java index b938aa69..3615fc48 100644 --- a/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java +++ b/client/src/main/java/io/atomix/copycat/client/DefaultCopycatClient.java @@ -57,6 +57,7 @@ public class DefaultCopycatClient implements CopycatClient { private final AddressSelector selector; private final Duration sessionTimeout; private final Duration unstabilityTimeout; + private final int keepAlivesPerTimeoutInterval; private final ConnectionStrategy connectionStrategy; private final RecoveryStrategy recoveryStrategy; private ClientSession session; @@ -68,7 +69,10 @@ public class DefaultCopycatClient implements CopycatClient { private final Set> eventListeners = new CopyOnWriteArraySet<>(); private Listener changeListener; - DefaultCopycatClient(String clientId, Collection
cluster, Transport transport, ThreadContext ioContext, ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration unstabilityTimeout) { + DefaultCopycatClient(String clientId, Collection
cluster, Transport transport, ThreadContext ioContext, + ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy + connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration + unstabilityTimeout, int keepAlivesPerTimeoutInterval) { this.clientId = Assert.notNull(clientId, "clientId"); this.cluster = Assert.notNull(cluster, "cluster"); this.transport = Assert.notNull(transport, "transport"); @@ -78,7 +82,8 @@ public class DefaultCopycatClient implements CopycatClient { this.connectionStrategy = Assert.notNull(connectionStrategy, "connectionStrategy"); this.recoveryStrategy = Assert.notNull(recoveryStrategy, "recoveryStrategy"); this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout"); - this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");; + this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout"); + this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval; } @Override @@ -128,7 +133,7 @@ public ThreadContext context() { */ private ClientSession newSession() { ClientSession session = new ClientSession(clientId, transport.client(), selector, ioContext, connectionStrategy, sessionTimeout, - unstabilityTimeout + unstabilityTimeout, keepAlivesPerTimeoutInterval ); // Update the session change listener. diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java b/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java index ba5860cc..6cc8fbc7 100644 --- a/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java +++ b/client/src/main/java/io/atomix/copycat/client/session/ClientSession.java @@ -59,14 +59,14 @@ public class ClientSession implements Session { private final ClientSessionListener listener; private final ClientSessionSubmitter submitter; - public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout) { - this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout); + public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout, int keepalivesPerTimeoutInterval) { + this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval); } - private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) { + private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepalivesPerTimeoutInterval) { this.connection = Assert.notNull(connection, "connection"); this.state = Assert.notNull(state, "state"); - this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout); + this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval); ClientSequencer sequencer = new ClientSequencer(state); this.listener = new ClientSessionListener(connection, state, sequencer, context); this.submitter = new ClientSessionSubmitter(connection, state, sequencer, context); diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java b/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java index 21aede07..33a68e79 100644 --- a/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java +++ b/client/src/main/java/io/atomix/copycat/client/session/ClientSessionManager.java @@ -40,15 +40,17 @@ final class ClientSessionManager { private final ThreadContext context; private final ConnectionStrategy strategy; private final Duration sessionTimeout; + private final int keepAlivesPerTimeoutInterval; private Duration interval; private Scheduled keepAlive; - ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) { + ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepAlivesPerTimeoutInterval) { this.connection = Assert.notNull(connection, "connection"); this.state = Assert.notNull(state, "state"); this.context = Assert.notNull(context, "context"); this.strategy = Assert.notNull(connectionStrategy, "connectionStrategy"); this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout"); + this.keepAlivesPerTimeoutInterval = Assert.arg(keepAlivesPerTimeoutInterval, keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval"); } /** @@ -94,7 +96,7 @@ private void register(RegisterAttempt attempt) { if (error == null) { state.getLogger().trace("Received {}", response); if (response.status() == Response.Status.OK) { - interval = Duration.ofMillis(response.timeout()).dividedBy(2); + interval = Duration.ofMillis(response.timeout()).dividedBy(keepAlivesPerTimeoutInterval); connection.reset(response.leader(), response.members()); state.setSessionId(response.session()) .setState(Session.State.OPEN); diff --git a/client/src/test/java/io/atomix/copycat/client/session/ClientSessionManagerTest.java b/client/src/test/java/io/atomix/copycat/client/session/ClientSessionManagerTest.java index 5c175f8d..71585bb9 100644 --- a/client/src/test/java/io/atomix/copycat/client/session/ClientSessionManagerTest.java +++ b/client/src/test/java/io/atomix/copycat/client/session/ClientSessionManagerTest.java @@ -73,7 +73,7 @@ public void testSessionRegisterUnregister() throws Throwable { Executor executor = new MockExecutor(); when(context.executor()).thenReturn(executor); - ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO); + ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO, 2); manager.open().join(); assertEquals(state.getSessionId(), 1);