Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuration of discardReadBytes through ClientOptions.bufferUsageRatio #916

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package io.lettuce.core;

import java.io.Serializable;

import io.lettuce.core.internal.LettuceAssert;

import java.io.Serializable;

/**
* Client Options to control the behavior of {@link RedisClient}.
*
Expand All @@ -36,6 +36,7 @@ public class ClientOptions implements Serializable {
public static final SocketOptions DEFAULT_SOCKET_OPTIONS = SocketOptions.create();
public static final SslOptions DEFAULT_SSL_OPTIONS = SslOptions.create();
public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();
public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;

private final boolean pingBeforeActivateConnection;
private final boolean autoReconnect;
Expand All @@ -46,6 +47,7 @@ public class ClientOptions implements Serializable {
private final SocketOptions socketOptions;
private final SslOptions sslOptions;
private final TimeoutOptions timeoutOptions;
private final int bufferUsageRatio;
private final Builder builder;

protected ClientOptions(Builder builder) {
Expand All @@ -58,6 +60,7 @@ protected ClientOptions(Builder builder) {
this.socketOptions = builder.socketOptions;
this.sslOptions = builder.sslOptions;
this.timeoutOptions = builder.timeoutOptions;
this.bufferUsageRatio = builder.bufferUsageRatio;
this.builder = builder;
}

Expand All @@ -71,6 +74,7 @@ protected ClientOptions(ClientOptions original) {
this.socketOptions = original.getSocketOptions();
this.sslOptions = original.getSslOptions();
this.timeoutOptions = original.getTimeoutOptions();
this.bufferUsageRatio = original.getBufferUsageRatio();
this.builder = original.builder;
}

Expand Down Expand Up @@ -116,6 +120,7 @@ public static class Builder {
private SocketOptions socketOptions = DEFAULT_SOCKET_OPTIONS;
private SslOptions sslOptions = DEFAULT_SSL_OPTIONS;
private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;
private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO;

protected Builder() {
}
Expand Down Expand Up @@ -238,6 +243,23 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Sets the buffer usage ratio to {@link io.lettuce.core.protocol.CommandHandler} for determine when to discard
* read bytes.See {@link #DEFAULT_BUFFER_USAGE_RATIO}.
*
* @param bufferUsageRatio must greater between 0 and 2^31-1
* @return {@code this}
* @since 5.2
*/
public Builder bufferUsageRatio(int bufferUsageRatio) {
LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must grater than 0");
LettuceAssert.isTrue(bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must less than " + Integer.MAX_VALUE);
this.bufferUsageRatio = bufferUsageRatio;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -354,6 +376,18 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Buffer usage ratio for {@link io.lettuce.core.protocol.CommandHandler}. Discard operation for read bytes occur
* When buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1}. For example, sets 3 to bufferUsageRatio,
* means buffer usage reach 75 percentage, discard operation occur.
*
* @return the buffer usage ratio.
* @since 5.2
*/
public int getBufferUsageRatio() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to make cleaning more customizable. This method could return a discard consumer (BiConsumer<ByteBuf, DecodeProgress>) instead of exposing details how discard of buffer bytes is configured.

See below for the full explanation.

return bufferUsageRatio;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

try {
if (!decode(ctx, buffer, command)) {
discardReadBytesIfNecessary(buffer);
return;
}
} catch (Exception e) {
Expand All @@ -614,9 +615,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
afterDecode(ctx, command);
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
discardReadBytesIfNecessary(buffer);
}

/**
Expand Down Expand Up @@ -840,6 +839,19 @@ private static long nanoTime() {
return System.nanoTime();
}

/**
* try discard read bytes when buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1}
* @param buffer
*/
private void discardReadBytesIfNecessary(ByteBuf buffer) {
int bufferUsageRatio = clientOptions.getBufferUsageRatio();
if ((float)buffer.writerIndex() / buffer.capacity() >= (float)bufferUsageRatio / (bufferUsageRatio + 1)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rather check for readerIndex(). The written bytes don't express how much of the buffer was already consumed.

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
}

public enum LifecycleState {
NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED,
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/lettuce/core/ClientOptionsUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ void checkAssertions(ClientOptions sut) {
assertThat(sut.isPingBeforeActivateConnection()).isEqualTo(false);
assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false);
assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT);
assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,4 +430,30 @@ void shouldIgnoreNonReadableBuffers() throws Exception {

verify(byteBufMock, never()).release();
}

@Test
void shouldDiscardReadBytes() throws Exception {

ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
channelPromise.setSuccess();
sut.channelRegistered(context);
sut.channelActive(context);
//set the command handler buffer capacity to 1024, make it easy to test
ReflectionTestUtils.setField(sut, "buffer", context.alloc().buffer(1024));
ByteBuf buffer = (ByteBuf) ReflectionTestUtils.getField(sut, "buffer");

//mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(1024);
while ((float)msg.writerIndex() / msg.capacity() <= (float)ClientOptions.DEFAULT_BUFFER_USAGE_RATIO / (
ClientOptions.DEFAULT_BUFFER_USAGE_RATIO + 1)) {
sut.write(context, command, channelPromise);
msg.writeBytes("*1\r\n+OK\r\n".getBytes());
}

sut.channelRead(context, msg);

assertThat(buffer.readerIndex() == 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a valid assertion.

assertThat(buffer.writerIndex() == 0);
sut.channelUnregistered(context);
}
}