Skip to content

Commit

Permalink
Custom policies to discard bytes read from CommandHandler#buffer redi…
Browse files Browse the repository at this point in the history
…s#1314

(cherry picked from commit 0263309)

# Conflicts:
#	src/main/java/io/lettuce/core/ClientOptions.java
#	src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java
#	src/main/java/io/lettuce/core/protocol/CommandHandler.java
  • Loading branch information
Shaphan committed Jul 17, 2020
1 parent a273e29 commit 0951d97
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 33 deletions.
46 changes: 35 additions & 11 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy;
import io.lettuce.core.protocol.ReadBytesDiscardPolicy;
import io.lettuce.core.resource.ClientResources;
import io.netty.buffer.ByteBuf;

/**
* Client Options to control the behavior of {@link RedisClient}.
Expand Down Expand Up @@ -82,7 +85,7 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final int bufferUsageRatio;
private final ReadBytesDiscardPolicy readBytesDiscardPolicy;

protected ClientOptions(Builder builder) {
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
Expand All @@ -97,7 +100,7 @@ protected ClientOptions(Builder builder) {
this.socketOptions = builder.socketOptions;
this.sslOptions = builder.sslOptions;
this.timeoutOptions = builder.timeoutOptions;
this.bufferUsageRatio = builder.bufferUsageRatio;
this.readBytesDiscardPolicy = builder.readBytesDiscardPolicy;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -113,7 +116,7 @@ protected ClientOptions(ClientOptions original) {
this.socketOptions = original.getSocketOptions();
this.sslOptions = original.getSslOptions();
this.timeoutOptions = original.getTimeoutOptions();
this.bufferUsageRatio = original.getBufferUsageRatio();
this.readBytesDiscardPolicy = original.getReadBytesDiscardPolicy();
}

/**
Expand Down Expand Up @@ -173,7 +176,7 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO;
private ReadBytesDiscardPolicy readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(DEFAULT_BUFFER_USAGE_RATIO);

protected Builder() {
}
Expand Down Expand Up @@ -352,17 +355,28 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
* E.g. setting {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75
* percent. See {@link #DEFAULT_BUFFER_USAGE_RATIO}.
*
* @param bufferUsageRatio must greater between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to
* @param bufferUsageRatio must be between 0 and 2^31-1, typically a value between 1 and 10 representing 50% to
* 90%.
* @return {@code this}
* @since 5.2
* @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding
* policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which.
* Please use {@link #readBytesDiscardPolicy(ReadBytesDiscardPolicy)}
*/
public Builder bufferUsageRatio(int bufferUsageRatio) {
this.readBytesDiscardPolicy = new RatioReadBytesDiscardPolicy(bufferUsageRatio);
return this;
}

LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must grater than 0");

this.bufferUsageRatio = bufferUsageRatio;
/**
* Sets the policy managing calls to {@link ByteBuf#discardReadBytes()}
* for {@link io.lettuce.core.protocol.CommandHandler#buffer}
*
* @param readBytesDiscardPolicy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
*/
public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) {
this.readBytesDiscardPolicy = readBytesDiscardPolicy;
return this;
}

