Skip to content

Commit

Permalink
Add commands to stack through write promise #616
Browse files Browse the repository at this point in the history
Lettuce now appends written commands to the command stack using the write promise. Appending the command with the promise does not require command dequeueing on write failures but increases GC pressure.
  • Loading branch information
mp911de committed Oct 11, 2017
1 parent 47e9940 commit d08843b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
16 changes: 3 additions & 13 deletions src/main/java/io/lettuce/core/protocol/AsyncCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,26 +205,16 @@ public boolean equals(Object o) {
return false;
}

RedisCommand<?, ?, ?> left = command;
while (left instanceof DecoratedCommand) {
left = CommandWrapper.unwrap(left);
}

RedisCommand<?, ?, ?> right = (RedisCommand<?, ?, ?>) o;
while (right instanceof DecoratedCommand) {
right = CommandWrapper.unwrap(right);
}
RedisCommand<?, ?, ?> left = CommandWrapper.unwrap(command);
RedisCommand<?, ?, ?> right = CommandWrapper.unwrap((RedisCommand<?, ?, ?>) o);

return left == right;
}

@Override
public int hashCode() {

RedisCommand<?, ?, ?> toHash = command;
while (toHash instanceof DecoratedCommand) {
toHash = CommandWrapper.unwrap(toHash);
}
RedisCommand<?, ?, ?> toHash = CommandWrapper.unwrap(command);

return toHash != null ? toHash.hashCode() : 0;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ private void flushCommands(Queue<RedisCommand<?, ?, ?>> queue) {

if (!commands.isEmpty()) {

QUEUE_SIZE.addAndGet(this, commands.size());

if (reliability == Reliability.AT_MOST_ONCE) {
// cancel on exceptions and remove from queue, because there is no housekeeping
writeAndFlush(commands).addListener(new AtMostOnceWriteListener(commands));
Expand Down Expand Up @@ -732,7 +734,7 @@ public void operationComplete(Future<Void> future) throws Exception {
}
}

if (!future.isSuccess() && !(cause instanceof ClosedChannelException)) {
if (!success && !(cause instanceof ClosedChannelException)) {

String message = "Unexpected exception during request: {}";
InternalLogLevel logLevel = InternalLogLevel.WARN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;

import java.io.IOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
Expand Down

0 comments on commit d08843b

Please sign in to comment.