Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

Commit

Permalink
make number of keepAlives configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Jul 12, 2017
1 parent c90b88c commit 2ccf61a
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 11 deletions.
17 changes: 16 additions & 1 deletion client/src/main/java/io/atomix/copycat/client/CopycatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ final class Builder implements io.atomix.catalyst.util.Builder<CopycatClient> {
private Serializer serializer;
private Duration sessionTimeout = Duration.ZERO;
private Duration unstabilityTimeout = Duration.ZERO;
private int keepAlivesPerTimeoutInterval = 2;
private ConnectionStrategy connectionStrategy = ConnectionStrategies.ONCE;
private ServerSelectionStrategy serverSelectionStrategy = ServerSelectionStrategies.ANY;
private RecoveryStrategy recoveryStrategy = RecoveryStrategies.CLOSE;
Expand Down Expand Up @@ -623,6 +624,19 @@ public Builder withSessionTimeout(Duration sessionTimeout) {
return this;
}

/**
* Sets the number of keep-alives to send during a session timeout interval. Default value is 2.
*
* @param keepAlivesPerTimeoutInterval The number of keep-alives to send during a session timeout interval
* @return The client builder.
* @throws IllegalArgumentException if the keepAlivesPerTimeoutInterval is not positive
*/
public Builder withKeepAlivesPerTimeoutInterval(int keepAlivesPerTimeoutInterval) {
Assert.arg(keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval must be positive");
this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval;
return this;
}

/**
* Sets the timeout for session unstability. Client is automatically closed if it can not reach servers within
* the given timeout.
Expand Down Expand Up @@ -711,7 +725,8 @@ public CopycatClient build() {
connectionStrategy,
recoveryStrategy,
sessionTimeout,
unstabilityTimeout
unstabilityTimeout,
keepAlivesPerTimeoutInterval
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DefaultCopycatClient implements CopycatClient {
private final AddressSelector selector;
private final Duration sessionTimeout;
private final Duration unstabilityTimeout;
private final int keepAlivesPerTimeoutInterval;
private final ConnectionStrategy connectionStrategy;
private final RecoveryStrategy recoveryStrategy;
private ClientSession session;
Expand All @@ -68,7 +69,10 @@ public class DefaultCopycatClient implements CopycatClient {
private final Set<EventListener<?>> eventListeners = new CopyOnWriteArraySet<>();
private Listener<Session.State> changeListener;

DefaultCopycatClient(String clientId, Collection<Address> cluster, Transport transport, ThreadContext ioContext, ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration unstabilityTimeout) {
DefaultCopycatClient(String clientId, Collection<Address> cluster, Transport transport, ThreadContext ioContext,
ThreadContext eventContext, ServerSelectionStrategy selectionStrategy, ConnectionStrategy
connectionStrategy, RecoveryStrategy recoveryStrategy, Duration sessionTimeout, Duration
unstabilityTimeout, int keepAlivesPerTimeoutInterval) {
this.clientId = Assert.notNull(clientId, "clientId");
this.cluster = Assert.notNull(cluster, "cluster");
this.transport = Assert.notNull(transport, "transport");
Expand All @@ -78,7 +82,8 @@ public class DefaultCopycatClient implements CopycatClient {
this.connectionStrategy = Assert.notNull(connectionStrategy, "connectionStrategy");
this.recoveryStrategy = Assert.notNull(recoveryStrategy, "recoveryStrategy");
this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout");
this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");;
this.unstabilityTimeout = Assert.notNull(unstabilityTimeout, "unstabilityTimeout");
this.keepAlivesPerTimeoutInterval = keepAlivesPerTimeoutInterval;
}

@Override
Expand Down Expand Up @@ -128,7 +133,7 @@ public ThreadContext context() {
*/
private ClientSession newSession() {
ClientSession session = new ClientSession(clientId, transport.client(), selector, ioContext, connectionStrategy, sessionTimeout,
unstabilityTimeout
unstabilityTimeout, keepAlivesPerTimeoutInterval
);

// Update the session change listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public class ClientSession implements Session {
private final ClientSessionListener listener;
private final ClientSessionSubmitter submitter;

public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout) {
this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout);
public ClientSession(String id, Client client, AddressSelector selector, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, Duration unstabilityTimeout, int keepalivesPerTimeoutInterval) {
this(new ClientConnection(id, client, selector), new ClientSessionState(id, unstabilityTimeout), context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval);
}

private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) {
private ClientSession(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepalivesPerTimeoutInterval) {
this.connection = Assert.notNull(connection, "connection");
this.state = Assert.notNull(state, "state");
this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout);
this.manager = new ClientSessionManager(connection, state, context, connectionStrategy, sessionTimeout, keepalivesPerTimeoutInterval);
ClientSequencer sequencer = new ClientSequencer(state);
this.listener = new ClientSessionListener(connection, state, sequencer, context);
this.submitter = new ClientSessionSubmitter(connection, state, sequencer, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ final class ClientSessionManager {
private final ThreadContext context;
private final ConnectionStrategy strategy;
private final Duration sessionTimeout;
private final int keepAlivesPerTimeoutInterval;
private Duration interval;
private Scheduled keepAlive;

ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout) {
ClientSessionManager(ClientConnection connection, ClientSessionState state, ThreadContext context, ConnectionStrategy connectionStrategy, Duration sessionTimeout, int keepAlivesPerTimeoutInterval) {
this.connection = Assert.notNull(connection, "connection");
this.state = Assert.notNull(state, "state");
this.context = Assert.notNull(context, "context");
this.strategy = Assert.notNull(connectionStrategy, "connectionStrategy");
this.sessionTimeout = Assert.notNull(sessionTimeout, "sessionTimeout");
this.keepAlivesPerTimeoutInterval = Assert.arg(keepAlivesPerTimeoutInterval, keepAlivesPerTimeoutInterval > 0, "keepAlivesPerTimeoutInterval");
}

/**
Expand Down Expand Up @@ -94,7 +96,7 @@ private void register(RegisterAttempt attempt) {
if (error == null) {
state.getLogger().trace("Received {}", response);
if (response.status() == Response.Status.OK) {
interval = Duration.ofMillis(response.timeout()).dividedBy(2);
interval = Duration.ofMillis(response.timeout()).dividedBy(keepAlivesPerTimeoutInterval);
connection.reset(response.leader(), response.members());
state.setSessionId(response.session())
.setState(Session.State.OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testSessionRegisterUnregister() throws Throwable {
Executor executor = new MockExecutor();
when(context.executor()).thenReturn(executor);

ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO);
ClientSessionManager manager = new ClientSessionManager(connection, state, context, ConnectionStrategies.EXPONENTIAL_BACKOFF, Duration.ZERO, 2);
manager.open().join();

assertEquals(state.getSessionId(), 1);
Expand Down

0 comments on commit 2ccf61a

Please sign in to comment.