Skip to content

Commit

Permalink
Complete write promise successfully on no-op writes #764
Browse files Browse the repository at this point in the history
CommandHandler now completes a write promise if the write completes without actually writing a message. Previously, no-op writes did not complete and rendered an invalid state (e.g. queue size was not decremented).
  • Loading branch information
mp911de committed Apr 23, 2018
1 parent 0366ad7 commit 275f609
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -799,19 +799,18 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}
}

private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command, ChannelPromise promise)
throws Exception {
private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<K, V, ?> command, ChannelPromise promise) {

if (!isWriteable(command)) {
promise.trySuccess();
return;
}

addToStack(command, promise);
ctx.write(command, promise);
}

private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V, ?>> batch, ChannelPromise promise)
throws Exception {
private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V, ?>> batch, ChannelPromise promise) {

Collection<RedisCommand<K, V, ?>> deduplicated = new LinkedHashSet<>(batch.size(), 1);

Expand Down Expand Up @@ -842,6 +841,8 @@ private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<K, V,

if (!deduplicated.isEmpty()) {
ctx.write(deduplicated, promise);
} else {
promise.trySuccess();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ public void authEmpty() throws Exception {
public void authReconnect() throws Exception {
new WithPasswordRequired() {
@Override
public void run(RedisClient client) {
public void run(RedisClient client) throws InterruptedException {

RedisConnection<String, String> connection = client.connect().sync();
assertThat(connection.auth(passwd)).isEqualTo("OK");
assertThat(connection.set(key, value)).isEqualTo("OK");
connection.quit();

Thread.sleep(100);
assertThat(connection.get(key)).isEqualTo(value);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,14 +456,27 @@ public void shouldCancelCommandsOnEncoderException() throws Exception {
assertThat(command.exception).isInstanceOf(EncoderException.class);
}

@Test
public void shouldNotWriteCancelledCommand() throws Exception {

command.cancel();
sut.write(context, command, promise);

verifyZeroInteractions(context);
assertThat(disconnectedBuffer).isEmpty();
verify(promise).trySuccess();

}

@Test
public void shouldNotWriteCancelledCommands() throws Exception {

command.cancel();
sut.write(context, command, null);
sut.write(context, Collections.singleton(command), promise);

verifyZeroInteractions(context);
assertThat(disconnectedBuffer).isEmpty();
verify(promise).trySuccess();
}

@Test
Expand Down

0 comments on commit 275f609

Please sign in to comment.