diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
index 96abaff08..b4e29f8c2 100644
--- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
+++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
@@ -19,125 +19,34 @@
*/
package io.lettuce.core;
-import io.lettuce.core.codec.StringCodec;
-import io.lettuce.core.protocol.AsyncCommand;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.protocol.ProtocolVersion;
-import io.lettuce.core.protocol.RedisCommand;
-import io.lettuce.core.pubsub.StatefulRedisPubSubConnectionImpl;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
-import reactor.core.Disposable;
-import reactor.core.publisher.Flux;
-
-import java.nio.CharBuffer;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Handles reauthentication of a connection each time a new authentication token is provided by
* `RenewableRedisCredentialsProvider`.
- *
+ *
* This class is part of the internal API.
*
* @author Ivo Gaydajiev
*/
-class RedisAuthenticationHandler {
-
- private static final InternalLogger log = InternalLoggerFactory.getInstance(RedisAuthenticationHandler.class);
-
- private final RedisCommandBuilder commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8);
+class RedisAuthenticationHandler extends BaseRedisAuthenticationHandler> {
- private final StatefulRedisConnectionImpl, ?> connection;
-
- private final AtomicReference credentialsSubscription = new AtomicReference<>();
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);
public RedisAuthenticationHandler(StatefulRedisConnectionImpl, ?> connection) {
- this.connection = connection;
- }
-
- /**
- * Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials.
- *
- * Each time new credentials are received, the client is reauthenticated.
- *
- * @param credentialsProvider the credentials provider to subscribe to
- */
- public void subscribe(RedisCredentialsProvider credentialsProvider) {
- if (credentialsProvider instanceof RenewableRedisCredentialsProvider) {
- if (!isSupportedConnection()) {
- return;
- }
-
- Flux credentialsFlux = ((RenewableRedisCredentialsProvider) credentialsProvider)
- .credentialsStream();
-
- Disposable subscription = credentialsFlux.subscribe(this::onNext, this::onError, this::complete);
-
- Disposable oldSubscription = credentialsSubscription.getAndSet(subscription);
- if (oldSubscription != null && !oldSubscription.isDisposed()) {
- oldSubscription.dispose();
- }
- }
- }
-
- /**
- * Unsubscribes from the current credentials stream.
- */
- public void unsubscribe() {
- Disposable subscription = credentialsSubscription.getAndSet(null);
- if (subscription != null && !subscription.isDisposed()) {
- subscription.dispose();
- }
- }
-
- private void complete() {
- log.debug("Credentials stream completed");
- }
-
- public void onNext(RedisCredentials credentials) {
- reauthenticate(credentials);
- }
-
- public void onError(Throwable e) {
- log.error("Credentials renew failed.", e);
- }
-
- /**
- * Performs re-authentication with the provided credentials.
- *
- * @param credentials the new credentials
- */
- private void reauthenticate(RedisCredentials credentials) {
- CharSequence password = CharBuffer.wrap(credentials.getPassword());
- if (credentials.hasUsername()) {
- dispatchAuth(new AsyncCommand<>(commandBuilder.auth(credentials.getUsername(), password)))
- .exceptionally(throwable -> {
- log.error("Re-authentication with username failed.", throwable);
- return null;
- });
- } else {
- dispatchAuth(new AsyncCommand<>(commandBuilder.auth(password))).exceptionally(throwable -> {
- log.error("Re-authentication without username failed.", throwable);
- return null;
- });
- }
+ super(connection);
}
protected boolean isSupportedConnection() {
- if (connection instanceof StatefulRedisPubSubConnectionImpl
+ if (connection instanceof StatefulRedisClusterPubSubConnection
&& ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
- log.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
+ logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
return false;
}
return true;
}
- public AsyncCommand dispatchAuth(RedisCommand cmd) {
- AsyncCommand asyncCommand = new AsyncCommand<>(cmd);
- RedisCommand dispatched = connection.getChannelWriter().write(asyncCommand);
- if (dispatched instanceof AsyncCommand) {
- return (AsyncCommand) dispatched;
- }
- return asyncCommand;
- }
-
}
diff --git a/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java b/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java
deleted file mode 100644
index 61340ea9c..000000000
--- a/src/main/java/io/lettuce/core/RenewableRedisCredentialsProvider.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package io.lettuce.core;
-
-import reactor.core.publisher.Flux;
-
-public interface RenewableRedisCredentialsProvider extends RedisCredentialsProvider {
-
- Flux credentialsStream();
-
-}
diff --git a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
new file mode 100644
index 000000000..08ab89850
--- /dev/null
+++ b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
@@ -0,0 +1,15 @@
+package io.lettuce.core;
+
+import reactor.core.publisher.Flux;
+
+public interface StreamingCredentialsProvider extends RedisCredentialsProvider {
+
+ /**
+ * Returns a {@link Flux} emitting {@link RedisCredentials} that can be used to authorize a Redis connection. This
+ * credential provider supports streaming credentials, meaning that it can emit multiple credentials over time.
+ *
+ * @return
+ */
+ Flux credentials();
+
+}
diff --git a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java b/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java
index 875200e74..54d857b37 100644
--- a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java
+++ b/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java
@@ -1,7 +1,5 @@
package io.lettuce.core;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -11,7 +9,7 @@
import redis.clients.authentication.core.TokenListener;
import redis.clients.authentication.core.TokenManager;
-public class TokenBasedRedisCredentialsProvider implements RenewableRedisCredentialsProvider {
+public class TokenBasedRedisCredentialsProvider implements StreamingCredentialsProvider {
private final TokenManager tokenManager;
@@ -66,7 +64,7 @@ public Mono resolveCredentials() {
* Expose the Flux for all credential updates.
*/
@Override
- public Flux credentialsStream() {
+ public Flux credentials() {
return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials
}
diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java b/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java
new file mode 100644
index 000000000..29584c404
--- /dev/null
+++ b/src/main/java/io/lettuce/core/cluster/RedisClusterAuthenticationHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019-Present, Redis Ltd. and Contributors
+ * All rights reserved.
+ *
+ * Licensed under the MIT License.
+ *
+ * This file contains contributions from third-party contributors
+ * licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.lettuce.core.cluster;
+
+import io.lettuce.core.*;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
+
+import io.lettuce.core.protocol.*;
+import io.netty.util.internal.logging.InternalLogger;
+import io.netty.util.internal.logging.InternalLoggerFactory;
+
+class RedisClusterAuthenticationHandler extends BaseRedisAuthenticationHandler> {
+
+ private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);
+
+ public RedisClusterAuthenticationHandler(StatefulRedisClusterConnectionImpl, ?> connection) {
+ super(connection);
+ }
+
+ protected boolean isSupportedConnection() {
+ if (connection instanceof StatefulRedisClusterPubSubConnection
+ && ProtocolVersion.RESP2 == connection.getConnectionState().getNegotiatedProtocolVersion()) {
+ logger.warn("Renewable credentials are not supported with RESP2 protocol on a pub/sub connection.");
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java
index df30f4888..b7851a561 100644
--- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java
+++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java
@@ -19,24 +19,13 @@
*/
package io.lettuce.core.cluster;
-import static io.lettuce.core.protocol.CommandType.*;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionState;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisChannelHandler;
import io.lettuce.core.RedisChannelWriter;
+import io.lettuce.core.RedisCredentialsProvider;
import io.lettuce.core.RedisException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
@@ -60,9 +49,23 @@
import io.lettuce.core.protocol.RedisCommand;
import reactor.core.publisher.Mono;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static io.lettuce.core.protocol.CommandType.AUTH;
+import static io.lettuce.core.protocol.CommandType.READONLY;
+import static io.lettuce.core.protocol.CommandType.READWRITE;
+
/**
* A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl}
- *
+ *
* A {@link ConnectionWatchdog} monitors each connection and reconnects automatically until {@link #close} is called. All
* pending commands will be (re)sent after successful reconnection.
*
@@ -88,6 +91,8 @@ public class StatefulRedisClusterConnectionImpl extends RedisChannelHandle
private volatile Partitions partitions;
+ private final RedisClusterAuthenticationHandler authHandler;
+
/**
* Initialize a new connection.
*
@@ -107,6 +112,8 @@ public StatefulRedisClusterConnectionImpl(RedisChannelWriter writer, ClusterPush
this.async = newRedisAdvancedClusterAsyncCommandsImpl();
this.sync = newRedisAdvancedClusterCommandsImpl();
this.reactive = newRedisAdvancedClusterReactiveCommandsImpl();
+
+ this.authHandler = new RedisClusterAuthenticationHandler(this);
}
protected RedisAdvancedClusterReactiveCommandsImpl newRedisAdvancedClusterReactiveCommandsImpl() {
@@ -187,8 +194,7 @@ public CompletableFuture> getConnectionAsync(Strin
throw new RedisException("NodeId " + nodeId + " does not belong to the cluster");
}
- AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter()
- .getClusterConnectionProvider();
+ AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider();
return provider.getConnectionAsync(connectionIntent, nodeId);
}
@@ -203,8 +209,7 @@ public StatefulRedisConnection getConnection(String host, int port, Connec
public CompletableFuture> getConnectionAsync(String host, int port,
ConnectionIntent connectionIntent) {
- AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter()
- .getClusterConnectionProvider();
+ AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider();
return provider.getConnectionAsync(connectionIntent, host, port);
}
@@ -213,6 +218,17 @@ public CompletableFuture> getConnectionAsync(Strin
public void activated() {
super.activated();
async.clusterMyId().thenAccept(connectionState::setNodeId);
+ RedisCredentialsProvider credentialsProvider = connectionState.getCredentialsProvider();
+ if (credentialsProvider != null && authHandler != null) {
+ authHandler.subscribe(credentialsProvider);
+ }
+ }
+
+ @Override
+ public void deactivated() {
+ if (authHandler != null) {
+ authHandler.unsubscribe();
+ }
}
ClusterDistributionChannelWriter getClusterDistributionChannelWriter() {
@@ -249,8 +265,8 @@ private RedisCommand preProcessCommand(RedisCommand comman
} else {
List stringArgs = CommandArgsAccessor.getStringArguments(command.getArgs());
- this.connectionState
- .setUserNamePassword(stringArgs.stream().map(String::toCharArray).collect(Collectors.toList()));
+ this.connectionState.setUserNamePassword(
+ stringArgs.stream().map(String::toCharArray).collect(Collectors.toList()));
}
}
});
diff --git a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java
index e336a6d9d..f36085edf 100644
--- a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java
+++ b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java
@@ -87,8 +87,8 @@ public void shouldWaitForAndReturnTokenWhenEmittedLater() {
@Test
public void shouldCompleteAllSubscribersOnStop() {
- Flux credentialsFlux1 = credentialsProvider.credentialsStream();
- Flux credentialsFlux2 = credentialsProvider.credentialsStream();
+ Flux credentialsFlux1 = credentialsProvider.credentials();
+ Flux credentialsFlux2 = credentialsProvider.credentials();
Disposable subscription1 = credentialsFlux1.subscribe();
Disposable subscription2 = credentialsFlux2.subscribe();
@@ -116,7 +116,7 @@ public void shouldCompleteAllSubscribersOnStop() {
@Test
public void shouldPropagateMultipleTokensOnStream() {
- Flux result = credentialsProvider.credentialsStream();
+ Flux result = credentialsProvider.credentials();
StepVerifier.create(result).then(() -> tokenManager.emitToken(testToken("test-user", "token1")))
.then(() -> tokenManager.emitToken(testToken("test-user", "token2")))
.assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token1"))
@@ -129,7 +129,7 @@ public void shouldHandleTokenRequestErrorGracefully() {
Exception simulatedError = new RuntimeException("Token request failed");
tokenManager.emitError(simulatedError);
- Flux result = credentialsProvider.credentialsStream();
+ Flux result = credentialsProvider.credentials();
StepVerifier.create(result).expectErrorMatches(
throwable -> throwable instanceof RuntimeException && "Token request failed".equals(throwable.getMessage()))