From cc144361f0e4263cae54c0094f591e5cb2fa1d61 Mon Sep 17 00:00:00 2001 From: Grzegorz Szpak Date: Tue, 6 Mar 2018 18:11:54 -0800 Subject: [PATCH] Using discardSomeReadBytes instead of discardReadBytes for CommandHandler's buffer #725 Add benchmarks for channelRead added to CommandHandler JMH test suite. CommandHandlerBenchmark tests the whole flow now - both writes and reads. Also, creation of commands was moved to benchmark methods - after one usage they become not writable which causes the benchmark to give incorrect results. Original pull request: #726 --- .../redis/protocol/CommandHandler.java | 2 +- .../jmh/com/lambdaworks/redis/JmhMain.java | 13 +- .../protocol/CommandHandlerBenchmark.java | 137 ++++++---- .../redis/protocol/EmptyChannel.java | 252 ++++++++++++++++++ .../redis/protocol/EmptyConfig.java | 157 +++++++++++ .../redis/protocol/EmptyContext.java | 7 +- 6 files changed, 508 insertions(+), 60 deletions(-) create mode 100644 src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java create mode 100644 src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 511676fb6a..9d53821e93 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -313,7 +313,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) { } if (buffer.refCnt() != 0) { - buffer.discardReadBytes(); + buffer.discardSomeReadBytes(); } afterComplete(ctx, command); diff --git a/src/test/jmh/com/lambdaworks/redis/JmhMain.java b/src/test/jmh/com/lambdaworks/redis/JmhMain.java index cbc2743ca7..83e755d896 100644 --- a/src/test/jmh/com/lambdaworks/redis/JmhMain.java +++ b/src/test/jmh/com/lambdaworks/redis/JmhMain.java @@ -52,10 +52,15 @@ private static void runCommandBenchmark() throws RunnerException { private static void runCommandHandlerBenchmark() throws RunnerException { - new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*") - .build()).run(); - // new - // Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run(); +// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*") +// .build()).run(); + new Runner( + prepareOptions() + .mode(Mode.Throughput) + .timeUnit(TimeUnit.SECONDS) + .include(".*CommandHandlerBenchmark.*") + .build() + ).run(); } private static void runCommandEncoderBenchmark() throws RunnerException { diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java b/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java index 8a2549202d..d90e6206e7 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java @@ -15,7 +15,11 @@ */ package com.lambdaworks.redis.protocol; -import java.util.Collection; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.openjdk.jmh.annotations.*; @@ -23,9 +27,7 @@ import com.lambdaworks.redis.codec.ByteArrayCodec; import com.lambdaworks.redis.output.ValueOutput; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPromise; -import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.buffer.ByteBuf; /** * Benchmark for {@link CommandHandler}. Test cases: @@ -40,79 +42,108 @@ public class CommandHandlerBenchmark { private final static ByteArrayCodec CODEC = new ByteArrayCodec(); - private final static ClientOptions CLIENT_OPTIONS = ClientOptions.builder().build(); + private final static ClientOptions CLIENT_OPTIONS = ClientOptions.create(); private final static EmptyContext CHANNEL_HANDLER_CONTEXT = new EmptyContext(); private final static byte[] KEY = "key".getBytes(); - private final static EmptyPromise EMPTY = new EmptyPromise(); + private static final String VALUE = "value\r\n"; + private final EmptyPromise PROMISE = new EmptyPromise(); private CommandHandler commandHandler; - private Collection stack; - private Command command; + private ByteBuf reply1; + private ByteBuf reply10; + private ByteBuf reply100; + private ByteBuf reply1000; @Setup - public void setup() { + public void setup() throws Exception { + commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE); + commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT); + commandHandler.setState(CommandHandler.LifecycleState.CONNECTED); - commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE) { - @Override - protected void setState(LifecycleState lifecycleState) { - CommandHandlerBenchmark.this.stack = super.stack; - super.setState(lifecycleState); - } - }; + reply1 = strToByteBuf(String.format("+%s", VALUE)); + reply10 = strToByteBuf(makeBulkReply(10)); + reply100 = strToByteBuf(makeBulkReply(100)); + reply1000 = strToByteBuf(makeBulkReply(1000)); + for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) { + buf.retain(); + } + } - command = new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)); + @TearDown + public void tearDown() throws Exception { + commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT); + for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) { + buf.release(2); + } + } - commandHandler.setState(CommandHandler.LifecycleState.CONNECTED); + private ByteBuf strToByteBuf(String str) { + ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer(); + buf.writeBytes(str.getBytes()); + return buf; + } - commandHandler.channel = new MyLocalChannel(); + private String makeBulkReply(int numOfReplies) { + String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE); + return String.join("", Collections.nCopies(numOfReplies, baseReply)); } - @TearDown(Level.Iteration) - public void tearDown() { - commandHandler.reset(); - stack.clear(); + private Command makeCommand() { + return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)); } @Benchmark - public void measureUserWrite() { - commandHandler.write(command); + public void measureNettyWriteAndRead() throws Exception { + Command command = makeCommand(); + + commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE); + + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); + reply1.resetReaderIndex(); + reply1.retain(); } @Benchmark - public void measureNettyWrite() throws Exception { - commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, EMPTY); - stack.remove(command); + public void measureNettyWriteAndReadBatch1() throws Exception { + List commands = Collections.singletonList(makeCommand()); + + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); + + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); + reply1.resetReaderIndex(); + reply1.retain(); } - private final static class MyLocalChannel extends EmbeddedChannel { - @Override - public boolean isActive() { - return true; - } + @Benchmark + public void measureNettyWriteAndReadBatch10() throws Exception { + List commands = IntStream.range(0, 10).mapToObj(i -> makeCommand()).collect(Collectors.toList()); - @Override - public boolean isOpen() { - return true; - } + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - @Override - public ChannelFuture write(Object msg) { - return EMPTY; - } + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10); + reply10.resetReaderIndex(); + reply10.retain(); + } - @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { - return promise; - } + @Benchmark + public void measureNettyWriteAndReadBatch100() throws Exception { + List commands = IntStream.range(0, 100).mapToObj(i -> makeCommand()).collect(Collectors.toList()); - @Override - public ChannelFuture writeAndFlush(Object msg) { - return EMPTY; - } + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - return promise; - } + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100); + reply100.resetReaderIndex(); + reply100.retain(); + } + + @Benchmark + public void measureNettyWriteAndReadBatch1000() throws Exception { + List commands = IntStream.range(0, 1000).mapToObj(i -> makeCommand()).collect(Collectors.toList()); + + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); + + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000); + reply1000.resetReaderIndex(); + reply1000.retain(); } } diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java new file mode 100644 index 0000000000..e238f3e645 --- /dev/null +++ b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java @@ -0,0 +1,252 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lambdaworks.redis.protocol; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +import java.net.SocketAddress; + +public class EmptyChannel implements Channel { + + private final static ChannelConfig CONFIG = new EmptyConfig(); + + @Override + public ChannelId id() { + return null; + } + + @Override + public EventLoop eventLoop() { + return null; + } + + @Override + public Channel parent() { + return null; + } + + @Override + public ChannelConfig config() { + return CONFIG; + } + + @Override + public boolean isOpen() { + return false; + } + + @Override + public boolean isRegistered() { + return false; + } + + @Override + public boolean isActive() { + return false; + } + + @Override + public ChannelMetadata metadata() { + return null; + } + + @Override + public SocketAddress localAddress() { + return null; + } + + @Override + public SocketAddress remoteAddress() { + return null; + } + + @Override + public ChannelFuture closeFuture() { + return null; + } + + @Override + public boolean isWritable() { + return false; + } + + @Override + public long bytesBeforeUnwritable() { + return 0; + } + + @Override + public long bytesBeforeWritable() { + return 0; + } + + @Override + public Unsafe unsafe() { + return null; + } + + @Override + public ChannelPipeline pipeline() { + return null; + } + + @Override + public ByteBufAllocator alloc() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1) { + return null; + } + + @Override + public ChannelFuture disconnect() { + return null; + } + + @Override + public ChannelFuture close() { + return null; + } + + @Override + public ChannelFuture deregister() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1, + ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture disconnect(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture close(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture deregister(ChannelPromise channelPromise) { + return null; + } + + @Override + public Channel read() { + return null; + } + + @Override + public ChannelFuture write(Object o) { + return null; + } + + @Override + public ChannelFuture write(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public Channel flush() { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o) { + return null; + } + + @Override + public ChannelPromise newPromise() { + return null; + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return null; + } + + @Override + public ChannelFuture newSucceededFuture() { + return null; + } + + @Override + public ChannelFuture newFailedFuture(Throwable throwable) { + return null; + } + + @Override + public ChannelPromise voidPromise() { + return null; + } + + @Override + public Attribute attr(AttributeKey attributeKey) { + return null; + } + + @Override + public boolean hasAttr(AttributeKey attributeKey) { + return false; + } + + @Override + public int compareTo(Channel o) { + return 0; + } +} diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java new file mode 100644 index 0000000000..cfcb8df8d6 --- /dev/null +++ b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java @@ -0,0 +1,157 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.lambdaworks.redis.protocol; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelOption; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; + +import java.util.Map; + +public class EmptyConfig implements ChannelConfig { + @Override + public Map, Object> getOptions() { + return null; + } + + @Override + public boolean setOptions(Map, ?> map) { + return false; + } + + @Override + public T getOption(ChannelOption channelOption) { + return null; + } + + @Override + public boolean setOption(ChannelOption channelOption, T t) { + return false; + } + + @Override + public int getConnectTimeoutMillis() { + return 0; + } + + @Override + public ChannelConfig setConnectTimeoutMillis(int i) { + return null; + } + + @Override + public int getMaxMessagesPerRead() { + return 0; + } + + @Override + public ChannelConfig setMaxMessagesPerRead(int i) { + return null; + } + + @Override + public int getWriteSpinCount() { + return 0; + } + + @Override + public ChannelConfig setWriteSpinCount(int i) { + return null; + } + + @Override + public ByteBufAllocator getAllocator() { + return null; + } + + @Override + public ChannelConfig setAllocator(ByteBufAllocator byteBufAllocator) { + return null; + } + + @Override + public T getRecvByteBufAllocator() { + return null; + } + + @Override + public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator recvByteBufAllocator) { + return null; + } + + @Override + public boolean isAutoRead() { + return false; + } + + @Override + public ChannelConfig setAutoRead(boolean b) { + return null; + } + + @Override + public boolean isAutoClose() { + return false; + } + + @Override + public ChannelConfig setAutoClose(boolean b) { + return null; + } + + @Override + public int getWriteBufferHighWaterMark() { + return 0; + } + + @Override + public ChannelConfig setWriteBufferHighWaterMark(int i) { + return null; + } + + @Override + public int getWriteBufferLowWaterMark() { + return 0; + } + + @Override + public ChannelConfig setWriteBufferLowWaterMark(int i) { + return null; + } + + @Override + public MessageSizeEstimator getMessageSizeEstimator() { + return null; + } + + @Override + public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator messageSizeEstimator) { + return null; + } + + @Override + public WriteBufferWaterMark getWriteBufferWaterMark() { + return null; + } + + @Override + public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + return null; + } +} diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyContext.java b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyContext.java index 76b8984059..dd514c2275 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyContext.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyContext.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -29,9 +30,11 @@ */ class EmptyContext implements ChannelHandlerContext { + private final static Channel CHANNEL = new EmptyChannel(); + @Override public Channel channel() { - return null; + return CHANNEL; } @Override @@ -208,7 +211,7 @@ public ChannelPipeline pipeline() { @Override public ByteBufAllocator alloc() { - return null; + return PooledByteBufAllocator.DEFAULT; } @Override