Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for StreamingCredentials #3068

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 78 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final ReauthenticateBehavior DEFAULT_REAUTHENTICATE_BEHAVIOUR = ReauthenticateBehavior.DEFAULT;

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand Down Expand Up @@ -95,6 +97,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final ReauthenticateBehavior reauthenticateBehavior;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand Down Expand Up @@ -124,6 +128,7 @@ protected ClientOptions(Builder builder) {
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.reauthenticateBehavior = builder.reauthenticateBehavior;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -143,6 +148,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.reauthenticateBehavior = original.getReauthenticateBehaviour();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -220,6 +226,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private ReauthenticateBehavior reauthenticateBehavior = DEFAULT_REAUTHENTICATE_BEHAVIOUR;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
Expand Down Expand Up @@ -301,6 +309,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Configure the {@link ReauthenticateBehavior} of the Lettuce driver. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @param reauthenticateBehavior the {@link ReauthenticateBehavior} to use. Must not be {@code null}.
* @return {@code this}
*/
public Builder reauthenticateBehavior(ReauthenticateBehavior reauthenticateBehavior) {

LettuceAssert.notNull(reauthenticateBehavior, "ReuthenticatBehavior must not be null");
this.reauthenticateBehavior = reauthenticateBehavior;
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -505,11 +527,12 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand Down Expand Up @@ -573,6 +596,16 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

/**
* Behavior for re-authentication when the {@link RedisCredentialsProvider} emits new credentials. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @return the currently set {@link ReauthenticateBehavior}.
*/
public ReauthenticateBehavior getReauthenticateBehaviour() {
return reauthenticateBehavior;
}

/**
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
*
Expand Down Expand Up @@ -704,6 +737,46 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Defines the re-authentication behavior of the Redis client.
* <p/>
* Certain implementations of the {@link RedisCredentialsProvider} could emit new credentials at runtime. This setting
* controls how the driver reacts to these newly emitted credentials.
*/
public enum ReauthenticateBehavior {

/**
* This is the default behavior. The client will fetch current credentials from the underlying
* {@link RedisCredentialsProvider} only when the driver needs to, e.g. when the connection is first established or when
* it is re-established after a disconnect.
* <p/>
* <p>
* No re-authentication is performed when new credentials are emitted by a {@link RedisCredentialsProvider} that
* supports streaming. The client does not subscribe to or react to any updates in the credential stream provided by
* {@link RedisCredentialsProvider#credentials()}.
* </p>
*/
DEFAULT,

/**
* Automatically triggers re-authentication whenever new credentials are emitted by a {@link RedisCredentialsProvider}
* that supports streaming, as indicated by {@link RedisCredentialsProvider#supportsStreaming()}.
*
* <p>
* When this behavior is enabled, the client subscribes to the credential stream provided by
* {@link RedisCredentialsProvider#credentials()} and issues an {@code AUTH} command to the Redis server each time new
* credentials are received. This behavior supports dynamic credential scenarios, such as token-based authentication, or
* credential rotation where credentials are refreshed periodically to maintain access.
* </p>
*
* <p>
* Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted commands, as the
* client performs re-authentication independently of user command flow.
* </p>
*/
ON_NEW_CREDENTIALS
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
Expand Down
Loading
Loading