From 7f455ec0a69e0796d2bfcffed92103e634ac1fd0 Mon Sep 17 00:00:00 2001 From: ggivo Date: Thu, 31 Oct 2024 16:24:44 +0200 Subject: [PATCH] io.lettuce.core.RedisCommandExecutionException: NOAUTH Authentication required on CLIENT and READONLY command (#3035) Using custom credentials provider can delay providing of credentials. In this case applyPostHandshake and applyConnectionMetadata got executed before handshake and lead to NOAUTH error in the log for CLIENT command. --- pom.xml | 14 ++++- .../java/io/lettuce/core/RedisHandshake.java | 32 ++++++----- .../lettuce/core/RedisHandshakeUnitTests.java | 55 +++++++++++++++++++ 3 files changed, 85 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index da7a50f5c5..ddfc4877b2 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,7 @@ + 4.2.2 3.25.3 2.0.SP1 5.13.11 @@ -103,7 +104,12 @@ - + + org.awaitility + awaitility + ${awaitility.version} + test + io.netty netty-bom @@ -327,6 +333,12 @@ test + + org.awaitility + awaitility + test + + org.hdrhistogram HdrHistogram diff --git a/src/main/java/io/lettuce/core/RedisHandshake.java b/src/main/java/io/lettuce/core/RedisHandshake.java index 21fe829bb4..80dc62096c 100644 --- a/src/main/java/io/lettuce/core/RedisHandshake.java +++ b/src/main/java/io/lettuce/core/RedisHandshake.java @@ -101,21 +101,14 @@ public CompletionStage initialize(Channel channel) { new RedisConnectionException("Protocol version" + this.requestedProtocolVersion + " not supported")); } - // post-handshake commands, whose execution failures would cause the connection to be considered - // unsuccessfully established - CompletableFuture postHandshake = applyPostHandshake(channel); - - // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different - // implementations or versions of the server runtime, and whose execution result (whether a success or a - // failure ) should not alter the outcome of the connection attempt - CompletableFuture connectionMetadata = applyConnectionMetadata(channel).handle((result, error) -> { - if (error != null) { - LOG.debug("Error applying connection metadata", error); - } - return null; - }); - - return handshake.thenCompose(ignore -> postHandshake).thenCompose(ignore -> connectionMetadata); + return handshake + // post-handshake commands, whose execution failures would cause the connection to be considered + // unsuccessfully established + .thenCompose(ignore -> applyPostHandshake(channel)) + // post-handshake commands, executed in a 'fire and forget' manner, to avoid having to react to different + // implementations or versions of the server runtime, and whose execution result (whether a success or a + // failure ) should not alter the outcome of the connection attempt + .thenCompose(ignore -> applyConnectionMetadataSafely(channel)); } private CompletionStage tryHandshakeResp3(Channel channel) { @@ -271,6 +264,15 @@ private CompletableFuture applyPostHandshake(Channel channel) { return dispatch(channel, postHandshake); } + private CompletionStage applyConnectionMetadataSafely(Channel channel) { + return applyConnectionMetadata(channel).handle((result, error) -> { + if (error != null) { + LOG.debug("Error applying connection metadata", error); + } + return null; + }); + } + private CompletableFuture applyConnectionMetadata(Channel channel) { List> postHandshake = new ArrayList<>(); diff --git a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java index d4b968c4f3..b01fd810fa 100644 --- a/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java +++ b/src/test/java/io/lettuce/core/RedisHandshakeUnitTests.java @@ -1,6 +1,7 @@ package io.lettuce.core; import static io.lettuce.TestTags.UNIT_TEST; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.*; import java.nio.ByteBuffer; @@ -8,6 +9,7 @@ import java.util.Map; import java.util.concurrent.CompletionStage; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -15,6 +17,8 @@ import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.ProtocolVersion; import io.netty.channel.embedded.EmbeddedChannel; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; /** * Unit tests for {@link RedisHandshake}. @@ -106,6 +110,42 @@ void handshakeFireAndForgetPostHandshake() { assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); } + @Test + void handshakeDelayedCredentialProvider() { + + DelayedRedisCredentialsProvider cp = new DelayedRedisCredentialsProvider(); + // RedisCredentialsProvider cp = () -> Mono.just(RedisCredentials.just("foo", + // "bar")).delayElement(Duration.ofMillis(3)); + EmbeddedChannel channel = new EmbeddedChannel(true, false); + + ConnectionMetadata connectionMetdata = new ConnectionMetadata(); + connectionMetdata.setLibraryName("library-name"); + connectionMetdata.setLibraryVersion("library-version"); + + ConnectionState state = new ConnectionState(); + state.setCredentialsProvider(cp); + state.apply(connectionMetdata); + RedisHandshake handshake = new RedisHandshake(null, false, state); + CompletionStage handshakeInit = handshake.initialize(channel); + cp.completeCredentials(RedisCredentials.just("foo", "bar")); + + Awaitility.await().atMost(50, MILLISECONDS) // Wait up to 5 seconds + .pollInterval(5, MILLISECONDS) // Poll every 50 milliseconds + .until(() -> !channel.outboundMessages().isEmpty()); + + AsyncCommand> hello = channel.readOutbound(); + helloResponse(hello.getOutput()); + hello.complete(); + + List>> postHandshake = channel.readOutbound(); + postHandshake.get(0).getOutput().setError(ERR_UNKNOWN_COMMAND); + postHandshake.get(0).completeExceptionally(new RedisException(ERR_UNKNOWN_COMMAND)); + postHandshake.get(0).complete(); + + assertThat(postHandshake.size()).isEqualTo(2); + assertThat(handshakeInit.toCompletableFuture().isCompletedExceptionally()).isFalse(); + } + @Test void shouldParseVersionWithCharacters() { @@ -136,4 +176,19 @@ private static void helloResponse(CommandOutput credentialsSink = Sinks.one(); + + @Override + public Mono resolveCredentials() { + return credentialsSink.asMono(); + } + + public void completeCredentials(RedisCredentials credentials) { + credentialsSink.tryEmitValue(credentials); + } + + } + }