Skip to content

Commit

Permalink
Fix race in RedisPublisher while reading state #634
Browse files Browse the repository at this point in the history
  • Loading branch information
mp911de committed Oct 26, 2017
1 parent 33f77ba commit e23235c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
9 changes: 3 additions & 6 deletions src/main/java/io/lettuce/core/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,14 @@ private boolean read(RedisSubscription<?> subscription) {

if (subscription.changeState(this, READING)) {

subscription.readAndPublish();
boolean hasDemand = subscription.readAndPublish();

if (subscription.data.isEmpty() && subscription.allDataRead) {
subscription.onAllDataRead();
return true;
}

if (subscription.readAndPublish()) {
if (hasDemand) {
subscription.changeState(READING, DEMAND);
subscription.checkOnDataAvailable();
} else {
Expand All @@ -514,10 +514,7 @@ private boolean read(RedisSubscription<?> subscription) {
READING {
@Override
void request(RedisSubscription<?> subscription, long n) {

if (!Operators.request(RedisSubscription.DEMAND, subscription, n)) {
onError(subscription, Exceptions.nullOrNegativeRequestException(n));
}
DEMAND.request(subscription, n);
}
},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -32,7 +33,6 @@
import reactor.test.StepVerifier;
import io.lettuce.KeysAndValues;
import io.lettuce.RedisConditions;
import io.lettuce.Wait;
import io.lettuce.core.*;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
Expand Down Expand Up @@ -234,21 +234,21 @@ public void keys() {
}

@Test
public void keysDoesNotRunIntoRaceConditions() {
public void keysDoesNotRunIntoRaceConditions() throws Exception {

List<RedisFuture> futures = new ArrayList<>();
RedisClusterAsyncCommands<String, String> async = commands.getStatefulConnection().async();
async.flushall().get();

for (int i = 0; i < 10000; i++) {
for (int i = 0; i < 1000; i++) {
futures.add(async.set("key-" + i, "value-" + i));
}

futures.forEach(f -> f.toCompletableFuture().join());

for (int i = 0; i < 20; i++) {

for (int i = 0; i < 100; i++) {
CompletableFuture<Long> future = commands.keys("*").count().toFuture();
Wait.untilTrue(future::isDone).waitOrTimeout();
future.get(5, TimeUnit.SECONDS);
}
}

Expand Down

0 comments on commit e23235c

Please sign in to comment.