Skip to content

Commit

Permalink
formating
Browse files Browse the repository at this point in the history
  • Loading branch information
ggivo committed Dec 8, 2024
1 parent 5858286 commit 797c75a
Show file tree
Hide file tree
Showing 18 changed files with 109 additions and 92 deletions.
27 changes: 23 additions & 4 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -704,18 +704,37 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Defines the re-authentication behavior of the Redis client in relation to the {@link CredentialsProvider}.
*/s
public enum ReauthenticateBehavior {

/**
* This is the default behavior. The driver whenever needed will pull current credentials from the underlying
* CredentialsProvider.
* This is the default behavior. The client will fetch current credentials from the underlying
* {@link RedisCredentialsProvider} only when required.
*
* <p>No re-authentication is performed automatically when new credentials are emitted by the
* {@link StreamingCredentialsProvider} .</p>
*
* <p>This behavior is suitable for use cases with static credentials or where explicit reconnection
* is required to change credentials.</p>
*/
DEFAULT,

/**
* CredentialsProvider might initiate re-authentication on its own.
* Automatically triggers re-authentication whenever new credentials are emitted by the
* {@link StreamingCredentialsProvider} or any other credentials manager.
*
* <p>When enabled, the client subscribes to the credentials stream provided by the
* {@link StreamingCredentialsProvider} (or other compatible sources) and issues an {@code AUTH}
* command to the Redis server each time new credentials are received. This behavior supports
* dynamic credential scenarios, such as token-based authentication, or credential rotation where credentials
* are refreshed periodically to maintain access.</p>
*
* <p>Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted
* commands, as the client performs re-authentication independently of user command flow.</p>
*/
REAUTHENTICATE_ON_CREDENTIALS_CHANGE
ON_NEW_CREDENTIALS
}

/**
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public void apply(RedisURI redisURI) {
bootstrap.attr(REDIS_URI, redisURI.toString());
}

public void registerAuthenticationHandler(RedisCredentialsProvider credentialsProvider, ConnectionState state,
Boolean isPubSubConnection) {
LettuceAssert.assertState(endpoint != null, "Endpoint must be set");
LettuceAssert.assertState(connection != null, "Connection must be set");
LettuceAssert.assertState(clientResources != null, "ClientResources must be set");

RedisAuthenticationHandler authenticationHandler = new RedisAuthenticationHandler(connection.getChannelWriter(),
credentialsProvider, state, clientResources.eventBus(), isPubSubConnection);
endpoint.registerAuthenticationHandler(authenticationHandler);
}

protected List<ChannelHandler> buildHandlers() {

LettuceAssert.assertState(channelGroup != null, "ChannelGroup must be set");
Expand Down
33 changes: 12 additions & 21 deletions src/main/java/io/lettuce/core/RedisAuthenticationHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* Redis authentication handler. Internally used to authenticate a Redis connection. This class is part of the internal API.
*
* @author Ivo Gaydazhiev
* @since 6.5.2
* @since 6.6.0
*/
public class RedisAuthenticationHandler {

Expand Down Expand Up @@ -70,13 +70,10 @@ public RedisAuthenticationHandler(RedisChannelWriter writer, RedisCredentialsPro
}

/**
* Subscribes to the provided `Flux` of credentials if the given `RedisCredentialsProvider` supports streaming credentials.
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`.
* <p>
* This method subscribes to a stream of credentials provided by the `StreamingCredentialsProvider`. Each time new
* credentials are received, the client is reauthenticated. If the connection is not supported, the method returns without
* subscribing.
* <p>
* The previous subscription, if any, is disposed of before setting the new subscription.
* Each time new credentials are received, the client is re-authenticated. The previous subscription, if any, is disposed of
* before setting the new subscription.
*/
public void subscribe() {
if (credentialsProvider == null) {
Expand Down Expand Up @@ -179,23 +176,17 @@ private String getEpid() {
return "unknown";
}

public static boolean isSupported(ClientOptions clientOptions, RedisCredentialsProvider credentialsProvider) {
public static boolean isSupported(ClientOptions clientOptions) {
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
switch (clientOptions.getReauthenticateBehaviour()) {
case ON_NEW_CREDENTIALS:
return true;

if (credentialsProvider instanceof StreamingCredentialsProvider) {

switch (clientOptions.getReauthenticateBehaviour()) {
case REAUTHENTICATE_ON_CREDENTIALS_CHANGE:
return true;

case DEFAULT:
return false;
case DEFAULT:
return false;

default:
return false;
}
} else {
return false;
default:
return false;
}
}

Expand Down
21 changes: 8 additions & 13 deletions src/main/java/io/lettuce/core/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,8 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone

StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec, timeout);

if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) {
connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(),
connection.getConnectionState(), getResources().eventBus(), false));
}

ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new CommandHandler(getOptions(), getResources(), endpoint));
() -> new CommandHandler(getOptions(), getResources(), endpoint), false);

