Skip to content

Commit

Permalink
Prefer drain-loop instead of recursion in RedisSubscription #1140
Browse files Browse the repository at this point in the history
RedisSubscription.onDataAvailable(…) and RedisSubscription.read(…) now use a drain-loop instead of recursive reads for element emission.
Recursive emission is error prone if the response contains many response elements.

onDataAvailable(…) calls read(…) if there is demand and if data is available.
  • Loading branch information
mp911de committed Oct 25, 2019
1 parent a8afab9 commit b54bd5b
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions src/main/java/io/lettuce/core/RedisPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -566,12 +566,16 @@ void request(RedisSubscription<?> subscription, long n) {
@Override
void onDataAvailable(RedisSubscription<?> subscription) {

do {
while (subscription.hasDemand()) {

if (subscription.state() == NO_DEMAND && !subscription.changeState(NO_DEMAND, DEMAND)) {
return;
}

if (!read(subscription)) {
return;
}
} while (subscription.hasDemand() && subscription.changeState(NO_DEMAND, this));
}
}

@Override
Expand All @@ -589,28 +593,41 @@ void request(RedisSubscription<?> subscription, long n) {
}
}

/**
* @param subscription
* @return {@literal true} if the {@code read()} call was able to perform a read and whether this method should be
* called again to emit remaining data.
*/
private boolean read(RedisSubscription<?> subscription) {

if (subscription.changeState(this, READING)) {
// concurrency/entry guard
if (!subscription.changeState(this, READING)) {
return false;
}

boolean hasDemand = subscription.readAndPublish();

boolean hasDemand = subscription.readAndPublish();
try {

if (subscription.allDataRead && subscription.data.isEmpty()) {
subscription.onAllDataRead();
return true;
if (subscription.data.isEmpty()) {

if (subscription.allDataRead) {
subscription.onAllDataRead();
}

return false;
}

return true;
} finally {

// concurrency/leave guard
if (hasDemand) {
subscription.changeState(READING, DEMAND);
subscription.checkOnDataAvailable();
} else {
subscription.changeState(READING, NO_DEMAND);
}

return true;
}

return false;
}
},

Expand Down

0 comments on commit b54bd5b

Please sign in to comment.