Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  - init TM on CredentialProvider create
  - CredentialProvider stop renamed to shutdow
  - extended example to use 2 separate users
  • Loading branch information
ggivo committed Nov 29, 2024
1 parent 24b901a commit d49f9b8
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package io.lettuce.core;
package io.lettuce.authx;

import io.lettuce.core.RedisCredentials;
import io.lettuce.core.StreamingCredentialsProvider;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import reactor.core.publisher.Sinks;
import redis.clients.authentication.core.Token;
import redis.clients.authentication.core.TokenAuthConfig;
Expand All @@ -18,23 +19,25 @@ public class TokenBasedRedisCredentialsProvider implements StreamingCredentialsP
public TokenBasedRedisCredentialsProvider(TokenAuthConfig tokenAuthConfig) {
this(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));

}

public TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
this.tokenManager = tokenManager;
initializeTokenManager();
}

/**
* Initialize the TokenManager and subscribe to token renewal events.
*/
public void start() {
private void initializeTokenManager() {
TokenListener listener = new TokenListener() {

@Override
public void onTokenRenewed(Token token) {
String username = token.tryGet("oid");
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = new StaticRedisCredentials(username, pass);
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
}

Expand All @@ -54,14 +57,28 @@ public void onError(Exception exception) {

/**
* Resolve the latest available credentials as a Mono.
* <p>
* This method returns a Mono that emits the most recent set of Redis credentials.
* The Mono will complete once the credentials are emitted.
* If no credentials are available at the time of subscription, the Mono will wait until
* credentials are available.
*
* @return a Mono that emits the latest Redis credentials
*/
@Override
public Mono<RedisCredentials> resolveCredentials() {

return credentialsSink.asFlux().next();
}

/**
* Expose the Flux for all credential updates.
* <p>
* This method returns a Flux that emits all updates to the Redis credentials.
* Subscribers will receive the latest credentials whenever they are updated.
* The Flux will continue to emit updates until the provider is shut down.
*
* @return a Flux that emits all updates to the Redis credentials
*/
@Override
public Flux<RedisCredentials> credentials() {
Expand All @@ -70,9 +87,13 @@ public Flux<RedisCredentials> credentials() {
}

/**
* Stop the adapter and clean up resources.
* Stop the credentials provider and clean up resources.
* <p>
* This method stops the TokenManager and completes the credentials sink,
* ensuring that all resources are properly released.
* It should be called when the credentials provider is no longer needed.
*/
public void stop() {
public void shutdown() {
credentialsSink.tryEmitComplete();
tokenManager.stop();
}
Expand Down
13 changes: 6 additions & 7 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,6 @@
*/
package io.lettuce.core;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.json.JsonParser;
Expand All @@ -37,6 +30,12 @@
import io.lettuce.core.resource.ClientResources;
import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
/**
* Client Options to control the behavior of {@link RedisClient}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Strin
throw new RedisException("NodeId " + nodeId + " does not belong to the cluster");
}

AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider();
AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter()
.getClusterConnectionProvider();

return provider.getConnectionAsync(connectionIntent, nodeId);
}
Expand All @@ -209,7 +210,8 @@ public StatefulRedisConnection<K, V> getConnection(String host, int port, Connec
public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(String host, int port,
ConnectionIntent connectionIntent) {

AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider();
AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter()
.getClusterConnectionProvider();

return provider.getConnectionAsync(connectionIntent, host, port);
}
Expand Down Expand Up @@ -265,8 +267,8 @@ private <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comman
} else {

List<String> stringArgs = CommandArgsAccessor.getStringArguments(command.getArgs());
this.connectionState.setUserNamePassword(
stringArgs.stream().map(String::toCharArray).collect(Collectors.toList()));
this.connectionState
.setUserNamePassword(stringArgs.stream().map(String::toCharArray).collect(Collectors.toList()));
}
}
});
Expand Down
27 changes: 12 additions & 15 deletions src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
package io.lettuce.core;

import static io.lettuce.TestTags.INTEGRATION_TEST;
import static org.assertj.core.api.Assertions.*;

import javax.inject.Inject;
import io.lettuce.test.Delay;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.lettuce.authx.TokenBasedRedisCredentialsProvider;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.output.StatusOutput;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.test.Delay;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.WithPassword;
import io.lettuce.test.condition.EnabledOnCommand;
import io.lettuce.test.settings.TestSettings;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Mono;
import redis.clients.authentication.core.*;
import redis.clients.authentication.core.SimpleToken;

import javax.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static io.lettuce.TestTags.INTEGRATION_TEST;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/**
* Integration test for authentication.
Expand Down Expand Up @@ -99,7 +97,6 @@ void streamingCredentialProvider(RedisClient client) {
// streaming credentials provider that emits redis credentials which will trigger connection re-authentication
// token manager is used to emit updated credentials
TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialsProvider.start();

RedisURI uri = RedisURI.builder().withTimeout(Duration.ofSeconds(1)).withClientName("streaming_cred_test")
.withHost(TestSettings.host()).withPort(TestSettings.port()).withAuthentication(credentialsProvider).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.lettuce.core;

import io.lettuce.authx.TokenBasedRedisCredentialsProvider;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import redis.clients.authentication.core.*;
import redis.clients.authentication.core.SimpleToken;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -24,7 +25,6 @@ public void setUp() {
// Use TestToken manager to emit tokens/errors on request
tokenManager = new TestTokenManager(null, null);
credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialsProvider.start();
}

@Test
Expand Down Expand Up @@ -101,7 +101,7 @@ public void shouldCompleteAllSubscribersOnStop() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
credentialsProvider.stop();
credentialsProvider.shutdown();
}).start();

StepVerifier.create(credentialsFlux1)
Expand Down
99 changes: 68 additions & 31 deletions src/test/java/io/lettuce/examples/TokenBasedAuthExample.java
Original file line number Diff line number Diff line change
@@ -1,60 +1,97 @@
package io.lettuce.examples;

import io.lettuce.core.*;
import io.lettuce.authx.TokenBasedRedisCredentialsProvider;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SocketOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.StringCodec;
import redis.clients.authentication.core.*;
import redis.clients.authentication.entraid.EntraIDTokenAuthConfig;
import redis.clients.authentication.core.IdentityProviderConfig;
import redis.clients.authentication.core.TokenAuthConfig;
import redis.clients.authentication.entraid.EntraIDTokenAuthConfigBuilder;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;

import static org.junit.Assert.assertNotNull;

public class TokenBasedAuthExample {

public static void main(String[] args) {
// Configure TokenManager
String authority = "https://login.microsoftonline.com/562f7bf2-f594-47bf-8ac3-a06514b5d434";
String clientId = "<client_id>"; // application id
String secret = "<client_secret>"; // client secret value
Set<String> scopes = Collections.singleton("https://redis.azure.com/.default");
IdentityProviderConfig config = EntraIDTokenAuthConfig.builder().authority(authority).clientId(clientId).secret(secret)
.scopes(scopes).tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig();

TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000)
.expirationRefreshRatio(0.1f).identityProviderConfig(config).build();
String User1_clientId = System.getenv("USER1_CLIENT_ID");
String User1_secret = System.getenv("USER1_SECRET");

String User2_clientId = System.getenv("USER2_CLIENT_ID");
String User2_secret = System.getenv("USER2_SECRET");

//User 1
// from redis-authx-entraind
IdentityProviderConfig config1 = EntraIDTokenAuthConfigBuilder.builder()
.authority(authority)
.clientId(User1_clientId)
.secret(User1_secret)
.scopes(scopes)
.tokenRequestExecTimeoutInMs(10000)
.build().getIdentityProviderConfig();
// from redis-authx-core
TokenAuthConfig tokenAuthConfigUser1 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000)
.expirationRefreshRatio(0.1f).identityProviderConfig(config1).build();
//Create credentials provider user1
// TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there)
TokenBasedRedisCredentialsProvider credentialsUser1 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser1);


// Create RedisURI and set custom credentials provider
TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenAuthConfig);
credentialsProvider.start();
//User2
// from redis-authx-entraind
IdentityProviderConfig config2 = EntraIDTokenAuthConfigBuilder.builder().authority(authority).clientId(User2_clientId)
.secret(User2_secret).scopes(scopes).tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig();
// from redis-authx-core
TokenAuthConfig tokenAuthConfigUser2 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000)
.expirationRefreshRatio(0.1f).identityProviderConfig(config2).build();
//Create credentials provider user2
// TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there)
TokenBasedRedisCredentialsProvider credentialsUser2 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser2);

RedisURI redisURI = RedisURI.create("redis://40.115.58.0:19431");
redisURI.setCredentialsProvider(credentialsProvider);
RedisClient rc = RedisClient.create();

//lettuce-core
RedisURI redisURI1 = RedisURI.create("redis://137.117.167.136:12002");
redisURI1.setCredentialsProvider(credentialsUser1);

RedisURI redisURI2 = RedisURI.create("redis://137.117.167.136:12002");
redisURI2.setCredentialsProvider(credentialsUser2);

// Create RedisClient
RedisClient redisClient = RedisClient.create();
ClientOptions clientOptions = ClientOptions.builder()
.socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build())
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(1))).build();

// Create StatefulRedisConnectionImpl using connectAsync()
try (StatefulRedisConnection<String, String> connection = redisClient.connect(StringCodec.UTF8, redisURI)) {
// RedisClient using user1 credentials by default
RedisClient redisClient = RedisClient.create(redisURI1);
redisClient.setOptions(clientOptions);

RedisCommands<String, String> commands = connection.sync();
commands.set("hello", "Hello, Redis!");
String result = commands.get("hello");
System.out.println("hello: " + result);
// create connection using default URI (authorised as user1)
try (StatefulRedisConnection<String, String> user1 =
redisClient.connect(StringCodec.UTF8)) {

user1.reactive().aclWhoami().doOnNext(System.out::println).block();
}

// another connection using same credentials provider
try (StatefulRedisConnection<String, String> connection = redisClient.connect(StringCodec.UTF8, redisURI);) {
RedisAsyncCommands<String, String> commands = connection.async();
commands.set("hello", "Hello, Redis!").thenCompose(s -> commands.get("hello"))
.thenAccept(result -> System.out.println("async hello: " + result)).toCompletableFuture().join();
// another connection using different authorizations (user2 credentials provider)
try (StatefulRedisConnection<String, String> user2 = redisClient.connect(StringCodec.UTF8, redisURI2);) {
user2.reactive().aclWhoami().doOnNext(System.out::println).block();
}

credentialsProvider.stop();

credentialsUser1.shutdown();
credentialsUser2.shutdown();
// Shutdown Redis client and close connections
redisClient.shutdown();
}

Expand Down

0 comments on commit d49f9b8

Please sign in to comment.