diff --git a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java index 03ad4fa465..22142db25c 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterDistributionChannelWriter.java @@ -33,10 +33,12 @@ import io.lettuce.core.RedisChannelHandler; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.cluster.event.AskRedirectionEvent; import io.lettuce.core.cluster.event.MovedRedirectionEvent; import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.event.Event; import io.lettuce.core.internal.Futures; @@ -110,7 +112,6 @@ private RedisCommand doWrite(RedisCommand command) { ClusterCommand clusterCommand = (ClusterCommand) command; if (clusterCommand.isMoved() || clusterCommand.isAsk()) { - HostAndPort target; boolean asking; ByteBuffer firstEncodedKey = clusterCommand.getArgs().getFirstEncodedKey(); @@ -125,7 +126,7 @@ private RedisCommand doWrite(RedisCommand command) { if (clusterCommand.isMoved()) { - target = getMoveTarget(clusterCommand.getError()); + target = getMoveTarget(partitions, clusterCommand.getError()); clusterEventListener.onMovedRedirection(); asking = false; @@ -329,8 +330,8 @@ private static RedisChannelWriter getWriterToUse(RedisChannelWriter writer) { /** * Optimization: Determine command intents and optimize for bulk execution preferring one node. *

- * If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than one connectionIntent, then - * use {@link ConnectionIntent#WRITE}. + * If there is only one connectionIntent, then we take the connectionIntent derived from the commands. If there is more than + * one connectionIntent, then use {@link ConnectionIntent#WRITE}. * * @param commands {@link Collection} of {@link RedisCommand commands}. * @return the connectionIntent. @@ -368,7 +369,7 @@ private static ConnectionIntent getIntent(ProtocolKeyword type) { return ReadOnlyCommands.isReadOnlyCommand(type) ? ConnectionIntent.READ : ConnectionIntent.WRITE; } - static HostAndPort getMoveTarget(String errorMessage) { + static HostAndPort getMoveTarget(Partitions partitions, String errorMessage) { LettuceAssert.notEmpty(errorMessage, "ErrorMessage must not be empty"); LettuceAssert.isTrue(errorMessage.startsWith(CommandKeyword.MOVED.name()), @@ -376,8 +377,30 @@ static HostAndPort getMoveTarget(String errorMessage) { String[] movedMessageParts = errorMessage.split(" "); LettuceAssert.isTrue(movedMessageParts.length >= 3, "ErrorMessage must consist of 3 tokens (" + errorMessage + ")"); + String redirectTarget = movedMessageParts[2]; - return HostAndPort.parseCompat(movedMessageParts[2]); + if (redirectTarget.startsWith(":")) { + + // unknown redirection hostname. We attempt discovering the hostname from Partitions + + int redirectPort = Integer.parseInt(redirectTarget.substring(1)); + for (RedisClusterNode partition : partitions) { + + RedisURI uri = partition.getUri(); + if (uri.getPort() == redirectPort) { + return HostAndPort.of(uri.getHost(), redirectPort); + } + } + + int slot = Integer.parseInt(movedMessageParts[1]); + RedisClusterNode partition = partitions.getPartitionBySlot(slot); + if (partition != null) { + RedisURI uri = partition.getUri(); + return HostAndPort.of(uri.getHost(), redirectPort); + } + } + + return HostAndPort.parseCompat(redirectTarget); } static HostAndPort getAskTarget(String errorMessage) { diff --git a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java index 1d0cf45d76..eeb670924e 100644 --- a/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/ClusterDistributionChannelWriterUnitTests.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.models.partitions.Partitions; +import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -116,16 +119,38 @@ void shouldParseIPv6AskTargetCorrectly() { @Test void shouldParseMovedTargetCorrectly() { - HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 127.0.0.1:6381"); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 127.0.0.1:6381"); assertThat(moveTarget.getHostText()).isEqualTo("127.0.0.1"); assertThat(moveTarget.getPort()).isEqualTo(6381); } + @Test + void shouldParseMovedTargetWithoutHostnameCorrectly() { + + Partitions partitions = new Partitions(); + partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:6381"), "foo", false,null,0,0,0,Collections.emptyList(), Collections.emptySet())); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381"); + + assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4"); + assertThat(moveTarget.getPort()).isEqualTo(6381); + } + + @Test + void shouldParseMovedTargetWithoutHostnameUsingSlotFallbackCorrectly() { + + Partitions partitions = new Partitions(); + partitions.add(new RedisClusterNode(RedisURI.create("redis://1.2.3.4:5678"), "foo", false,null,0,0,0, Collections.singletonList(1234), Collections.emptySet())); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(partitions, "MOVED 1234 :6381"); + + assertThat(moveTarget.getHostText()).isEqualTo("1.2.3.4"); + assertThat(moveTarget.getPort()).isEqualTo(6381); + } + @Test void shouldParseIPv6MovedTargetCorrectly() { - HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget("MOVED 1234-2020 1:2:3:4::6:6381"); + HostAndPort moveTarget = ClusterDistributionChannelWriter.getMoveTarget(new Partitions(), "MOVED 1234-2020 1:2:3:4::6:6381"); assertThat(moveTarget.getHostText()).isEqualTo("1:2:3:4::6"); assertThat(moveTarget.getPort()).isEqualTo(6381);