From 77c5058d382da0b4a0d07c34d5221adf415eb324 Mon Sep 17 00:00:00 2001 From: ggivo Date: Mon, 2 Dec 2024 18:13:11 +0200 Subject: [PATCH] Token based auth integration with core extension Provide a way for lettuce clients to use token-based authentication. TOKENs come with a TTL. After a Redis client authenticates with a TOKEN, if they didn't renew their authentication we need to evict (close) them. The suggested approach is to leverage the existing CredentialsProvider and add support for streaming credentials to handle token refresh scenarios. Each time a new token is received connection is reauthenticated. --- pom.xml | 35 ++++- .../TokenBasedRedisCredentialsProvider.java | 102 ++++++++++++ ...okenBasedRedisCredentialsProviderTest.java | 146 ++++++++++++++++++ .../core/AuthenticationIntegrationTests.java | 46 ++++++ .../io/lettuce/core/TestTokenManager.java | 50 ++++++ ...gCredentialsProviderlIntegrationTests.java | 1 - .../examples/TokenBasedAuthExample.java | 136 ++++++++++++++++ 7 files changed, 513 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java create mode 100644 src/test/java/io/lettuce/authx/TokenBasedRedisCredentialsProviderTest.java create mode 100644 src/test/java/io/lettuce/core/TestTokenManager.java create mode 100644 src/test/java/io/lettuce/examples/TokenBasedAuthExample.java diff --git a/pom.xml b/pom.xml index e92ed704e..e90411a01 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,19 @@ HEAD + + + sonatype-snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + + false + + + true + + + + ossrh @@ -173,12 +186,30 @@ pom import - + + redis.clients.authentication + redis-authx-core + 0.1.0-SNAPSHOT + + + redis.clients.authentication + redis-authx-entraid + 0.1.0-SNAPSHOT + test + - + + redis.clients.authentication + redis-authx-core + + + redis.clients.authentication + redis-authx-entraid + test + diff --git a/src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java b/src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java new file mode 100644 index 000000000..ec8a36372 --- /dev/null +++ b/src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java @@ -0,0 +1,102 @@ +package io.lettuce.authx; + +import io.lettuce.core.RedisCredentials; +import io.lettuce.core.StreamingCredentialsProvider; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import redis.clients.authentication.core.Token; +import redis.clients.authentication.core.TokenAuthConfig; +import redis.clients.authentication.core.TokenListener; +import redis.clients.authentication.core.TokenManager; + +public class TokenBasedRedisCredentialsProvider implements StreamingCredentialsProvider { + + private final TokenManager tokenManager; + + private final Sinks.Many credentialsSink = Sinks.many().replay().latest(); + + public TokenBasedRedisCredentialsProvider(TokenAuthConfig tokenAuthConfig) { + this(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(), + tokenAuthConfig.getTokenManagerConfig())); + + } + + public TokenBasedRedisCredentialsProvider(TokenManager tokenManager) { + this.tokenManager = tokenManager; + initializeTokenManager(); + } + + /** + * Initialize the TokenManager and subscribe to token renewal events. + */ + private void initializeTokenManager() { + TokenListener listener = new TokenListener() { + + @Override + public void onTokenRenewed(Token token) { + try { + String username = token.tryGet("oid"); + char[] pass = token.getValue().toCharArray(); + RedisCredentials credentials = RedisCredentials.just(username, pass); + credentialsSink.tryEmitNext(credentials); + } catch (Exception e) { + credentialsSink.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST); + } + } + + @Override + public void onError(Exception exception) { + credentialsSink.tryEmitError(exception); + } + + }; + + try { + tokenManager.start(listener, false); + } catch (Exception e) { + credentialsSink.tryEmitError(e); + } + } + + /** + * Resolve the latest available credentials as a Mono. + *

