diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 2cb2e7715b..7ebd488ea5 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -52,7 +52,6 @@ public class ClientOptions implements Serializable { private final SslOptions sslOptions; private final TimeoutOptions timeoutOptions; private final int bufferUsageRatio; - private final Builder builder; protected ClientOptions(Builder builder) { this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection; @@ -66,7 +65,6 @@ protected ClientOptions(Builder builder) { this.sslOptions = builder.sslOptions; this.timeoutOptions = builder.timeoutOptions; this.bufferUsageRatio = builder.bufferUsageRatio; - this.builder = builder; } protected ClientOptions(ClientOptions original) { @@ -81,7 +79,6 @@ protected ClientOptions(ClientOptions original) { this.sslOptions = original.getSslOptions(); this.timeoutOptions = original.getTimeoutOptions(); this.bufferUsageRatio = original.getBufferUsageRatio(); - this.builder = original.builder; } /** @@ -310,7 +307,16 @@ public ClientOptions build() { * @since 5.1 */ public ClientOptions.Builder mutate() { - return this.builder; + Builder builder = new Builder(); + + builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio()) + .cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) + .disconnectedBehavior(getDisconnectedBehavior()).publishOnScheduler(isPublishOnScheduler()) + .pingBeforeActivateConnection(isPingBeforeActivateConnection()).requestQueueSize(getRequestQueueSize()) + .socketOptions(getSocketOptions()).sslOptions(getSslOptions()) + .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()); + + return builder; } /** diff --git a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java index d1d8d33f88..e653dbd1b1 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java @@ -21,6 +21,7 @@ import io.lettuce.core.SocketOptions; import io.lettuce.core.SslOptions; import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.internal.LettuceAssert; /** * Client Options to control the behavior of {@link RedisClusterClient}. @@ -40,7 +41,6 @@ public class ClusterClientOptions extends ClientOptions { private final boolean validateClusterNodeMembership; private final int maxRedirects; private final ClusterTopologyRefreshOptions topologyRefreshOptions; - private final ClusterClientOptions.Builder builder; protected ClusterClientOptions(Builder builder) { @@ -60,7 +60,6 @@ protected ClusterClientOptions(Builder builder) { } this.topologyRefreshOptions = refreshOptions; - this.builder = builder; } protected ClusterClientOptions(ClusterClientOptions original) { @@ -70,7 +69,6 @@ protected ClusterClientOptions(ClusterClientOptions original) { this.validateClusterNodeMembership = original.validateClusterNodeMembership; this.maxRedirects = original.maxRedirects; this.topologyRefreshOptions = original.topologyRefreshOptions; - this.builder = original.builder; } /** @@ -92,6 +90,35 @@ public static ClusterClientOptions.Builder builder() { return new ClusterClientOptions.Builder(); } + /** + * Returns a new {@link ClusterClientOptions.Builder} initialized from {@link ClientOptions} to construct + * {@link ClusterClientOptions}. + * + * @return a new {@link ClusterClientOptions.Builder} to construct {@link ClusterClientOptions}. + * @since 5.1.6 + */ + public static ClusterClientOptions.Builder builder(ClientOptions clientOptions) { + + LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); + + if (clientOptions instanceof ClusterClientOptions) { + return ((ClusterClientOptions) clientOptions).mutate(); + } + + Builder builder = new Builder(); + builder.autoReconnect(clientOptions.isAutoReconnect()).bufferUsageRatio(clientOptions.getBufferUsageRatio()) + .cancelCommandsOnReconnectFailure(clientOptions.isCancelCommandsOnReconnectFailure()) + .disconnectedBehavior(clientOptions.getDisconnectedBehavior()) + .publishOnScheduler(clientOptions.isPublishOnScheduler()) + .pingBeforeActivateConnection(clientOptions.isPingBeforeActivateConnection()) + .requestQueueSize(clientOptions.getRequestQueueSize()).socketOptions(clientOptions.getSocketOptions()) + .sslOptions(clientOptions.getSslOptions()) + .suspendReconnectOnProtocolFailure(clientOptions.isSuspendReconnectOnProtocolFailure()) + .timeoutOptions(clientOptions.getTimeoutOptions()); + + return builder; + } + /** * Create a new {@link ClusterClientOptions} using default settings. * @@ -174,7 +201,7 @@ public Builder cancelCommandsOnReconnectFailure(boolean cancelCommandsOnReconnec } @Override - public ClientOptions.Builder publishOnScheduler(boolean publishOnScheduler) { + public Builder publishOnScheduler(boolean publishOnScheduler) { super.publishOnScheduler(publishOnScheduler); return this; } @@ -235,7 +262,19 @@ public ClusterClientOptions build() { * @since 5.1 */ public ClusterClientOptions.Builder mutate() { - return this.builder; + + Builder builder = new Builder(); + + builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio()) + .cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) + .disconnectedBehavior(getDisconnectedBehavior()).publishOnScheduler(isPublishOnScheduler()) + .pingBeforeActivateConnection(isPingBeforeActivateConnection()).requestQueueSize(getRequestQueueSize()) + .socketOptions(getSocketOptions()).sslOptions(getSslOptions()) + .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()) + .validateClusterNodeMembership(isValidateClusterNodeMembership()).maxRedirects(getMaxRedirects()) + .topologyRefreshOptions(getTopologyRefreshOptions()); + + return builder; } /** diff --git a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java index 54fc14bc02..ffbf7dadfa 100644 --- a/src/main/java/io/lettuce/core/resource/DefaultClientResources.java +++ b/src/main/java/io/lettuce/core/resource/DefaultClientResources.java @@ -127,14 +127,10 @@ public class DefaultClientResources implements ClientResources { private final NettyCustomizer nettyCustomizer; private final Tracing tracing; - private final Builder builder; - private volatile boolean shutdownCalled = false; protected DefaultClientResources(Builder builder) { - this.builder = builder; - if (builder.eventLoopGroupProvider == null) { int ioThreadPoolSize = builder.ioThreadPoolSize; @@ -148,7 +144,7 @@ protected DefaultClientResources(Builder builder) { this.eventLoopGroupProvider = new DefaultEventLoopGroupProvider(ioThreadPoolSize); } else { - this.sharedEventLoopGroupProvider = true; + this.sharedEventLoopGroupProvider = builder.sharedEventLoopGroupProvider; this.eventLoopGroupProvider = builder.eventLoopGroupProvider; } @@ -165,7 +161,7 @@ protected DefaultClientResources(Builder builder) { computationThreadPoolSize); sharedEventExecutor = false; } else { - sharedEventExecutor = true; + sharedEventExecutor = builder.sharedEventExecutor; eventExecutorGroup = builder.eventExecutorGroup; } @@ -174,7 +170,7 @@ protected DefaultClientResources(Builder builder) { sharedTimer = false; } else { timer = builder.timer; - sharedTimer = true; + sharedTimer = builder.sharedTimer; } if (builder.eventBus == null) { @@ -198,7 +194,7 @@ protected DefaultClientResources(Builder builder) { sharedCommandLatencyCollector = false; } else { - sharedCommandLatencyCollector = true; + sharedCommandLatencyCollector = builder.sharedCommandLatencyCollector; commandLatencyCollector = builder.commandLatencyCollector; } @@ -251,6 +247,11 @@ public static DefaultClientResources.Builder builder() { */ public static class Builder implements ClientResources.Builder { + private boolean sharedEventLoopGroupProvider; + private boolean sharedEventExecutor; + private boolean sharedTimer; + private boolean sharedCommandLatencyCollector; + private int ioThreadPoolSize = DEFAULT_IO_THREADS; private int computationThreadPoolSize = DEFAULT_COMPUTATION_THREADS; private EventExecutorGroup eventExecutorGroup; @@ -299,6 +300,7 @@ public Builder eventLoopGroupProvider(EventLoopGroupProvider eventLoopGroupProvi LettuceAssert.notNull(eventLoopGroupProvider, "EventLoopGroupProvider must not be null"); + this.sharedEventLoopGroupProvider = true; this.eventLoopGroupProvider = eventLoopGroupProvider; return this; } @@ -333,6 +335,7 @@ public Builder eventExecutorGroup(EventExecutorGroup eventExecutorGroup) { LettuceAssert.notNull(eventExecutorGroup, "EventExecutorGroup must not be null"); + this.sharedEventExecutor = true; this.eventExecutorGroup = eventExecutorGroup; return this; } @@ -352,6 +355,7 @@ public Builder timer(Timer timer) { LettuceAssert.notNull(timer, "Timer must not be null"); + this.sharedTimer = true; this.timer = timer; return this; } @@ -414,6 +418,7 @@ public Builder commandLatencyCollector(CommandLatencyCollector commandLatencyCol LettuceAssert.notNull(commandLatencyCollector, "CommandLatencyCollector must not be null"); + this.sharedCommandLatencyCollector = true; this.commandLatencyCollector = commandLatencyCollector; return this; } @@ -531,6 +536,12 @@ public DefaultClientResources build() { /** * Returns a builder to create new {@link DefaultClientResources} whose settings are replicated from the current * {@link DefaultClientResources}. + *

+ * Note: The resulting {@link DefaultClientResources} retains shared state for {@link Timer}, + * {@link CommandLatencyCollector}, {@link EventExecutorGroup}, and {@link EventLoopGroupProvider} if these are left + * unchanged. Thus you need only to shut down the last created {@link ClientResources} instances. Shutdown affects any + * previously created {@link ClientResources}. + *

* * @return a {@link DefaultClientResources.Builder} to create new {@link DefaultClientResources} whose settings are * replicated from the current {@link DefaultClientResources}. @@ -539,7 +550,21 @@ public DefaultClientResources build() { */ @Override public DefaultClientResources.Builder mutate() { - return this.builder; + + Builder builder = new Builder(); + + builder.eventExecutorGroup(eventExecutorGroup()).timer(timer()).eventBus(eventBus()) + .commandLatencyCollector(commandLatencyCollector()) + .commandLatencyPublisherOptions(commandLatencyPublisherOptions()).dnsResolver(dnsResolver()) + .socketAddressResolver(socketAddressResolver()).reconnectDelay(reconnectDelay) + .nettyCustomizer(nettyCustomizer()).tracing(tracing()); + + builder.sharedCommandLatencyCollector = sharedEventLoopGroupProvider; + builder.sharedEventExecutor = sharedEventExecutor; + builder.sharedEventLoopGroupProvider = sharedEventLoopGroupProvider; + builder.sharedTimer = sharedTimer; + + return builder; } @Override diff --git a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java index fb0fcd5ffa..cbff7434e7 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java @@ -36,7 +36,13 @@ void testBuilder() { @Test void testCopy() { - checkAssertions(ClientOptions.copyOf(ClientOptions.builder().build())); + + ClientOptions original = ClientOptions.builder().build(); + ClientOptions copy = ClientOptions.copyOf(original); + + checkAssertions(copy); + + assertThat(original.mutate()).isNotSameAs(copy.mutate()); } void checkAssertions(ClientOptions sut) { diff --git a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java index b9ec99e7bd..16780ca4eb 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java @@ -19,6 +19,8 @@ import org.junit.jupiter.api.Test; +import io.lettuce.core.ClientOptions; + /** * @author Mark Paluch */ @@ -27,9 +29,8 @@ class ClusterClientOptionsUnitTests { @Test void testCopy() { - ClusterClientOptions options = ClusterClientOptions.builder() - .autoReconnect(false).requestQueueSize(100).suspendReconnectOnProtocolFailure(true).maxRedirects(1234) - .validateClusterNodeMembership(false).build(); + ClusterClientOptions options = ClusterClientOptions.builder().autoReconnect(false).requestQueueSize(100) + .suspendReconnectOnProtocolFailure(true).maxRedirects(1234).validateClusterNodeMembership(false).build(); ClusterClientOptions copy = ClusterClientOptions.copyOf(options); @@ -43,4 +44,46 @@ void testCopy() { assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure()); assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects()); } + + @Test + void builderFromDefaultClientOptions() { + + ClientOptions clientOptions = ClientOptions.builder().build(); + ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder(clientOptions).build(); + + assertThat(clusterClientOptions.getDisconnectedBehavior()).isEqualTo(clusterClientOptions.getDisconnectedBehavior()); + assertThat(clusterClientOptions.getSslOptions()).isEqualTo(clusterClientOptions.getSslOptions()); + assertThat(clusterClientOptions.getTimeoutOptions()).isEqualTo(clusterClientOptions.getTimeoutOptions()); + assertThat(clusterClientOptions.getRequestQueueSize()).isEqualTo(clusterClientOptions.getRequestQueueSize()); + assertThat(clusterClientOptions.isAutoReconnect()).isEqualTo(clusterClientOptions.isAutoReconnect()); + assertThat(clusterClientOptions.isCloseStaleConnections()).isEqualTo(clusterClientOptions.isCloseStaleConnections()); + assertThat(clusterClientOptions.isCancelCommandsOnReconnectFailure()).isEqualTo( + clusterClientOptions.isCancelCommandsOnReconnectFailure()); + assertThat(clusterClientOptions.isPingBeforeActivateConnection()).isEqualTo( + clusterClientOptions.isPingBeforeActivateConnection()); + assertThat(clusterClientOptions.isPublishOnScheduler()).isEqualTo(clusterClientOptions.isPublishOnScheduler()); + assertThat(clusterClientOptions.isSuspendReconnectOnProtocolFailure()).isEqualTo( + clusterClientOptions.isSuspendReconnectOnProtocolFailure()); + assertThat(clusterClientOptions.mutate()).isNotNull(); + } + + @Test + void builderFromClusterClientOptions() { + + ClusterClientOptions options = ClusterClientOptions.builder().maxRedirects(1234).validateClusterNodeMembership(false) + .build(); + + ClusterClientOptions copy = ClusterClientOptions.builder(options).build(); + + assertThat(copy.getRefreshPeriod()).isEqualTo(options.getRefreshPeriod()); + assertThat(copy.isCloseStaleConnections()).isEqualTo(options.isCloseStaleConnections()); + assertThat(copy.isRefreshClusterView()).isEqualTo(options.isRefreshClusterView()); + assertThat(copy.isValidateClusterNodeMembership()).isEqualTo(options.isValidateClusterNodeMembership()); + assertThat(copy.getRequestQueueSize()).isEqualTo(options.getRequestQueueSize()); + assertThat(copy.isAutoReconnect()).isEqualTo(options.isAutoReconnect()); + assertThat(copy.isCancelCommandsOnReconnectFailure()).isEqualTo(options.isCancelCommandsOnReconnectFailure()); + assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure()); + assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects()); + assertThat(options.mutate()).isNotSameAs(copy.mutate()); + } } diff --git a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java index 79d167bb83..b8972f2b6e 100644 --- a/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java +++ b/src/test/java/io/lettuce/core/resource/DefaultClientResourcesUnitTests.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; import reactor.test.StepVerifier; import io.lettuce.core.event.Event; @@ -34,6 +35,7 @@ import io.lettuce.test.Futures; import io.lettuce.test.resource.FastShutdown; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; @@ -214,4 +216,41 @@ void reconnectDelayReturnsSameInstanceForStatelessDelays() { FastShutdown.shutdown(resources); } + + @Test + void considersSharedStateFromMutation() { + + ClientResources clientResources = ClientResources.create(); + HashedWheelTimer timer = (HashedWheelTimer) clientResources.timer(); + + assertThat(ReflectionTestUtils.getField(timer, "workerState")).isEqualTo(0); + + ClientResources copy = clientResources.mutate().build(); + assertThat(copy.timer()).isSameAs(timer); + + copy.shutdown().awaitUninterruptibly(); + + assertThat(ReflectionTestUtils.getField(timer, "workerState")).isEqualTo(2); + } + + @Test + void considersDecoupledSharedStateFromMutation() { + + ClientResources clientResources = ClientResources.create(); + HashedWheelTimer timer = (HashedWheelTimer) clientResources.timer(); + + assertThat(ReflectionTestUtils.getField(timer, "workerState")).isEqualTo(0); + + ClientResources copy = clientResources.mutate().timer(new HashedWheelTimer()).build(); + HashedWheelTimer copyTimer = (HashedWheelTimer) copy.timer(); + assertThat(copy.timer()).isNotSameAs(timer); + + copy.shutdown().awaitUninterruptibly(); + + assertThat(ReflectionTestUtils.getField(timer, "workerState")).isEqualTo(0); + assertThat(ReflectionTestUtils.getField(copyTimer, "workerState")).isEqualTo(0); + + copyTimer.stop(); + timer.stop(); + } }