Skip to content

Commit

Permalink
Polishing #2401
Browse files Browse the repository at this point in the history
Refactor Set of Commands into ReadOnlyPredicate to provide reusable components for predicate composition. Also allow override over Lettuce defaults.

Incorporate optimization of getIntent logic.

Original pull requests: #2447, #2442
  • Loading branch information
mp911de committed Jul 17, 2023
1 parent afa58fa commit c135dc1
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 114 deletions.
71 changes: 29 additions & 42 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.ReadOnlyCommands;
import io.lettuce.core.resource.ClientResources;

/**
Expand All @@ -48,14 +45,14 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final Set<ProtocolKeyword> DEFAULT_EXTRA_READONLY_COMMANDS = Collections.emptySet();

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;

public static final ProtocolVersion DEFAULT_PROTOCOL_VERSION = ProtocolVersion.newestSupported();

public static final ReadOnlyCommands.ReadOnlyPredicate DEFAULT_READ_ONLY_COMMANDS = ReadOnlyCommands.asPredicate();

public static final int DEFAULT_REQUEST_QUEUE_SIZE = Integer.MAX_VALUE;

public static final Charset DEFAULT_SCRIPT_CHARSET = StandardCharsets.UTF_8;
Expand All @@ -76,14 +73,14 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final Set<ProtocolKeyword> extraReadOnlyCommands;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;

private final ProtocolVersion protocolVersion;

private final ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands;

private final int requestQueueSize;

private final Charset scriptCharset;
Expand All @@ -102,10 +99,10 @@ protected ClientOptions(Builder builder) {
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.extraReadOnlyCommands = builder.extraReadOnlyCommands;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
this.readOnlyCommands = builder.readOnlyCommands;
this.requestQueueSize = builder.requestQueueSize;
this.scriptCharset = builder.scriptCharset;
this.socketOptions = builder.socketOptions;
Expand All @@ -119,10 +116,10 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.extraReadOnlyCommands = original.getExtraReadOnlyCommands();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
this.readOnlyCommands = original.getReadOnlyCommands();
this.requestQueueSize = original.getRequestQueueSize();
this.scriptCharset = original.getScriptCharset();
this.socketOptions = original.getSocketOptions();
Expand Down Expand Up @@ -172,14 +169,14 @@ public static class Builder {

private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR;

private Set<ProtocolKeyword> extraReadOnlyCommands = DEFAULT_EXTRA_READONLY_COMMANDS;

private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION;

private ProtocolVersion protocolVersion;

private boolean publishOnScheduler = DEFAULT_PUBLISH_ON_SCHEDULER;

private ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands = DEFAULT_READ_ONLY_COMMANDS;

private int requestQueueSize = DEFAULT_REQUEST_QUEUE_SIZE;

private Charset scriptCharset = DEFAULT_SCRIPT_CHARSET;
Expand Down Expand Up @@ -271,20 +268,6 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Identifies extra commands (module commands) as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
*
* @param extraReadOnlyCommands must not be {@code null}.
* @return {@code this}
*/
public Builder extraReadOnlyCommands(Set<ProtocolKeyword> extraReadOnlyCommands) {

LettuceAssert.notNull(extraReadOnlyCommands, "extraReadOnlyCommands must not be null");
this.extraReadOnlyCommands = Collections.unmodifiableSet(new HashSet<>(extraReadOnlyCommands));
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -338,6 +321,21 @@ public Builder publishOnScheduler(boolean publishOnScheduler) {
return this;
}

/**
* Identifies commands (e.g. module commands) as read-only. Defaults {@link #DEFAULT_READ_ONLY_COMMANDS}, see
* {@link ReadOnlyCommands}.
*
* @param readOnlyCommands must not be {@code null}.
* @return {@code this}
* @see 6.2.4
*/
public Builder readOnlyCommands(ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands) {

LettuceAssert.notNull(readOnlyCommands, "readOnlyCommands must not be null");
this.readOnlyCommands = readOnlyCommands;
return this;
}

/**
* Set the per-connection request queue size. The command invocation will lead to a {@link RedisException} if the queue
* size is exceeded. Setting the {@code requestQueueSize} to a lower value will lead earlier to exceptions during
Expand Down Expand Up @@ -446,7 +444,7 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.extraReadOnlyCommands(getExtraReadOnlyCommands())
.readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
Expand Down Expand Up @@ -516,23 +514,12 @@ public DisconnectedBehavior getDisconnectedBehavior() {
}

/**
* Extra commands (module commands) which are identified as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
*
* @return the set of extra read-only commands
*/
public Set<ProtocolKeyword> getExtraReadOnlyCommands() {
return extraReadOnlyCommands;
}

/**
* Check if a command is identified as an extra read-only command.
*
* @param command
* @return {@code true} if the command is an extra read-only command.
* @return the predicate to identify read-only commands.
*/
public boolean isExtraReadOnlyCommand(ProtocolKeyword command) {
return extraReadOnlyCommands.contains(command);
public ReadOnlyCommands.ReadOnlyPredicate getReadOnlyCommands() {
return readOnlyCommands;
}

/**
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.ReadOnlyCommands;

/**
* Client Options to control the behavior of {@link RedisClusterClient}.
Expand All @@ -38,6 +39,8 @@ public class ClusterClientOptions extends ClientOptions {

public static final boolean DEFAULT_CLOSE_STALE_CONNECTIONS = true;

public static final ReadOnlyCommands.ReadOnlyPredicate DEFAULT_READ_ONLY_COMMANDS = ClusterReadOnlyCommands.asPredicate();

public static final int DEFAULT_MAX_REDIRECTS = 5;

public static final boolean DEFAULT_REFRESH_CLUSTER_VIEW = false;
Expand Down Expand Up @@ -163,6 +166,7 @@ public static class Builder extends ClientOptions.Builder {
private ClusterTopologyRefreshOptions topologyRefreshOptions = null;

protected Builder() {
readOnlyCommands(DEFAULT_READ_ONLY_COMMANDS);
}

@Override
Expand Down Expand Up @@ -246,6 +250,13 @@ public Builder publishOnScheduler(boolean publishOnScheduler) {
return this;
}

@Override
public Builder readOnlyCommands(ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands) {

super.readOnlyCommands(readOnlyCommands);
return this;
}

@Override
public Builder requestQueueSize(int requestQueueSize) {
super.requestQueueSize(requestQueueSize);
Expand Down Expand Up @@ -343,7 +354,8 @@ public ClusterClientOptions.Builder mutate() {
.decodeBufferPolicy(getDecodeBufferPolicy())
.disconnectedBehavior(getDisconnectedBehavior()).maxRedirects(getMaxRedirects())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.protocolVersion(getConfiguredProtocolVersion()).readOnlyCommands(getReadOnlyCommands())
.requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions())
.topologyRefreshOptions(getTopologyRefreshOptions())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.ConnectionIntent;
import io.lettuce.core.protocol.DefaultEndpoint;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.ReadOnlyCommands;
import io.lettuce.core.protocol.RedisCommand;
import io.lettuce.core.resource.ClientResources;

Expand All @@ -70,6 +70,8 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter {

private final ClientOptions clientOptions;

private final ReadOnlyCommands.ReadOnlyPredicate readOnlyCommands;

private final ClusterEventListener clusterEventListener;

private final int executionLimit;
Expand All @@ -93,6 +95,7 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter {

this.defaultWriter = defaultWriter;
this.clientOptions = clientOptions;
this.readOnlyCommands = clientOptions.getReadOnlyCommands();
this.clusterEventListener = clusterEventListener;
}

Expand Down Expand Up @@ -169,7 +172,7 @@ private <K, V, T> RedisCommand<K, V, T> doWrite(RedisCommand<K, V, T> command) {
if (encodedKey != null) {

int hash = getSlot(encodedKey);
ConnectionIntent connectionIntent = getIntent(command.getType());
ConnectionIntent connectionIntent = getIntent(command);

CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
.getConnectionAsync(connectionIntent, hash);
Expand Down Expand Up @@ -342,36 +345,22 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) {
*/
ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {

boolean w = false;
boolean r = false;
ConnectionIntent singleConnectionIntent = ConnectionIntent.WRITE;
if (commands.isEmpty()) {
return ConnectionIntent.WRITE;
}

for (RedisCommand<?, ?, ?> command : commands) {

if (command instanceof ClusterCommand) {
continue;
}

singleConnectionIntent = getIntent(command.getType());
if (singleConnectionIntent == ConnectionIntent.READ) {
r = true;
}

if (singleConnectionIntent == ConnectionIntent.WRITE) {
w = true;
}

if (r && w) {
if (!readOnlyCommands.isReadOnly(command)) {
return ConnectionIntent.WRITE;
}
}

return singleConnectionIntent;
return ConnectionIntent.READ;
}

private ConnectionIntent getIntent(ProtocolKeyword type) {
return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type))
? ConnectionIntent.READ : ConnectionIntent.WRITE;
private ConnectionIntent getIntent(RedisCommand<?, ?, ?> command) {
return readOnlyCommands.isReadOnly(command) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,24 @@

import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.ReadOnlyCommands;

/**
* Contains all command names that are read-only commands.
*
* @author Mark Paluch
* @since 6.2.5
*/
class ReadOnlyCommands {
public class ClusterReadOnlyCommands {

private static final Set<CommandType> READ_ONLY_COMMANDS = EnumSet.noneOf(CommandType.class);

private static final ReadOnlyCommands.ReadOnlyPredicate PREDICATE = command -> isReadOnlyCommand(command.getType());

static {

READ_ONLY_COMMANDS.addAll(ReadOnlyCommands.getReadOnlyCommands());

for (CommandName commandNames : CommandName.values()) {
READ_ONLY_COMMANDS.add(CommandType.valueOf(commandNames.name()));
}
Expand All @@ -52,17 +59,18 @@ public static Set<CommandType> getReadOnlyCommands() {
return Collections.unmodifiableSet(READ_ONLY_COMMANDS);
}

/**
* Return a {@link ReadOnlyCommands.ReadOnlyPredicate} to test against the underlying
* {@link #isReadOnlyCommand(ProtocolKeyword) known commands}.
*
* @return a {@link ReadOnlyCommands.ReadOnlyPredicate} to test against the underlying
* {@link #isReadOnlyCommand(ProtocolKeyword) known commands}.
*/
public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() {
return PREDICATE;
}

enum CommandName {
ASKING, BITCOUNT, BITPOS, CLIENT, COMMAND, DUMP, ECHO, EVAL_RO, EVALSHA_RO, EXISTS, //
GEODIST, GEOPOS, GEORADIUS, GEORADIUS_RO, GEORADIUSBYMEMBER, GEORADIUSBYMEMBER_RO, GEOHASH, GET, GETBIT, //
GETRANGE, HEXISTS, HGET, HGETALL, HKEYS, HLEN, HMGET, HRANDFIELD, HSCAN, HSTRLEN, //
HVALS, INFO, KEYS, LINDEX, LLEN, LPOS, LRANGE, SORT_RO, MGET, PFCOUNT, PTTL, //
RANDOMKEY, READWRITE, SCAN, SCARD, SCRIPT, //
SDIFF, SINTER, SISMEMBER, SMISMEMBER, SMEMBERS, SRANDMEMBER, SSCAN, STRLEN, //
SUNION, TIME, TTL, TYPE, //
XINFO, XLEN, XPENDING, XRANGE, XREVRANGE, XREAD, //
ZCARD, ZCOUNT, ZLEXCOUNT, ZRANGE, //
ZRANDMEMBER, ZRANGEBYLEX, ZRANGEBYSCORE, ZRANK, ZREVRANGE, ZREVRANGEBYLEX, ZREVRANGEBYSCORE, ZREVRANK, ZSCAN, ZSCORE,

// Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE
Expand Down
Loading

0 comments on commit c135dc1

Please sign in to comment.