Skip to content

Commit

Permalink
Complete write promise successfully on no-op writes #764
Browse files Browse the repository at this point in the history
CommandHandler now completes a write promise if the write completes without actually writing a message. Previously, no-op writes did not complete and rendered an invalid state (e.g. queue size was not decremented).
  • Loading branch information
mp911de committed Apr 23, 2018
1 parent fb05f21 commit d5bae76
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
17 changes: 7 additions & 10 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,23 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}

if (msg instanceof Collection) {

Collection<RedisCommand<?, ?, ?>> batch = (Collection<RedisCommand<?, ?, ?>>) msg;

writeBatch(ctx, batch, promise);
writeBatch(ctx, (Collection<RedisCommand<?, ?, ?>>) msg, promise);
}
}

private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand<?, ?, ?> command, ChannelPromise promise)
throws Exception {
{

if (!isWriteable(command)) {
promise.trySuccess();
return;
}

addToStack(command, promise);
ctx.write(command, promise);
}

private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?, ?>> batch, ChannelPromise promise)
throws Exception {
private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?, ?>> batch, ChannelPromise promise) {

Collection<RedisCommand<?, ?, ?>> deduplicated = new LinkedHashSet<>(batch.size(), 1);

Expand All @@ -390,8 +387,7 @@ private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?,
redisCommand.completeExceptionally(e);
}

promise.setFailure(e);
return;
throw e;
}

for (RedisCommand<?, ?, ?> command : deduplicated) {
Expand All @@ -400,6 +396,8 @@ private void writeBatch(ChannelHandlerContext ctx, Collection<RedisCommand<?, ?,

if (!deduplicated.isEmpty()) {
ctx.write(deduplicated, promise);
} else {
promise.trySuccess();
}
}

Expand All @@ -423,7 +421,6 @@ private void addToStack(RedisCommand<?, ?, ?> command, ChannelPromise promise) {
}
} catch (Exception e) {
command.completeExceptionally(e);
promise.setFailure(e);
throw e;
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/test/java/io/lettuce/core/ConnectionCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,14 @@ public void authEmpty() throws Exception {
public void authReconnect() throws Exception {
new WithPasswordRequired() {
@Override
public void run(RedisClient client) {
public void run(RedisClient client) throws InterruptedException {

RedisCommands<String, String> connection = client.connect().sync();
assertThat(connection.auth(passwd)).isEqualTo("OK");
assertThat(connection.set(key, value)).isEqualTo("OK");
connection.quit();

Thread.sleep(100);
assertThat(connection.get(key)).isEqualTo(value);
}
};
Expand Down
16 changes: 15 additions & 1 deletion src/test/java/io/lettuce/core/protocol/CommandHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,28 @@ public void isConnectedShouldReportFalseForCLOSED() throws Exception {
assertThat(sut.isConnected()).isFalse();
}

@Test
public void shouldNotWriteCancelledCommand() throws Exception {

command.cancel();
sut.write(context, command, promise);

verifyZeroInteractions(context);
assertThat(stack).isEmpty();

verify(promise).trySuccess();
}

@Test
public void shouldNotWriteCancelledCommands() throws Exception {

command.cancel();
sut.write(context, command, null);
sut.write(context, Collections.singleton(command), promise);

verifyZeroInteractions(context);
assertThat(stack).isEmpty();

verify(promise).trySuccess();
}

@Test
Expand Down

0 comments on commit d5bae76

Please sign in to comment.