diff --git a/src/main/java/io/lettuce/core/internal/LettuceFactories.java b/src/main/java/io/lettuce/core/internal/LettuceFactories.java index f50010d18b..a303f86da6 100644 --- a/src/main/java/io/lettuce/core/internal/LettuceFactories.java +++ b/src/main/java/io/lettuce/core/internal/LettuceFactories.java @@ -20,7 +20,7 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; /** @@ -41,13 +41,13 @@ public class LettuceFactories { /** * Creates a new, optionally bounded, {@link Queue} that does not require external synchronization. * - * @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedDeque unbounded queue}. + * @param maxSize queue size. If {@link Integer#MAX_VALUE}, then creates an {@link ConcurrentLinkedQueue unbounded queue}. * @return a new, empty {@link Queue}. */ public static Queue newConcurrentQueue(int maxSize) { if (maxSize == Integer.MAX_VALUE) { - return new ConcurrentLinkedDeque<>(); + return new ConcurrentLinkedQueue<>(); } return maxSize > ARRAY_QUEUE_THRESHOLD ? new LinkedBlockingQueue<>(maxSize) : new ArrayBlockingQueue<>(maxSize); diff --git a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java index d3b9e751fe..7947ff7978 100644 --- a/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java +++ b/src/main/java/io/lettuce/core/protocol/DefaultEndpoint.java @@ -696,7 +696,7 @@ public void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) { logger.debug("{} notifyQueuedCommands adding {} command(s) to buffer", logPrefix(), commands.size()); } - commands.addAll(drainCommands(disconnectedBuffer)); + drainCommands(disconnectedBuffer, commands); for (RedisCommand command : commands) { @@ -745,8 +745,8 @@ protected T doExclusive(Supplier supplier) { List> target = new ArrayList<>(disconnectedBuffer.size() + commandBuffer.size()); - target.addAll(drainCommands(disconnectedBuffer)); - target.addAll(drainCommands(commandBuffer)); + drainCommands(disconnectedBuffer, target); + drainCommands(commandBuffer, target); return target; } @@ -769,9 +769,26 @@ protected T doExclusive(Supplier supplier) { } } + drainCommands(source, target); return target; } + /** + * Drain commands from a queue and return only active commands. + * + * @param source the source queue. + */ + private static void drainCommands(Queue> source, Collection> target) { + + RedisCommand cmd; + while ((cmd = source.poll()) != null) { + + if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) { + target.add(cmd); + } + } + } + private void cancelBufferedCommands(String message) { cancelCommands(message, doExclusive(this::drainCommands), RedisCommand::cancel); }