Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for module-based read-only commands #2401 #2447

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.DecodeBufferPolicies;
import io.lettuce.core.protocol.DecodeBufferPolicy;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.resource.ClientResources;

Expand All @@ -31,6 +35,7 @@
*
* @author Mark Paluch
* @author Gavin Cook
* @author Jim Brunner
*/
@SuppressWarnings("serial")
public class ClientOptions implements Serializable {
Expand All @@ -43,6 +48,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final Set<ProtocolKeyword> DEFAULT_EXTRA_READONLY_COMMANDS = Collections.emptySet();

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand All @@ -69,6 +76,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final Set<ProtocolKeyword> extraReadOnlyCommands;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand All @@ -87,11 +96,13 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;


protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.extraReadOnlyCommands = builder.extraReadOnlyCommands;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -108,6 +119,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.extraReadOnlyCommands = original.getExtraReadOnlyCommands();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -160,6 +172,8 @@ public static class Builder {

private DisconnectedBehavior disconnectedBehavior = DEFAULT_DISCONNECTED_BEHAVIOR;

private Set<ProtocolKeyword> extraReadOnlyCommands = DEFAULT_EXTRA_READONLY_COMMANDS;

private boolean pingBeforeActivateConnection = DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION;

private ProtocolVersion protocolVersion;
Expand Down Expand Up @@ -257,6 +271,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Identifies extra commands (module commands) as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
*
* @param extraReadOnlyCommands must not be {@code null}.
* @return {@code this}
*/
public Builder extraReadOnlyCommands(Set<ProtocolKeyword> extraReadOnlyCommands) {

LettuceAssert.notNull(extraReadOnlyCommands, "extraReadOnlyCommands must not be null");
this.extraReadOnlyCommands = Collections.unmodifiableSet(new HashSet<>(extraReadOnlyCommands));
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -418,6 +446,7 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.extraReadOnlyCommands(getExtraReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).socketOptions(getSocketOptions()).sslOptions(getSslOptions())
Expand Down Expand Up @@ -486,6 +515,26 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

/**
* Extra commands (module commands) which are identified as read-only. Defaults to {@code emptySet}. See
* {@link #DEFAULT_EXTRA_READONLY_COMMANDS}.
*
* @return the set of extra read-only commands
*/
public Set<ProtocolKeyword> getExtraReadOnlyCommands() {
return extraReadOnlyCommands;
}

/**
* Check if a command is identified as an extra read-only command.
*
* @param command
* @return {@code true} if the command is an extra read-only command.
*/
public boolean isExtraReadOnlyCommand(ProtocolKeyword command) {
return extraReadOnlyCommands.contains(command);
}

/**
* Request queue size for a connection. This value applies per connection. The command invocation will throw a
* {@link RedisException} if the queue size is exceeded and a new command is requested. Defaults to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,15 @@
* Channel writer for cluster operation. This writer looks up the right partition by hash/slot for the operation.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 3.0
*/
class ClusterDistributionChannelWriter implements RedisChannelWriter {

private final RedisChannelWriter defaultWriter;

private final ClientOptions clientOptions;

private final ClusterEventListener clusterEventListener;

private final int executionLimit;
Expand All @@ -89,6 +92,7 @@ class ClusterDistributionChannelWriter implements RedisChannelWriter {
}

this.defaultWriter = defaultWriter;
this.clientOptions = clientOptions;
this.clusterEventListener = clusterEventListener;
}

Expand Down Expand Up @@ -336,7 +340,7 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) {
* @param commands {@link Collection} of {@link RedisCommand commands}.
* @return the connectionIntent.
*/
static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {
ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {

boolean w = false;
boolean r = false;
Expand Down Expand Up @@ -365,8 +369,9 @@ static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> co
return singleConnectionIntent;
}

private static ConnectionIntent getIntent(ProtocolKeyword type) {
return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
private ConnectionIntent getIntent(ProtocolKeyword type) {
return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type))
? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* a single {@link RedisURI}.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class AutodiscoveryConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -125,8 +126,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re

connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions());

StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, redisURI.getTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.internal.LettuceAssert;
Expand All @@ -33,21 +35,25 @@
* Channel writer/dispatcher that dispatches commands based on the ConnectionIntent to different connections.
*
* @author Mark Paluch
* @author Jim Brunner
*/
class MasterReplicaChannelWriter implements RedisChannelWriter {

private MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider;

private final ClientResources clientResources;

private final ClientOptions clientOptions;

private boolean closed = false;

private boolean inTransaction;

MasterReplicaChannelWriter(MasterReplicaConnectionProvider<?, ?> masterReplicaConnectionProvider,
ClientResources clientResources) {
ClientResources clientResources, ClientOptions clientOptions) {
this.masterReplicaConnectionProvider = masterReplicaConnectionProvider;
this.clientResources = clientResources;
this.clientOptions = clientOptions;
}

@Override
Expand Down Expand Up @@ -162,7 +168,7 @@ private static <K, V> void writeCommands(Collection<? extends RedisCommand<K, V,
* @param commands {@link Collection} of {@link RedisCommand commands}.
* @return the ConnectionIntent.
*/
static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {
ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> commands) {

boolean w = false;
boolean r = false;
Expand All @@ -187,8 +193,9 @@ static ConnectionIntent getIntent(Collection<? extends RedisCommand<?, ?, ?>> co
return singleIntent;
}

private static ConnectionIntent getIntent(ProtocolKeyword type) {
return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE;
private ConnectionIntent getIntent(ProtocolKeyword type) {
return (ReadOnlyCommands.isReadOnlyCommand(type) || clientOptions.isExtraReadOnlyCommand(type))
? ConnectionIntent.READ : ConnectionIntent.WRITE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* {@link MasterReplicaConnector} to connect a Sentinel-managed Master/Replica setup using a Sentinel {@link RedisURI}.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class SentinelConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -83,7 +84,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re
connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources()) {
redisClient.getResources(), redisClient.getOptions()) {

@Override
public CompletableFuture<Void> closeAsync() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* {@link RedisURI}. This connector determines roles and remains using only the provided endpoints.
*
* @author Mark Paluch
* @author Jim Brunner
* @since 5.1
*/
class StaticMasterReplicaConnector<K, V> implements MasterReplicaConnector<K, V> {
Expand Down Expand Up @@ -81,8 +82,7 @@ private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(Re

connectionProvider.setKnownNodes(nodes);

MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
redisClient.getResources());
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, redisClient.getResources(), redisClient.getOptions());

StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
channelWriter, codec, seedNode.getTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import io.lettuce.core.ClientOptions;
import io.lettuce.core.CommandListenerWriter;
import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.StatefulRedisConnectionImpl;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
Expand All @@ -60,6 +61,7 @@
*
* @author Mark Paluch
* @author koisyu
* @author Jim Brunner
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
Expand All @@ -74,6 +76,9 @@ class ClusterDistributionChannelWriterUnitTests {
@Mock
private ClientResources clientResources;

@Mock
private ClientOptions clientOptions;

@Mock
private ClusterEventListener clusterEventListener;

Expand Down Expand Up @@ -159,40 +164,48 @@ void shouldParseIPv6MovedTargetCorrectly() {
@Test
void shouldReturnIntentForWriteCommand() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> set = new Command<>(CommandType.SET, null);
RedisCommand<String, String, String> mset = new Command<>(CommandType.MSET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Arrays.asList(set, mset))).isEqualTo(ConnectionIntent.WRITE);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
}

@Test
void shouldReturnDefaultIntentForNoCommands() {

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE);
ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

assertThat(writer.getIntent(Collections.emptyList())).isEqualTo(ConnectionIntent.WRITE);
}

@Test
void shouldReturnIntentForReadCommand() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> get = new Command<>(CommandType.GET, null);
RedisCommand<String, String, String> mget = new Command<>(CommandType.MGET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ);
assertThat(writer.getIntent(Arrays.asList(get, mget))).isEqualTo(ConnectionIntent.READ);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ);
assertThat(writer.getIntent(Collections.singletonList(get))).isEqualTo(ConnectionIntent.READ);
}

@Test
void shouldReturnIntentForMixedCommands() {

ClusterDistributionChannelWriter writer = new ClusterDistributionChannelWriter(clusterDistributionChannelWriter, clientOptions, clusterEventListener);

RedisCommand<String, String, String> set = new Command<>(CommandType.SET, null);
RedisCommand<String, String, String> mget = new Command<>(CommandType.MGET, null);

assertThat(ClusterDistributionChannelWriter.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Arrays.asList(set, mget))).isEqualTo(ConnectionIntent.WRITE);

assertThat(ClusterDistributionChannelWriter.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
assertThat(writer.getIntent(Collections.singletonList(set))).isEqualTo(ConnectionIntent.WRITE);
}

@Test
Expand Down
Loading