Expand Down Expand Up @@ -546,11 +560,21 @@ public TimeoutOptions getTimeoutOptions() {
* during decoding. In particular, when buffer usage reaches {@code bufferUsageRatio / bufferUsageRatio + 1}. E.g. setting
* {@code bufferUsageRatio} to {@literal 3}, will discard read bytes once the buffer usage reaches 75 percent.
*
* @return the buffer usage ratio.
* @return the buffer usage ratio, greater than zero if {@link RatioReadBytesDiscardPolicy} is used and zero otherwise
* @since 5.2
*
* @deprecated Calls to {@link ByteBuf#discardReadBytes()} are controlled by corresponding
* policies ({@link ReadBytesDiscardPolicy}), {@link RatioReadBytesDiscardPolicy} is one of which
*/
public int getBufferUsageRatio() {
return bufferUsageRatio;
if (readBytesDiscardPolicy instanceof RatioReadBytesDiscardPolicy) {
return ((RatioReadBytesDiscardPolicy) readBytesDiscardPolicy).getBufferUsageRatio();
}
return 0;
}

public ReadBytesDiscardPolicy getReadBytesDiscardPolicy() {
return readBytesDiscardPolicy;
}

/**
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/io/lettuce/core/cluster/ClusterClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ProtocolVersion;
import io.lettuce.core.protocol.ReadBytesDiscardPolicy;

/**
* Client Options to control the behavior of {@link RedisClusterClient}.
Expand Down Expand Up @@ -115,7 +116,8 @@ public static ClusterClientOptions.Builder builder(ClientOptions clientOptions)
}

Builder builder = new Builder();
builder.autoReconnect(clientOptions.isAutoReconnect()).bufferUsageRatio(clientOptions.getBufferUsageRatio())
builder.autoReconnect(clientOptions.isAutoReconnect())
.readBytesDiscardPolicy(clientOptions.getReadBytesDiscardPolicy())
.cancelCommandsOnReconnectFailure(clientOptions.isCancelCommandsOnReconnectFailure())
.disconnectedBehavior(clientOptions.getDisconnectedBehavior()).scriptCharset(clientOptions.getScriptCharset())
.publishOnScheduler(clientOptions.isPublishOnScheduler())
Expand Down Expand Up @@ -266,6 +268,12 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
return this;
}

@Override
public Builder readBytesDiscardPolicy(ReadBytesDiscardPolicy readBytesDiscardPolicy) {
super.readBytesDiscardPolicy(readBytesDiscardPolicy);
return this;
}

/**
* Create a new instance of {@link ClusterClientOptions}
*
Expand All @@ -290,7 +298,8 @@ public ClusterClientOptions.Builder mutate() {

Builder builder = new Builder();

builder.autoReconnect(isAutoReconnect()).bufferUsageRatio(getBufferUsageRatio())
builder.autoReconnect(isAutoReconnect())
.readBytesDiscardPolicy(getReadBytesDiscardPolicy())
.cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.disconnectedBehavior(getDisconnectedBehavior()).scriptCharset(getScriptCharset())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
Expand Down
25 changes: 5 additions & 20 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom

private final boolean tracingEnabled;

private final float discardReadBytesRatio;
private final ReadBytesDiscardPolicy readBytesDiscardPolicy;

private final boolean boundedQueues;

Expand Down Expand Up @@ -137,8 +137,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc

this.tracingEnabled = tracing.isEnabled();

float bufferUsageRatio = clientOptions.getBufferUsageRatio();
this.discardReadBytesRatio = bufferUsageRatio / (bufferUsageRatio + 1);
this.readBytesDiscardPolicy = clientOptions.getReadBytesDiscardPolicy();
}

public Queue<RedisCommand<?, ?, ?>> getStack() {
Expand Down Expand Up @@ -584,7 +583,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

try {
if (!decode(ctx, buffer, pushOutput)) {
discardReadBytesIfNecessary(buffer);
readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer);
return;
}

Expand All @@ -609,7 +608,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

try {
if (!decode(ctx, buffer, command)) {
discardReadBytesIfNecessary(buffer);
readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer);
return;
}
} catch (Exception e) {
Expand Down Expand Up @@ -640,7 +639,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

}

discardReadBytesIfNecessary(buffer);
readBytesDiscardPolicy.discardReadBytesIfNecessary(buffer);
}

protected void notifyPushListeners(PushMessage notification) {
Expand Down Expand Up @@ -925,20 +924,6 @@ private static long nanoTime() {
return System.nanoTime();
}

/**
* Try to discard read bytes when buffer usage reach a higher usage ratio.
*
* @param buffer
*/
private void discardReadBytesIfNecessary(ByteBuf buffer) {

float usedRatio = (float) buffer.readerIndex() / buffer.capacity();

if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}

public enum LifecycleState {
NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2011-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.protocol;

import io.lettuce.core.internal.LettuceAssert;
import io.netty.buffer.ByteBuf;

/**
* A {@link ReadBytesDiscardPolicy} that tries to discard read bytes when buffer usage reaches a higher ratio
*/
public class RatioReadBytesDiscardPolicy implements ReadBytesDiscardPolicy {
private final int bufferUsageRatio;
private final float discardReadBytesRatio;

public RatioReadBytesDiscardPolicy(int bufferUsageRatio) {
LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must be greater than 0");

this.bufferUsageRatio = bufferUsageRatio;
this.discardReadBytesRatio = (float)bufferUsageRatio / (bufferUsageRatio + 1);
}

public int getBufferUsageRatio() {
return bufferUsageRatio;
}

@Override
public void discardReadBytesIfNecessary(ByteBuf buffer) {
float usedRatio = (float) buffer.readerIndex() / buffer.capacity();

if (usedRatio >= discardReadBytesRatio && buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
}
31 changes: 31 additions & 0 deletions src/main/java/io/lettuce/core/protocol/ReadBytesDiscardPolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2011-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;

/**
* Defines the approach to discard bytes read from {@link ByteBuf}
* in {@link CommandHandler#decode(ChannelHandlerContext ctx, ByteBuf buffer)}
*/
public interface ReadBytesDiscardPolicy {
/**
* Calls {@link ByteBuf#discardReadBytes()} if appropriate according to the current discard policy
* @param buffer {@link CommandHandler#buffer}
*/
void discardReadBytesIfNecessary(ByteBuf buffer);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2011-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.protocol;

import io.netty.buffer.ByteBuf;

/**
* A {@link ReadBytesDiscardPolicy} that discards bytes read from a buffer on the sole condition
* that the buffer has not been deallocated yet.
*/
public class SimpleReadBytesDiscardPolicy implements ReadBytesDiscardPolicy {
public static final SimpleReadBytesDiscardPolicy INSTANCE = new SimpleReadBytesDiscardPolicy();

private SimpleReadBytesDiscardPolicy() {
}

@Override
public void discardReadBytesIfNecessary(ByteBuf buffer) {
if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
}
2 changes: 2 additions & 0 deletions src/test/java/io/lettuce/core/ClientOptionsUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.lettuce.core.protocol.RatioReadBytesDiscardPolicy;
import org.junit.jupiter.api.Test;

import io.lettuce.core.protocol.ProtocolVersion;
Expand Down Expand Up @@ -62,5 +63,6 @@ void checkAssertions(ClientOptions sut) {
assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false);
assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT);
assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO);
assertThat(sut.getReadBytesDiscardPolicy()).isInstanceOf(RatioReadBytesDiscardPolicy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
Expand Down Expand Up @@ -506,4 +507,28 @@ void shouldDiscardReadBytes() throws Exception {
assertThat(internalBuffer.writerIndex()).isEqualTo(0);
sut.channelUnregistered(context);
}

@Test
void shouldCallPolicyToDiscardReadBytes() throws Exception {
ReadBytesDiscardPolicy policy = Mockito.mock(ReadBytesDiscardPolicy.class);

CommandHandler commandHandler = new CommandHandler(ClientOptions.builder().readBytesDiscardPolicy(policy).build(),
clientResources, endpoint);

ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
channelPromise.setSuccess();

commandHandler.channelRegistered(context);
commandHandler.channelActive(context);

commandHandler.getStack().add(new Command<>(CommandType.PING, new StatusOutput<>(StringCodec.UTF8)));

ByteBuf msg = context.alloc().buffer(100);
msg.writeBytes("*1\r\n+OK\r\n".getBytes());

commandHandler.channelRead(context, msg);
commandHandler.channelUnregistered(context);

verify(policy).discardReadBytesIfNecessary(any());
}
}
Loading

0 comments on commit 0951d97

Please sign in to comment.