Skip to content

Commit

Permalink
Improve mutators for ClientOptions, ClusterClientOptions, and ClientR…
Browse files Browse the repository at this point in the history
…esources #1003

Mutators return decoupled builder instances and do not retain builder state across multiple created instances. Multiple creations do not longer affect each other.

ClientResources also now retains the shared object state across mutations if shared resources (timer, EventLoops) are not altered in the mutation builder.
  • Loading branch information
mp911de committed Mar 21, 2019
1 parent ee53b43 commit 060f1ba
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 22 deletions.
14 changes: 10 additions & 4 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class ClientOptions implements Serializable {
private final SslOptions sslOptions;
private final TimeoutOptions timeoutOptions;
private final int bufferUsageRatio;
private final Builder builder;

protected ClientOptions(Builder builder) {
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
Expand All @@ -66,7 +65,6 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.timeoutOptions = builder.timeoutOptions;
this.bufferUsageRatio = builder.bufferUsageRatio;
this.builder = builder;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -81,7 +79,6 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.timeoutOptions = original.getTimeoutOptions();
this.bufferUsageRatio = original.getBufferUsageRatio();
this.builder = original.builder;
}

/**
Expand Down Expand Up @@ -310,7 +307,16 @@ public ClientOptions build() {
* @since 5.1
*/
public ClientOptions.Builder mutate() {
return this.builder;
Builder builder = new Builder();

builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.disconnectedBehavior(getDisconnectedBehavior()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).requestQueueSize(getRequestQueueSize())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());

return builder;
}

/**
Expand Down
49 changes: 44 additions & 5 deletions src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.lettuce.core.SocketOptions;
import io.lettuce.core.SslOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.internal.LettuceAssert;

/**
* Client Options to control the behavior of {@link RedisClusterClient}.
Expand All @@ -40,7 +41,6 @@ public class ClusterClientOptions extends ClientOptions {
private final boolean validateClusterNodeMembership;
private final int maxRedirects;
private final ClusterTopologyRefreshOptions topologyRefreshOptions;
private final ClusterClientOptions.Builder builder;

protected ClusterClientOptions(Builder builder) {

Expand All @@ -60,7 +60,6 @@ protected ClusterClientOptions(Builder builder) {
}

this.topologyRefreshOptions = refreshOptions;
this.builder = builder;
}

protected ClusterClientOptions(ClusterClientOptions original) {
Expand All @@ -70,7 +69,6 @@ protected ClusterClientOptions(ClusterClientOptions original) {
this.validateClusterNodeMembership = original.validateClusterNodeMembership;
this.maxRedirects = original.maxRedirects;
this.topologyRefreshOptions = original.topologyRefreshOptions;
this.builder = original.builder;
}

/**
Expand All @@ -92,6 +90,35 @@ public static ClusterClientOptions.Builder builder() {
return new ClusterClientOptions.Builder();
}

/**
* Returns a new {@link ClusterClientOptions.Builder} initialized from {@link ClientOptions} to construct
* {@link ClusterClientOptions}.
*
* @return a new {@link ClusterClientOptions.Builder} to construct {@link ClusterClientOptions}.
* @since 5.1.6
*/
public static ClusterClientOptions.Builder builder(ClientOptions clientOptions) {

LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");

if (clientOptions instanceof ClusterClientOptions) {
return ((ClusterClientOptions) clientOptions).mutate();
}

Builder builder = new Builder();
builder.autoReconnect(clientOptions.isAutoReconnect()).bufferUsageRatio(clientOptions.getBufferUsageRatio())
.cancelCommandsOnReconnectFailure(clientOptions.isCancelCommandsOnReconnectFailure())
.disconnectedBehavior(clientOptions.getDisconnectedBehavior())
.publishOnScheduler(clientOptions.isPublishOnScheduler())
.pingBeforeActivateConnection(clientOptions.isPingBeforeActivateConnection())
.requestQueueSize(clientOptions.getRequestQueueSize()).socketOptions(clientOptions.getSocketOptions())
.sslOptions(clientOptions.getSslOptions())
.suspendReconnectOnProtocolFailure(clientOptions.isSuspendReconnectOnProtocolFailure())
.timeoutOptions(clientOptions.getTimeoutOptions());

return builder;
}

/**
* Create a new {@link ClusterClientOptions} using default settings.
*
Expand Down Expand Up @@ -174,7 +201,7 @@ public Builder cancelCommandsOnReconnectFailure(boolean cancelCommandsOnReconnec
}

@Override
public ClientOptions.Builder publishOnScheduler(boolean publishOnScheduler) {
public Builder publishOnScheduler(boolean publishOnScheduler) {
super.publishOnScheduler(publishOnScheduler);
return this;
}
Expand Down Expand Up @@ -235,7 +262,19 @@ public ClusterClientOptions build() {
* @since 5.1
*/
public ClusterClientOptions.Builder mutate() {
return this.builder;

Builder builder = new Builder();

builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.disconnectedBehavior(getDisconnectedBehavior()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).requestQueueSize(getRequestQueueSize())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
.validateClusterNodeMembership(isValidateClusterNodeMembership()).maxRedirects(getMaxRedirects())
.topologyRefreshOptions(getTopologyRefreshOptions());

return builder;
}

