From 31341f161817091c0beaf9eb82a4bc3359ff1cd6 Mon Sep 17 00:00:00 2001 From: ggivo Date: Wed, 18 Dec 2024 11:22:47 +0200 Subject: [PATCH] Add authentication handler to ClusterPubSub connections --- .../core/RedisAuthenticationHandler.java | 6 +++++ .../core/cluster/RedisClusterClient.java | 2 ++ .../RedisClientConnectIntegrationTests.java | 18 +++++++++++++++ .../ClusterClientOptionsIntegrationTests.java | 23 +++++++++++++++++++ 4 files changed, 49 insertions(+) diff --git a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java index 7a05a7a60..5a3733381 100644 --- a/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java +++ b/src/main/java/io/lettuce/core/RedisAuthenticationHandler.java @@ -89,6 +89,12 @@ public static RedisAuthenticationHandler createHandler(StatefulRedi RedisCredentialsProvider credentialsProvider, Boolean isPubSubConnection, ClientOptions options) { if (isSupported(options)) { + + if (isPubSubConnection && options.getConfiguredProtocolVersion() == ProtocolVersion.RESP2) { + throw new RedisConnectionException( + "Renewable credentials are not supported with RESP2 protocol on a pub/sub connection."); + } + return new RedisAuthenticationHandler<>(connection, credentialsProvider, isPubSubConnection); } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index e62fbc731..e9e8a2ede 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -780,6 +780,8 @@ private CompletableFuture> con clusterWriter.setClusterConnectionProvider(pooledClusterConnectionProvider); connection.setPartitions(partitions); + connection.setAuthenticationHandler( + createHandler(connection, getFirstUri().getCredentialsProvider(), true, getOptions())); Supplier commandHandlerSupplier = () -> new PubSubCommandHandler<>(getClusterClientOptions(), getResources(), codec, endpoint); diff --git a/src/test/java/io/lettuce/core/RedisClientConnectIntegrationTests.java b/src/test/java/io/lettuce/core/RedisClientConnectIntegrationTests.java index 4e7c281e4..416ffa3a4 100644 --- a/src/test/java/io/lettuce/core/RedisClientConnectIntegrationTests.java +++ b/src/test/java/io/lettuce/core/RedisClientConnectIntegrationTests.java @@ -32,6 +32,7 @@ import javax.inject.Inject; +import io.lettuce.core.protocol.ProtocolVersion; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Tag; @@ -219,6 +220,23 @@ void connectPubSubCodecSentinelMissingHostAndSocketUri() { assertThatThrownBy(() -> client.connectPubSub(UTF8, invalidSentinel())).isInstanceOf(IllegalArgumentException.class); } + @Test + void connectPubSubAsyncReauthNotSupportedWithRESP2() { + ClientOptions.ReauthenticateBehavior reauth = client.getOptions().getReauthenticateBehaviour(); + ProtocolVersion protocolVersion = client.getOptions().getConfiguredProtocolVersion(); + try { + client.setOptions(client.getOptions().mutate().protocolVersion(ProtocolVersion.RESP2) + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build()); + + RedisURI redisURI = redis(host, port).build(); + assertThatThrownBy(() -> client.connectPubSubAsync(UTF8, redisURI)).isInstanceOf(RedisConnectionException.class); + + } finally { + client.setOptions( + client.getOptions().mutate().protocolVersion(protocolVersion).reauthenticateBehavior(reauth).build()); + } + } + /* * Sentinel Stateful */ diff --git a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsIntegrationTests.java index 6eddfa2e0..94cbbe76b 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterClientOptionsIntegrationTests.java @@ -1,6 +1,7 @@ package io.lettuce.core.cluster; import static io.lettuce.TestTags.INTEGRATION_TEST; +import static io.lettuce.core.codec.StringCodec.UTF8; import static org.assertj.core.api.Assertions.*; import java.time.Duration; @@ -8,6 +9,9 @@ import javax.inject.Inject; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.protocol.ProtocolVersion; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -80,4 +84,23 @@ void shouldApplyTimeoutOptionsToPubSubClusterConnection() throws InterruptedExce Thread.sleep(300); } + @Test + void connectPubSubAsyncReauthNotSupportedWithRESP2() { + + ClientOptions.ReauthenticateBehavior reauth = clusterClient.getClusterClientOptions().getReauthenticateBehaviour(); + ProtocolVersion protocolVersion = clusterClient.getClusterClientOptions().getConfiguredProtocolVersion(); + + try { + clusterClient.setOptions(clusterClient.getClusterClientOptions().mutate().protocolVersion(ProtocolVersion.RESP2) + .reauthenticateBehavior(ClientOptions.ReauthenticateBehavior.ON_NEW_CREDENTIALS).build()); + assertThatThrownBy(() -> clusterClient.connectPubSub(UTF8)).isInstanceOf(RedisConnectionException.class); + + } finally { + + clusterClient.setOptions(clusterClient.getClusterClientOptions().mutate().protocolVersion(protocolVersion) + .reauthenticateBehavior(reauth).build()); + } + + } + }