From 779edca5c0b1a221f465945d927f35b253be9b1f Mon Sep 17 00:00:00 2001
From: ggivo <ivo.gaydazhiev@redis.com>
Date: Sun, 8 Dec 2024 14:03:23 +0200
Subject: [PATCH] Client setting for enabling reauthentication   - Moved
 Authentication handler to DefaultEndpoint   - updated since 6.6.0

---
 .../java/io/lettuce/core/ClientOptions.java   | 27 +++++++++--
 .../io/lettuce/core/ConnectionBuilder.java    | 11 +++++
 .../core/RedisAuthenticationHandler.java      | 33 +++++--------
 .../java/io/lettuce/core/RedisClient.java     | 21 ++++-----
 .../core/StatefulRedisConnectionImpl.java     | 18 -------
 .../core/StreamingCredentialsProvider.java    |  2 +-
 .../core/cluster/RedisClusterClient.java      | 47 +++++++++----------
 .../event/connection/AuthenticateEvent.java   |  2 +-
 .../core/event/connection/JfrReauthEvent.java |  2 +-
 .../connection/JfrReauthFailedEvent.java      |  2 +-
 .../event/connection/ReauthenticateEvent.java |  2 +-
 .../connection/ReauthenticateFailedEvent.java |  2 +-
 .../core/protocol/DefaultEndpoint.java        | 14 ++++++
 .../io/lettuce/core/protocol/Endpoint.java    |  8 ++++
 .../core/AuthenticationIntegrationTests.java  |  2 +-
 .../MyStreamingRedisCredentialsProvider.java  |  2 +-
 ...gCredentialsProviderlIntegrationTests.java |  4 +-
 ...ectionEventsTriggeredIntegrationTests.java |  2 +-
 18 files changed, 109 insertions(+), 92 deletions(-)

diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java
index eca9a37898..f8327a4c94 100644
--- a/src/main/java/io/lettuce/core/ClientOptions.java
+++ b/src/main/java/io/lettuce/core/ClientOptions.java
@@ -704,18 +704,37 @@ public TimeoutOptions getTimeoutOptions() {
         return timeoutOptions;
     }
 
+    /**
+     * Defines the re-authentication behavior of the Redis client in relation to the {@link CredentialsProvider}.
+     */s
     public enum ReauthenticateBehavior {
 
         /**
-         * This is the default behavior. The driver whenever needed will pull current credentials from the underlying
-         * CredentialsProvider.
+         * This is the default behavior. The client will fetch current credentials from the underlying
+         * {@link RedisCredentialsProvider} only when required.
+         *
+         * <p>No re-authentication is performed automatically when new credentials are emitted by the
+         * {@link StreamingCredentialsProvider} .</p>
+         *
+         * <p>This behavior is suitable for use cases with static credentials or where explicit reconnection
+         * is required to change credentials.</p>
          */
         DEFAULT,
 
         /**
-         * CredentialsProvider might initiate re-authentication on its own.
+         * Automatically triggers re-authentication whenever new credentials are emitted by the
+         * {@link StreamingCredentialsProvider} or any other credentials manager.
+         *
+         * <p>When enabled, the client subscribes to the credentials stream provided by the
+         * {@link StreamingCredentialsProvider} (or other compatible sources) and issues an {@code AUTH}
+         * command to the Redis server each time new credentials are received. This behavior supports
+         * dynamic credential scenarios, such as token-based authentication, or credential rotation where credentials
+         * are refreshed periodically to maintain access.</p>
+         *
+         * <p>Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted
+         * commands, as the client performs re-authentication independently of user command flow.</p>
          */
-        REAUTHENTICATE_ON_CREDENTIALS_CHANGE
+        ON_NEW_CREDENTIALS
     }
 
     /**
diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java
index 4bb3127b88..444c13f907 100644
--- a/src/main/java/io/lettuce/core/ConnectionBuilder.java
+++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java
@@ -113,6 +113,17 @@ public void apply(RedisURI redisURI) {
         bootstrap.attr(REDIS_URI, redisURI.toString());
     }
 
+    public void registerAuthenticationHandler(RedisCredentialsProvider credentialsProvider, ConnectionState state,
+            Boolean isPubSubConnection) {
+        LettuceAssert.assertState(endpoint != null, "Endpoint must be set");
+        LettuceAssert.assertState(connection != null, "Connection must be set");
+        LettuceAssert.assertState(clientResources != null, "ClientResources must be set");
+
+        RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection.getChannelWriter(),
+                credentialsProvider, state, clientResources.eventBus(), isPubSubConnection);
+        endpoint.registerAuthenticationHandler(authenticationHandler);
+    }
+
     protected List<ChannelHandler> buildHandlers() {
 
         LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
index ca7d5e56d2..e8cb1d5a02 100644
--- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
+++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
@@ -40,7 +40,7 @@
  * Redis authentication handler. Internally used to authenticate a Redis connection. This class is part of the internal API.
  *
  * @author Ivo Gaydazhiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public class RedisAuthenticationHandler {
 
@@ -70,13 +70,10 @@ public RedisAuthenticationHandler(RedisChannelWriter writer, RedisCredentialsPro
     }
 
     /**
-     * Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials.
+     * This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`.
      * <p>
-     * This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. Each time new
-     * credentials are received, the client is reauthenticated. If the connection is not supported, the method returns without
-     * subscribing.
-     * <p>
-     * The previous subscription, if any, is disposed of before setting the new subscription.
+     * Each time new credentials are received, the client is re-authenticated. The previous subscription, if any, is disposed of
+     * before setting the new subscription.
      */
     public void subscribe() {
         if (credentialsProvider == null) {
@@ -179,23 +176,17 @@ private String getEpid() {
         return "unknown";
     }
 
-    public static boolean isSupported(ClientOptions clientOptions, RedisCredentialsProvider credentialsProvider) {
+    public static boolean isSupported(ClientOptions clientOptions) {
         LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
+        switch (clientOptions.getReauthenticateBehaviour()) {
+            case ON_NEW_CREDENTIALS:
+                return true;
 
-        if (credentialsProvider instanceof StreamingCredentialsProvider) {
-
-            switch (clientOptions.getReauthenticateBehaviour()) {
-                case REAUTHENTICATE_ON_CREDENTIALS_CHANGE:
-                    return true;
-
-                case DEFAULT:
-                    return false;
+            case DEFAULT:
+                return false;
 
-                default:
-                    return false;
-            }
-        } else {
-            return false;
+            default:
+                return false;
         }
     }
 
diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java
index b3eecd817f..26801d949d 100644
--- a/src/main/java/io/lettuce/core/RedisClient.java
+++ b/src/main/java/io/lettuce/core/RedisClient.java
@@ -288,13 +288,8 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
 
         StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);
 
-        if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) {
-            connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(),
-                    connection.getConnectionState(), getResources().eventBus(), false));
-        }
-
         ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
