Skip to content

Commit

Permalink
Fix race in RedisPublisher while demand state #634.
Browse files Browse the repository at this point in the history
This commit fixes a race condition between two concurrent threads that are calling DEMAND.request(…) and DEMAND.onDataAvailable(…) resulting in registered demand but no further emission. The publishing part now checks whether new demand was registered after the state transition and the requesting thread checks for a state transition after registering demand. If one of the newly introduced conditions yields true, the outstanding signals are processed.

There are three key components to create the race:

* the command is completed
* the requesting thread does not pull because of DEMAND state
* the emitting thread completes prematurely

The race is possible because the requesting thread calls request(…) in the DEMAND state after the publishing thread determined there was no further demand. The publishing did not check whether demand was registered after its state transition to NO_DEMAND, the requesting code did not check whether a state transition occurred meanwhile so the publisher never completed.
  • Loading branch information
mp911de committed Oct 26, 2017
1 parent bd69f8e commit 302dfa1
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 19 deletions.
51 changes: 32 additions & 19 deletions src/main/java/io/lettuce/core/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.lettuce.core;

import java.io.IOException;
import java.util.Collection;
import java.util.Objects;
import java.util.Queue;
Expand Down Expand Up @@ -330,7 +329,7 @@ void checkOnDataAvailable() {
*
* @return {@literal true} if there is more demand, {@literal false} otherwise.
*/
private boolean readAndPublish() throws IOException {
private boolean readAndPublish() {

while (hasDemand()) {

Expand Down Expand Up @@ -466,35 +465,49 @@ void request(RedisSubscription<?> subscription, long n) {
@Override
void onDataAvailable(RedisSubscription<?> subscription) {

if (subscription.changeState(this, READING)) {

try {
boolean demandAvailable = subscription.readAndPublish();
if (demandAvailable) {
subscription.changeState(READING, DEMAND);
subscription.checkOnDataAvailable();
} else {
do {

if (subscription.data.isEmpty() && subscription.allDataRead) {
subscription.onAllDataRead();
} else {
subscription.changeState(READING, NO_DEMAND);
}
}
} catch (IOException ex) {
onError(subscription, ex);
if (!read(subscription)) {
return;
}
}
} while (subscription.hasDemand() && subscription.changeState(NO_DEMAND, this));
}

@Override
void request(RedisSubscription<?> subscription, long n) {

if (Operators.validate(n)) {
Operators.addCap(RedisSubscription.DEMAND, subscription, n);

if (subscription.changeState(NO_DEMAND, DEMAND)) {
read(subscription);
}
}
}

private boolean read(RedisSubscription<?> subscription) {

if (subscription.changeState(this, READING)) {

subscription.readAndPublish();

if (subscription.data.isEmpty() && subscription.allDataRead) {
subscription.onAllDataRead();
return true;
}

if (subscription.readAndPublish()) {
subscription.changeState(READING, DEMAND);
subscription.checkOnDataAvailable();
} else {
subscription.changeState(READING, NO_DEMAND);
}

return true;
}

return false;
}
},

READING {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.Assume.assumeTrue;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -31,8 +32,10 @@
import reactor.test.StepVerifier;
import io.lettuce.KeysAndValues;
import io.lettuce.RedisConditions;
import io.lettuce.Wait;
import io.lettuce.core.*;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
Expand Down Expand Up @@ -230,6 +233,25 @@ public void keys() {
.consumeRecordedWith(actual -> assertThat(actual).contains(KEY_ON_NODE_1, KEY_ON_NODE_2)).verifyComplete();
}

@Test
public void keysDoesNotRunIntoRaceConditions() {

List<RedisFuture> futures = new ArrayList<>();
RedisClusterAsyncCommands<String, String> async = commands.getStatefulConnection().async();

for (int i = 0; i < 10000; i++) {
futures.add(async.set("key-" + i, "value-" + i));
}

futures.forEach(f -> f.toCompletableFuture().join());

for (int i = 0; i < 20; i++) {

CompletableFuture<Long> future = commands.keys("*").count().toFuture();
Wait.untilTrue(future::isDone).waitOrTimeout();
}
}

@Test
public void keysStreaming() {

Expand Down

0 comments on commit 302dfa1

Please sign in to comment.