diff --git a/src/main/java/io/lettuce/core/RedisPublisher.java b/src/main/java/io/lettuce/core/RedisPublisher.java index 8e8f08863f..722bde5474 100644 --- a/src/main/java/io/lettuce/core/RedisPublisher.java +++ b/src/main/java/io/lettuce/core/RedisPublisher.java @@ -566,12 +566,16 @@ void request(RedisSubscription subscription, long n) { @Override void onDataAvailable(RedisSubscription subscription) { - do { + while (subscription.hasDemand()) { + + if (subscription.state() == NO_DEMAND && !subscription.changeState(NO_DEMAND, DEMAND)) { + return; + } if (!read(subscription)) { return; } - } while (subscription.hasDemand() && subscription.changeState(NO_DEMAND, this)); + } } @Override @@ -589,28 +593,41 @@ void request(RedisSubscription subscription, long n) { } } + /** + * @param subscription + * @return {@literal true} if the {@code read()} call was able to perform a read and whether this method should be + * called again to emit remaining data. + */ private boolean read(RedisSubscription subscription) { - if (subscription.changeState(this, READING)) { + // concurrency/entry guard + if (!subscription.changeState(this, READING)) { + return false; + } + + boolean hasDemand = subscription.readAndPublish(); - boolean hasDemand = subscription.readAndPublish(); + try { - if (subscription.allDataRead && subscription.data.isEmpty()) { - subscription.onAllDataRead(); - return true; + if (subscription.data.isEmpty()) { + + if (subscription.allDataRead) { + subscription.onAllDataRead(); + } + + return false; } + return true; + } finally { + + // concurrency/leave guard if (hasDemand) { subscription.changeState(READING, DEMAND); - subscription.checkOnDataAvailable(); } else { subscription.changeState(READING, NO_DEMAND); } - - return true; } - - return false; } },