/**
Expand Down
43 changes: 34 additions & 9 deletions src/main/java/io/lettuce/core/resource/DefaultClientResources.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,10 @@ public class DefaultClientResources implements ClientResources {
private final NettyCustomizer nettyCustomizer;
private final Tracing tracing;

private final Builder builder;

private volatile boolean shutdownCalled = false;

protected DefaultClientResources(Builder builder) {

this.builder = builder;

if (builder.eventLoopGroupProvider == null) {
int ioThreadPoolSize = builder.ioThreadPoolSize;

Expand All @@ -148,7 +144,7 @@ protected DefaultClientResources(Builder builder) {
this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize);

} else {
this.sharedEventLoopGroupProvider = true;
this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider;
this.eventLoopGroupProvider = builder.eventLoopGroupProvider;
}

Expand All @@ -165,7 +161,7 @@ protected DefaultClientResources(Builder builder) {
computationThreadPoolSize);
sharedEventExecutor = false;
} else {
sharedEventExecutor = true;
sharedEventExecutor = builder.sharedEventExecutor;
eventExecutorGroup = builder.eventExecutorGroup;
}

Expand All @@ -174,7 +170,7 @@ protected DefaultClientResources(Builder builder) {
sharedTimer = false;
} else {
timer = builder.timer;
sharedTimer = true;
sharedTimer = builder.sharedTimer;
}

if (builder.eventBus == null) {
Expand All @@ -198,7 +194,7 @@ protected DefaultClientResources(Builder builder) {

sharedCommandLatencyCollector = false;
} else {
sharedCommandLatencyCollector = true;
sharedCommandLatencyCollector = builder.sharedCommandLatencyCollector;
commandLatencyCollector = builder.commandLatencyCollector;
}

Expand Down Expand Up @@ -251,6 +247,11 @@ public static DefaultClientResources.Builder builder() {
*/
public static class Builder implements ClientResources.Builder {

private boolean sharedEventLoopGroupProvider;
private boolean sharedEventExecutor;
private boolean sharedTimer;
private boolean sharedCommandLatencyCollector;

private int ioThreadPoolSize = DEFAULT_IO_THREADS;
private int computationThreadPoolSize = DEFAULT_COMPUTATION_THREADS;
private EventExecutorGroup eventExecutorGroup;
Expand Down Expand Up @@ -299,6 +300,7 @@ public Builder eventLoopGroupProvider(EventLoopGroupProvider eventLoopGroupProvi

LettuceAssert.notNull(eventLoopGroupProvider, "EventLoopGroupProvider must not be null");

this.sharedEventLoopGroupProvider = true;
this.eventLoopGroupProvider = eventLoopGroupProvider;
return this;
}
Expand Down Expand Up @@ -333,6 +335,7 @@ public Builder eventExecutorGroup(EventExecutorGroup eventExecutorGroup) {

LettuceAssert.notNull(eventExecutorGroup, "EventExecutorGroup must not be null");

this.sharedEventExecutor = true;
this.eventExecutorGroup = eventExecutorGroup;
return this;
}
Expand All @@ -352,6 +355,7 @@ public Builder timer(Timer timer) {

LettuceAssert.notNull(timer, "Timer must not be null");

this.sharedTimer = true;
this.timer = timer;
return this;
}
Expand Down Expand Up @@ -414,6 +418,7 @@ public Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCol

LettuceAssert.notNull(commandLatencyCollector, "CommandLatencyCollector must not be null");

this.sharedCommandLatencyCollector = true;
this.commandLatencyCollector = commandLatencyCollector;
return this;
}
Expand Down Expand Up @@ -531,6 +536,12 @@ public DefaultClientResources build() {
/**
* Returns a builder to create new {@link DefaultClientResources} whose settings are replicated from the current
* {@link DefaultClientResources}.
* <p>
* Note: The resulting {@link DefaultClientResources} retains shared state for {@link Timer},
* {@link CommandLatencyCollector}, {@link EventExecutorGroup}, and {@link EventLoopGroupProvider} if these are left
* unchanged. Thus you need only to shut down the last created {@link ClientResources} instances. Shutdown affects any
* previously created {@link ClientResources}.
* </p>
*
* @return a {@link DefaultClientResources.Builder} to create new {@link DefaultClientResources} whose settings are
* replicated from the current {@link DefaultClientResources}.
Expand All @@ -539,7 +550,21 @@ public DefaultClientResources build() {
*/
@Override
public DefaultClientResources.Builder mutate() {
return this.builder;

Builder builder = new Builder();

builder.eventExecutorGroup(eventExecutorGroup()).timer(timer()).eventBus(eventBus())
.commandLatencyCollector(commandLatencyCollector())
.commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver())
.socketAddressResolver(socketAddressResolver()).reconnectDelay(reconnectDelay)
.nettyCustomizer(nettyCustomizer()).tracing(tracing());

builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider;
builder.sharedEventExecutor = sharedEventExecutor;
builder.sharedEventLoopGroupProvider = sharedEventLoopGroupProvider;
builder.sharedTimer = sharedTimer;

return builder;
}

