Skip to content

Commit

Permalink
Support for module-based read-only commands #2401
Browse files Browse the repository at this point in the history
Original pull requests: #2447, #2442
  • Loading branch information
JimB123 authored and mp911de committed Jul 17, 2023
1 parent 66671e2 commit afa58fa
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 31 deletions.
49 changes: 49 additions & 0 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.resource.ClientResources;

Expand All @@ -31,6 +35,7 @@
*
* @author Mark Paluch
* @author Gavin Cook
* @author Jim Brunner
*/
@SuppressWarnings("serial")
public class ClientOptions implements Serializable {
Expand All @@ -43,6 +48,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final Set<ProtocolKeyword> DEFAULT_EXTRA_READONLY_COMMANDS = Collections.emptySet();

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand All @@ -69,6 +76,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final Set<ProtocolKeyword> extraReadOnlyCommands;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand All @@ -87,11 +96,13 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;


protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.extraReadOnlyCommands = builder.extraReadOnlyCommands;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -108,6 +119,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.extraReadOnlyCommands = original.getExtraReadOnlyCommands();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -160,6 +172,8 @@ public static class Builder {

private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR;

private Set<ProtocolKeyword> extraReadOnlyCommands = DEFAULT_EXTRA_READONLY_COMMANDS;

private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION;

private ProtocolVersion protocolVersion;
Expand Down Expand Up @@ -257,6 +271,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Identifies extra commands (module commands) as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
*
* @param extraReadOnlyCommands must not be {@code null}.
* @return {@code this}
*/
public Builder extraReadOnlyCommands(Set<ProtocolKeyword> extraReadOnlyCommands) {

LettuceAssert.notNull(extraReadOnlyCommands, "extraReadOnlyCommands must not be null");
this.extraReadOnlyCommands = Collections.unmodifiableSet(new HashSet<>(extraReadOnlyCommands));
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -418,6 +446,7 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.extraReadOnlyCommands(getExtraReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
Expand Down Expand Up @@ -486,6 +515,26 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

/**
* Extra commands (module commands) which are identified as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
*
* @return the set of extra read-only commands
*/
public Set<ProtocolKeyword> getExtraReadOnlyCommands() {
return extraReadOnlyCommands;
}

/**
* Check if a command is identified as an extra read-only command.
*
* @param command
* @return {@code true} if the command is an extra read-only command.
*/
public boolean isExtraReadOnlyCommand(ProtocolKeyword command) {
return extraReadOnlyCommands.contains(command);
}

/**
* Request queue size for a connection. This value applies per connection. The command invocation will throw a
* {@link RedisException} if the queue size is exceeded and a new command is requested. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@
* Channel writer for cluster operation. This writer looks up the right partition by hash/slot for the operation.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 3.0
*/
class ClusterDistributionChannelWriter implements RedisChannelWriter {

private final RedisChannelWriter defaultWriter;

private final ClientOptions clientOptions;

private final ClusterEventListener clusterEventListener;

private final int executionLimit;
Expand All @@ -89,6 +92,7 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter {
}

this.defaultWriter = defaultWriter;
this.clientOptions = clientOptions;
this.clusterEventListener = clusterEventListener;
}

Expand Down Expand Up @@ -336,7 +340,7 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) {
* @param commands {@link Collection} of {@link RedisCommand commands}.
* @return the connectionIntent.
*/
static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {
ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {

boolean w = false;
boolean r = false;
Expand Down Expand Up @@ -365,8 +369,9 @@ static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> co
return singleConnectionIntent;
}

private static ConnectionIntent getIntent(ProtocolKeyword type) {
return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
private ConnectionIntent getIntent(ProtocolKeyword type) {
return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type))
? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* a single {@link RedisURI}.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class AutodiscoveryConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -125,8 +126,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re

connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions());

StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, redisURI.getTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.internal.LettuceAssert;
Expand All @@ -33,21 +35,25 @@
* Channel writer/dispatcher that dispatches commands based on the ConnectionIntent to different connections.
*
* @author Mark Paluch
* @author Jim Brunner
*/
class MasterReplicaChannelWriter implements RedisChannelWriter {

private MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider;

private final ClientResources clientResources;

private final ClientOptions clientOptions;

private boolean closed = false;

private boolean inTransaction;

MasterReplicaChannelWriter(MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider,
ClientResources clientResources) {
ClientResources clientResources, ClientOptions clientOptions) {
this.masterReplicaConnectionProvider = masterReplicaConnectionProvider;
this.clientResources = clientResources;
this.clientOptions = clientOptions;
}

@Override
Expand Down Expand Up @@ -162,7 +168,7 @@ private static <K, V> void writeCommands(Collection<? extends RedisCommand<K, V,
* @param commands {@link Collection} of {@link RedisCommand commands}.
* @return the ConnectionIntent.
*/
static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {
ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {

boolean w = false;
boolean r = false;
Expand All @@ -187,8 +193,9 @@ static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> co
return singleIntent;
}

private static ConnectionIntent getIntent(ProtocolKeyword type) {
return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
private ConnectionIntent getIntent(ProtocolKeyword type) {
return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type))
? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* {@link MasterReplicaConnector} to connect a Sentinel-managed Master/Replica setup using a Sentinel {@link RedisURI}.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class SentinelConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -83,7 +84,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re
connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources()) {
redisClient.getResources(), redisClient.getOptions()) {

@Override
public CompletableFuture<Void> closeAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* {@link RedisURI}. This connector determines roles and remains using only the provided endpoints.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class StaticMasterReplicaConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -81,8 +82,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re

connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions());

StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, seedNode.getTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import io.lettuce.core.ClientOptions;
import io.lettuce.core.CommandListenerWriter;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.StatefulRedisConnectionImpl;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
Expand All @@ -60,6 +61,7 @@
*
* @author Mark Paluch
* @author koisyu
* @author Jim Brunner
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand All @@ -74,6 +76,9 @@ class ClusterDistributionChannelWriterUnitTests {
@Mock
private ClientResources clientResources;

@Mock
private ClientOptions clientOptions;

@Mock
private ClusterEventListener clusterEventListener;

Expand Down Expand Up @@ -159,40 +164,48 @@ void shouldParseIPv6MovedTargetCorrectly() {
@Test
void shouldReturnIntentForWriteCommand() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> set = new Command<>(CommandType.SET, null);
RedisCommand<String, String, String> mset = new Command<>(CommandType.MSET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
}

@Test
void shouldReturnDefaultIntentForNoCommands() {

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE);
ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

assertThat(writer.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE);
}

@Test
void shouldReturnIntentForReadCommand() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> get = new Command<>(CommandType.GET, null);
RedisCommand<String, String, String> mget = new Command<>(CommandType.MGET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ);
assertThat(writer.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ);
assertThat(writer.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ);
}

@Test
void shouldReturnIntentForMixedCommands() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> set = new Command<>(CommandType.SET, null);
RedisCommand<String, String, String> mget = new Command<>(CommandType.MGET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
}

@Test
Expand Down
Loading

0 comments on commit afa58fa

Please sign in to comment.