+ * This method returns a Mono that emits the most recent set of Redis credentials. The Mono will complete once the + * credentials are emitted. If no credentials are available at the time of subscription, the Mono will wait until + * credentials are available. + * + * @return a Mono that emits the latest Redis credentials + */ + @Override + public Mono resolveCredentials() { + + return credentialsSink.asFlux().next(); + } + + /** + * Expose the Flux for all credential updates. + *

+ * This method returns a Flux that emits all updates to the Redis credentials. Subscribers will receive the latest + * credentials whenever they are updated. The Flux will continue to emit updates until the provider is shut down. + * + * @return a Flux that emits all updates to the Redis credentials + */ + @Override + public Flux credentials() { + + return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials + } + + /** + * Stop the credentials provider and clean up resources. + *

+ * This method stops the TokenManager and completes the credentials sink, ensuring that all resources are properly released. + * It should be called when the credentials provider is no longer needed. + */ + public void shutdown() { + credentialsSink.tryEmitComplete(); + tokenManager.stop(); + } + +} diff --git a/src/test/java/io/lettuce/authx/TokenBasedRedisCredentialsProviderTest.java b/src/test/java/io/lettuce/authx/TokenBasedRedisCredentialsProviderTest.java new file mode 100644 index 000000000..2d43bb9b1 --- /dev/null +++ b/src/test/java/io/lettuce/authx/TokenBasedRedisCredentialsProviderTest.java @@ -0,0 +1,146 @@ +package io.lettuce.authx; + +import io.lettuce.core.RedisCredentials; +import io.lettuce.core.TestTokenManager; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import redis.clients.authentication.core.SimpleToken; + +import java.time.Duration; +import java.util.Collections; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TokenBasedRedisCredentialsProviderTest { + + private TestTokenManager tokenManager; + + private TokenBasedRedisCredentialsProvider credentialsProvider; + + @BeforeEach + public void setUp() { + // Use TestToken manager to emit tokens/errors on request + tokenManager = new TestTokenManager(null, null); + credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager); + } + + @Test + public void shouldReturnPreviouslyEmittedTokenWhenResolved() { + tokenManager.emitToken(testToken("test-user", "token-1")); + + Mono credentials = credentialsProvider.resolveCredentials(); + + StepVerifier.create(credentials).assertNext(actual -> { + assertThat(actual.getUsername()).isEqualTo("test-user"); + assertThat(new String(actual.getPassword())).isEqualTo("token-1"); + }).verifyComplete(); + } + + @Test + public void shouldReturnLatestEmittedTokenWhenResolved() { + tokenManager.emitToken(testToken("test-user", "token-2")); + tokenManager.emitToken(testToken("test-user", "token-3")); // Latest token + + Mono credentials = credentialsProvider.resolveCredentials(); + + StepVerifier.create(credentials).assertNext(actual -> { + assertThat(actual.getUsername()).isEqualTo("test-user"); + assertThat(new String(actual.getPassword())).isEqualTo("token-3"); + }).verifyComplete(); + } + + @Test + public void shouldReturnTokenEmittedBeforeSubscription() { + + tokenManager.emitToken(testToken("test-user", "token-1")); + + // Test resolveCredentials + Mono credentials1 = credentialsProvider.resolveCredentials(); + + StepVerifier.create(credentials1).assertNext(actual -> { + assertThat(actual.getUsername()).isEqualTo("test-user"); + assertThat(new String(actual.getPassword())).isEqualTo("token-1"); + }).verifyComplete(); + + // Emit second token and subscribe another + tokenManager.emitToken(testToken("test-user", "token-2")); + tokenManager.emitToken(testToken("test-user", "token-3")); + Mono credentials2 = credentialsProvider.resolveCredentials(); + StepVerifier.create(credentials2).assertNext(actual -> { + assertThat(actual.getUsername()).isEqualTo("test-user"); + assertThat(new String(actual.getPassword())).isEqualTo("token-3"); + }).verifyComplete(); + } + + @Test + public void shouldWaitForAndReturnTokenWhenEmittedLater() { + Mono result = credentialsProvider.resolveCredentials(); + + tokenManager.emitTokenWithDelay(testToken("test-user", "delayed-token"), 100); // Emit token after 100ms + StepVerifier.create(result) + .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("delayed-token")) + .verifyComplete(); + } + + @Test + public void shouldCompleteAllSubscribersOnStop() { + Flux credentialsFlux1 = credentialsProvider.credentials(); + Flux credentialsFlux2 = credentialsProvider.credentials(); + + Disposable subscription1 = credentialsFlux1.subscribe(); + Disposable subscription2 = credentialsFlux2.subscribe(); + + tokenManager.emitToken(testToken("test-user", "token-1")); + + new Thread(() -> { + try { + Thread.sleep(100); // Delay of 100 milliseconds + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + credentialsProvider.shutdown(); + }).start(); + + StepVerifier.create(credentialsFlux1) + .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token-1")) + .verifyComplete(); + + StepVerifier.create(credentialsFlux2) + .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token-1")) + .verifyComplete(); + } + + @Test + public void shouldPropagateMultipleTokensOnStream() { + + Flux result = credentialsProvider.credentials(); + StepVerifier.create(result).then(() -> tokenManager.emitToken(testToken("test-user", "token1"))) + .then(() -> tokenManager.emitToken(testToken("test-user", "token2"))) + .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token1")) + .assertNext(credentials -> assertThat(String.valueOf(credentials.getPassword())).isEqualTo("token2")) + .thenCancel().verify(Duration.ofMillis(100)); + } + + @Test + public void shouldHandleTokenRequestErrorGracefully() { + Exception simulatedError = new RuntimeException("Token request failed"); + tokenManager.emitError(simulatedError); + + Flux result = credentialsProvider.credentials(); + + StepVerifier.create(result).expectErrorMatches( + throwable -> throwable instanceof RuntimeException && "Token request failed".equals(throwable.getMessage())) + .verify(); + } + + private SimpleToken testToken(String username, String value) { + return new SimpleToken(value, System.currentTimeMillis() + 5000, // expires in 5 seconds + System.currentTimeMillis(), Collections.singletonMap("oid", username)); + + } + +} diff --git a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java index 1890113f7..e54d935d0 100644 --- a/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java +++ b/src/test/java/io/lettuce/core/AuthenticationIntegrationTests.java @@ -5,6 +5,7 @@ import javax.inject.Inject; +import io.lettuce.authx.TokenBasedRedisCredentialsProvider; import io.lettuce.core.event.command.CommandListener; import io.lettuce.core.event.command.CommandSucceededEvent; import io.lettuce.core.protocol.RedisCommand; @@ -24,9 +25,12 @@ import io.lettuce.test.condition.EnabledOnCommand; import io.lettuce.test.settings.TestSettings; import reactor.core.publisher.Mono; +import redis.clients.authentication.core.SimpleToken; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -121,6 +125,43 @@ void streamingCredentialProvider(RedisClient client) { client.getOptions().mutate().reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.DEFAULT).build()); } + @Test + @Inject + void tokenBasedCredentialProvider(RedisClient client) { + + TestCommandListener listener = new TestCommandListener(); + client.addListener(listener); + client.setOptions(client.getOptions().mutate() + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build()); + + TestTokenManager tokenManager = new TestTokenManager(null, null); + TokenBasedRedisCredentialsProvider credentialsProvider = new TokenBasedRedisCredentialsProvider(tokenManager); + + // Build RedisURI with streaming credentials provider + RedisURI uri = RedisURI.builder().withHost(TestSettings.host()).withPort(TestSettings.port()) + .withClientName("streaming_cred_test").withAuthentication(credentialsProvider) + .withTimeout(Duration.ofSeconds(5)).build(); + tokenManager.emitToken(testToken(TestSettings.username(), TestSettings.password().toString().toCharArray())); + + StatefulRedisConnection connection = client.connect(StringCodec.UTF8, uri); + assertThat(connection.sync().aclWhoami()).isEqualTo(TestSettings.username()); + + // rotate the credentials + tokenManager.emitToken(testToken("steave", "foobared".toCharArray())); + + Awaitility.await().atMost(Duration.ofSeconds(1)).until(() -> listener.succeeded.stream() + .anyMatch(command -> isAuthCommandWithCredentials(command, "steave", "foobared".toCharArray()))); + + // verify that the connection is re-authenticated with the new user credentials + assertThat(connection.sync().aclWhoami()).isEqualTo("steave"); + + credentialsProvider.shutdown(); + connection.close(); + client.removeListener(listener); + client.setOptions( + client.getOptions().mutate().reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.DEFAULT).build()); + } + static class TestCommandListener implements CommandListener { final List> succeeded = new ArrayList<>(); @@ -142,4 +183,9 @@ private boolean isAuthCommandWithCredentials(RedisCommand command, Stri return false; } + private SimpleToken testToken(String username, char[] password) { + return new SimpleToken(String.valueOf(password), Instant.now().plusMillis(500).toEpochMilli(), + Instant.now().toEpochMilli(), Collections.singletonMap("oid", username)); + } + } diff --git a/src/test/java/io/lettuce/core/TestTokenManager.java b/src/test/java/io/lettuce/core/TestTokenManager.java new file mode 100644 index 000000000..391b6302b --- /dev/null +++ b/src/test/java/io/lettuce/core/TestTokenManager.java @@ -0,0 +1,50 @@ +package io.lettuce.core; + +import redis.clients.authentication.core.IdentityProvider; +import redis.clients.authentication.core.SimpleToken; +import redis.clients.authentication.core.TokenListener; +import redis.clients.authentication.core.TokenManager; +import redis.clients.authentication.core.TokenManagerConfig; + +public class TestTokenManager extends TokenManager { + + private TokenListener listener; + + public TestTokenManager(IdentityProvider identityProvider, TokenManagerConfig tokenManagerConfig) { + super(identityProvider, tokenManagerConfig); + } + + @Override + public void start(TokenListener listener, boolean waitForToken) { + this.listener = listener; + } + + @Override + public void stop() { + // Cleanup logic if needed + } + + public void emitToken(SimpleToken token) { + if (listener != null) { + listener.onTokenRenewed(token); + } + } + + public void emitError(Exception exception) { + if (listener != null) { + listener.onError(exception); + } + } + + public void emitTokenWithDelay(SimpleToken token, long delayMillis) { + new Thread(() -> { + try { + Thread.sleep(delayMillis); + emitToken(token); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }).start(); + } + +} diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java index 3c8f20a96..908ec7583 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterStreamingCredentialsProviderlIntegrationTests.java @@ -140,7 +140,6 @@ void nodeSelectionApiShouldWork() { @Test void shouldPerformNodeConnectionReauth() { ClusterClientOptions origClientOptions = redisClient.getClusterClientOptions(); - origClientOptions.mutate().reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build(); redisClient.setOptions(origClientOptions.mutate() .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build()); diff --git a/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java b/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java new file mode 100644 index 000000000..26b60caa8 --- /dev/null +++ b/src/test/java/io/lettuce/examples/TokenBasedAuthExample.java @@ -0,0 +1,136 @@ +package io.lettuce.examples; + +import io.lettuce.authx.TokenBasedRedisCredentialsProvider; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisClient; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SocketOptions; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.sync.NodeSelection; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import io.lettuce.core.codec.StringCodec; +import redis.clients.authentication.core.IdentityProviderConfig; +import redis.clients.authentication.core.TokenAuthConfig; +import redis.clients.authentication.entraid.EntraIDTokenAuthConfigBuilder; + +import java.time.Duration; +import java.util.Collections; +import java.util.Set; + +public class TokenBasedAuthExample { + + public static final String REDIS_URI = "redis://108.143.40.70:12002"; + + public static void main(String[] args) throws Exception { + // Configure TokenManager + String authority = "https://login.microsoftonline.com/562f7bf2-f594-47bf-8ac3-a06514b5d434"; + Set scopes = Collections.singleton("https://redis.azure.com/.default"); + + String User1_clientId = System.getenv("USER1_CLIENT_ID"); + String User1_secret = System.getenv("USER1_SECRET"); + + String User2_clientId = System.getenv("USER2_CLIENT_ID"); + String User2_secret = System.getenv("USER2_SECRET"); + // User 1 + // from redis-authx-entraind + IdentityProviderConfig config1; + try (EntraIDTokenAuthConfigBuilder builder = EntraIDTokenAuthConfigBuilder.builder()) { + config1 = builder.authority(authority).clientId(User1_clientId).secret(User1_secret).scopes(scopes) + .tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig(); + } + + // from redis-authx-core + TokenAuthConfig tokenAuthConfigUser1 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000) + .expirationRefreshRatio(0.1f).identityProviderConfig(config1).build(); + // Create credentials provider user1 + TokenBasedRedisCredentialsProvider credentialsUser1 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser1); + + // User2 + // from redis-authx-entraind + IdentityProviderConfig config2 = EntraIDTokenAuthConfigBuilder.builder().authority(authority).clientId(User2_clientId) + .secret(User2_secret).scopes(scopes).tokenRequestExecTimeoutInMs(10000).build().getIdentityProviderConfig(); + // from redis-authx-core + TokenAuthConfig tokenAuthConfigUser2 = TokenAuthConfig.builder().tokenRequestExecTimeoutInMs(10000) + .expirationRefreshRatio(0.1f).identityProviderConfig(config2).build(); + // Create credentials provider user2 + // TODO: lettuce-autx-tba ( TokenBasedRedisCredentialsProvider & Example there) + TokenBasedRedisCredentialsProvider credentialsUser2 = new TokenBasedRedisCredentialsProvider(tokenAuthConfigUser2); + + // lettuce-core + RedisURI redisURI1 = RedisURI.create(REDIS_URI); + redisURI1.setCredentialsProvider(credentialsUser1); + + RedisURI redisURI2 = RedisURI.create(REDIS_URI); + redisURI2.setCredentialsProvider(credentialsUser2); + + // Create RedisClient + ClientOptions clientOptions = ClientOptions.builder() + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build()) + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(1))) + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build(); + try { + + // RedisClient using user1 credentials by default + RedisClient redisClient = RedisClient.create(redisURI1); + redisClient.setOptions(clientOptions); + + // create connection using default URI (authorised as user1) + try (StatefulRedisConnection user1 = redisClient.connect(StringCodec.UTF8)) { + + user1.reactive().aclWhoami().doOnNext(System.out::println).block(); + } + + // another connection using different authorizations (user2 credentials provider) + try (StatefulRedisConnection user2 = redisClient.connect(StringCodec.UTF8, redisURI2);) { + user2.reactive().aclWhoami().doOnNext(System.out::println).block(); + } + + // Shutdown Redis client and close connections + redisClient.shutdown(); + + ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() + .socketOptions(SocketOptions.builder().connectTimeout(Duration.ofSeconds(5)).build()) + .disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS) + .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(1))) + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build(); + + // RedisClient using user1 credentials by default + RedisClusterClient redisClusterClient = RedisClusterClient.create(redisURI1); + redisClusterClient.setOptions(clusterClientOptions); + + // create connection using default URI (authorised as user1) + try (StatefulRedisClusterConnection clusterConnection = redisClusterClient.connect(StringCodec.UTF8)) { + + String info = clusterConnection.sync().clusterInfo(); + System.out.println("Cluster Info :" + info); + + String nodes = clusterConnection.sync().clusterNodes(); + System.out.println("Cluster Nodes :" + nodes); + + clusterConnection.sync().set("cluster-key", "cluster-value"); + System.out.println("set " + clusterConnection.sync().get("cluster-key")); + + RedisAdvancedClusterCommands sync = clusterConnection.sync(); + NodeSelection upstream = sync.upstream(); + + upstream.commands().clientId().forEach((v) -> { System.out.println("Client Id : " + v);}); + + System.out.println(" whoami :" + clusterConnection.getConnection(clusterConnection.getPartitions().getPartition(0).getNodeId()).sync() + .aclWhoami()); + } + // Shutdown Redis client and close connections + redisClusterClient.shutdown(); + } finally { + credentialsUser1.shutdown(); + credentialsUser2.shutdown(); + + } + + } + +}