diff --git a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java index 9c8b52849..1c2ae525b 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java @@ -58,8 +58,6 @@ import java.util.Set; import static io.lettuce.core.protocol.CommandType.EXEC; -import static io.lettuce.core.protocol.CommandType.GEORADIUS; -import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER; import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO; import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO; @@ -1140,13 +1138,13 @@ public RedisFuture> geopos(K key, V... members) { @Override public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name())); + return georadius_ro(key, longitude, latitude, distance, unit); } @Override public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs)); + return georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @Override @@ -1166,13 +1164,13 @@ protected RedisFuture>> georadius_ro(K key, double longitude, @Override public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name())); + return georadiusbymember_ro(key, member, distance, unit); } @Override public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs)); + return georadiusbymember_ro(key, member, distance, unit, geoArgs); } @Override diff --git a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java index eb7e911ca..6d6c82e1a 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java +++ b/src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java @@ -66,8 +66,6 @@ import java.util.function.Supplier; import static io.lettuce.core.protocol.CommandType.EXEC; -import static io.lettuce.core.protocol.CommandType.GEORADIUS; -import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER; import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO; import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO; @@ -1203,14 +1201,14 @@ public Flux> geopos(K key, V... members) { } @Override - public Flux georadius(K key, double longitude, double latitude, double distance, Unit unit) { - return createDissolvingFlux(() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name())); + public Flux georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { + return georadius_ro(key, longitude, latitude, distance, unit); } @Override - public Flux> georadius(K key, double longitude, double latitude, double distance, Unit unit, GeoArgs geoArgs) { - return createDissolvingFlux( - () -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs)); + public Flux> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, + GeoArgs geoArgs) { + return georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @Override @@ -1231,15 +1229,13 @@ protected Flux> georadius_ro(K key, double longitude, double latitu } @Override - public Flux georadiusbymember(K key, V member, double distance, Unit unit) { - return createDissolvingFlux( - () -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name())); + public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { + return georadiusbymember_ro(key, member, distance, unit); } @Override - public Flux> georadiusbymember(K key, V member, double distance, Unit unit, GeoArgs geoArgs) { - return createDissolvingFlux( - () -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs)); + public Flux> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { + return georadiusbymember_ro(key, member, distance, unit, geoArgs); } @Override diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java index 14295db78..06c9a3ee7 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java @@ -249,28 +249,6 @@ public RedisFuture flushdb(FlushMode flushMode) { .firstOfAsync(executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode))); } - @Override - public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, - GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public RedisFuture> keys(K pattern) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java index 0517feb4c..0ebb1475e 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java @@ -235,27 +235,6 @@ public Mono flushdb(FlushMode flushMode) { return Flux.merge(publishers.values()).last(); } - @Override - public Flux georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public Flux> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public Flux> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public Flux keys(K pattern) { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java index 28a2f972e..a9163f947 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubAsyncCommandsImpl.java @@ -65,28 +65,6 @@ public RedisClusterPubSubAsyncCommandsImpl(StatefulRedisPubSubConnection c super(connection, codec); } - @Override - public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - return super.georadius_ro(key, longitude, latitude, distance, unit); - } - - @Override - public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, - GeoArgs.Unit unit, GeoArgs geoArgs) { - return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); - } - - @Override - public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - return super.georadiusbymember_ro(key, member, distance, unit); - } - - @Override - public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, - GeoArgs geoArgs) { - return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); - } - @Override public StatefulRedisClusterPubSubConnectionImpl getStatefulConnection() { return (StatefulRedisClusterPubSubConnectionImpl) super.getStatefulConnection(); diff --git a/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java new file mode 100644 index 000000000..3a176e475 --- /dev/null +++ b/src/test/java/io/lettuce/core/commands/GeoMasterReplicaIntegrationTests.java @@ -0,0 +1,140 @@ +package io.lettuce.core.commands; + +import io.lettuce.core.*; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.masterreplica.MasterReplica; +import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection; +import io.lettuce.core.models.role.RedisInstance; +import io.lettuce.core.models.role.RoleParser; +import io.lettuce.test.LettuceExtension; +import io.lettuce.test.condition.EnabledOnCommand; +import io.lettuce.test.settings.TestSettings; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import static io.lettuce.TestTags.INTEGRATION_TEST; +import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * @author Mark Paluch + */ +@Tag(INTEGRATION_TEST) +@ExtendWith(LettuceExtension.class) +@EnabledOnCommand("GEOADD") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class GeoMasterReplicaIntegrationTests extends AbstractRedisClientTest { + + private StatefulRedisMasterReplicaConnection masterReplica; + + private RedisCommands upstream; + + private RedisCommands connection1; + + private RedisCommands connection2; + + @BeforeEach + void before() { + + RedisURI node1 = RedisURI.Builder.redis(host, TestSettings.port(3)).withDatabase(2).build(); + RedisURI node2 = RedisURI.Builder.redis(host, TestSettings.port(4)).withDatabase(2).build(); + + connection1 = client.connect(node1).sync(); + connection2 = client.connect(node2).sync(); + + RedisInstance node1Instance = RoleParser.parse(this.connection1.role()); + RedisInstance node2Instance = RoleParser.parse(this.connection2.role()); + + if (node1Instance.getRole().isUpstream() && node2Instance.getRole().isReplica()) { + upstream = connection1; + } else if (node2Instance.getRole().isUpstream() && node1Instance.getRole().isReplica()) { + upstream = connection2; + } else { + assumeTrue(false, + String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s", + node1Instance, node2Instance)); + } + upstream.flushall(); + + masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2)); + masterReplica.setReadFrom(ReadFrom.REPLICA); + + } + + @AfterEach + void after() { + + if (connection1 != null) { + connection1.getStatefulConnection().close(); + } + + if (connection2 != null) { + connection2.getStatefulConnection().close(); + } + + if (masterReplica != null) { + masterReplica.close(); + } + } + + @BeforeEach + void setUp() { + this.redis.flushall(); + } + + @Test + void georadiusReadFromReplica() { + + prepareGeo(upstream); + + upstream.waitForReplication(1, 1000); + + Set georadius = masterReplica.sync().georadius(key, 8.6582861, 49.5285695, 1, GeoArgs.Unit.km); + assertThat(georadius).hasSize(1).contains("Weinheim"); + } + + @Test + void georadiusWithArgsReadFromReplica() { + + prepareGeo(upstream); + + upstream.waitForReplication(1, 1000); + + GeoArgs geoArgs = new GeoArgs().withHash().withCoordinates().withDistance().withCount(1).desc(); + + List> result = masterReplica.sync().georadius(key, 8.665351, 49.553302, 5, GeoArgs.Unit.km, geoArgs); + assertThat(result).hasSize(1); + } + + @Test + void georadiusbymemberReadFromReplica() { + + prepareGeo(upstream); + upstream.waitForReplication(1, 100); + + Set empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km); + assertThat(empty).hasSize(1).contains("Bahn"); + } + + @Test + void georadiusbymemberWithArgsReadFromReplica() { + + prepareGeo(upstream); + upstream.waitForReplication(1, 100); + + List> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km, + new GeoArgs().withHash().withCoordinates().withDistance().desc()); + assertThat(empty).isNotEmpty(); + } + + protected void prepareGeo(RedisCommands redis) { + redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim"); + redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn"); + } + +}