diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index b5145188db..d794d968ab 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -35,12 +35,11 @@ import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.internal.logging.InternalLogger; @@ -65,24 +64,20 @@ */ public abstract class AbstractRedisClient { - protected static final PooledByteBufAllocator BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT; - protected static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractRedisClient.class); - protected final Map, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<>(2); protected final ConnectionEvents connectionEvents = new ConnectionEvents(); protected final Set closeableResources = ConcurrentHashMap.newKeySet(); - protected final EventExecutorGroup genericWorkerPool; - protected final HashedWheelTimer timer; protected final ChannelGroup channels; - protected final ClientResources clientResources; - - protected volatile ClientOptions clientOptions = ClientOptions.builder().build(); - - protected Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION; + private final ClientResources clientResources; + private final Map, EventLoopGroup> eventLoopGroups = new ConcurrentHashMap<>(2); private final boolean sharedResources; private final AtomicBoolean shutdown = new AtomicBoolean(); + private volatile ClientOptions clientOptions = ClientOptions.create(); + private volatile Duration defaultTimeout = RedisURI.DEFAULT_TIMEOUT_DURATION; + /** * Create a new instance with client resources. * @@ -92,16 +87,27 @@ public abstract class AbstractRedisClient { protected AbstractRedisClient(ClientResources clientResources) { if (clientResources == null) { - sharedResources = false; + this.sharedResources = false; this.clientResources = DefaultClientResources.create(); } else { - sharedResources = true; + this.sharedResources = true; this.clientResources = clientResources; } - genericWorkerPool = this.clientResources.eventExecutorGroup(); - channels = new DefaultChannelGroup(genericWorkerPool.next()); - timer = (HashedWheelTimer) this.clientResources.timer(); + this.channels = new DefaultChannelGroup(this.clientResources.eventExecutorGroup().next()); + } + + protected int getChannelCount() { + return channels.size(); + } + + /** + * Returns the default {@link Duration timeout} for commands. + * + * @return the default {@link Duration timeout} for commands. + */ + public Duration getDefaultTimeout() { + return defaultTimeout; } /** @@ -116,7 +122,7 @@ public void setDefaultTimeout(Duration timeout) { LettuceAssert.notNull(timeout, "Timeout duration must not be null"); LettuceAssert.isTrue(!timeout.isNegative(), "Timeout duration must be greater or equal to zero"); - this.timeout = timeout; + this.defaultTimeout = timeout; } /** @@ -132,6 +138,65 @@ public void setDefaultTimeout(long timeout, TimeUnit unit) { setDefaultTimeout(Duration.ofNanos(unit.toNanos(timeout))); } + /** + * Returns the {@link ClientOptions} which are valid for that client. Connections inherit the current options at the moment + * the connection is created. Changes to options will not affect existing connections. + * + * @return the {@link ClientOptions} for this client + */ + public ClientOptions getOptions() { + return clientOptions; + } + + /** + * Set the {@link ClientOptions} for the client. + * + * @param clientOptions client options for the client and connections that are created after setting the options + */ + protected void setOptions(ClientOptions clientOptions) { + LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); + this.clientOptions = clientOptions; + } + + /** + * Returns the {@link ClientResources} which are used with that client. + * + * @return the {@link ClientResources} for this client. + * @since 6.0 + * + */ + public ClientResources getResources() { + return clientResources; + } + + protected int getResourceCount() { + return closeableResources.size(); + } + + /** + * Add a listener for the RedisConnectionState. The listener is notified every time a connect/disconnect/IO exception + * happens. The listeners are not bound to a specific connection, so every time a connection event happens on any + * connection, the listener will be notified. The corresponding netty channel handler (async connection) is passed on the + * event. + * + * @param listener must not be {@literal null} + */ + public void addListener(RedisConnectionStateListener listener) { + LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null"); + connectionEvents.addListener(listener); + } + + /** + * Removes a listener. + * + * @param listener must not be {@literal null} + */ + public void removeListener(RedisConnectionStateListener listener) { + + LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null"); + connectionEvents.removeListener(listener); + } + /** * Populate connection builder with necessary resources. * @@ -143,9 +208,7 @@ protected void connectionBuilder(Mono socketAddressSupplier, Conn RedisURI redisURI) { Bootstrap redisBootstrap = new Bootstrap(); - redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); - redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024); - redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR); + redisBootstrap.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT); ClientOptions clientOptions = getOptions(); SocketOptions socketOptions = clientOptions.getSocketOptions(); @@ -161,7 +224,7 @@ protected void connectionBuilder(Mono socketAddressSupplier, Conn connectionBuilder.apply(redisURI); connectionBuilder.bootstrap(redisBootstrap); - connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer); + connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents); connectionBuilder.socketAddressSupplier(socketAddressSupplier); } @@ -512,58 +575,6 @@ private CompletableFuture closeClientResources(long quietPeriod, long time return Futures.allOf(groupCloseFutures); } - protected int getResourceCount() { - return closeableResources.size(); - } - - protected int getChannelCount() { - return channels.size(); - } - - /** - * Add a listener for the RedisConnectionState. The listener is notified every time a connect/disconnect/IO exception - * happens. The listeners are not bound to a specific connection, so every time a connection event happens on any - * connection, the listener will be notified. The corresponding netty channel handler (async connection) is passed on the - * event. - * - * @param listener must not be {@literal null} - */ - public void addListener(RedisConnectionStateListener listener) { - LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null"); - connectionEvents.addListener(listener); - } - - /** - * Removes a listener. - * - * @param listener must not be {@literal null} - */ - public void removeListener(RedisConnectionStateListener listener) { - - LettuceAssert.notNull(listener, "RedisConnectionStateListener must not be null"); - connectionEvents.removeListener(listener); - } - - /** - * Returns the {@link ClientOptions} which are valid for that client. Connections inherit the current options at the moment - * the connection is created. Changes to options will not affect existing connections. - * - * @return the {@link ClientOptions} for this client - */ - public ClientOptions getOptions() { - return clientOptions; - } - - /** - * Set the {@link ClientOptions} for the client. - * - * @param clientOptions client options for the client and connections that are created after setting the options - */ - protected void setOptions(ClientOptions clientOptions) { - LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); - this.clientOptions = clientOptions; - } - protected RedisHandshake createHandshake(ConnectionState state) { return new RedisHandshake(clientOptions.getConfiguredProtocolVersion(), clientOptions.isPingBeforeActivateConnection(), state); diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 297aea2f30..5b48868b3b 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -45,7 +45,6 @@ public class ConnectionBuilder { private Endpoint endpoint; private Supplier commandHandlerSupplier; private ChannelGroup channelGroup; - private Timer timer; private Bootstrap bootstrap; private ClientOptions clientOptions; private Duration timeout; @@ -105,10 +104,10 @@ protected ConnectionWatchdog createConnectionWatchdog() { } LettuceAssert.assertState(bootstrap != null, "Bootstrap must be set for autoReconnect=true"); - LettuceAssert.assertState(timer != null, "Timer must be set for autoReconnect=true"); LettuceAssert.assertState(socketAddressSupplier != null, "SocketAddressSupplier must be set for autoReconnect=true"); - ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer, + ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, + clientResources.timer(), clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection, clientResources.eventBus()); @@ -173,11 +172,6 @@ public ConnectionBuilder commandHandler(Supplier supplier) { return this; } - public ConnectionBuilder timer(Timer timer) { - this.timer = timer; - return this; - } - public ConnectionBuilder bootstrap(Bootstrap bootstrap) { this.bootstrap = bootstrap; return this; diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index aae42bdda5..9b80eac787 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -46,6 +46,8 @@ import io.lettuce.core.resource.ClientResources; import io.lettuce.core.sentinel.StatefulRedisSentinelConnectionImpl; import io.lettuce.core.sentinel.api.StatefulRedisSentinelConnection; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; /** * A scalable and thread-safe Redis client supporting synchronous, asynchronous and reactive @@ -60,9 +62,9 @@ *
  • Redis Sentinel, Master connections
  • * * - * Redis Cluster is used through {@link io.lettuce.core.cluster.RedisClusterClient}. Master/Slave connections through - * {@link io.lettuce.core.masterslave.MasterSlave} provide connections to Redis Master/Slave setups which run either in a static - * Master/Slave setup or are managed by Redis Sentinel. + * Redis Cluster is used through {@link io.lettuce.core.cluster.RedisClusterClient}. Master/Replica connections through + * {@link io.lettuce.core.masterreplica.MasterReplica} provide connections to Redis Master/Replica setups which run either in a + * static Master/Replica setup or are managed by Redis Sentinel. *

    * {@link RedisClient} is an expensive resource. It holds a set of netty's {@link io.netty.channel.EventLoopGroup}'s that use * multiple threads. Reuse this instance as much as possible or share a {@link ClientResources} instance amongst multiple client @@ -78,11 +80,13 @@ * @see RedisCodec * @see ClientOptions * @see ClientResources - * @see io.lettuce.core.masterslave.MasterSlave + * @see io.lettuce.core.masterreplica.MasterReplica * @see io.lettuce.core.cluster.RedisClusterClient */ public class RedisClient extends AbstractRedisClient { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class); + private static final RedisURI EMPTY_URI = new RedisURI(); private final RedisURI redisURI; @@ -206,7 +210,7 @@ public StatefulRedisConnection connect(RedisCodec codec) { checkForRedisURI(); - return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout)); + return getConnection(connectStandaloneAsync(codec, this.redisURI, getDefaultTimeout())); } /** @@ -265,16 +269,16 @@ private ConnectionFuture> connectStandalone logger.debug("Trying to get a Redis connection for: " + redisURI); - DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources); + DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getOptions())) { + writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, codec, timeout); ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, - () -> new CommandHandler(clientOptions, clientResources, endpoint)); + () -> new CommandHandler(getOptions(), getResources(), endpoint)); future.whenComplete((channelHandler, throwable) -> { @@ -304,8 +308,8 @@ private ConnectionFuture connectStatefulAsync(StatefulRedisConnecti state.setDb(redisURI.getDatabase()); connectionBuilder.connection(connection); - connectionBuilder.clientOptions(clientOptions); - connectionBuilder.clientResources(clientResources); + connectionBuilder.clientOptions(getOptions()); + connectionBuilder.clientResources(getResources()); connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); @@ -323,7 +327,7 @@ private ConnectionFuture connectStatefulAsync(StatefulRedisConnecti * @return A new stateful pub/sub connection */ public StatefulRedisPubSubConnection connectPubSub() { - return getConnection(connectPubSubAsync(newStringStringCodec(), redisURI, timeout)); + return getConnection(connectPubSubAsync(newStringStringCodec(), redisURI, getDefaultTimeout())); } /** @@ -350,7 +354,7 @@ public StatefulRedisPubSubConnection connectPubSub(RedisURI redi */ public StatefulRedisPubSubConnection connectPubSub(RedisCodec codec) { checkForRedisURI(); - return getConnection(connectPubSubAsync(codec, redisURI, timeout)); + return getConnection(connectPubSubAsync(codec, redisURI, getDefaultTimeout())); } /** @@ -393,17 +397,17 @@ private ConnectionFuture> connectPubS assertNotNull(codec); checkValidRedisURI(redisURI); - PubSubEndpoint endpoint = new PubSubEndpoint<>(clientOptions, clientResources); + PubSubEndpoint endpoint = new PubSubEndpoint<>(getOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getOptions())) { + writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } StatefulRedisPubSubConnectionImpl connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout); ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, - () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint)); + () -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint)); return future.whenComplete((conn, throwable) -> { @@ -433,7 +437,7 @@ public StatefulRedisSentinelConnection connectSentinel() { */ public StatefulRedisSentinelConnection connectSentinel(RedisCodec codec) { checkForRedisURI(); - return getConnection(connectSentinelAsync(codec, redisURI, timeout)); + return getConnection(connectSentinelAsync(codec, redisURI, getDefaultTimeout())); } /** @@ -547,13 +551,13 @@ private ConnectionFuture> doConnect ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder(); connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions())); - connectionBuilder.clientResources(clientResources); + connectionBuilder.clientResources(getResources()); - DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources); + DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getOptions())) { + writer = new CommandExpiryWriter(writer, getOptions(), getResources()); } StatefulRedisSentinelConnectionImpl connection = newStatefulRedisSentinelConnection(writer, codec, timeout); @@ -564,7 +568,7 @@ private ConnectionFuture> doConnect logger.debug("Connecting to Redis Sentinel, address: " + redisURI); - connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(clientOptions, clientResources, endpoint)) + connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(getOptions(), getResources(), endpoint)) .connection(connection); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); @@ -591,15 +595,6 @@ public void setOptions(ClientOptions clientOptions) { super.setOptions(clientOptions); } - /** - * Returns the {@link ClientResources} which are used with that client. - * - * @return the {@link ClientResources} for this client - */ - public ClientResources getResources() { - return clientResources; - } - // ------------------------------------------------------------------------- // Implementation hooks and helper methods // ------------------------------------------------------------------------- @@ -679,7 +674,7 @@ protected Mono getSocketAddress(RedisURI redisURI) { "Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId()))); } else { - return Mono.fromCallable(() -> clientResources.socketAddressResolver().resolve((redisURI))); + return Mono.fromCallable(() -> getResources().socketAddressResolver().resolve((redisURI))); } }); } @@ -718,6 +713,8 @@ private Mono getSocketAddressSupplier(RedisURI redisURI) { private Mono lookupRedis(RedisURI sentinelUri) { + Duration timeout = getDefaultTimeout(); + Mono> connection = Mono .fromCompletionStage(() -> connectSentinelAsync(newStringStringCodec(), sentinelUri, timeout)); @@ -729,7 +726,7 @@ private Mono lookupRedis(RedisURI sentinelUri) { if (it instanceof InetSocketAddress) { InetSocketAddress isa = (InetSocketAddress) it; - SocketAddress resolved = clientResources.socketAddressResolver() + SocketAddress resolved = getResources().socketAddressResolver() .resolve(RedisURI.create(isa.getHostString(), isa.getPort())); logger.debug("Resolved Master {} SocketAddress {}:{} to {}", sentinelMasterId, isa.getHostString(), @@ -739,7 +736,7 @@ private Mono lookupRedis(RedisURI sentinelUri) { } return it; - }).timeout(this.timeout) // + }).timeout(timeout) // .flatMap(it -> Mono.fromCompletionStage(c::closeAsync) // .thenReturn(it)); }); diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 5038b30648..6667afa750 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -295,6 +295,52 @@ public static RedisClusterClient create(ClientResources clientResources, Iterabl return new RedisClusterClient(clientResources, redisURIs); } + /** + * Set the {@link ClusterClientOptions} for the client. + * + * @param clientOptions client options for the client and connections that are created after setting the options + */ + public void setOptions(ClusterClientOptions clientOptions) { + super.setOptions(clientOptions); + } + + /** + * Retrieve the cluster view. Partitions are shared amongst all connections opened by this client instance. + * + * @return the partitions. + */ + public Partitions getPartitions() { + if (partitions == null) { + get(initializePartitions(), e -> new RedisException("Cannot obtain initial Redis Cluster topology", e)); + } + return partitions; + } + + /** + * Returns the seed {@link RedisURI} for the topology refreshing. This method is called before each topology refresh to + * provide an {@link Iterable} of {@link RedisURI} that is used to perform the next topology refresh. + *

    + * Subclasses of {@link RedisClusterClient} may override that method. + * + * @return {@link Iterable} of {@link RedisURI} for the next topology refresh. + */ + protected Iterable getTopologyRefreshSource() { + + boolean initialSeedNodes = !useDynamicRefreshSources(); + + Iterable seed; + if (initialSeedNodes || partitions == null || partitions.isEmpty()) { + seed = this.initialUris; + } else { + List uris = new ArrayList<>(); + for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) { + uris.add(partition.getUri()); + } + seed = uris; + } + return seed; + } + /** * Connect to a Redis Cluster and treat keys and values as UTF-8 strings. *

    @@ -472,18 +518,19 @@ ConnectionFuture> connectToNodeAsync(RedisC assertNotEmpty(initialUris); LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null"); - ClusterNodeEndpoint endpoint = new ClusterNodeEndpoint(clientOptions, getResources(), clusterWriter); + ClusterNodeEndpoint endpoint = new ClusterNodeEndpoint(getClusterClientOptions(), getResources(), clusterWriter); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { + writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } - StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl<>(writer, codec, timeout); + StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl<>(writer, codec, getDefaultTimeout()); ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, - getFirstUri(), socketAddressSupplier, () -> new CommandHandler(clientOptions, clientResources, endpoint)); + getFirstUri(), socketAddressSupplier, + () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint)); return connectionFuture.whenComplete((conn, throwable) -> { if (throwable != null) { @@ -512,20 +559,20 @@ ConnectionFuture> connectPubSubToNode logger.debug("connectPubSubToNode(" + nodeId + ")"); - PubSubEndpoint endpoint = new PubSubEndpoint<>(clientOptions, clientResources); + PubSubEndpoint endpoint = new PubSubEndpoint<>(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { + writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec, - timeout); + getDefaultTimeout()); ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, - () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint)); + () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint)); return connectionFuture.whenComplete((conn, throwable) -> { if (throwable != null) { connection.close(); @@ -554,14 +601,14 @@ private CompletableFuture> connectCl Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources); + DefaultEndpoint endpoint = new DefaultEndpoint(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { + writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } - ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(clientOptions, writer, + ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer, topologyRefreshScheduler); PooledClusterConnectionProvider pooledClusterConnectionProvider = new PooledClusterConnectionProvider<>(this, clusterWriter, codec, topologyRefreshScheduler); @@ -569,12 +616,13 @@ private CompletableFuture> connectCl clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); StatefulRedisClusterConnectionImpl connection = new StatefulRedisClusterConnectionImpl<>(clusterWriter, codec, - timeout); + getDefaultTimeout()); connection.setReadFrom(ReadFrom.MASTER); connection.setPartitions(partitions); - Supplier commandHandlerSupplier = () -> new CommandHandler(clientOptions, clientResources, endpoint); + Supplier commandHandlerSupplier = () -> new CommandHandler(getClusterClientOptions(), getResources(), + endpoint); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -637,18 +685,18 @@ private CompletableFuture> con Mono socketAddressSupplier = getSocketAddressSupplier(TopologyComparators::sortByClientCount); - PubSubClusterEndpoint endpoint = new PubSubClusterEndpoint<>(clientOptions, clientResources); + PubSubClusterEndpoint endpoint = new PubSubClusterEndpoint<>(getClusterClientOptions(), getResources()); RedisChannelWriter writer = endpoint; - if (CommandExpiryWriter.isSupported(clientOptions)) { - writer = new CommandExpiryWriter(writer, clientOptions, clientResources); + if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { + writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); } - ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(clientOptions, writer, + ClusterDistributionChannelWriter clusterWriter = new ClusterDistributionChannelWriter(getClusterClientOptions(), writer, topologyRefreshScheduler); StatefulRedisClusterPubSubConnectionImpl connection = new StatefulRedisClusterPubSubConnectionImpl<>(endpoint, - clusterWriter, codec, timeout); + clusterWriter, codec, getDefaultTimeout()); ClusterPubSubConnectionProvider pooledClusterConnectionProvider = new ClusterPubSubConnectionProvider<>(this, clusterWriter, codec, connection.getUpstreamListener(), topologyRefreshScheduler); @@ -656,8 +704,8 @@ private CompletableFuture> con clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); connection.setPartitions(partitions); - Supplier commandHandlerSupplier = () -> new PubSubCommandHandler<>(clientOptions, clientResources, - codec, endpoint); + Supplier commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(), + getResources(), codec, endpoint); Mono> connectionMono = Mono .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); @@ -734,9 +782,9 @@ private ConnectionBuilder createConnectionBuilder(RedisChannelHandler new RedisException("Cannot obtain initial Redis Cluster topology", e)); - } - return partitions; - } - /** * Retrieve partitions. Nodes within {@link Partitions} are ordered by latency. Lower latency nodes come first. * @@ -892,17 +928,6 @@ protected CompletableFuture loadPartitionsAsync() { return future; } - private static Throwable unwrap(Throwable throwable) { - - Throwable ex = throwable; - - while (ex instanceof CompletionException || ex instanceof ExecutionException) { - ex = ex.getCause(); - } - - return ex; - } - private CompletionStage fetchPartitions(Iterable topologyRefreshSource) { CompletionStage> topology = refresh.loadViews(topologyRefreshSource, @@ -931,24 +956,6 @@ private CompletionStage fetchPartitions(Iterable topologyR }); } - /** - * Resolve a {@link RedisURI} from a map of cluster views by {@link Partitions} as key - * - * @param map the map - * @param partitions the key - * @return a {@link RedisURI} or null - */ - private static RedisURI getViewedBy(Map map, Partitions partitions) { - - for (Map.Entry entry : map.entrySet()) { - if (entry.getValue() == partitions) { - return entry.getKey(); - } - } - - return null; - } - /** * Determines a {@link Partitions topology view} based on the current and the obtain topology views. * @@ -974,15 +981,6 @@ public void setPartitions(Partitions partitions) { this.partitions = partitions; } - /** - * Returns the {@link ClientResources} which are used with that client. - * - * @return the {@link ClientResources} for this client - */ - public ClientResources getResources() { - return clientResources; - } - /** * Shutdown this client and close all open connections asynchronously. The client should be discarded after calling * shutdown. @@ -1001,15 +999,6 @@ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, Tim return super.shutdownAsync(quietPeriod, timeout, timeUnit); } - /** - * Set the {@link ClusterClientOptions} for the client. - * - * @param clientOptions client options for the client and connections that are created after setting the options - */ - public void setOptions(ClusterClientOptions clientOptions) { - super.setOptions(clientOptions); - } - // ------------------------------------------------------------------------- // Implementation hooks and helper methods // ------------------------------------------------------------------------- @@ -1037,13 +1026,13 @@ protected Mono getSocketAddressSupplier(Function { if (partitions.isEmpty()) { return Mono.fromCallable(() -> { - SocketAddress socketAddress = clientResources.socketAddressResolver().resolve(getFirstUri()); + SocketAddress socketAddress = getResources().socketAddressResolver().resolve(getFirstUri()); logger.debug("Resolved SocketAddress {} using {}", socketAddress, getFirstUri()); return socketAddress; }); @@ -1095,31 +1084,6 @@ protected void forEachCloseable(Predicate - * Subclasses of {@link RedisClusterClient} may override that method. - * - * @return {@link Iterable} of {@link RedisURI} for the next topology refresh. - */ - protected Iterable getTopologyRefreshSource() { - - boolean initialSeedNodes = !useDynamicRefreshSources(); - - Iterable seed; - if (initialSeedNodes || partitions == null || partitions.isEmpty()) { - seed = this.initialUris; - } else { - List uris = new ArrayList<>(); - for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) { - uris.add(partition.getUri()); - } - seed = uris; - } - return seed; - } - /** * Returns {@literal true} if {@link ClusterTopologyRefreshOptions#useDynamicRefreshSources() dynamic refresh sources} are * enabled. @@ -1146,6 +1110,35 @@ protected RedisCodec newStringStringCodec() { return StringCodec.UTF8; } + /** + * Resolve a {@link RedisURI} from a map of cluster views by {@link Partitions} as key + * + * @param map the map + * @param partitions the key + * @return a {@link RedisURI} or null + */ + private static RedisURI getViewedBy(Map map, Partitions partitions) { + + for (Map.Entry entry : map.entrySet()) { + if (entry.getValue() == partitions) { + return entry.getKey(); + } + } + + return null; + } + + private static Throwable unwrap(Throwable throwable) { + + Throwable ex = throwable; + + while (ex instanceof CompletionException || ex instanceof ExecutionException) { + ex = ex.getCause(); + } + + return ex; + } + ClusterClientOptions getClusterClientOptions() { return (ClusterClientOptions) getOptions(); }