Skip to content

Commit

Permalink
Allow configuration of discardReadBytes through ClientOptions.bufferU…
Browse files Browse the repository at this point in the history
…sageRatio #906

We now allow configuration of when ByteBuf.discardReadBytes() gets called. Memory freeing can be useful during decoding of large responses instead of waiting until decoding is finished.

Original pull request: #916
  • Loading branch information
gavincook authored and mp911de committed Nov 23, 2018
1 parent ed8f349 commit 4f026e6
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 5 deletions.
38 changes: 36 additions & 2 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
*/
package io.lettuce.core;

import java.io.Serializable;

import io.lettuce.core.internal.LettuceAssert;

import java.io.Serializable;

/**
* Client Options to control the behavior of {@link RedisClient}.
*
Expand All @@ -36,6 +36,7 @@ public class ClientOptions implements Serializable {
public static final SocketOptions DEFAULT_SOCKET_OPTIONS = SocketOptions.create();
public static final SslOptions DEFAULT_SSL_OPTIONS = SslOptions.create();
public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();
public static final int DEFAULT_BUFFER_USAGE_RATIO = 3;

private final boolean pingBeforeActivateConnection;
private final boolean autoReconnect;
Expand All @@ -46,6 +47,7 @@ public class ClientOptions implements Serializable {
private final SocketOptions socketOptions;
private final SslOptions sslOptions;
private final TimeoutOptions timeoutOptions;
private final int bufferUsageRatio;
private final Builder builder;

protected ClientOptions(Builder builder) {
Expand All @@ -58,6 +60,7 @@ protected ClientOptions(Builder builder) {
this.socketOptions = builder.socketOptions;
this.sslOptions = builder.sslOptions;
this.timeoutOptions = builder.timeoutOptions;
this.bufferUsageRatio = builder.bufferUsageRatio;
this.builder = builder;
}

Expand All @@ -71,6 +74,7 @@ protected ClientOptions(ClientOptions original) {
this.socketOptions = original.getSocketOptions();
this.sslOptions = original.getSslOptions();
this.timeoutOptions = original.getTimeoutOptions();
this.bufferUsageRatio = original.getBufferUsageRatio();
this.builder = original.builder;
}

Expand Down Expand Up @@ -116,6 +120,7 @@ public static class Builder {
private SocketOptions socketOptions = DEFAULT_SOCKET_OPTIONS;
private SslOptions sslOptions = DEFAULT_SSL_OPTIONS;
private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;
private int bufferUsageRatio = DEFAULT_BUFFER_USAGE_RATIO;

protected Builder() {
}
Expand Down Expand Up @@ -238,6 +243,23 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Sets the buffer usage ratio to {@link io.lettuce.core.protocol.CommandHandler} for determine when to discard
* read bytes.See {@link #DEFAULT_BUFFER_USAGE_RATIO}.
*
* @param bufferUsageRatio must greater between 0 and 2^31-1
* @return {@code this}
* @since 5.2
*/
public Builder bufferUsageRatio(int bufferUsageRatio) {
LettuceAssert.isTrue(bufferUsageRatio > 0 && bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must grater than 0");
LettuceAssert.isTrue(bufferUsageRatio < Integer.MAX_VALUE,
"BufferUsageRatio must less than " + Integer.MAX_VALUE);
this.bufferUsageRatio = bufferUsageRatio;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand Down Expand Up @@ -354,6 +376,18 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Buffer usage ratio for {@link io.lettuce.core.protocol.CommandHandler}. Discard operation for read bytes occur
* When buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1}. For example, sets 3 to bufferUsageRatio,
* means buffer usage reach 75 percentage, discard operation occur.
*
* @return the buffer usage ratio.
* @since 5.2
*/
public int getBufferUsageRatio() {
return bufferUsageRatio;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup

try {
if (!decode(ctx, buffer, command)) {
discardReadBytesIfNecessary(buffer);
return;
}
} catch (Exception e) {
Expand All @@ -614,9 +615,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
afterDecode(ctx, command);
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
discardReadBytesIfNecessary(buffer);
}

/**
Expand Down Expand Up @@ -840,6 +839,19 @@ private static long nanoTime() {
return System.nanoTime();
}

/**
* try discard read bytes when buffer usage reach {@code bufferUsageRatio / bufferUsageRatio + 1}
* @param buffer
*/
private void discardReadBytesIfNecessary(ByteBuf buffer) {
int bufferUsageRatio = clientOptions.getBufferUsageRatio();
if ((float)buffer.writerIndex() / buffer.capacity() >= (float)bufferUsageRatio / (bufferUsageRatio + 1)) {
if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}
}

public enum LifecycleState {
NOT_CONNECTED, REGISTERED, CONNECTED, ACTIVATING, ACTIVE, DISCONNECTED, DEACTIVATING, DEACTIVATED, CLOSED,
}
Expand Down
1 change: 1 addition & 0 deletions src/test/java/io/lettuce/core/ClientOptionsUnitTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ void checkAssertions(ClientOptions sut) {
assertThat(sut.isPingBeforeActivateConnection()).isEqualTo(false);
assertThat(sut.isSuspendReconnectOnProtocolFailure()).isEqualTo(false);
assertThat(sut.getDisconnectedBehavior()).isEqualTo(ClientOptions.DisconnectedBehavior.DEFAULT);
assertThat(sut.getBufferUsageRatio()).isEqualTo(ClientOptions.DEFAULT_BUFFER_USAGE_RATIO);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,4 +430,30 @@ void shouldIgnoreNonReadableBuffers() throws Exception {

verify(byteBufMock, never()).release();
}

@Test
void shouldDiscardReadBytes() throws Exception {

ChannelPromise channelPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
channelPromise.setSuccess();
sut.channelRegistered(context);
sut.channelActive(context);
//set the command handler buffer capacity to 1024, make it easy to test
ReflectionTestUtils.setField(sut, "buffer", context.alloc().buffer(1024));
ByteBuf buffer = (ByteBuf) ReflectionTestUtils.getField(sut, "buffer");

//mock a multi reply, which will reach the buffer usage ratio
ByteBuf msg = context.alloc().buffer(1024);
while ((float)msg.writerIndex() / msg.capacity() <= (float)ClientOptions.DEFAULT_BUFFER_USAGE_RATIO / (
ClientOptions.DEFAULT_BUFFER_USAGE_RATIO + 1)) {
sut.write(context, command, channelPromise);
msg.writeBytes("*1\r\n+OK\r\n".getBytes());
}

sut.channelRead(context, msg);

assertThat(buffer.readerIndex() == 0);
assertThat(buffer.writerIndex() == 0);
sut.channelUnregistered(context);
}
}

0 comments on commit 4f026e6

Please sign in to comment.