diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index d3f7638673..1b94332799 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -18,7 +18,10 @@ import java.io.Serializable; import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy; +import io.lettuce.core.protocol.ReadBytesDiscardPolicy; import io.lettuce.core.resource.ClientResources; +import io.netty.buffer.ByteBuf; /** * Client Options to control the behavior of {@link RedisClient}. @@ -71,7 +74,7 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; - private final int bufferUsageRatio; + private final ReadBytesDiscardPolicy readBytesDiscardPolicy; protected ClientOptions(Builder builder) { this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection; @@ -84,7 +87,7 @@ protected ClientOptions(Builder builder) { this.socketOptions = builder.socketOptions; this.sslOptions = builder.sslOptions; this.timeoutOptions = builder.timeoutOptions; - this.bufferUsageRatio = builder.bufferUsageRatio; + this.readBytesDiscardPolicy = builder.readBytesDiscardPolicy; } protected ClientOptions(ClientOptions original) { @@ -98,7 +101,7 @@ protected ClientOptions(ClientOptions original) { this.socketOptions = original.getSocketOptions(); this.sslOptions = original.getSslOptions(); this.timeoutOptions = original.getTimeoutOptions(); - this.bufferUsageRatio = original.getBufferUsageRatio(); + this.readBytesDiscardPolicy = original.getReadBytesDiscardPolicy(); } /** @@ -154,7 +157,7 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; - private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO; + private ReadBytesDiscardPolicy readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(DEFAULT_BUFFER_USAGE_RATIO); protected Builder() { } @@ -303,17 +306,28 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { * E.g. setting {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75 * percent. See {@link #DEFAULT_BUFFER_USAGE_RATIO}. * - * @param bufferUsageRatio must greater between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to + * @param bufferUsageRatio must be between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to * 90%. * @return {@code this} * @since 5.2 + * @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding + * policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which. + * Please use {@link #readBytesDiscardPolicy(ReadBytesDiscardPolicy)} */ public Builder bufferUsageRatio(int bufferUsageRatio) { + this.readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(bufferUsageRatio); + return this; + } - LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE, - "BufferUsageRatio must grater than 0"); - - this.bufferUsageRatio = bufferUsageRatio; + /** + * Sets the policy managing calls to {@link ByteBuf#discardReadBytes()} + * for {@link io.lettuce.core.protocol.CommandHandler#buffer} + * + * @param readBytesDiscardPolicy the policy to use in {@link io.lettuce.core.protocol.CommandHandler} + * @return {@code this} + */ + public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) { + this.readBytesDiscardPolicy = readBytesDiscardPolicy; return this; } @@ -463,11 +477,21 @@ public TimeoutOptions getTimeoutOptions() { * during decoding. In particular, when buffer usage reaches {@code bufferUsageRatio / bufferUsageRatio + 1}. E.g. setting * {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75 percent. * - * @return the buffer usage ratio. + * @return the buffer usage ratio, greater than zero if {@link RatioReadBytesDiscardPolicy} is used and zero otherwise * @since 5.2 + * + * @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding + * policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which */ public int getBufferUsageRatio() { - return bufferUsageRatio; + if (readBytesDiscardPolicy instanceof RatioReadBytesDiscardPolicy) { + return ((RatioReadBytesDiscardPolicy) readBytesDiscardPolicy).getBufferUsageRatio(); + } + return 0; + } + + public ReadBytesDiscardPolicy getReadBytesDiscardPolicy() { + return readBytesDiscardPolicy; } /** diff --git a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java index a76d6557da..562f49ba2f 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java @@ -22,6 +22,7 @@ import io.lettuce.core.SslOptions; import io.lettuce.core.TimeoutOptions; import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.ReadBytesDiscardPolicy; /** * Client Options to control the behavior of {@link RedisClusterClient}. @@ -113,7 +114,8 @@ public static ClusterClientOptions.Builder builder(ClientOptions clientOptions) } Builder builder = new Builder(); - builder.autoReconnect(clientOptions.isAutoReconnect()).bufferUsageRatio(clientOptions.getBufferUsageRatio()) + builder.autoReconnect(clientOptions.isAutoReconnect()) + .readBytesDiscardPolicy(clientOptions.getReadBytesDiscardPolicy()) .cancelCommandsOnReconnectFailure(clientOptions.isCancelCommandsOnReconnectFailure()) .disconnectedBehavior(clientOptions.getDisconnectedBehavior()) .publishOnScheduler(clientOptions.isPublishOnScheduler()) @@ -252,6 +254,12 @@ public Builder bufferUsageRatio(int bufferUsageRatio) { return this; } + @Override + public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) { + super.readBytesDiscardPolicy(readBytesDiscardPolicy); + return this; + } + /** * Create a new instance of {@link ClusterClientOptions} * @@ -275,7 +283,8 @@ public ClusterClientOptions.Builder mutate() { Builder builder = new Builder(); - builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio()) + builder.autoReconnect(isAutoReconnect()) + .readBytesDiscardPolicy(getReadBytesDiscardPolicy()) .cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) .disconnectedBehavior(getDisconnectedBehavior()).publishOnScheduler(isPublishOnScheduler()) .pingBeforeActivateConnection(isPingBeforeActivateConnection()).requestQueueSize(getRequestQueueSize()) diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index e52ed68c48..178dc5afbb 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -91,7 +91,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final boolean includeCommandArgsInSpanTags; - private final float discardReadBytesRatio; + private final ReadBytesDiscardPolicy readBytesDiscardPolicy; private final boolean boundedQueues; @@ -135,8 +135,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.tracingEnabled = tracing.isEnabled(); this.includeCommandArgsInSpanTags = tracing.includeCommandArgsInSpanTags(); - float bufferUsageRatio = clientOptions.getBufferUsageRatio(); - this.discardReadBytesRatio = bufferUsageRatio / (bufferUsageRatio + 1); + this.readBytesDiscardPolicy = clientOptions.getReadBytesDiscardPolicy(); } public Queue> getStack() { @@ -620,7 +619,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup try { if (!decode(ctx, buffer, command)) { - discardReadBytesIfNecessary(buffer); + readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer); return; } } catch (Exception e) { @@ -647,7 +646,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup afterDecode(ctx, command); } - discardReadBytesIfNecessary(buffer); + readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer); } /** @@ -872,20 +871,6 @@ private static long nanoTime() { return System.nanoTime(); } - /** - * Try to discard read bytes when buffer usage reach a higher usage ratio. - * - * @param buffer - */ - private void discardReadBytesIfNecessary(ByteBuf buffer) { - - float usedRatio = (float) buffer.readerIndex() / buffer.capacity(); - - if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) { - buffer.discardReadBytes(); - } - } - public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, } diff --git a/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..c88db9f938 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java @@ -0,0 +1,48 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.lettuce.core.internal.LettuceAssert; +import io.netty.buffer.ByteBuf; + +/** + * A {@link ReadBytesDiscardPolicy} that tries to discard read bytes when buffer usage reaches a higher ratio + */ +public class RatioReadBytesDiscardPolicy implements ReadBytesDiscardPolicy { + private final int bufferUsageRatio; + private final float discardReadBytesRatio; + + public RatioReadBytesDiscardPolicy(int bufferUsageRatio) { + LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE, + "BufferUsageRatio must be greater than 0"); + + this.bufferUsageRatio = bufferUsageRatio; + this.discardReadBytesRatio = (float)bufferUsageRatio / (bufferUsageRatio + 1); + } + + public int getBufferUsageRatio() { + return bufferUsageRatio; + } + + @Override + public void discardReadBytesIfNecessary(ByteBuf buffer) { + float usedRatio = (float) buffer.readerIndex() / buffer.capacity(); + + if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } + } +} diff --git a/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..9c1eef7343 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java @@ -0,0 +1,31 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * Defines the approach to discard bytes read from {@link ByteBuf} + * in {@link CommandHandler#decode(ChannelHandlerContext ctx, ByteBuf buffer)} + */ +public interface ReadBytesDiscardPolicy { + /** + * Calls {@link ByteBuf#discardReadBytes()} if appropriate according to the current discard policy + * @param buffer {@link CommandHandler#buffer} + */ + void discardReadBytesIfNecessary(ByteBuf buffer); +} diff --git a/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..45dabfab7a --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java @@ -0,0 +1,36 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ReadBytesDiscardPolicy} that discards bytes read from a buffer on the sole condition + * that the buffer has not been deallocated yet. + */ +public class SimpleReadBytesDiscardPolicy implements ReadBytesDiscardPolicy { + public static final SimpleReadBytesDiscardPolicy INSTANCE = new SimpleReadBytesDiscardPolicy(); + + private SimpleReadBytesDiscardPolicy() { + } + + @Override + public void discardReadBytesIfNecessary(ByteBuf buffer) { + if (buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } + } +} diff --git a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java index b7ffd7d3ee..250959fb37 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy; import org.junit.jupiter.api.Test; /** @@ -52,6 +53,7 @@ void checkAssertions(ClientOptions sut) { assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false); assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT); assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO); + assertThat(sut.getReadBytesDiscardPolicy()).isInstanceOf(RatioReadBytesDiscardPolicy.class); } } diff --git a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java index d87c752d5b..8a5706150b 100644 --- a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -491,4 +492,28 @@ void shouldDiscardReadBytes() throws Exception { sut.channelUnregistered(context); } + @Test + void shouldCallPolicyToDiscardReadBytes() throws Exception { + ReadBytesDiscardPolicy policy = Mockito.mock(ReadBytesDiscardPolicy.class); + + CommandHandler commandHandler = new CommandHandler(ClientOptions.builder().readBytesDiscardPolicy(policy).build(), + clientResources, endpoint); + + ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + channelPromise.setSuccess(); + + commandHandler.channelRegistered(context); + commandHandler.channelActive(context); + + commandHandler.getStack().add(new Command<>(CommandType.PING, new StatusOutput<>(StringCodec.UTF8))); + + ByteBuf msg = context.alloc().buffer(100); + msg.writeBytes("*1\r\n+OK\r\n".getBytes()); + + commandHandler.channelRead(context, msg); + commandHandler.channelUnregistered(context); + + verify(policy).discardReadBytesIfNecessary(any()); + } + } diff --git a/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java b/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java new file mode 100644 index 0000000000..75a5825b34 --- /dev/null +++ b/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith({MockitoExtension.class}) +class RatioReadBytesDiscardPolicyTest { + @Mock + private ByteBuf buffer; + + private RatioReadBytesDiscardPolicy policy = new RatioReadBytesDiscardPolicy(3); + + @Test + void shouldNotDiscardReadBytesWhenDidntReachUsageRatio() { + when(buffer.readerIndex()).thenReturn(7); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verifyNoMoreInteractions(buffer); + } + + @Test + void shouldDiscardReadBytesWhenReachedUsageRatio() { + when(buffer.refCnt()).thenReturn(1); + when(buffer.readerIndex()).thenReturn(9); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verify(buffer).discardReadBytes(); + } + + @Test + void shouldNotDiscardReadBytesWhenReachedUsageRatioButBufferReleased() { + when(buffer.refCnt()).thenReturn(0); + when(buffer.readerIndex()).thenReturn(9); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verifyNoMoreInteractions(buffer); + } +} \ No newline at end of file diff --git a/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java b/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java new file mode 100644 index 0000000000..e2d7c29348 --- /dev/null +++ b/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SimpleReadBytesDiscardPolicyTest { + + @Mock + private ByteBuf buffer; + + private final SimpleReadBytesDiscardPolicy policy = SimpleReadBytesDiscardPolicy.INSTANCE; + + @Test + void shouldDiscardReadBytesWhenRefCntGreaterThanZero() { + when(buffer.refCnt()).thenReturn(1); + policy.discardReadBytesIfNecessary(buffer); + verify(buffer).discardReadBytes(); + } + + @Test + void shouldNotDiscardReadBytesWhenRefCntIsZero() { + when(buffer.refCnt()).thenReturn(0); + policy.discardReadBytesIfNecessary(buffer); + verifyNoMoreInteractions(buffer); + } +} \ No newline at end of file