future.whenComplete((channelHandler, throwable) -> {

Expand All @@ -308,7 +303,7 @@ private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandalone

@SuppressWarnings("unchecked")
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint,
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionBuilder connectionBuilder;
if (redisURI.isSsl()) {
Expand All @@ -331,6 +326,11 @@ private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnecti
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, connection.getConnectionEvents(), redisURI);
connectionBuilder.connectionInitializer(createHandshake(state));

if (RedisAuthenticationHandler.isSupported(getOptions())) {
connectionBuilder.registerAuthenticationHandler(redisURI.getCredentialsProvider(), connection.getConnectionState(),
isPubSub);
}

ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

return future.thenApply(channelHandler -> (S) connection);
Expand Down Expand Up @@ -425,13 +425,8 @@ private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubS

StatefulRedisPubSubConnectionImpl<K, V> connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);

if (RedisAuthenticationHandler.isSupported(getOptions(), redisURI.getCredentialsProvider())) {
connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, redisURI.getCredentialsProvider(),
connection.getConnectionState(), getResources().eventBus(), true));
}

ConnectionFuture<StatefulRedisPubSubConnection<K, V>> future = connectStatefulAsync(connection, endpoint, redisURI,
() -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint));
() -> new PubSubCommandHandler<>(getOptions(), getResources(), codec, endpoint), true);

return future.whenComplete((conn, throwable) -> {

Expand Down
18 changes: 0 additions & 18 deletions src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class StatefulRedisConnectionImpl<K, V> extends RedisChannelHandler<K, V>

private final PushHandler pushHandler;

private final AtomicReference<RedisAuthenticationHandler> authenticationHandler = new AtomicReference<>();

private final Mono<JsonParser> parser;

protected MultiOutput<K, V> multi;
Expand Down Expand Up @@ -321,27 +319,11 @@ public ConnectionState getConnectionState() {
@Override
public void activated() {
super.activated();
RedisAuthenticationHandler currentHandler = authenticationHandler.get();
if (currentHandler != null) {
currentHandler.subscribe();
}
}

@Override
public void deactivated() {
RedisAuthenticationHandler currentHandler = authenticationHandler.get();
if (currentHandler != null) {
currentHandler.unsubscribe();
}
super.deactivated();
}

public void setAuthenticationHandler(RedisAuthenticationHandler authenticationHandler) {
RedisAuthenticationHandler currentHandler = this.authenticationHandler.getAndSet(authenticationHandler);

if (currentHandler != null) {
currentHandler.unsubscribe();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* new credentials are received.
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
public interface StreamingCredentialsProvider extends RedisCredentialsProvider {

Expand Down
47 changes: 22 additions & 25 deletions src/main/java/io/lettuce/core/cluster/RedisClusterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -556,14 +556,9 @@ <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectToNodeAsync(RedisC
StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, endpoint, codec,
getFirstUri().getTimeout(), getClusterClientOptions().getJsonParser());

if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) {
connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(),
connection.getConnectionState(), getResources().eventBus(), false));
}

ConnectionFuture<StatefulRedisConnection<K, V>> connectionFuture = connectStatefulAsync(connection, endpoint,
getFirstUri(), socketAddressSupplier,
() -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint));
() -> new CommandHandler(getClusterClientOptions(), getResources(), endpoint), false);

return connectionFuture.whenComplete((conn, throwable) -> {
if (throwable != null) {
Expand Down Expand Up @@ -626,14 +621,9 @@ <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubToNode
StatefulRedisPubSubConnectionImpl<K, V> connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec,
getFirstUri().getTimeout());

if (RedisAuthenticationHandler.isSupported(getOptions(), getFirstUri().getCredentialsProvider())) {
connection.setAuthenticationHandler(new RedisAuthenticationHandler(writer, getFirstUri().getCredentialsProvider(),
connection.getConnectionState(), getResources().eventBus(), false));
}

ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectionFuture = connectStatefulAsync(connection, endpoint,
getFirstUri(), socketAddressSupplier,
() -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint));
() -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint), true);
return connectionFuture.whenComplete((conn, throwable) -> {
if (throwable != null) {
connection.closeAsync();
Expand Down Expand Up @@ -689,11 +679,11 @@ private <K, V> CompletableFuture<StatefulRedisClusterConnection<K, V>> connectCl
Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
TopologyComparators::sortByClientCount);
Mono<StatefulRedisClusterConnectionImpl<K, V>> connectionMono = Mono
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false));