-                () -> new CommandHandler(getOptions(), getResources(), endpoint));
+                () -> new CommandHandler(getOptions(), getResources(), endpoint), false);
 
         future.whenComplete((channelHandler, throwable) -> {
 
@@ -308,7 +303,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone
 
     @SuppressWarnings("unchecked")
     private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,
-            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
+            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {
 
         ConnectionBuilder connectionBuilder;
         if (redisURI.isSsl()) {
@@ -331,6 +326,11 @@ private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnecti
         connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);
         connectionBuilder.connectionInitializer(createHandshake(state));
 
+        if (RedisAuthenticationHandler.isSupported(getOptions())) {
+            connectionBuilder.registerAuthenticationHandler(redisURI.getCredentialsProvider(), connection.getConnectionState(),
+                    isPubSub);
+        }
+
         ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
 
         return future.thenApply(channelHandler -> (S) connection);
@@ -425,13 +425,8 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS
 
         StatefulRedisPubSubConnectionImpl<K, V> connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);
 
-        if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) {
-            connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(),
-                    connection.getConnectionState(), getResources().eventBus(), true));
-        }
-
         ConnectionFuture<StatefulRedisPubSubConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
-                () -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint));
+                () -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint), true);
 
         return future.whenComplete((conn, throwable) -> {
 
diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
index b213936eb0..3304cfdd41 100644
--- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
+++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
@@ -68,8 +68,6 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>
 
     private final PushHandler pushHandler;
 
-    private final AtomicReference<RedisAuthenticationHandler> authenticationHandler = new AtomicReference<>();
-
     private final Mono<JsonParser> parser;
 
     protected MultiOutput<K, V> multi;
@@ -321,27 +319,11 @@ public ConnectionState getConnectionState() {
     @Override
     public void activated() {
         super.activated();
-        RedisAuthenticationHandler currentHandler = authenticationHandler.get();
-        if (currentHandler != null) {
-            currentHandler.subscribe();
-        }
     }
 
     @Override
     public void deactivated() {
-        RedisAuthenticationHandler currentHandler = authenticationHandler.get();
-        if (currentHandler != null) {
-            currentHandler.unsubscribe();
-        }
         super.deactivated();
     }
 
-    public void setAuthenticationHandler(RedisAuthenticationHandler authenticationHandler) {
-        RedisAuthenticationHandler currentHandler = this.authenticationHandler.getAndSet(authenticationHandler);
-
-        if (currentHandler != null) {
-            currentHandler.unsubscribe();
-        }
-    }
-
 }
diff --git a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
index dba9302c74..999d356837 100644
--- a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
+++ b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java
@@ -7,7 +7,7 @@
  * new credentials are received.
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public interface StreamingCredentialsProvider extends RedisCredentialsProvider {
 
diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
index c3411bb409..f384cbda07 100644
--- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
+++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
@@ -556,14 +556,9 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
         StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec,
                 getFirstUri().getTimeout(), getClusterClientOptions().getJsonParser());
 
-        if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) {
-            connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(),
-                    connection.getConnectionState(), getResources().eventBus(), false));
-        }
-
         ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectStatefulAsync(connection, endpoint,
                 getFirstUri(), socketAddressSupplier,
-                () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint));
+                () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint), false);
 
         return connectionFuture.whenComplete((conn, throwable) -> {
             if (throwable != null) {
@@ -626,14 +621,9 @@ <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNode
         StatefulRedisPubSubConnectionImpl<K, V> connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec,
                 getFirstUri().getTimeout());
 
-        if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) {
-            connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(),
-                    connection.getConnectionState(), getResources().eventBus(), false));
-        }
-
         ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectionFuture = connectStatefulAsync(connection, endpoint,
                 getFirstUri(), socketAddressSupplier,
-                () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint));
+                () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint), true);
         return connectionFuture.whenComplete((conn, throwable) -> {
             if (throwable != null) {
                 connection.closeAsync();
@@ -689,11 +679,11 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
         Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
                 TopologyComparators::sortByClientCount);
         Mono<StatefulRedisClusterConnectionImpl<K, V>> connectionMono = Mono
-                .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
+                .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false));
 
         for (int i = 1; i < getConnectionAttempts(); i++) {
             connectionMono = connectionMono
-                    .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
+                    .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false));
         }
 
         return connectionMono
