diff --git a/src/main/java/io/lettuce/core/RedisPublisher.java b/src/main/java/io/lettuce/core/RedisPublisher.java index 522de2dafb..186e7db765 100644 --- a/src/main/java/io/lettuce/core/RedisPublisher.java +++ b/src/main/java/io/lettuce/core/RedisPublisher.java @@ -490,14 +490,14 @@ private boolean read(RedisSubscription subscription) { if (subscription.changeState(this, READING)) { - subscription.readAndPublish(); + boolean hasDemand = subscription.readAndPublish(); if (subscription.data.isEmpty() && subscription.allDataRead) { subscription.onAllDataRead(); return true; } - if (subscription.readAndPublish()) { + if (hasDemand) { subscription.changeState(READING, DEMAND); subscription.checkOnDataAvailable(); } else { @@ -514,10 +514,7 @@ private boolean read(RedisSubscription subscription) { READING { @Override void request(RedisSubscription subscription, long n) { - - if (!Operators.request(RedisSubscription.DEMAND, subscription, n)) { - onError(subscription, Exceptions.nullOrNegativeRequestException(n)); - } + DEMAND.request(subscription, n); } }, diff --git a/src/test/java/io/lettuce/core/cluster/AdvancedClusterReactiveTest.java b/src/test/java/io/lettuce/core/cluster/AdvancedClusterReactiveTest.java index ffa0115e03..134ed19e6e 100644 --- a/src/test/java/io/lettuce/core/cluster/AdvancedClusterReactiveTest.java +++ b/src/test/java/io/lettuce/core/cluster/AdvancedClusterReactiveTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -32,7 +33,6 @@ import reactor.test.StepVerifier; import io.lettuce.KeysAndValues; import io.lettuce.RedisConditions; -import io.lettuce.Wait; import io.lettuce.core.*; import io.lettuce.core.api.sync.RedisCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; @@ -234,21 +234,21 @@ public void keys() { } @Test - public void keysDoesNotRunIntoRaceConditions() { + public void keysDoesNotRunIntoRaceConditions() throws Exception { List futures = new ArrayList<>(); RedisClusterAsyncCommands async = commands.getStatefulConnection().async(); + async.flushall().get(); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 1000; i++) { futures.add(async.set("key-" + i, "value-" + i)); } futures.forEach(f -> f.toCompletableFuture().join()); - for (int i = 0; i < 20; i++) { - + for (int i = 0; i < 100; i++) { CompletableFuture future = commands.keys("*").count().toFuture(); - Wait.untilTrue(future::isDone).waitOrTimeout(); + future.get(5, TimeUnit.SECONDS); } }