@Override
Expand Down
8 changes: 7 additions & 1 deletion src/test/java/io/lettuce/core/ClientOptionsUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ void testBuilder() {

@Test
void testCopy() {
checkAssertions(ClientOptions.copyOf(ClientOptions.builder().build()));

ClientOptions original = ClientOptions.builder().build();
ClientOptions copy = ClientOptions.copyOf(original);

checkAssertions(copy);

assertThat(original.mutate()).isNotSameAs(copy.mutate());
}

void checkAssertions(ClientOptions sut) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.junit.jupiter.api.Test;

import io.lettuce.core.ClientOptions;

/**
* @author Mark Paluch
*/
Expand All @@ -27,9 +29,8 @@ class ClusterClientOptionsUnitTests {
@Test
void testCopy() {

ClusterClientOptions options = ClusterClientOptions.builder()
.autoReconnect(false).requestQueueSize(100).suspendReconnectOnProtocolFailure(true).maxRedirects(1234)
.validateClusterNodeMembership(false).build();
ClusterClientOptions options = ClusterClientOptions.builder().autoReconnect(false).requestQueueSize(100)
.suspendReconnectOnProtocolFailure(true).maxRedirects(1234).validateClusterNodeMembership(false).build();

ClusterClientOptions copy = ClusterClientOptions.copyOf(options);

Expand All @@ -43,4 +44,46 @@ void testCopy() {
assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure());
assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects());
}

@Test
void builderFromDefaultClientOptions() {

ClientOptions clientOptions = ClientOptions.builder().build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder(clientOptions).build();

assertThat(clusterClientOptions.getDisconnectedBehavior()).isEqualTo(clusterClientOptions.getDisconnectedBehavior());
assertThat(clusterClientOptions.getSslOptions()).isEqualTo(clusterClientOptions.getSslOptions());
assertThat(clusterClientOptions.getTimeoutOptions()).isEqualTo(clusterClientOptions.getTimeoutOptions());
assertThat(clusterClientOptions.getRequestQueueSize()).isEqualTo(clusterClientOptions.getRequestQueueSize());
assertThat(clusterClientOptions.isAutoReconnect()).isEqualTo(clusterClientOptions.isAutoReconnect());
assertThat(clusterClientOptions.isCloseStaleConnections()).isEqualTo(clusterClientOptions.isCloseStaleConnections());
assertThat(clusterClientOptions.isCancelCommandsOnReconnectFailure()).isEqualTo(
clusterClientOptions.isCancelCommandsOnReconnectFailure());
assertThat(clusterClientOptions.isPingBeforeActivateConnection()).isEqualTo(
clusterClientOptions.isPingBeforeActivateConnection());
assertThat(clusterClientOptions.isPublishOnScheduler()).isEqualTo(clusterClientOptions.isPublishOnScheduler());
assertThat(clusterClientOptions.isSuspendReconnectOnProtocolFailure()).isEqualTo(
clusterClientOptions.isSuspendReconnectOnProtocolFailure());
assertThat(clusterClientOptions.mutate()).isNotNull();
}

@Test
void builderFromClusterClientOptions() {

ClusterClientOptions options = ClusterClientOptions.builder().maxRedirects(1234).validateClusterNodeMembership(false)
.build();

ClusterClientOptions copy = ClusterClientOptions.builder(options).build();

assertThat(copy.getRefreshPeriod()).isEqualTo(options.getRefreshPeriod());
assertThat(copy.isCloseStaleConnections()).isEqualTo(options.isCloseStaleConnections());
assertThat(copy.isRefreshClusterView()).isEqualTo(options.isRefreshClusterView());
assertThat(copy.isValidateClusterNodeMembership()).isEqualTo(options.isValidateClusterNodeMembership());
assertThat(copy.getRequestQueueSize()).isEqualTo(options.getRequestQueueSize());
assertThat(copy.isAutoReconnect()).isEqualTo(options.isAutoReconnect());
assertThat(copy.isCancelCommandsOnReconnectFailure()).isEqualTo(options.isCancelCommandsOnReconnectFailure());
assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure());
assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects());
assertThat(options.mutate()).isNotSameAs(copy.mutate());
}
}
Loading

0 comments on commit 060f1ba

Please sign in to comment.