@@ -723,19 +713,20 @@ protected <V, K> StatefulRedisClusterConnectionImpl<K, V> newStatefulRedisCluste
     }
 
     private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint,
-            StatefulRedisClusterConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
+            StatefulRedisClusterConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier,
+            Boolean isPubSub) {
 
         ConnectionFuture<T> future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier,
-                commandHandlerSupplier);
+                commandHandlerSupplier, isPubSub);
 
         return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
     }
 
     private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint,
-            StatefulRedisConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
+            StatefulRedisConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {
 
         ConnectionFuture<T> future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier,
-                commandHandlerSupplier);
+                commandHandlerSupplier, isPubSub);
 
         return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
     }
@@ -788,11 +779,11 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con
         Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
                 TopologyComparators::sortByClientCount);
         Mono<StatefulRedisClusterPubSubConnectionImpl<K, V>> connectionMono = Mono
-                .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
+                .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true));
 
         for (int i = 1; i < getConnectionAttempts(); i++) {
             connectionMono = connectionMono
-                    .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
+                    .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true));
         }
 
         return connectionMono
@@ -812,10 +803,10 @@ private int getConnectionAttempts() {
     @SuppressWarnings("unchecked")
     private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
             DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
-            Supplier<CommandHandler> commandHandlerSupplier) {
+            Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {
 
         ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
-                connectionSettings, socketAddressSupplier, commandHandlerSupplier);
+                connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub);
 
         ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
 
@@ -829,10 +820,10 @@ private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> Connection
     @SuppressWarnings("unchecked")
     private <K, V, T extends StatefulRedisConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
             DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
-            Supplier<CommandHandler> commandHandlerSupplier) {
+            Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {
 
         ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
-                connectionSettings, socketAddressSupplier, commandHandlerSupplier);
+                connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub);
 
         ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
 
@@ -841,7 +832,7 @@ private <K, V, T extends StatefulRedisConnectionImpl<K, V>, S> ConnectionFuture<
 
     private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K, V> connection, ConnectionState state,
             DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
