From d49f9b8a6810ee1a0ca24ad88b34f25c332e75ed Mon Sep 17 00:00:00 2001 From: ggivo Date: Fri, 29 Nov 2024 18:42:27 +0200 Subject: [PATCH] Clean up - init TM on CredentialProvider create - CredentialProvider stop renamed to shutdow - extended example to use 2 separate users --- .../TokenBasedRedisCredentialsProvider.java | 33 +++++-- .../java/io/lettuce/core/ClientOptions.java | 13 ++- .../StatefulRedisClusterConnectionImpl.java | 10 +- .../core/AuthenticationIntegrationTests.java | 27 +++-- ...okenBasedRedisCredentialsProviderTest.java | 6 +- .../examples/TokenBasedAuthExample.java | 99 +++++++++++++------ 6 files changed, 122 insertions(+), 66 deletions(-) rename src/main/java/io/lettuce/{core => authx}/TokenBasedRedisCredentialsProvider.java (63%) diff --git a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java b/src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java similarity index 63% rename from src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java rename to src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java index 54d857b37d..e191dab7e4 100644 --- a/src/main/java/io/lettuce/core/TokenBasedRedisCredentialsProvider.java +++ b/src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java @@ -1,8 +1,9 @@ -package io.lettuce.core; +package io.lettuce.authx; +import io.lettuce.core.RedisCredentials; +import io.lettuce.core.StreamingCredentialsProvider; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; - import reactor.core.publisher.Sinks; import redis.clients.authentication.core.Token; import redis.clients.authentication.core.TokenAuthConfig; @@ -18,23 +19,25 @@ public class TokenBasedRedisCredentialsProvider implements StreamingCredentialsP public TokenBasedRedisCredentialsProvider(TokenAuthConfig tokenAuthConfig) { this(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(), tokenAuthConfig.getTokenManagerConfig())); + } public TokenBasedRedisCredentialsProvider(TokenManager tokenManager) { this.tokenManager = tokenManager; + initializeTokenManager(); } /** * Initialize the TokenManager and subscribe to token renewal events. */ - public void start() { + private void initializeTokenManager() { TokenListener listener = new TokenListener() { @Override public void onTokenRenewed(Token token) { String username = token.tryGet("oid"); char[] pass = token.getValue().toCharArray(); - RedisCredentials credentials = new StaticRedisCredentials(username, pass); + RedisCredentials credentials = RedisCredentials.just(username, pass); credentialsSink.tryEmitNext(credentials); } @@ -54,14 +57,28 @@ public void onError(Exception exception) { /** * Resolve the latest available credentials as a Mono. + *

+ * This method returns a Mono that emits the most recent set of Redis credentials. + * The Mono will complete once the credentials are emitted. + * If no credentials are available at the time of subscription, the Mono will wait until + * credentials are available. + * + * @return a Mono that emits the latest Redis credentials */ @Override public Mono resolveCredentials() { + return credentialsSink.asFlux().next(); } /** * Expose the Flux for all credential updates. + *

+ * This method returns a Flux that emits all updates to the Redis credentials. + * Subscribers will receive the latest credentials whenever they are updated. + * The Flux will continue to emit updates until the provider is shut down. + * + * @return a Flux that emits all updates to the Redis credentials */ @Override public Flux credentials() { @@ -70,9 +87,13 @@ public Flux credentials() { } /** - * Stop the adapter and clean up resources. + * Stop the credentials provider and clean up resources. + *

+ * This method stops the TokenManager and completes the credentials sink, + * ensuring that all resources are properly released. + * It should be called when the credentials provider is no longer needed. */ - public void stop() { + public void shutdown() { credentialsSink.tryEmitComplete(); tokenManager.stop(); } diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 9f1f1c33d9..9e44795000 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -19,13 +19,6 @@ */ package io.lettuce.core; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.ServiceConfigurationError; -import java.util.ServiceLoader; - import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.json.JsonParser; @@ -37,6 +30,12 @@ import io.lettuce.core.resource.ClientResources; import reactor.core.publisher.Mono; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.ServiceConfigurationError; +import java.util.ServiceLoader; /** * Client Options to control the behavior of {@link RedisClient}. * diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index b7851a5617..1b0e7b570e 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -194,7 +194,8 @@ public CompletableFuture> getConnectionAsync(Strin throw new RedisException("NodeId " + nodeId + " does not belong to the cluster"); } - AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider(); + AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter() + .getClusterConnectionProvider(); return provider.getConnectionAsync(connectionIntent, nodeId); } @@ -209,7 +210,8 @@ public StatefulRedisConnection getConnection(String host, int port, Connec public CompletableFuture> getConnectionAsync(String host, int port, ConnectionIntent connectionIntent) { - AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter().getClusterConnectionProvider(); + AsyncClusterConnectionProvider provider = (AsyncClusterConnectionProvider) getClusterDistributionChannelWriter() + .getClusterConnectionProvider(); return provider.getConnectionAsync(connectionIntent, host, port); } @@ -265,8 +267,8 @@ private RedisCommand preProcessCommand(RedisCommand comman } else { List stringArgs = CommandArgsAccessor.getStringArguments(command.getArgs()); - this.connectionState.setUserNamePassword( - stringArgs.stream().map(String::toCharArray).collect(Collectors.toList())); + this.connectionState + .setUserNamePassword(stringArgs.stream().map(String::toCharArray).collect(Collectors.toList())); } } }); diff --git a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java index c66b95ec83..0760545c7a 100644 --- a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java +++ b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java @@ -1,33 +1,31 @@ package io.lettuce.core; -import static io.lettuce.TestTags.INTEGRATION_TEST; -import static org.assertj.core.api.Assertions.*; - -import javax.inject.Inject; -import io.lettuce.test.Delay; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - +import io.lettuce.authx.TokenBasedRedisCredentialsProvider; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.output.StatusOutput; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandType; +import io.lettuce.test.Delay; import io.lettuce.test.LettuceExtension; import io.lettuce.test.WithPassword; import io.lettuce.test.condition.EnabledOnCommand; import io.lettuce.test.settings.TestSettings; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import reactor.core.publisher.Mono; -import redis.clients.authentication.core.*; +import redis.clients.authentication.core.SimpleToken; +import javax.inject.Inject; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Integration test for authentication. @@ -99,7 +97,6 @@ void streamingCredentialProvider(RedisClient client) { // streaming credentials provider that emits redis credentials which will trigger connection re-authentication // token manager is used to emit updated credentials TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager); - credentialsProvider.start(); RedisURI uri = RedisURI.builder().withTimeout(Duration.ofSeconds(1)).withClientName("streaming_cred_test") .withHost(TestSettings.host()).withPort(TestSettings.port()).withAuthentication(credentialsProvider).build(); diff --git a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java index f36085edfc..4772dde3f2 100644 --- a/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java +++ b/src/test/java/io/lettuce/core/TokenBasedRedisCredentialsProviderTest.java @@ -1,12 +1,13 @@ package io.lettuce.core; +import io.lettuce.authx.TokenBasedRedisCredentialsProvider; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -import redis.clients.authentication.core.*; +import redis.clients.authentication.core.SimpleToken; import java.time.Duration; import java.util.Collections; @@ -24,7 +25,6 @@ public void setUp() { // Use TestToken manager to emit tokens/errors on request tokenManager = new TestTokenManager(null, null); credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager); - credentialsProvider.start(); } @Test @@ -101,7 +101,7 @@ public void shouldCompleteAllSubscribersOnStop() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - credentialsProvider.stop(); + credentialsProvider.shutdown(); }).start(); StepVerifier.create(credentialsFlux1) diff --git a/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java b/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java index cc065af4a7..8f4571d316 100644 --- a/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java +++ b/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java @@ -1,60 +1,97 @@ package io.lettuce.examples; -import io.lettuce.core.*; + import io.lettuce.authx.TokenBasedRedisCredentialsProvider; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SocketOptions; +import io.lettuce.core.TimeoutOptions; import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.async.RedisAsyncCommands; -import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.codec.StringCodec; -import redis.clients.authentication.core.*; -import redis.clients.authentication.entraid.EntraIDTokenAuthConfig; +import redis.clients.authentication.core.IdentityProviderConfig; +import redis.clients.authentication.core.TokenAuthConfig; +import redis.clients.authentication.entraid.EntraIDTokenAuthConfigBuilder; +import java.time.Duration; import java.util.Collections; import java.util.Set; -import static org.junit.Assert.assertNotNull; - public class TokenBasedAuthExample { public static void main(String[] args) { // Configure TokenManager String authority = "https://login.microsoftonline.com/562f7bf2-f594-47bf-8ac3-a06514b5d434"; - String clientId = ""; // application id - String secret = ""; // client secret value Set scopes = Collections.singleton("https://redis.azure.com/.default"); - IdentityProviderConfig config = EntraIDTokenAuthConfig.builder().authority(authority).clientId(clientId).secret(secret) - .scopes(scopes).tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig(); - TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000) - .expirationRefreshRatio(0.1f).identityProviderConfig(config).build(); + String User1_clientId = System.getenv("USER1_CLIENT_ID"); + String User1_secret = System.getenv("USER1_SECRET"); + + String User2_clientId = System.getenv("USER2_CLIENT_ID"); + String User2_secret = System.getenv("USER2_SECRET"); + + //User 1 + // from redis-authx-entraind + IdentityProviderConfig config1 = EntraIDTokenAuthConfigBuilder.builder() + .authority(authority) + .clientId(User1_clientId) + .secret(User1_secret) + .scopes(scopes) + .tokenRequestExecTimeoutInMs(10000) + .build().getIdentityProviderConfig(); + // from redis-authx-core + TokenAuthConfig tokenAuthConfigUser1 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000) + .expirationRefreshRatio(0.1f).identityProviderConfig(config1).build(); + //Create credentials provider user1 + // TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there) + TokenBasedRedisCredentialsProvider credentialsUser1 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser1); + - // Create RedisURI and set custom credentials provider - TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenAuthConfig); - credentialsProvider.start(); + //User2 + // from redis-authx-entraind + IdentityProviderConfig config2 = EntraIDTokenAuthConfigBuilder.builder().authority(authority).clientId(User2_clientId) + .secret(User2_secret).scopes(scopes).tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig(); + // from redis-authx-core + TokenAuthConfig tokenAuthConfigUser2 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000) + .expirationRefreshRatio(0.1f).identityProviderConfig(config2).build(); + //Create credentials provider user2 + // TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there) + TokenBasedRedisCredentialsProvider credentialsUser2 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser2); - RedisURI redisURI = RedisURI.create("redis://40.115.58.0:19431"); - redisURI.setCredentialsProvider(credentialsProvider); + RedisClient rc = RedisClient.create(); + + //lettuce-core + RedisURI redisURI1 = RedisURI.create("redis://137.117.167.136:12002"); + redisURI1.setCredentialsProvider(credentialsUser1); + + RedisURI redisURI2 = RedisURI.create("redis://137.117.167.136:12002"); + redisURI2.setCredentialsProvider(credentialsUser2); // Create RedisClient - RedisClient redisClient = RedisClient.create(); + ClientOptions clientOptions = ClientOptions.builder() + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build()) + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(1))).build(); - // Create StatefulRedisConnectionImpl using connectAsync() - try (StatefulRedisConnection connection = redisClient.connect(StringCodec.UTF8, redisURI)) { + // RedisClient using user1 credentials by default + RedisClient redisClient = RedisClient.create(redisURI1); + redisClient.setOptions(clientOptions); - RedisCommands commands = connection.sync(); - commands.set("hello", "Hello, Redis!"); - String result = commands.get("hello"); - System.out.println("hello: " + result); + // create connection using default URI (authorised as user1) + try (StatefulRedisConnection user1 = + redisClient.connect(StringCodec.UTF8)) { + user1.reactive().aclWhoami().doOnNext(System.out::println).block(); } - // another connection using same credentials provider - try (StatefulRedisConnection connection = redisClient.connect(StringCodec.UTF8, redisURI);) { - RedisAsyncCommands commands = connection.async(); - commands.set("hello", "Hello, Redis!").thenCompose(s -> commands.get("hello")) - .thenAccept(result -> System.out.println("async hello: " + result)).toCompletableFuture().join(); + // another connection using different authorizations (user2 credentials provider) + try (StatefulRedisConnection user2 = redisClient.connect(StringCodec.UTF8, redisURI2);) { + user2.reactive().aclWhoami().doOnNext(System.out::println).block(); } - credentialsProvider.stop(); + + credentialsUser1.shutdown(); + credentialsUser2.shutdown(); + // Shutdown Redis client and close connections redisClient.shutdown(); }