diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 4c60726240..8e59538239 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -18,16 +18,13 @@ import java.io.Serializable; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.DecodeBufferPolicies; import io.lettuce.core.protocol.DecodeBufferPolicy; -import io.lettuce.core.protocol.ProtocolKeyword; import io.lettuce.core.protocol.ProtocolVersion; +import io.lettuce.core.protocol.ReadOnlyCommands; import io.lettuce.core.resource.ClientResources; /** @@ -48,14 +45,14 @@ public class ClientOptions implements Serializable { public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT; - public static final Set DEFAULT_EXTRA_READONLY_COMMANDS = Collections.emptySet(); - public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false; public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true; public static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.newestSupported(); + public static final ReadOnlyCommands.ReadOnlyPredicate DEFAULT_READ_ONLY_COMMANDS = ReadOnlyCommands.asPredicate(); + public static final int DEFAULT_REQUEST_QUEUE_SIZE = Integer.MAX_VALUE; public static final Charset DEFAULT_SCRIPT_CHARSET = StandardCharsets.UTF_8; @@ -76,14 +73,14 @@ public class ClientOptions implements Serializable { private final DisconnectedBehavior disconnectedBehavior; - private final Set extraReadOnlyCommands; - private final boolean publishOnScheduler; private final boolean pingBeforeActivateConnection; private final ProtocolVersion protocolVersion; + private final ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands; + private final int requestQueueSize; private final Charset scriptCharset; @@ -102,10 +99,10 @@ protected ClientOptions(Builder builder) { this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure; this.decodeBufferPolicy = builder.decodeBufferPolicy; this.disconnectedBehavior = builder.disconnectedBehavior; - this.extraReadOnlyCommands = builder.extraReadOnlyCommands; this.publishOnScheduler = builder.publishOnScheduler; this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection; this.protocolVersion = builder.protocolVersion; + this.readOnlyCommands = builder.readOnlyCommands; this.requestQueueSize = builder.requestQueueSize; this.scriptCharset = builder.scriptCharset; this.socketOptions = builder.socketOptions; @@ -119,10 +116,10 @@ protected ClientOptions(ClientOptions original) { this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure(); this.decodeBufferPolicy = original.getDecodeBufferPolicy(); this.disconnectedBehavior = original.getDisconnectedBehavior(); - this.extraReadOnlyCommands = original.getExtraReadOnlyCommands(); this.publishOnScheduler = original.isPublishOnScheduler(); this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection(); this.protocolVersion = original.getConfiguredProtocolVersion(); + this.readOnlyCommands = original.getReadOnlyCommands(); this.requestQueueSize = original.getRequestQueueSize(); this.scriptCharset = original.getScriptCharset(); this.socketOptions = original.getSocketOptions(); @@ -172,14 +169,14 @@ public static class Builder { private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR; - private Set extraReadOnlyCommands = DEFAULT_EXTRA_READONLY_COMMANDS; - private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION; private ProtocolVersion protocolVersion; private boolean publishOnScheduler = DEFAULT_PUBLISH_ON_SCHEDULER; + private ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands = DEFAULT_READ_ONLY_COMMANDS; + private int requestQueueSize = DEFAULT_REQUEST_QUEUE_SIZE; private Charset scriptCharset = DEFAULT_SCRIPT_CHARSET; @@ -271,20 +268,6 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) { return this; } - /** - * Identifies extra commands (module commands) as read-only. Defaults to {@code emptySet}. See - * {@link #DEFAULT_EXTRA_READONLY_COMMANDS}. - * - * @param extraReadOnlyCommands must not be {@code null}. - * @return {@code this} - */ - public Builder extraReadOnlyCommands(Set extraReadOnlyCommands) { - - LettuceAssert.notNull(extraReadOnlyCommands, "extraReadOnlyCommands must not be null"); - this.extraReadOnlyCommands = Collections.unmodifiableSet(new HashSet<>(extraReadOnlyCommands)); - return this; - } - /** * Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true} * (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will @@ -338,6 +321,21 @@ public Builder publishOnScheduler(boolean publishOnScheduler) { return this; } + /** + * Identifies commands (e.g. module commands) as read-only. Defaults {@link #DEFAULT_READ_ONLY_COMMANDS}, see + * {@link ReadOnlyCommands}. + * + * @param readOnlyCommands must not be {@code null}. + * @return {@code this} + * @see 6.2.4 + */ + public Builder readOnlyCommands(ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands) { + + LettuceAssert.notNull(readOnlyCommands, "readOnlyCommands must not be null"); + this.readOnlyCommands = readOnlyCommands; + return this; + } + /** * Set the per-connection request queue size. The command invocation will lead to a {@link RedisException} if the queue * size is exceeded. Setting the {@code requestQueueSize} to a lower value will lead earlier to exceptions during @@ -446,7 +444,7 @@ public ClientOptions.Builder mutate() { builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) .decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior()) - .extraReadOnlyCommands(getExtraReadOnlyCommands()) + .readOnlyCommands(getReadOnlyCommands()) .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) .scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions()) @@ -516,23 +514,12 @@ public DisconnectedBehavior getDisconnectedBehavior() { } /** - * Extra commands (module commands) which are identified as read-only. Defaults to {@code emptySet}. See - * {@link #DEFAULT_EXTRA_READONLY_COMMANDS}. + * Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}. * - * @return the set of extra read-only commands - */ - public Set getExtraReadOnlyCommands() { - return extraReadOnlyCommands; - } - - /** - * Check if a command is identified as an extra read-only command. - * - * @param command - * @return {@code true} if the command is an extra read-only command. + * @return the predicate to identify read-only commands. */ - public boolean isExtraReadOnlyCommand(ProtocolKeyword command) { - return extraReadOnlyCommands.contains(command); + public ReadOnlyCommands.ReadOnlyPredicate getReadOnlyCommands() { + return readOnlyCommands; } /** diff --git a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java index a1e3d83fb0..e1b63788c6 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java @@ -27,6 +27,7 @@ import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.DecodeBufferPolicy; import io.lettuce.core.protocol.ProtocolVersion; +import io.lettuce.core.protocol.ReadOnlyCommands; /** * Client Options to control the behavior of {@link RedisClusterClient}. @@ -38,6 +39,8 @@ public class ClusterClientOptions extends ClientOptions { public static final boolean DEFAULT_CLOSE_STALE_CONNECTIONS = true; + public static final ReadOnlyCommands.ReadOnlyPredicate DEFAULT_READ_ONLY_COMMANDS = ClusterReadOnlyCommands.asPredicate(); + public static final int DEFAULT_MAX_REDIRECTS = 5; public static final boolean DEFAULT_REFRESH_CLUSTER_VIEW = false; @@ -163,6 +166,7 @@ public static class Builder extends ClientOptions.Builder { private ClusterTopologyRefreshOptions topologyRefreshOptions = null; protected Builder() { + readOnlyCommands(DEFAULT_READ_ONLY_COMMANDS); } @Override @@ -246,6 +250,13 @@ public Builder publishOnScheduler(boolean publishOnScheduler) { return this; } + @Override + public Builder readOnlyCommands(ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands) { + + super.readOnlyCommands(readOnlyCommands); + return this; + } + @Override public Builder requestQueueSize(int requestQueueSize) { super.requestQueueSize(requestQueueSize); @@ -343,7 +354,8 @@ public ClusterClientOptions.Builder mutate() { .decodeBufferPolicy(getDecodeBufferPolicy()) .disconnectedBehavior(getDisconnectedBehavior()).maxRedirects(getMaxRedirects()) .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) - .protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize()) + .protocolVersion(getConfiguredProtocolVersion()).readOnlyCommands(getReadOnlyCommands()) + .requestQueueSize(getRequestQueueSize()) .scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions()) .suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions()) .topologyRefreshOptions(getTopologyRefreshOptions()) diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 8371ed117c..f1fcc0a656 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -53,7 +53,7 @@ import io.lettuce.core.protocol.ConnectionFacade; import io.lettuce.core.protocol.ConnectionIntent; import io.lettuce.core.protocol.DefaultEndpoint; -import io.lettuce.core.protocol.ProtocolKeyword; +import io.lettuce.core.protocol.ReadOnlyCommands; import io.lettuce.core.protocol.RedisCommand; import io.lettuce.core.resource.ClientResources; @@ -70,6 +70,8 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter { private final ClientOptions clientOptions; + private final ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands; + private final ClusterEventListener clusterEventListener; private final int executionLimit; @@ -93,6 +95,7 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter { this.defaultWriter = defaultWriter; this.clientOptions = clientOptions; + this.readOnlyCommands = clientOptions.getReadOnlyCommands(); this.clusterEventListener = clusterEventListener; } @@ -169,7 +172,7 @@ private RedisCommand doWrite(RedisCommand command) { if (encodedKey != null) { int hash = getSlot(encodedKey); - ConnectionIntent connectionIntent = getIntent(command.getType()); + ConnectionIntent connectionIntent = getIntent(command); CompletableFuture> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider) .getConnectionAsync(connectionIntent, hash); @@ -342,36 +345,22 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) { */ ConnectionIntent getIntent(Collection> commands) { - boolean w = false; - boolean r = false; - ConnectionIntent singleConnectionIntent = ConnectionIntent.WRITE; + if (commands.isEmpty()) { + return ConnectionIntent.WRITE; + } for (RedisCommand command : commands) { - if (command instanceof ClusterCommand) { - continue; - } - - singleConnectionIntent = getIntent(command.getType()); - if (singleConnectionIntent == ConnectionIntent.READ) { - r = true; - } - - if (singleConnectionIntent == ConnectionIntent.WRITE) { - w = true; - } - - if (r && w) { + if (!readOnlyCommands.isReadOnly(command)) { return ConnectionIntent.WRITE; } } - return singleConnectionIntent; + return ConnectionIntent.READ; } - private ConnectionIntent getIntent(ProtocolKeyword type) { - return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type)) - ? ConnectionIntent.READ : ConnectionIntent.WRITE; + private ConnectionIntent getIntent(RedisCommand command) { + return readOnlyCommands.isReadOnly(command) ? ConnectionIntent.READ : ConnectionIntent.WRITE; } static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) { diff --git a/src/main/java/io/lettuce/core/cluster/ReadOnlyCommands.java b/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java similarity index 70% rename from src/main/java/io/lettuce/core/cluster/ReadOnlyCommands.java rename to src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java index dfb3781bf1..e76e9f3943 100644 --- a/src/main/java/io/lettuce/core/cluster/ReadOnlyCommands.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterReadOnlyCommands.java @@ -21,17 +21,24 @@ import io.lettuce.core.protocol.CommandType; import io.lettuce.core.protocol.ProtocolKeyword; +import io.lettuce.core.protocol.ReadOnlyCommands; /** * Contains all command names that are read-only commands. * * @author Mark Paluch + * @since 6.2.5 */ -class ReadOnlyCommands { +public class ClusterReadOnlyCommands { private static final Set READ_ONLY_COMMANDS = EnumSet.noneOf(CommandType.class); + private static final ReadOnlyCommands.ReadOnlyPredicate PREDICATE = command -> isReadOnlyCommand(command.getType()); + static { + + READ_ONLY_COMMANDS.addAll(ReadOnlyCommands.getReadOnlyCommands()); + for (CommandName commandNames : CommandName.values()) { READ_ONLY_COMMANDS.add(CommandType.valueOf(commandNames.name())); } @@ -52,17 +59,18 @@ public static Set getReadOnlyCommands() { return Collections.unmodifiableSet(READ_ONLY_COMMANDS); } + /** + * Return a {@link ReadOnlyCommands.ReadOnlyPredicate} to test against the underlying + * {@link #isReadOnlyCommand(ProtocolKeyword) known commands}. + * + * @return a {@link ReadOnlyCommands.ReadOnlyPredicate} to test against the underlying + * {@link #isReadOnlyCommand(ProtocolKeyword) known commands}. + */ + public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() { + return PREDICATE; + } + enum CommandName { - ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EVAL_RO, EVALSHA_RO, EXISTS, // - GEODIST, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, GEOHASH, GET, GETBIT, // - GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HRANDFIELD, HSCAN, HSTRLEN, // - HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, SORT_RO, MGET, PFCOUNT, PTTL, // - RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT, // - SDIFF, SINTER, SISMEMBER, SMISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN, // - SUNION, TIME, TTL, TYPE, // - XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, // - ZCARD, ZCOUNT, ZLEXCOUNT, ZRANGE, // - ZRANDMEMBER, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREVRANGE, ZREVRANGEBYLEX, ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE, // Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE diff --git a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java index d902b59de0..d30b0ab60f 100644 --- a/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java +++ b/src/main/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriter.java @@ -21,7 +21,6 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.ReadFrom; import io.lettuce.core.RedisChannelWriter; -import io.lettuce.core.RedisClient; import io.lettuce.core.RedisException; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.internal.LettuceAssert; @@ -45,6 +44,8 @@ class MasterReplicaChannelWriter implements RedisChannelWriter { private final ClientOptions clientOptions; + private final io.lettuce.core.protocol.ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands; + private boolean closed = false; private boolean inTransaction; @@ -54,6 +55,7 @@ class MasterReplicaChannelWriter implements RedisChannelWriter { this.masterReplicaConnectionProvider = masterReplicaConnectionProvider; this.clientResources = clientResources; this.clientOptions = clientOptions; + this.readOnlyCommands = clientOptions.getReadOnlyCommands(); } @Override @@ -70,7 +72,8 @@ public RedisCommand write(RedisCommand command) { inTransaction = true; } - ConnectionIntent connectionIntent = inTransaction ? ConnectionIntent.WRITE : getIntent(command.getType()); + ConnectionIntent connectionIntent = inTransaction ? ConnectionIntent.WRITE + : (readOnlyCommands.isReadOnly(command) ? ConnectionIntent.READ : ConnectionIntent.WRITE); CompletableFuture> future = (CompletableFuture) masterReplicaConnectionProvider .getConnectionAsync(connectionIntent); @@ -170,32 +173,18 @@ private static void writeCommands(Collection> commands) { - boolean w = false; - boolean r = false; - ConnectionIntent singleIntent = ConnectionIntent.WRITE; + if (commands.isEmpty()) { + return ConnectionIntent.WRITE; + } for (RedisCommand command : commands) { - singleIntent = getIntent(command.getType()); - if (singleIntent == ConnectionIntent.READ) { - r = true; - } - - if (singleIntent == ConnectionIntent.WRITE) { - w = true; - } - - if (r && w) { + if (!readOnlyCommands.isReadOnly(command)) { return ConnectionIntent.WRITE; } } - return singleIntent; - } - - private ConnectionIntent getIntent(ProtocolKeyword type) { - return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type)) - ? ConnectionIntent.READ : ConnectionIntent.WRITE; + return ConnectionIntent.READ; } @Override diff --git a/src/main/java/io/lettuce/core/protocol/ReadOnlyCommands.java b/src/main/java/io/lettuce/core/protocol/ReadOnlyCommands.java new file mode 100644 index 0000000000..32f9d4aa88 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/ReadOnlyCommands.java @@ -0,0 +1,150 @@ +/* + * Copyright 2020-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; + +import io.lettuce.core.internal.LettuceAssert; + +/** + * Contains all command names that are read-only commands. + * + * @author Mark Paluch + * @since 6.2.5 + */ +public class ReadOnlyCommands { + + private static final Set READ_ONLY_COMMANDS = EnumSet.noneOf(CommandType.class); + + private static final ReadOnlyPredicate PREDICATE = command -> isReadOnlyCommand(command.getType()); + + static { + for (CommandName commandNames : CommandName.values()) { + READ_ONLY_COMMANDS.add(CommandType.valueOf(commandNames.name())); + } + } + + /** + * @param protocolKeyword must not be {@code null}. + * @return {@code true} if {@link ProtocolKeyword} is a read-only command. + */ + public static boolean isReadOnlyCommand(ProtocolKeyword protocolKeyword) { + return READ_ONLY_COMMANDS.contains(protocolKeyword); + } + + /** + * @return an unmodifiable {@link Set} of {@link CommandType read-only} commands. + */ + public static Set getReadOnlyCommands() { + return Collections.unmodifiableSet(READ_ONLY_COMMANDS); + } + + /** + * Return a {@link ReadOnlyPredicate} to test against the underlying {@link #isReadOnlyCommand(ProtocolKeyword) known + * commands}. + * + * @return a {@link ReadOnlyPredicate} to test against the underlying {@link #isReadOnlyCommand(ProtocolKeyword) known + * commands}. + */ + public static ReadOnlyPredicate asPredicate() { + return PREDICATE; + } + + enum CommandName { + ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EVAL_RO, EVALSHA_RO, EXISTS, // + GEODIST, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, GEOHASH, GET, GETBIT, // + GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HRANDFIELD, HSCAN, HSTRLEN, // + HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, SORT_RO, MGET, PFCOUNT, PTTL, // + RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT, // + SDIFF, SINTER, SISMEMBER, SMISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN, // + SUNION, TIME, TTL, TYPE, // + XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, // + ZCARD, ZCOUNT, ZLEXCOUNT, ZRANGE, // + ZRANDMEMBER, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREVRANGE, ZREVRANGEBYLEX, ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE, + } + + /** + * A predicate to determine whether a command qualifies as Read-Only command. + * + * @since 6.2.4 + */ + @FunctionalInterface + public interface ReadOnlyPredicate { + + /** + * Evaluates this predicate on the given {@link RedisCommand}. + * + * @param command the input command. + * @return {@code true} if the input argument matches the predicate, otherwise {@code false} + */ + boolean isReadOnly(RedisCommand command); + + /** + * Returns a composed predicate that represents a short-circuiting logical AND of this predicate and another. When + * evaluating the composed predicate, if this predicate is {@code false}, then the {@code other} predicate is not + * evaluated. + * + *

+ * Any exceptions thrown during evaluation of either predicate are relayed to the caller; if evaluation of this + * predicate throws an exception, the {@code other} predicate will not be evaluated. + * + * @param other a predicate that will be logically-ANDed with this predicate. + * @return a composed predicate that represents the short-circuiting logical AND of this predicate and the {@code other} + * predicate. + * @throws IllegalArgumentException if other is {@code null}. + */ + default ReadOnlyPredicate and(ReadOnlyPredicate other) { + + LettuceAssert.notNull(other, "Other ReadOnlyPredicate must not be null"); + + return (t) -> isReadOnly(t) && other.isReadOnly(t); + } + + /** + * Returns a predicate that represents the logical negation of this predicate. + * + * @return a predicate that represents the logical negation of this predicate. + */ + default ReadOnlyPredicate negate() { + return (t) -> !isReadOnly(t); + } + + /** + * Returns a composed predicate that represents a short-circuiting logical OR of this predicate and another. When + * evaluating the composed predicate, if this predicate is {@code true}, then the {@code other} predicate is not + * evaluated. + * + *

+ * Any exceptions thrown during evaluation of either predicate are relayed to the caller; if evaluation of this + * predicate throws an exception, the {@code other} predicate will not be evaluated. + * + * @param other a predicate that will be logically-ORed with this predicate. + * @return a composed predicate that represents the short-circuiting logical OR of this predicate and the {@code other} + * predicate. + * @throws IllegalArgumentException if other is {@code null}. + */ + default ReadOnlyPredicate or(ReadOnlyPredicate other) { + + LettuceAssert.notNull(other, "Other ReadOnlyPredicate must not be null"); + + return (t) -> isReadOnly(t) || other.isReadOnly(t); + } + + } + +} diff --git a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java index 64e3b3356b..480d428f85 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java @@ -15,12 +15,14 @@ */ package io.lettuce.core; -import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.*; import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Test; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandType; import io.lettuce.core.protocol.ProtocolVersion; /** @@ -35,6 +37,16 @@ void testNew() { checkAssertions(ClientOptions.create()); } + @Test + void testDefault() { + + ClientOptions options = ClientOptions.builder().build(); + + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.SET, null))).isFalse(); + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.PUBLISH, null))).isFalse(); + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.GET, null))).isTrue(); + } + @Test void testBuilder() { ClientOptions options = ClientOptions.builder().scriptCharset(StandardCharsets.US_ASCII).build(); diff --git a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java index ffe28d182a..35f31f3e9e 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsUnitTests.java @@ -24,6 +24,8 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandType; import io.lettuce.core.protocol.ProtocolVersion; /** @@ -39,6 +41,7 @@ void testCopy() { Predicate nodeFilter = it -> true; ClusterClientOptions options = ClusterClientOptions.builder().autoReconnect(false).requestQueueSize(100) .suspendReconnectOnProtocolFailure(true).maxRedirects(1234).validateClusterNodeMembership(false) + .readOnlyCommands(command -> command.getType() == CommandType.PING) .protocolVersion(ProtocolVersion.RESP2).nodeFilter(nodeFilter).build(); ClusterClientOptions copy = ClusterClientOptions.copyOf(options); @@ -55,6 +58,18 @@ void testCopy() { assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects()); assertThat(copy.getScriptCharset()).isEqualTo(StandardCharsets.UTF_8); assertThat(copy.getNodeFilter()).isEqualTo(nodeFilter); + assertThat(copy.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.GET, null))).isFalse(); + assertThat(copy.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.PING, null))).isTrue(); + } + + @Test + void testDefault() { + + ClusterClientOptions options = ClusterClientOptions.builder().build(); + + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.SET, null))).isFalse(); + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.PUBLISH, null))).isTrue(); + assertThat(options.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.GET, null))).isTrue(); } @Test @@ -83,7 +98,8 @@ void builderFromDefaultClientOptions() { void builderFromClusterClientOptions() { ClusterClientOptions options = ClusterClientOptions.builder().maxRedirects(1234).validateClusterNodeMembership(false) - .scriptCharset(StandardCharsets.US_ASCII).build(); + .scriptCharset(StandardCharsets.US_ASCII).readOnlyCommands(command -> command.getType() == CommandType.PING) + .build(); ClusterClientOptions copy = ClusterClientOptions.builder(options).build(); @@ -97,6 +113,8 @@ void builderFromClusterClientOptions() { assertThat(copy.isSuspendReconnectOnProtocolFailure()).isEqualTo(options.isSuspendReconnectOnProtocolFailure()); assertThat(copy.getMaxRedirects()).isEqualTo(options.getMaxRedirects()); assertThat(copy.getScriptCharset()).isEqualTo(options.getScriptCharset()); + assertThat(copy.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.GET, null))).isFalse(); + assertThat(copy.getReadOnlyCommands().isReadOnly(new Command<>(CommandType.PING, null))).isTrue(); assertThat(options.mutate()).isNotSameAs(copy.mutate()); } } diff --git a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java index 6fc4ceaab2..94c267c0bc 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java @@ -23,14 +23,10 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; -import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.models.partitions.Partitions; -import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentMatchers; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -38,10 +34,12 @@ import io.lettuce.core.ClientOptions; import io.lettuce.core.CommandListenerWriter; -import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisURI; import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.event.EventBus; import io.lettuce.core.internal.HostAndPort; @@ -76,8 +74,7 @@ class ClusterDistributionChannelWriterUnitTests { @Mock private ClientResources clientResources; - @Mock - private ClientOptions clientOptions; + private ClientOptions clientOptions = ClientOptions.create(); @Mock private ClusterEventListener clusterEventListener; @@ -94,13 +91,15 @@ class ClusterDistributionChannelWriterUnitTests { @Mock private PooledClusterConnectionProvider pooledClusterConnectionProvider; - @InjectMocks private ClusterDistributionChannelWriter clusterDistributionChannelWriter; @BeforeEach void setUp() { when(defaultWriter.getClientResources()).thenReturn(clientResources); when(clientResources.eventBus()).thenReturn(eventBus); + + clusterDistributionChannelWriter = new ClusterDistributionChannelWriter(defaultWriter, clientOptions, + clusterEventListener); } @Test diff --git a/src/test/java/io/lettuce/core/cluster/ReadOnlyCommandsUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java similarity index 80% rename from src/test/java/io/lettuce/core/cluster/ReadOnlyCommandsUnitTests.java rename to src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java index aaceee1459..c876b04ec9 100644 --- a/src/test/java/io/lettuce/core/cluster/ReadOnlyCommandsUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterReadOnlyCommandsUnitTests.java @@ -23,21 +23,21 @@ import io.lettuce.core.protocol.ProtocolKeyword; /** - * Tests for {@link ReadOnlyCommands}. + * Tests for {@link ClusterReadOnlyCommands}. * * @author Mark Paluch */ -class ReadOnlyCommandsUnitTests { +class ClusterReadOnlyCommandsUnitTests { @Test void testCount() { - assertThat(ReadOnlyCommands.getReadOnlyCommands()).hasSize(83); + assertThat(ClusterReadOnlyCommands.getReadOnlyCommands()).hasSize(83); } @Test void testResolvableCommandNames() { - for (ProtocolKeyword readOnlyCommand : ReadOnlyCommands.getReadOnlyCommands()) { + for (ProtocolKeyword readOnlyCommand : ClusterReadOnlyCommands.getReadOnlyCommands()) { assertThat(readOnlyCommand.name()).isEqualTo(CommandType.valueOf(readOnlyCommand.name()).name()); } } diff --git a/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java index ea34ac684a..3800fba1d4 100644 --- a/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/masterreplica/MasterReplicaChannelWriterUnitTests.java @@ -55,8 +55,7 @@ class MasterReplicaChannelWriterUnitTests { @Mock private ClientResources clientResources; - @Mock - private ClientOptions clientOptions; + private ClientOptions clientOptions = ClientOptions.create(); @Mock private StatefulRedisConnection connection;