-            Supplier<CommandHandler> commandHandlerSupplier) {
+            Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {
 
         ConnectionBuilder connectionBuilder;
         if (connectionSettings.isSsl()) {
@@ -853,6 +844,7 @@ private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K,
         }
 
         state.apply(connectionSettings);
+
         connectionBuilder.connectionInitializer(createHandshake(state));
 
         connectionBuilder.reconnectionListener(new ReconnectEventListener(topologyRefreshScheduler));
@@ -861,6 +853,11 @@ private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K,
         connectionBuilder.clientResources(getResources());
         connectionBuilder.endpoint(endpoint);
         connectionBuilder.commandHandler(commandHandlerSupplier);
+
+        if (RedisAuthenticationHandler.isSupported(getOptions())) {
+            connectionBuilder.registerAuthenticationHandler(connectionSettings.getCredentialsProvider(), state, isPubSub);
+        }
+
         connectionBuilder(socketAddressSupplier, connectionBuilder, connection.getConnectionEvents(), connectionSettings);
 
         return connectionBuilder;
diff --git a/src/main/java/io/lettuce/core/event/connection/AuthenticateEvent.java b/src/main/java/io/lettuce/core/event/connection/AuthenticateEvent.java
index a2928cae50..6a02be3998 100644
--- a/src/main/java/io/lettuce/core/event/connection/AuthenticateEvent.java
+++ b/src/main/java/io/lettuce/core/event/connection/AuthenticateEvent.java
@@ -6,7 +6,7 @@
  * Interface for Connection authentication events
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public interface AuthenticateEvent extends Event {
 
diff --git a/src/main/java/io/lettuce/core/event/connection/JfrReauthEvent.java b/src/main/java/io/lettuce/core/event/connection/JfrReauthEvent.java
index 6f86e82f48..8faaad8f63 100644
--- a/src/main/java/io/lettuce/core/event/connection/JfrReauthEvent.java
+++ b/src/main/java/io/lettuce/core/event/connection/JfrReauthEvent.java
@@ -9,7 +9,7 @@
  * Flight recorder event variant of {@link ReauthenticateEvent}.
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 @Category({ "Lettuce", "Connection Events" })
 @Label("Reauthenticate to a Redis server")
diff --git a/src/main/java/io/lettuce/core/event/connection/JfrReauthFailedEvent.java b/src/main/java/io/lettuce/core/event/connection/JfrReauthFailedEvent.java
index d0e3e69967..b1b5d78994 100644
--- a/src/main/java/io/lettuce/core/event/connection/JfrReauthFailedEvent.java
+++ b/src/main/java/io/lettuce/core/event/connection/JfrReauthFailedEvent.java
@@ -9,7 +9,7 @@
  * Flight recorder event variant of {@link ReauthEvent}.
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 @Category({ "Lettuce", "Connection Events" })
 @Label("Reauthenticate to a Redis server failed")
diff --git a/src/main/java/io/lettuce/core/event/connection/ReauthenticateEvent.java b/src/main/java/io/lettuce/core/event/connection/ReauthenticateEvent.java
index caf1d29325..20d737066f 100644
--- a/src/main/java/io/lettuce/core/event/connection/ReauthenticateEvent.java
+++ b/src/main/java/io/lettuce/core/event/connection/ReauthenticateEvent.java
@@ -4,7 +4,7 @@
  * Event fired on successfull connection re-authentication. see {@link io.lettuce.core.StreamingCredentialsProvider}
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public class ReauthenticateEvent implements AuthenticateEvent {
 
diff --git a/src/main/java/io/lettuce/core/event/connection/ReauthenticateFailedEvent.java b/src/main/java/io/lettuce/core/event/connection/ReauthenticateFailedEvent.java
index f808daa716..aa40481dc9 100644
--- a/src/main/java/io/lettuce/core/event/connection/ReauthenticateFailedEvent.java
+++ b/src/main/java/io/lettuce/core/event/connection/ReauthenticateFailedEvent.java
@@ -5,7 +5,7 @@
  * {@link io.lettuce.core.StreamingCredentialsProvider}
  * 
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public class ReauthenticateFailedEvent implements AuthenticateEvent {
 
diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
index 07fbacf77a..1a64d7b417 100644
--- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
+++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
@@ -37,6 +37,7 @@
 
 import io.lettuce.core.ClientOptions;
 import io.lettuce.core.ConnectionEvents;
+import io.lettuce.core.RedisAuthenticationHandler;
 import io.lettuce.core.RedisChannelWriter;
 import io.lettuce.core.RedisConnectionException;
 import io.lettuce.core.RedisException;
@@ -109,6 +110,8 @@ public class DefaultEndpoint implements RedisChannelWriter, Endpoint, PushHandle
 
     private boolean inActivation = false;
 
+    private RedisAuthenticationHandler authenticationHandler;
+
     private ConnectionWatchdog connectionWatchdog;
 
     private ConnectionFacade connectionFacade;
@@ -473,6 +476,9 @@ public void notifyChannelActive(Channel channel) {
 
                 try {
                     inActivation = true;
+                    if (authenticationHandler != null) {
+                        authenticationHandler.subscribe();
+                    }
                     connectionFacade.activated();
                 } finally {
                     inActivation = false;
@@ -508,6 +514,9 @@ public void notifyChannelInactive(Channel channel) {
                 logger.debug("{} deactivating endpoint handler", logPrefix());
             }
 
+            if (authenticationHandler != null) {
+                authenticationHandler.unsubscribe();
+            }
             connectionFacade.deactivated();
         });
 
@@ -535,6 +544,11 @@ public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) {
         this.connectionWatchdog = connectionWatchdog;
     }
 
+    @Override
+    public void registerAuthenticationHandler(RedisAuthenticationHandler authenticationHandler) {
+        this.authenticationHandler = authenticationHandler;
+    }
+
     @Override
     @SuppressWarnings({ "rawtypes", "unchecked" })
     public void flushCommands() {
diff --git a/src/main/java/io/lettuce/core/protocol/Endpoint.java b/src/main/java/io/lettuce/core/protocol/Endpoint.java
index aa55ac43b7..2dbf067d42 100644
--- a/src/main/java/io/lettuce/core/protocol/Endpoint.java
+++ b/src/main/java/io/lettuce/core/protocol/Endpoint.java
@@ -1,5 +1,6 @@
 package io.lettuce.core.protocol;
 
+import io.lettuce.core.RedisAuthenticationHandler;
 import io.netty.channel.Channel;
 
 /**
@@ -52,6 +53,13 @@ public interface Endpoint extends PushHandler {
      */
     void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog);
 
+    /**
+     * Associate a {@link RedisAuthenticationHandler} with the {@link Endpoint}.
+     *
+     * @param authenticationHandler the connection watchdog.
+     */
+    void registerAuthenticationHandler(RedisAuthenticationHandler authenticationHandler);
+
     /**
      * @return the endpoint Id.
      * @since 6.1
diff --git a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java
index badd948f36..1890113f72 100644
--- a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java
+++ b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java
@@ -91,7 +91,7 @@ void streamingCredentialProvider(RedisClient client) {
         TestCommandListener listener = new TestCommandListener();
         client.addListener(listener);
         client.setOptions(client.getOptions().mutate()
-                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.REAUTHENTICATE_ON_CREDENTIALS_CHANGE).build());
+                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());
 
         // Build RedisURI with streaming credentials provider
         MyStreamingRedisCredentialsProvider credentialsProvider = new MyStreamingRedisCredentialsProvider();
diff --git a/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java b/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java
index 6a2a436976..e5b0eaa937 100644
--- a/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java
+++ b/src/test/java/io/lettuce/core/MyStreamingRedisCredentialsProvider.java
@@ -8,7 +8,7 @@
  * A provider for streaming credentials that can be used to authorize a Redis connection
  *
  * @author Ivo Gaydajiev
- * @since 6.5.2
+ * @since 6.6.0
  */
 public class MyStreamingRedisCredentialsProvider implements StreamingCredentialsProvider {
 
diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java
index cedbe64e98..aeb7dd49bc 100644
--- a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java
+++ b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java
@@ -141,9 +141,9 @@ void nodeSelectionApiShouldWork() {
     void shouldPerformNodeConnectionReauth() {
         ClusterClientOptions origClientOptions = redisClient.getClusterClientOptions();
         origClientOptions.mutate()
-                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.REAUTHENTICATE_ON_CREDENTIALS_CHANGE).build();
+                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build();
         redisClient.setOptions(origClientOptions.mutate()
-                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.REAUTHENTICATE_ON_CREDENTIALS_CHANGE).build());
+                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());
 
         StatefulRedisClusterConnection<String, String> connection = redisClient.connect();
         connection.getPartitions().forEach(
diff --git a/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java b/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java
index bb59fe885a..732e5dcda5 100644
--- a/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java
+++ b/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java
@@ -58,7 +58,7 @@ void testReauthenticateEvents() {
         RedisClient client = RedisClient.create(TestClientResources.get(),
                 RedisURI.Builder.redis(host, port).withAuthentication(credentialsProvider).build());
         client.setOptions(ClientOptions.builder()
-                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.REAUTHENTICATE_ON_CREDENTIALS_CHANGE).build());
+                .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build());
 
         Flux<AuthenticateEvent> publisher = client.getResources().eventBus().get()
                 .filter(event -> event instanceof AuthenticateEvent).cast(AuthenticateEvent.class);