diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index 59761499f7..8a6a2f18c2 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -15,10 +15,10 @@ */ package io.lettuce.core; -import java.io.Serializable; - import io.lettuce.core.internal.LettuceAssert; +import java.io.Serializable; + /** * Client Options to control the behavior of {@link RedisClient}. * @@ -36,6 +36,7 @@ public class ClientOptions implements Serializable { public static final SocketOptions DEFAULT_SOCKET_OPTIONS = SocketOptions.create(); public static final SslOptions DEFAULT_SSL_OPTIONS = SslOptions.create(); public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create(); + public static final int DEFAULT_BUFFER_USAGE_RATIO = 3; private final boolean pingBeforeActivateConnection; private final boolean autoReconnect; @@ -46,6 +47,7 @@ public class ClientOptions implements Serializable { private final SocketOptions socketOptions; private final SslOptions sslOptions; private final TimeoutOptions timeoutOptions; + private final int bufferUsageRatio; private final Builder builder; protected ClientOptions(Builder builder) { @@ -58,6 +60,7 @@ protected ClientOptions(Builder builder) { this.socketOptions = builder.socketOptions; this.sslOptions = builder.sslOptions; this.timeoutOptions = builder.timeoutOptions; + this.bufferUsageRatio = builder.bufferUsageRatio; this.builder = builder; } @@ -71,6 +74,7 @@ protected ClientOptions(ClientOptions original) { this.socketOptions = original.getSocketOptions(); this.sslOptions = original.getSslOptions(); this.timeoutOptions = original.getTimeoutOptions(); + this.bufferUsageRatio = original.getBufferUsageRatio(); this.builder = original.builder; } @@ -116,6 +120,7 @@ public static class Builder { private SocketOptions socketOptions = DEFAULT_SOCKET_OPTIONS; private SslOptions sslOptions = DEFAULT_SSL_OPTIONS; private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; + private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO; protected Builder() { } @@ -238,6 +243,23 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { return this; } + /** + * Sets the buffer usage ratio to {@link io.lettuce.core.protocol.CommandHandler} for determine when to discard + * read bytes.See {@link #DEFAULT_BUFFER_USAGE_RATIO}. + * + * @param bufferUsageRatio must greater between 0 and 2^31-1 + * @return {@code this} + * @since 5.2 + */ + public Builder bufferUsageRatio(int bufferUsageRatio) { + LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE, + "BufferUsageRatio must grater than 0"); + LettuceAssert.isTrue(bufferUsageRatio < Integer.MAX_VALUE, + "BufferUsageRatio must less than " + Integer.MAX_VALUE); + this.bufferUsageRatio = bufferUsageRatio; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -354,6 +376,18 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + /** + * Buffer usage ratio for {@link io.lettuce.core.protocol.CommandHandler}. Discard operation for read bytes occur + * When buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1}. For example, sets 3 to bufferUsageRatio, + * means buffer usage reach 75 percentage, discard operation occur. + * + * @return the buffer usage ratio. + * @since 5.2 + */ + public int getBufferUsageRatio() { + return bufferUsageRatio; + } + /** * Behavior of connections in disconnected state. */ diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 68a78bc26f..211586aa2d 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -588,6 +588,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup try { if (!decode(ctx, buffer, command)) { + discardReadBytesIfNecessary(buffer); return; } } catch (Exception e) { @@ -614,9 +615,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup afterDecode(ctx, command); } - if (buffer.refCnt() != 0) { - buffer.discardReadBytes(); - } + discardReadBytesIfNecessary(buffer); } /** @@ -840,6 +839,19 @@ private static long nanoTime() { return System.nanoTime(); } + /** + * try discard read bytes when buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1} + * @param buffer + */ + private void discardReadBytesIfNecessary(ByteBuf buffer) { + int bufferUsageRatio = clientOptions.getBufferUsageRatio(); + if ((float)buffer.writerIndex() / buffer.capacity() >= (float)bufferUsageRatio / (bufferUsageRatio + 1)) { + if (buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } + } + } + public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, } diff --git a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java index e10a50ab83..febec7b9ec 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java @@ -45,5 +45,6 @@ void checkAssertions(ClientOptions sut) { assertThat(sut.isPingBeforeActivateConnection()).isEqualTo(false); assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false); assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT); + assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO); } } diff --git a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java index bdbdc637db..97cd7e557b 100644 --- a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java @@ -430,4 +430,30 @@ void shouldIgnoreNonReadableBuffers() throws Exception { verify(byteBufMock, never()).release(); } + + @Test + void shouldDiscardReadBytes() throws Exception { + + ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + channelPromise.setSuccess(); + sut.channelRegistered(context); + sut.channelActive(context); + //set the command handler buffer capacity to 1024, make it easy to test + ReflectionTestUtils.setField(sut, "buffer", context.alloc().buffer(1024)); + ByteBuf buffer = (ByteBuf) ReflectionTestUtils.getField(sut, "buffer"); + + //mock a multi reply, which will reach the buffer usage ratio + ByteBuf msg = context.alloc().buffer(1024); + while ((float)msg.writerIndex() / msg.capacity() <= (float)ClientOptions.DEFAULT_BUFFER_USAGE_RATIO / ( + ClientOptions.DEFAULT_BUFFER_USAGE_RATIO + 1)) { + sut.write(context, command, channelPromise); + msg.writeBytes("*1\r\n+OK\r\n".getBytes()); + } + + sut.channelRead(context, msg); + + assertThat(buffer.readerIndex() == 0); + assertThat(buffer.writerIndex() == 0); + sut.channelUnregistered(context); + } }