diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 92282361cd..dc7081ddef 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -561,7 +561,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup } if (buffer.refCnt() != 0) { - buffer.discardReadBytes(); + buffer.discardSomeReadBytes(); } afterComplete(ctx, command); diff --git a/src/test/jmh/io/lettuce/core/JmhMain.java b/src/test/jmh/io/lettuce/core/JmhMain.java index 0e4b325c61..8e3542b51c 100644 --- a/src/test/jmh/io/lettuce/core/JmhMain.java +++ b/src/test/jmh/io/lettuce/core/JmhMain.java @@ -60,10 +60,15 @@ private static void runCommandBenchmark() throws RunnerException { private static void runCommandHandlerBenchmark() throws RunnerException { - new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*") - .build()).run(); - // new - // Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run(); +// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*") +// .build()).run(); + new Runner( + prepareOptions() + .mode(Mode.Throughput) + .timeUnit(TimeUnit.SECONDS) + .include(".*CommandHandlerBenchmark.*") + .build() + ).run(); } private static void runCommandEncoderBenchmark() throws RunnerException { diff --git a/src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java b/src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java index 99b2b89ffd..d908ebfdd8 100644 --- a/src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java +++ b/src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java @@ -16,7 +16,7 @@ package io.lettuce.core.protocol; import java.util.Arrays; -import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -25,10 +25,12 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import io.lettuce.core.ClientOptions; import io.lettuce.core.codec.ByteArrayCodec; import io.lettuce.core.output.ValueOutput; +import io.netty.buffer.ByteBuf; /** * Benchmark for {@link CommandHandler}. Test cases: @@ -46,81 +48,111 @@ public class CommandHandlerBenchmark { private final static ClientOptions CLIENT_OPTIONS = ClientOptions.create(); private final static EmptyContext CHANNEL_HANDLER_CONTEXT = new EmptyContext(); private final static byte[] KEY = "key".getBytes(); + private static final String VALUE = "value\r\n"; private final EmptyPromise PROMISE = new EmptyPromise(); private CommandHandler commandHandler; - private Command command; - private Command batchCommand; - private Collection commands1; - private List commands10; - private List commands100; - private List commands1000; + private ByteBuf reply1; + private ByteBuf reply10; + private ByteBuf reply100; + private ByteBuf reply1000; @Setup - public void setup() { - + public void setup() throws Exception { commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE, new DefaultEndpoint(CLIENT_OPTIONS)); - command = new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)); + commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT); + commandHandler.setState(CommandHandler.LifecycleState.CONNECTED); + reply1 = strToByteBuf(String.format("+%s", VALUE)); + reply10 = strToByteBuf(makeBulkReply(10)); + reply100 = strToByteBuf(makeBulkReply(100)); + reply1000 = strToByteBuf(makeBulkReply(1000)); + for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) { + buf.retain(); + } + } - commandHandler.setState(CommandHandler.LifecycleState.CONNECTED); + @TearDown + public void tearDown() throws Exception { + commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT); + for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) { + buf.release(2); + } + } - commands1 = Arrays.asList(command); - commands10 = IntStream.range(0, 10) - .mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY))) - .collect(Collectors.toList()); + private ByteBuf strToByteBuf(String str) { + ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer(); + buf.writeBytes(str.getBytes()); + return buf; + } - commands100 = IntStream.range(0, 100) - .mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY))) - .collect(Collectors.toList()); + private String makeBulkReply(int numOfReplies) { + String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE); + return String.join("", Collections.nCopies(numOfReplies, baseReply)); + } - commands1000 = IntStream.range(0, 1000) - .mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY))) - .collect(Collectors.toList()); + private Command makeCommand() { + return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)); } @Benchmark - public void measureNettyWrite() throws Exception { + public void measureNettyWriteAndRead() throws Exception { + Command command = makeCommand(); commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE); - // Prevent OOME - commandHandler.getStack().clear(); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); + reply1.resetReaderIndex(); + reply1.retain(); } @Benchmark - public void measureNettyWriteBatch1() throws Exception { + public void measureNettyWriteAndReadBatch1() throws Exception { + List commands = Collections.singletonList(makeCommand()); - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1, PROMISE); + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - // Prevent OOME - commandHandler.getStack().clear(); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); + reply1.resetReaderIndex(); + reply1.retain(); } @Benchmark - public void measureNettyWriteBatch10() throws Exception { + public void measureNettyWriteAndReadBatch10() throws Exception { + List commands = IntStream.range(0, 10) + .mapToObj(i -> makeCommand()) + .collect(Collectors.toList()); - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands10, PROMISE); + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - // Prevent OOME - commandHandler.getStack().clear(); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10); + reply10.resetReaderIndex(); + reply10.retain(); } @Benchmark - public void measureNettyWriteBatch100() throws Exception { + public void measureNettyWriteAndReadBatch100() throws Exception { + List commands = IntStream.range(0, 100) + .mapToObj(i -> makeCommand()) + .collect(Collectors.toList()); - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands100, PROMISE); + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - // Prevent OOME - commandHandler.getStack().clear(); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100); + reply100.resetReaderIndex(); + reply100.retain(); } @Benchmark - public void measureNettyWriteBatch1000() throws Exception { + public void measureNettyWriteAndReadBatch1000() throws Exception { + List commands = IntStream.range(0, 1000) + .mapToObj(i -> makeCommand()) + .collect(Collectors.toList()); - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1000, PROMISE); + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - // Prevent OOME - commandHandler.getStack().clear(); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000); + reply1000.resetReaderIndex(); + reply1000.retain(); } } diff --git a/src/test/jmh/io/lettuce/core/protocol/EmptyChannel.java b/src/test/jmh/io/lettuce/core/protocol/EmptyChannel.java new file mode 100644 index 0000000000..1e88550f5e --- /dev/null +++ b/src/test/jmh/io/lettuce/core/protocol/EmptyChannel.java @@ -0,0 +1,237 @@ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +import java.net.SocketAddress; + +public class EmptyChannel implements Channel { + + private final static ChannelConfig CONFIG = new EmptyConfig(); + + @Override + public ChannelId id() { + return null; + } + + @Override + public EventLoop eventLoop() { + return null; + } + + @Override + public Channel parent() { + return null; + } + + @Override + public ChannelConfig config() { + return CONFIG; + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + public boolean isRegistered() { + return false; + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public ChannelMetadata metadata() { + return null; + } + + @Override + public SocketAddress localAddress() { + return null; + } + + @Override + public SocketAddress remoteAddress() { + return null; + } + + @Override + public ChannelFuture closeFuture() { + return null; + } + + @Override + public boolean isWritable() { + return false; + } + + @Override + public long bytesBeforeUnwritable() { + return 0; + } + + @Override + public long bytesBeforeWritable() { + return 0; + } + + @Override + public Unsafe unsafe() { + return null; + } + + @Override + public ChannelPipeline pipeline() { + return null; + } + + @Override + public ByteBufAllocator alloc() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1) { + return null; + } + + @Override + public ChannelFuture disconnect() { + return null; + } + + @Override + public ChannelFuture close() { + return null; + } + + @Override + public ChannelFuture deregister() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1, + ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture disconnect(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture close(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture deregister(ChannelPromise channelPromise) { + return null; + } + + @Override + public Channel read() { + return null; + } + + @Override + public ChannelFuture write(Object o) { + return null; + } + + @Override + public ChannelFuture write(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public Channel flush() { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o) { + return null; + } + + @Override + public ChannelPromise newPromise() { + return null; + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return null; + } + + @Override + public ChannelFuture newSucceededFuture() { + return null; + } + + @Override + public ChannelFuture newFailedFuture(Throwable throwable) { + return null; + } + + @Override + public ChannelPromise voidPromise() { + return null; + } + + @Override + public Attribute attr(AttributeKey attributeKey) { + return null; + } + + @Override + public boolean hasAttr(AttributeKey attributeKey) { + return false; + } + + @Override + public int compareTo(Channel o) { + return 0; + } +} diff --git a/src/test/jmh/io/lettuce/core/protocol/EmptyConfig.java b/src/test/jmh/io/lettuce/core/protocol/EmptyConfig.java new file mode 100644 index 0000000000..07c52d7317 --- /dev/null +++ b/src/test/jmh/io/lettuce/core/protocol/EmptyConfig.java @@ -0,0 +1,142 @@ +package io.lettuce.core.protocol; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelOption; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; + +import java.util.Map; + +public class EmptyConfig implements ChannelConfig { + @Override + public Map, Object> getOptions() { + return null; + } + + @Override + public boolean setOptions(Map, ?> map) { + return false; + } + + @Override + public T getOption(ChannelOption channelOption) { + return null; + } + + @Override + public boolean setOption(ChannelOption channelOption, T t) { + return false; + } + + @Override + public int getConnectTimeoutMillis() { + return 0; + } + + @Override + public ChannelConfig setConnectTimeoutMillis(int i) { + return null; + } + + @Override + public int getMaxMessagesPerRead() { + return 0; + } + + @Override + public ChannelConfig setMaxMessagesPerRead(int i) { + return null; + } + + @Override + public int getWriteSpinCount() { + return 0; + } + + @Override + public ChannelConfig setWriteSpinCount(int i) { + return null; + } + + @Override + public ByteBufAllocator getAllocator() { + return null; + } + + @Override + public ChannelConfig setAllocator(ByteBufAllocator byteBufAllocator) { + return null; + } + + @Override + public T getRecvByteBufAllocator() { + return null; + } + + @Override + public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator recvByteBufAllocator) { + return null; + } + + @Override + public boolean isAutoRead() { + return false; + } + + @Override + public ChannelConfig setAutoRead(boolean b) { + return null; + } + + @Override + public boolean isAutoClose() { + return false; + } + + @Override + public ChannelConfig setAutoClose(boolean b) { + return null; + } + + @Override + public int getWriteBufferHighWaterMark() { + return 0; + } + + @Override + public ChannelConfig setWriteBufferHighWaterMark(int i) { + return null; + } + + @Override + public int getWriteBufferLowWaterMark() { + return 0; + } + + @Override + public ChannelConfig setWriteBufferLowWaterMark(int i) { + return null; + } + + @Override + public MessageSizeEstimator getMessageSizeEstimator() { + return null; + } + + @Override + public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator messageSizeEstimator) { + return null; + } + + @Override + public WriteBufferWaterMark getWriteBufferWaterMark() { + return null; + } + + @Override + public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + return null; + } +} diff --git a/src/test/jmh/io/lettuce/core/protocol/EmptyContext.java b/src/test/jmh/io/lettuce/core/protocol/EmptyContext.java index 8500feca29..e715171e0f 100644 --- a/src/test/jmh/io/lettuce/core/protocol/EmptyContext.java +++ b/src/test/jmh/io/lettuce/core/protocol/EmptyContext.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -29,9 +30,11 @@ */ class EmptyContext implements ChannelHandlerContext { + private final static Channel CHANNEL = new EmptyChannel(); + @Override public Channel channel() { - return null; + return CHANNEL; } @Override @@ -208,7 +211,7 @@ public ChannelPipeline pipeline() { @Override public ByteBufAllocator alloc() { - return null; + return PooledByteBufAllocator.DEFAULT; } @Override