diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index b3d078a5a7..6797851df3 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -21,7 +21,10 @@ import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.ProtocolVersion; +import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy; +import io.lettuce.core.protocol.ReadBytesDiscardPolicy; import io.lettuce.core.resource.ClientResources; +import io.netty.buffer.ByteBuf; /** * Client Options to control the behavior of {@link RedisClient}. @@ -82,7 +85,7 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; - private final int bufferUsageRatio; + private final ReadBytesDiscardPolicy readBytesDiscardPolicy; protected ClientOptions(Builder builder) { this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection; @@ -97,7 +100,7 @@ protected ClientOptions(Builder builder) { this.socketOptions = builder.socketOptions; this.sslOptions = builder.sslOptions; this.timeoutOptions = builder.timeoutOptions; - this.bufferUsageRatio = builder.bufferUsageRatio; + this.readBytesDiscardPolicy = builder.readBytesDiscardPolicy; } protected ClientOptions(ClientOptions original) { @@ -113,7 +116,7 @@ protected ClientOptions(ClientOptions original) { this.socketOptions = original.getSocketOptions(); this.sslOptions = original.getSslOptions(); this.timeoutOptions = original.getTimeoutOptions(); - this.bufferUsageRatio = original.getBufferUsageRatio(); + this.readBytesDiscardPolicy = original.getReadBytesDiscardPolicy(); } /** @@ -173,7 +176,7 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; - private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO; + private ReadBytesDiscardPolicy readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(DEFAULT_BUFFER_USAGE_RATIO); protected Builder() { } @@ -352,17 +355,28 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { * E.g. setting {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75 * percent. See {@link #DEFAULT_BUFFER_USAGE_RATIO}. * - * @param bufferUsageRatio must greater between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to + * @param bufferUsageRatio must be between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to * 90%. * @return {@code this} * @since 5.2 + * @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding + * policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which. + * Please use {@link #readBytesDiscardPolicy(ReadBytesDiscardPolicy)} */ public Builder bufferUsageRatio(int bufferUsageRatio) { + this.readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(bufferUsageRatio); + return this; + } - LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE, - "BufferUsageRatio must grater than 0"); - - this.bufferUsageRatio = bufferUsageRatio; + /** + * Sets the policy managing calls to {@link ByteBuf#discardReadBytes()} + * for {@link io.lettuce.core.protocol.CommandHandler#buffer} + * + * @param readBytesDiscardPolicy the policy to use in {@link io.lettuce.core.protocol.CommandHandler} + * @return {@code this} + */ + public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) { + this.readBytesDiscardPolicy = readBytesDiscardPolicy; return this; } @@ -546,11 +560,21 @@ public TimeoutOptions getTimeoutOptions() { * during decoding. In particular, when buffer usage reaches {@code bufferUsageRatio / bufferUsageRatio + 1}. E.g. setting * {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75 percent. * - * @return the buffer usage ratio. + * @return the buffer usage ratio, greater than zero if {@link RatioReadBytesDiscardPolicy} is used and zero otherwise * @since 5.2 + * + * @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding + * policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which */ public int getBufferUsageRatio() { - return bufferUsageRatio; + if (readBytesDiscardPolicy instanceof RatioReadBytesDiscardPolicy) { + return ((RatioReadBytesDiscardPolicy) readBytesDiscardPolicy).getBufferUsageRatio(); + } + return 0; + } + + public ReadBytesDiscardPolicy getReadBytesDiscardPolicy() { + return readBytesDiscardPolicy; } /** diff --git a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java index 8e9ccc8301..59f777ceab 100644 --- a/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java +++ b/src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java @@ -24,6 +24,7 @@ import io.lettuce.core.TimeoutOptions; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.ProtocolVersion; +import io.lettuce.core.protocol.ReadBytesDiscardPolicy; /** * Client Options to control the behavior of {@link RedisClusterClient}. @@ -115,7 +116,8 @@ public static ClusterClientOptions.Builder builder(ClientOptions clientOptions) } Builder builder = new Builder(); - builder.autoReconnect(clientOptions.isAutoReconnect()).bufferUsageRatio(clientOptions.getBufferUsageRatio()) + builder.autoReconnect(clientOptions.isAutoReconnect()) + .readBytesDiscardPolicy(clientOptions.getReadBytesDiscardPolicy()) .cancelCommandsOnReconnectFailure(clientOptions.isCancelCommandsOnReconnectFailure()) .disconnectedBehavior(clientOptions.getDisconnectedBehavior()).scriptCharset(clientOptions.getScriptCharset()) .publishOnScheduler(clientOptions.isPublishOnScheduler()) @@ -266,6 +268,12 @@ public Builder bufferUsageRatio(int bufferUsageRatio) { return this; } + @Override + public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) { + super.readBytesDiscardPolicy(readBytesDiscardPolicy); + return this; + } + /** * Create a new instance of {@link ClusterClientOptions} * @@ -290,7 +298,8 @@ public ClusterClientOptions.Builder mutate() { Builder builder = new Builder(); - builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio()) + builder.autoReconnect(isAutoReconnect()) + .readBytesDiscardPolicy(getReadBytesDiscardPolicy()) .cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure()) .disconnectedBehavior(getDisconnectedBehavior()).scriptCharset(getScriptCharset()) .publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection()) diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 6794847fb9..c398dd1dff 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -90,7 +90,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final boolean tracingEnabled; - private final float discardReadBytesRatio; + private final ReadBytesDiscardPolicy readBytesDiscardPolicy; private final boolean boundedQueues; @@ -137,8 +137,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.tracingEnabled = tracing.isEnabled(); - float bufferUsageRatio = clientOptions.getBufferUsageRatio(); - this.discardReadBytesRatio = bufferUsageRatio / (bufferUsageRatio + 1); + this.readBytesDiscardPolicy = clientOptions.getReadBytesDiscardPolicy(); } public Queue> getStack() { @@ -584,7 +583,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup try { if (!decode(ctx, buffer, pushOutput)) { - discardReadBytesIfNecessary(buffer); + readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer); return; } @@ -609,7 +608,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup try { if (!decode(ctx, buffer, command)) { - discardReadBytesIfNecessary(buffer); + readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer); return; } } catch (Exception e) { @@ -640,7 +639,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup } - discardReadBytesIfNecessary(buffer); + readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer); } protected void notifyPushListeners(PushMessage notification) { @@ -925,20 +924,6 @@ private static long nanoTime() { return System.nanoTime(); } - /** - * Try to discard read bytes when buffer usage reach a higher usage ratio. - * - * @param buffer - */ - private void discardReadBytesIfNecessary(ByteBuf buffer) { - - float usedRatio = (float) buffer.readerIndex() / buffer.capacity(); - - if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) { - buffer.discardReadBytes(); - } - } - public enum LifecycleState { NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED, } diff --git a/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..c88db9f938 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicy.java @@ -0,0 +1,48 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.lettuce.core.internal.LettuceAssert; +import io.netty.buffer.ByteBuf; + +/** + * A {@link ReadBytesDiscardPolicy} that tries to discard read bytes when buffer usage reaches a higher ratio + */ +public class RatioReadBytesDiscardPolicy implements ReadBytesDiscardPolicy { + private final int bufferUsageRatio; + private final float discardReadBytesRatio; + + public RatioReadBytesDiscardPolicy(int bufferUsageRatio) { + LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE, + "BufferUsageRatio must be greater than 0"); + + this.bufferUsageRatio = bufferUsageRatio; + this.discardReadBytesRatio = (float)bufferUsageRatio / (bufferUsageRatio + 1); + } + + public int getBufferUsageRatio() { + return bufferUsageRatio; + } + + @Override + public void discardReadBytesIfNecessary(ByteBuf buffer) { + float usedRatio = (float) buffer.readerIndex() / buffer.capacity(); + + if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } + } +} diff --git a/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..9c1eef7343 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java @@ -0,0 +1,31 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; + +/** + * Defines the approach to discard bytes read from {@link ByteBuf} + * in {@link CommandHandler#decode(ChannelHandlerContext ctx, ByteBuf buffer)} + */ +public interface ReadBytesDiscardPolicy { + /** + * Calls {@link ByteBuf#discardReadBytes()} if appropriate according to the current discard policy + * @param buffer {@link CommandHandler#buffer} + */ + void discardReadBytesIfNecessary(ByteBuf buffer); +} diff --git a/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java b/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java new file mode 100644 index 0000000000..45dabfab7a --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicy.java @@ -0,0 +1,36 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; + +/** + * A {@link ReadBytesDiscardPolicy} that discards bytes read from a buffer on the sole condition + * that the buffer has not been deallocated yet. + */ +public class SimpleReadBytesDiscardPolicy implements ReadBytesDiscardPolicy { + public static final SimpleReadBytesDiscardPolicy INSTANCE = new SimpleReadBytesDiscardPolicy(); + + private SimpleReadBytesDiscardPolicy() { + } + + @Override + public void discardReadBytesIfNecessary(ByteBuf buffer) { + if (buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } + } +} diff --git a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java index b322f29ca2..74e7ee4202 100644 --- a/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java +++ b/src/test/java/io/lettuce/core/ClientOptionsUnitTests.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy; import org.junit.jupiter.api.Test; import io.lettuce.core.protocol.ProtocolVersion; @@ -62,5 +63,6 @@ void checkAssertions(ClientOptions sut) { assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false); assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT); assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO); + assertThat(sut.getReadBytesDiscardPolicy()).isInstanceOf(RatioReadBytesDiscardPolicy.class); } } diff --git a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java index b0a2262460..0ecd95404f 100644 --- a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -506,4 +507,28 @@ void shouldDiscardReadBytes() throws Exception { assertThat(internalBuffer.writerIndex()).isEqualTo(0); sut.channelUnregistered(context); } + + @Test + void shouldCallPolicyToDiscardReadBytes() throws Exception { + ReadBytesDiscardPolicy policy = Mockito.mock(ReadBytesDiscardPolicy.class); + + CommandHandler commandHandler = new CommandHandler(ClientOptions.builder().readBytesDiscardPolicy(policy).build(), + clientResources, endpoint); + + ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + channelPromise.setSuccess(); + + commandHandler.channelRegistered(context); + commandHandler.channelActive(context); + + commandHandler.getStack().add(new Command<>(CommandType.PING, new StatusOutput<>(StringCodec.UTF8))); + + ByteBuf msg = context.alloc().buffer(100); + msg.writeBytes("*1\r\n+OK\r\n".getBytes()); + + commandHandler.channelRead(context, msg); + commandHandler.channelUnregistered(context); + + verify(policy).discardReadBytesIfNecessary(any()); + } } diff --git a/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java b/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java new file mode 100644 index 0000000000..75a5825b34 --- /dev/null +++ b/src/test/java/io/lettuce/core/protocol/RatioReadBytesDiscardPolicyTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith({MockitoExtension.class}) +class RatioReadBytesDiscardPolicyTest { + @Mock + private ByteBuf buffer; + + private RatioReadBytesDiscardPolicy policy = new RatioReadBytesDiscardPolicy(3); + + @Test + void shouldNotDiscardReadBytesWhenDidntReachUsageRatio() { + when(buffer.readerIndex()).thenReturn(7); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verifyNoMoreInteractions(buffer); + } + + @Test + void shouldDiscardReadBytesWhenReachedUsageRatio() { + when(buffer.refCnt()).thenReturn(1); + when(buffer.readerIndex()).thenReturn(9); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verify(buffer).discardReadBytes(); + } + + @Test + void shouldNotDiscardReadBytesWhenReachedUsageRatioButBufferReleased() { + when(buffer.refCnt()).thenReturn(0); + when(buffer.readerIndex()).thenReturn(9); + when(buffer.capacity()).thenReturn(10); + + policy.discardReadBytesIfNecessary(buffer); + + verifyNoMoreInteractions(buffer); + } +} \ No newline at end of file diff --git a/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java b/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java new file mode 100644 index 0000000000..e2d7c29348 --- /dev/null +++ b/src/test/java/io/lettuce/core/protocol/SimpleReadBytesDiscardPolicyTest.java @@ -0,0 +1,50 @@ +/* + * Copyright 2011-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBuf; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class SimpleReadBytesDiscardPolicyTest { + + @Mock + private ByteBuf buffer; + + private final SimpleReadBytesDiscardPolicy policy = SimpleReadBytesDiscardPolicy.INSTANCE; + + @Test + void shouldDiscardReadBytesWhenRefCntGreaterThanZero() { + when(buffer.refCnt()).thenReturn(1); + policy.discardReadBytesIfNecessary(buffer); + verify(buffer).discardReadBytes(); + } + + @Test + void shouldNotDiscardReadBytesWhenRefCntIsZero() { + when(buffer.refCnt()).thenReturn(0); + policy.discardReadBytesIfNecessary(buffer); + verifyNoMoreInteractions(buffer); + } +} \ No newline at end of file