From b397c71d001acc4f175fc54c60b78f3261729d04 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Mon, 13 Jan 2020 09:42:11 +0100 Subject: [PATCH] Log AUTH, SELECT, and READONLY failures on connection activation #1201 Lettuce now logs failures of asynchronously fired commands during connection activation. Previously, failures during e.g. reconnect went unnoticed and could result in a subsequent NOAUTH Authentication required errors although the password was provided. --- .../core/StatefulRedisConnectionImpl.java | 31 +++++++++++++---- .../StatefulRedisClusterConnectionImpl.java | 34 +++++++++++++------ .../StatefulRedisPubSubConnectionImpl.java | 15 ++++++-- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index 15514285ae..cd114c177d 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -33,6 +33,7 @@ import io.lettuce.core.output.MultiOutput; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.*; +import io.netty.util.internal.logging.InternalLoggerFactory; /** * A thread-safe connection to a Redis server. Multiple threads may share one {@link StatefulRedisConnectionImpl} @@ -127,11 +128,19 @@ public void activated() { super.activated(); // do not block in here, since the channel flow will be interrupted. if (password != null) { - async.authAsync(password); + AsyncCommand command = async.authAsync(password); + command.exceptionally(throwable -> { + return logOnFailure(throwable, "AUTH failed: " + command.getError()); + }); } - if (db != 0) { - async.selectAsync(db); + if (db != 0) + + { + AsyncCommand command = async.selectAsync(db); + command.exceptionally(throwable -> { + return logOnFailure(throwable, "SELECT failed: " + command.getError()); + }); } if (clientName != null) { @@ -139,8 +148,18 @@ public void activated() { } if (readOnly) { - async.readOnly(); + RedisFuture command = async.readOnly(); + command.exceptionally(throwable -> { + return logOnFailure(throwable, "READONLY failed: " + command.getError()); + }); + } + } + + private String logOnFailure(Throwable throwable, String message) { + if (throwable instanceof RedisCommandExecutionException) { + InternalLoggerFactory.getInstance(getClass()).warn(message); } + return ""; } @Override @@ -259,8 +278,8 @@ private RedisCommand attachOnComplete(RedisCommand command public void setClientName(String clientName) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName); - AsyncCommand async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>( - StringCodec.UTF8), args)); + AsyncCommand async = new AsyncCommand<>( + new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args)); this.clientName = clientName; dispatch((RedisCommand) async); diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index 65000d5962..6255560931 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -44,6 +44,7 @@ import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.*; +import io.netty.util.internal.logging.InternalLoggerFactory; /** * A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl} @@ -54,8 +55,8 @@ * @author Mark Paluch * @since 4.0 */ -public class StatefulRedisClusterConnectionImpl extends RedisChannelHandler implements - StatefulRedisClusterConnection { +public class StatefulRedisClusterConnectionImpl extends RedisChannelHandler + implements StatefulRedisClusterConnection { private Partitions partitions; @@ -135,8 +136,8 @@ public StatefulRedisConnection getConnection(String nodeId) { throw new RedisException("NodeId " + nodeId + " does not belong to the cluster"); } - return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection( - ClusterConnectionProvider.Intent.WRITE, nodeId); + return getClusterDistributionChannelWriter().getClusterConnectionProvider() + .getConnection(ClusterConnectionProvider.Intent.WRITE, nodeId); } @Override @@ -157,8 +158,8 @@ public CompletableFuture> getConnectionAsync(Strin @Override public StatefulRedisConnection getConnection(String host, int port) { - return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection( - ClusterConnectionProvider.Intent.WRITE, host, port); + return getClusterDistributionChannelWriter().getClusterConnectionProvider() + .getConnection(ClusterConnectionProvider.Intent.WRITE, host, port); } @Override @@ -180,7 +181,10 @@ public void activated() { super.activated(); // do not block in here, since the channel flow will be interrupted. if (password != null) { - async.authAsync(password); + AsyncCommand command = async.authAsync(password); + command.exceptionally(throwable -> { + return logOnFailure(throwable, "AUTH failed: " + command.getError()); + }); } if (clientName != null) { @@ -188,15 +192,25 @@ public void activated() { } if (readOnly) { - async.readOnly(); + RedisFuture command = async.readOnly(); + command.exceptionally(throwable -> { + return logOnFailure(throwable, "READONLY failed: " + command.getError()); + }); + } + } + + private String logOnFailure(Throwable throwable, String message) { + if (throwable instanceof RedisCommandExecutionException) { + InternalLoggerFactory.getInstance(getClass()).warn(message); } + return ""; } void setClientName(String clientName) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName); - AsyncCommand async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>( - StringCodec.UTF8), args)); + AsyncCommand async = new AsyncCommand<>( + new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args)); this.clientName = clientName; dispatch((RedisCommand) async); diff --git a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java index a68dab1e40..684e97f3fd 100644 --- a/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/pubsub/StatefulRedisPubSubConnectionImpl.java @@ -22,6 +22,7 @@ import java.util.List; import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisCommandExecutionException; import io.lettuce.core.RedisFuture; import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.codec.RedisCodec; @@ -29,6 +30,7 @@ import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands; import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands; +import io.netty.util.internal.logging.InternalLoggerFactory; /** * An thread-safe pub/sub connection to a Redis server. Multiple threads may share one {@link StatefulRedisPubSubConnectionImpl} @@ -40,8 +42,8 @@ * @param Value type. * @author Mark Paluch */ -public class StatefulRedisPubSubConnectionImpl extends StatefulRedisConnectionImpl implements - StatefulRedisPubSubConnection { +public class StatefulRedisPubSubConnectionImpl extends StatefulRedisConnectionImpl + implements StatefulRedisPubSubConnection { private final PubSubEndpoint endpoint; @@ -141,6 +143,13 @@ private T[] toArray(Collection c) { @Override public void activated() { super.activated(); - resubscribe(); + for (RedisFuture command : resubscribe()) { + command.exceptionally(throwable -> { + if (throwable instanceof RedisCommandExecutionException) { + InternalLoggerFactory.getInstance(getClass()).warn("Re-subscribe failed: " + command.getError()); + } + return null; + }); + } } }