for (int i = 1; i < getConnectionAttempts(); i++) {
connectionMono = connectionMono
.onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
.onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, false));
}

return connectionMono
Expand Down Expand Up @@ -723,19 +713,20 @@ protected <V, K> StatefulRedisClusterConnectionImpl<K, V> newStatefulRedisCluste
}

private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint,
StatefulRedisClusterConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
StatefulRedisClusterConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier,
Boolean isPubSub) {

ConnectionFuture<T> future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier,
commandHandlerSupplier);
commandHandlerSupplier, isPubSub);

return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}

private <T, K, V> Mono<T> connect(Mono<SocketAddress> socketAddressSupplier, DefaultEndpoint endpoint,
StatefulRedisConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier) {
StatefulRedisConnectionImpl<K, V> connection, Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionFuture<T> future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier,
commandHandlerSupplier);
commandHandlerSupplier, isPubSub);

return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage()));
}
Expand Down Expand Up @@ -788,11 +779,11 @@ private <K, V> CompletableFuture<StatefulRedisClusterPubSubConnection<K, V>> con
Mono<SocketAddress> socketAddressSupplier = getSocketAddressSupplier(connection::getPartitions,
TopologyComparators::sortByClientCount);
Mono<StatefulRedisClusterPubSubConnectionImpl<K, V>> connectionMono = Mono
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
.defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true));

for (int i = 1; i < getConnectionAttempts(); i++) {
connectionMono = connectionMono
.onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier));
.onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier, true));
}

return connectionMono
Expand All @@ -812,10 +803,10 @@ private int getConnectionAttempts() {
@SuppressWarnings("unchecked")
private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
Supplier<CommandHandler> commandHandlerSupplier) {
Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
connectionSettings, socketAddressSupplier, commandHandlerSupplier);
connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub);

ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

Expand All @@ -829,10 +820,10 @@ private <K, V, T extends StatefulRedisClusterConnectionImpl<K, V>, S> Connection
@SuppressWarnings("unchecked")
private <K, V, T extends StatefulRedisConnectionImpl<K, V>, S> ConnectionFuture<S> connectStatefulAsync(T connection,
DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
Supplier<CommandHandler> commandHandlerSupplier) {
Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint,
connectionSettings, socketAddressSupplier, commandHandlerSupplier);
connectionSettings, socketAddressSupplier, commandHandlerSupplier, isPubSub);

ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

Expand All @@ -841,7 +832,7 @@ private <K, V, T extends StatefulRedisConnectionImpl<K, V>, S> ConnectionFuture<

private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K, V> connection, ConnectionState state,
DefaultEndpoint endpoint, RedisURI connectionSettings, Mono<SocketAddress> socketAddressSupplier,
Supplier<CommandHandler> commandHandlerSupplier) {
Supplier<CommandHandler> commandHandlerSupplier, Boolean isPubSub) {

ConnectionBuilder connectionBuilder;
if (connectionSettings.isSsl()) {
Expand All @@ -853,6 +844,7 @@ private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K,
}

state.apply(connectionSettings);

connectionBuilder.connectionInitializer(createHandshake(state));

connectionBuilder.reconnectionListener(new ReconnectEventListener(topologyRefreshScheduler));
Expand All @@ -861,6 +853,11 @@ private <K, V> ConnectionBuilder createConnectionBuilder(RedisChannelHandler<K,
connectionBuilder.clientResources(getResources());
connectionBuilder.endpoint(endpoint);
connectionBuilder.commandHandler(commandHandlerSupplier);

if (RedisAuthenticationHandler.isSupported(getOptions())) {
connectionBuilder.registerAuthenticationHandler(connectionSettings.getCredentialsProvider(), state, isPubSub);
}

connectionBuilder(socketAddressSupplier, connectionBuilder, connection.getConnectionEvents(), connectionSettings);

return connectionBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* Interface for Connection authentication events
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
public interface AuthenticateEvent extends Event {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* Flight recorder event variant of {@link ReauthenticateEvent}.
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
@Category({ "Lettuce", "Connection Events" })
@Label("Reauthenticate to a Redis server")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* Flight recorder event variant of {@link ReauthEvent}.
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
@Category({ "Lettuce", "Connection Events" })
@Label("Reauthenticate to a Redis server failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Event fired on successfull connection re-authentication. see {@link io.lettuce.core.StreamingCredentialsProvider}
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
public class ReauthenticateEvent implements AuthenticateEvent {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* {@link io.lettuce.core.StreamingCredentialsProvider}
*
* @author Ivo Gaydajiev
* @since 6.5.2
* @since 6.6.0
*/
public class ReauthenticateFailedEvent implements AuthenticateEvent {

Expand Down
Loading

0 comments on commit 797c75a

Please sign in to comment.