Skip to content

Commit

Permalink
Log AUTH, SELECT, and READONLY failures on connection activation #1201
Browse files Browse the repository at this point in the history
Lettuce now logs failures of asynchronously fired commands during connection activation. Previously, failures during e.g. reconnect went unnoticed and could result in a subsequent NOAUTH Authentication required errors although the password was provided.
  • Loading branch information
mp911de committed Jan 13, 2020
1 parent ac7a431 commit b397c71
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 19 deletions.
31 changes: 25 additions & 6 deletions src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.lettuce.core.output.MultiOutput;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.*;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* A thread-safe connection to a Redis server. Multiple threads may share one {@link StatefulRedisConnectionImpl}
Expand Down Expand Up @@ -127,20 +128,38 @@ public void activated() {
super.activated();
// do not block in here, since the channel flow will be interrupted.
if (password != null) {
async.authAsync(password);
AsyncCommand<K, V, String> command = async.authAsync(password);
command.exceptionally(throwable -> {
return logOnFailure(throwable, "AUTH failed: " + command.getError());
});
}

if (db != 0) {
async.selectAsync(db);
if (db != 0)

{
AsyncCommand<K, V, String> command = async.selectAsync(db);
command.exceptionally(throwable -> {
return logOnFailure(throwable, "SELECT failed: " + command.getError());
});
}

if (clientName != null) {
setClientName(clientName);
}

if (readOnly) {
async.readOnly();
RedisFuture<String> command = async.readOnly();
command.exceptionally(throwable -> {
return logOnFailure(throwable, "READONLY failed: " + command.getError());
});
}
}

private String logOnFailure(Throwable throwable, String message) {
if (throwable instanceof RedisCommandExecutionException) {
InternalLoggerFactory.getInstance(getClass()).warn(message);
}
return "";
}

@Override
Expand Down Expand Up @@ -259,8 +278,8 @@ private <T> RedisCommand<K, V, T> attachOnComplete(RedisCommand<K, V, T> command
public void setClientName(String clientName) {

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName);
AsyncCommand<String, String, String> async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>(
StringCodec.UTF8), args));
AsyncCommand<String, String, String> async = new AsyncCommand<>(
new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args));
this.clientName = clientName;

dispatch((RedisCommand) async);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.*;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl}
Expand All @@ -54,8 +55,8 @@
* @author Mark Paluch
* @since 4.0
*/
public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandler<K, V> implements
StatefulRedisClusterConnection<K, V> {
public class StatefulRedisClusterConnectionImpl<K, V> extends RedisChannelHandler<K, V>
implements StatefulRedisClusterConnection<K, V> {

private Partitions partitions;

Expand Down Expand Up @@ -135,8 +136,8 @@ public StatefulRedisConnection<K, V> getConnection(String nodeId) {
throw new RedisException("NodeId " + nodeId + " does not belong to the cluster");
}

return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection(
ClusterConnectionProvider.Intent.WRITE, nodeId);
return getClusterDistributionChannelWriter().getClusterConnectionProvider()
.getConnection(ClusterConnectionProvider.Intent.WRITE, nodeId);
}

@Override
Expand All @@ -157,8 +158,8 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Strin
@Override
public StatefulRedisConnection<K, V> getConnection(String host, int port) {

return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection(
ClusterConnectionProvider.Intent.WRITE, host, port);
return getClusterDistributionChannelWriter().getClusterConnectionProvider()
.getConnection(ClusterConnectionProvider.Intent.WRITE, host, port);
}

@Override
Expand All @@ -180,23 +181,36 @@ public void activated() {
super.activated();
// do not block in here, since the channel flow will be interrupted.
if (password != null) {
async.authAsync(password);
AsyncCommand<K, V, String> command = async.authAsync(password);
command.exceptionally(throwable -> {
return logOnFailure(throwable, "AUTH failed: " + command.getError());
});
}

if (clientName != null) {
setClientName(clientName);
}

if (readOnly) {
async.readOnly();
RedisFuture<String> command = async.readOnly();
command.exceptionally(throwable -> {
return logOnFailure(throwable, "READONLY failed: " + command.getError());
});
}
}

private String logOnFailure(Throwable throwable, String message) {
if (throwable instanceof RedisCommandExecutionException) {
InternalLoggerFactory.getInstance(getClass()).warn(message);
}
return "";
}

void setClientName(String clientName) {

CommandArgs<String, String> args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName);
AsyncCommand<String, String, String> async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>(
StringCodec.UTF8), args));
AsyncCommand<String, String, String> async = new AsyncCommand<>(
new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args));
this.clientName = clientName;

dispatch((RedisCommand) async);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.List;

import io.lettuce.core.RedisChannelWriter;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StatefulRedisConnectionImpl;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.protocol.ConnectionWatchdog;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* An thread-safe pub/sub connection to a Redis server. Multiple threads may share one {@link StatefulRedisPubSubConnectionImpl}
Expand All @@ -40,8 +42,8 @@
* @param <V> Value type.
* @author Mark Paluch
*/
public class StatefulRedisPubSubConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V> implements
StatefulRedisPubSubConnection<K, V> {
public class StatefulRedisPubSubConnectionImpl<K, V> extends StatefulRedisConnectionImpl<K, V>
implements StatefulRedisPubSubConnection<K, V> {

private final PubSubEndpoint<K, V> endpoint;

Expand Down Expand Up @@ -141,6 +143,13 @@ private <T> T[] toArray(Collection<T> c) {
@Override
public void activated() {
super.activated();
resubscribe();
for (RedisFuture<Void> command : resubscribe()) {
command.exceptionally(throwable -> {
if (throwable instanceof RedisCommandExecutionException) {
InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + command.getError());
}
return null;
});
}
}
}

0 comments on commit b397c71

Please sign in to comment.