diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index eca9a3789..f8327a4c9 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -704,18 +704,37 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + /** + * Defines the re-authentication behavior of the Redis client in relation to the {@link CredentialsProvider}. + */s public enum ReauthenticateBehavior { /** - * This is the default behavior. The driver whenever needed will pull current credentials from the underlying - * CredentialsProvider. + * This is the default behavior. The client will fetch current credentials from the underlying + * {@link RedisCredentialsProvider} only when required. + * + *

No re-authentication is performed automatically when new credentials are emitted by the + * {@link StreamingCredentialsProvider} .

+ * + *

This behavior is suitable for use cases with static credentials or where explicit reconnection + * is required to change credentials.

*/ DEFAULT, /** - * CredentialsProvider might initiate re-authentication on its own. + * Automatically triggers re-authentication whenever new credentials are emitted by the + * {@link StreamingCredentialsProvider} or any other credentials manager. + * + *

When enabled, the client subscribes to the credentials stream provided by the + * {@link StreamingCredentialsProvider} (or other compatible sources) and issues an {@code AUTH} + * command to the Redis server each time new credentials are received. This behavior supports + * dynamic credential scenarios, such as token-based authentication, or credential rotation where credentials + * are refreshed periodically to maintain access.

+ * + *

Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted + * commands, as the client performs re-authentication independently of user command flow.

*/ - REAUTHENTICATE_ON_CREDENTIALS_CHANGE + ON_NEW_CREDENTIALS } /** diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index 4bb3127b8..444c13f90 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -113,6 +113,17 @@ public void apply(RedisURI redisURI) { bootstrap.attr(REDIS_URI, redisURI.toString()); } + public void registerAuthenticationHandler(RedisCredentialsProvider credentialsProvider, ConnectionState state, + Boolean isPubSubConnection) { + LettuceAssert.assertState(endpoint != null, "Endpoint must be set"); + LettuceAssert.assertState(connection != null, "Connection must be set"); + LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); + + RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection.getChannelWriter(), + credentialsProvider, state, clientResources.eventBus(), isPubSubConnection); + endpoint.registerAuthenticationHandler(authenticationHandler); + } + protected List buildHandlers() { LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set"); diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java index ca7d5e56d..e8cb1d5a0 100644 --- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java +++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java @@ -40,7 +40,7 @@ * Redis authentication handler. Internally used to authenticate a Redis connection. This class is part of the internal API. * * @author Ivo Gaydazhiev - * @since 6.5.2 + * @since 6.6.0 */ public class RedisAuthenticationHandler { @@ -70,13 +70,10 @@ public RedisAuthenticationHandler(RedisChannelWriter writer, RedisCredentialsPro } /** - * Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials. + * This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. *

- * This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. Each time new - * credentials are received, the client is reauthenticated. If the connection is not supported, the method returns without - * subscribing. - *

- * The previous subscription, if any, is disposed of before setting the new subscription. + * Each time new credentials are received, the client is re-authenticated. The previous subscription, if any, is disposed of + * before setting the new subscription. */ public void subscribe() { if (credentialsProvider == null) { @@ -179,23 +176,17 @@ private String getEpid() { return "unknown"; } - public static boolean isSupported(ClientOptions clientOptions, RedisCredentialsProvider credentialsProvider) { + public static boolean isSupported(ClientOptions clientOptions) { LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); + switch (clientOptions.getReauthenticateBehaviour()) { + case ON_NEW_CREDENTIALS: + return true; - if (credentialsProvider instanceof StreamingCredentialsProvider) { - - switch (clientOptions.getReauthenticateBehaviour()) { - case REAUTHENTICATE_ON_CREDENTIALS_CHANGE: - return true; - - case DEFAULT: - return false; + case DEFAULT: + return false; - default: - return false; - } - } else { - return false; + default: + return false; } } diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index b3eecd817..26801d949 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -288,13 +288,8 @@ private ConnectionFuture> connectStandalone StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, endpoint, codec, timeout); - if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) { - connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(), - connection.getConnectionState(), getResources().eventBus(), false)); - } - ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, - () -> new CommandHandler(getOptions(), getResources(), endpoint)); + () -> new CommandHandler(getOptions(), getResources(), endpoint), false); future.whenComplete((channelHandler, throwable) -> { @@ -308,7 +303,7 @@ private ConnectionFuture> connectStandalone @SuppressWarnings("unchecked") private ConnectionFuture connectStatefulAsync(StatefulRedisConnectionImpl connection, Endpoint endpoint, - RedisURI redisURI, Supplier commandHandlerSupplier) { + RedisURI redisURI, Supplier commandHandlerSupplier, Boolean isPubSub) { ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { @@ -331,6 +326,11 @@ private ConnectionFuture connectStatefulAsync(StatefulRedisConnecti connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI); connectionBuilder.connectionInitializer(createHandshake(state)); + if (RedisAuthenticationHandler.isSupported(getOptions())) { + connectionBuilder.registerAuthenticationHandler(redisURI.getCredentialsProvider(), connection.getConnectionState(), + isPubSub); + } + ConnectionFuture> future = initializeChannelAsync(connectionBuilder); return future.thenApply(channelHandler -> (S) connection); @@ -425,13 +425,8 @@ private ConnectionFuture> connectPubS StatefulRedisPubSubConnectionImpl connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout); - if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) { - connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(), - connection.getConnectionState(), getResources().eventBus(), true)); - } - ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, - () -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint)); + () -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint), true); return future.whenComplete((conn, throwable) -> { diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index b213936eb..3304cfdd4 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -68,8 +68,6 @@ public class StatefulRedisConnectionImpl extends RedisChannelHandler private final PushHandler pushHandler; - private final AtomicReference authenticationHandler = new AtomicReference<>(); - private final Mono parser; protected MultiOutput multi; @@ -321,27 +319,11 @@ public ConnectionState getConnectionState() { @Override public void activated() { super.activated(); - RedisAuthenticationHandler currentHandler = authenticationHandler.get(); - if (currentHandler != null) { - currentHandler.subscribe(); - } } @Override public void deactivated() { - RedisAuthenticationHandler currentHandler = authenticationHandler.get(); - if (currentHandler != null) { - currentHandler.unsubscribe(); - } super.deactivated(); } - public void setAuthenticationHandler(RedisAuthenticationHandler authenticationHandler) { - RedisAuthenticationHandler currentHandler = this.authenticationHandler.getAndSet(authenticationHandler); - - if (currentHandler != null) { - currentHandler.unsubscribe(); - } - } - } diff --git a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java index dba9302c7..999d35683 100644 --- a/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java +++ b/src/main/java/io/lettuce/core/StreamingCredentialsProvider.java @@ -7,7 +7,7 @@ * new credentials are received. * * @author Ivo Gaydajiev - * @since 6.5.2 + * @since 6.6.0 */ public interface StreamingCredentialsProvider extends RedisCredentialsProvider { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index c3411bb40..f384cbda0 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -556,14 +556,9 @@ ConnectionFuture> connectToNodeAsync(RedisC StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, endpoint, codec, getFirstUri().getTimeout(), getClusterClientOptions().getJsonParser()); - if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) { - connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(), - connection.getConnectionState(), getResources().eventBus(), false)); - } - ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, - () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint)); + () -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint), false); return connectionFuture.whenComplete((conn, throwable) -> { if (throwable != null) { @@ -626,14 +621,9 @@ ConnectionFuture> connectPubSubToNode StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec, getFirstUri().getTimeout()); - if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) { - connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(), - connection.getConnectionState(), getResources().eventBus(), false)); - } - ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, - () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint)); + () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint), true); return connectionFuture.whenComplete((conn, throwable) -> { if (throwable != null) { connection.closeAsync(); @@ -689,11 +679,11 @@ private CompletableFuture> connectCl Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, TopologyComparators::sortByClientCount); Mono> connectionMono = Mono - .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); + .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false)); for (int i = 1; i < getConnectionAttempts(); i++) { connectionMono = connectionMono - .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); + .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false)); } return connectionMono @@ -723,19 +713,20 @@ protected StatefulRedisClusterConnectionImpl newStatefulRedisCluste } private Mono connect(Mono socketAddressSupplier, DefaultEndpoint endpoint, - StatefulRedisClusterConnectionImpl connection, Supplier commandHandlerSupplier) { + StatefulRedisClusterConnectionImpl connection, Supplier commandHandlerSupplier, + Boolean isPubSub) { ConnectionFuture future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, - commandHandlerSupplier); + commandHandlerSupplier, isPubSub); return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage())); } private Mono connect(Mono socketAddressSupplier, DefaultEndpoint endpoint, - StatefulRedisConnectionImpl connection, Supplier commandHandlerSupplier) { + StatefulRedisConnectionImpl connection, Supplier commandHandlerSupplier, Boolean isPubSub) { ConnectionFuture future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, - commandHandlerSupplier); + commandHandlerSupplier, isPubSub); return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage())); } @@ -788,11 +779,11 @@ private CompletableFuture> con Mono socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions, TopologyComparators::sortByClientCount); Mono> connectionMono = Mono - .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); + .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true)); for (int i = 1; i < getConnectionAttempts(); i++) { connectionMono = connectionMono - .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); + .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true)); } return connectionMono @@ -812,10 +803,10 @@ private int getConnectionAttempts() { @SuppressWarnings("unchecked") private , S> ConnectionFuture connectStatefulAsync(T connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, - Supplier commandHandlerSupplier) { + Supplier commandHandlerSupplier, Boolean isPubSub) { ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint, - connectionSettings, socketAddressSupplier, commandHandlerSupplier); + connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub); ConnectionFuture> future = initializeChannelAsync(connectionBuilder); @@ -829,10 +820,10 @@ private , S> Connection @SuppressWarnings("unchecked") private , S> ConnectionFuture connectStatefulAsync(T connection, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, - Supplier commandHandlerSupplier) { + Supplier commandHandlerSupplier, Boolean isPubSub) { ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint, - connectionSettings, socketAddressSupplier, commandHandlerSupplier); + connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub); ConnectionFuture> future = initializeChannelAsync(connectionBuilder); @@ -841,7 +832,7 @@ private , S> ConnectionFuture< private ConnectionBuilder createConnectionBuilder(RedisChannelHandler connection, ConnectionState state, DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, - Supplier commandHandlerSupplier) { + Supplier commandHandlerSupplier, Boolean isPubSub) { ConnectionBuilder connectionBuilder; if (connectionSettings.isSsl()) { @@ -853,6 +844,7 @@ private ConnectionBuilder createConnectionBuilder(RedisChannelHandler ConnectionBuilder createConnectionBuilder(RedisChannelHandler connection = redisClient.connect(); connection.getPartitions().forEach( diff --git a/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java b/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java index bb59fe885..732e5dcda 100644 --- a/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java +++ b/src/test/java/io/lettuce/core/event/ConnectionEventsTriggeredIntegrationTests.java @@ -58,7 +58,7 @@ void testReauthenticateEvents() { RedisClient client = RedisClient.create(TestClientResources.get(), RedisURI.Builder.redis(host, port).withAuthentication(credentialsProvider).build()); client.setOptions(ClientOptions.builder() - .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.REAUTHENTICATE_ON_CREDENTIALS_CHANGE).build()); + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build()); Flux publisher = client.getResources().eventBus().get() .filter(event -> event instanceof AuthenticateEvent).cast(AuthenticateEvent.class);