From 41a7a53ffa41a7c9f5d4b4c27eee06a2a70866c4 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 15 Mar 2018 08:55:58 +0100 Subject: [PATCH] Polishing #725 Move back to discardReadBytes() but discard bytes outside the decoding loop to not enforce cleanup upon each decoded command. Tweak JMH benchmarks to not include command creation overhead caused by IntStream and element collection. Tweak commands in test to never return done state so commands can be reused for all benchmark runs. Original pull request: #726 --- .../redis/protocol/CommandHandler.java | 9 +- .../jmh/com/lambdaworks/redis/JmhMain.java | 37 +------ .../protocol/CommandHandlerBenchmark.java | 103 ++++++++++-------- .../redis/protocol/EmptyChannel.java | 17 +-- .../redis/protocol/EmptyConfig.java | 14 +-- .../lambdaworks/redis/protocol/JmhMain.java | 3 +- 6 files changed, 80 insertions(+), 103 deletions(-) diff --git a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java index 9d53821e93..767f5a6fbb 100644 --- a/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java +++ b/src/main/java/com/lambdaworks/redis/protocol/CommandHandler.java @@ -50,6 +50,7 @@ * @author Will Glozer * @author Mark Paluch * @author Jongyeol Choi + * @author Grzegorz Szpak */ @ChannelHandler.Sharable public class CommandHandler extends ChannelDuplexHandler implements RedisChannelWriter { @@ -312,12 +313,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) { } } - if (buffer.refCnt() != 0) { - buffer.discardSomeReadBytes(); - } - afterComplete(ctx, command); } + + if (buffer.refCnt() != 0) { + buffer.discardReadBytes(); + } } /** diff --git a/src/test/jmh/com/lambdaworks/redis/JmhMain.java b/src/test/jmh/com/lambdaworks/redis/JmhMain.java index 83e755d896..ba834dd04d 100644 --- a/src/test/jmh/com/lambdaworks/redis/JmhMain.java +++ b/src/test/jmh/com/lambdaworks/redis/JmhMain.java @@ -15,7 +15,6 @@ */ package com.lambdaworks.redis; -import java.io.IOException; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Mode; @@ -32,9 +31,9 @@ */ public class JmhMain { - public static void main(String... args) throws IOException, RunnerException { + public static void main(String... args) throws RunnerException { - runCommandHandlerBenchmark(); + runCommandBenchmark(); } private static void runBenchmarks() throws RunnerException { @@ -45,38 +44,6 @@ private static void runCommandBenchmark() throws RunnerException { new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*CommandBenchmark.*") .build()).run(); - - new Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandBenchmark.*").build()) - .run(); - } - - private static void runCommandHandlerBenchmark() throws RunnerException { - -// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*") -// .build()).run(); - new Runner( - prepareOptions() - .mode(Mode.Throughput) - .timeUnit(TimeUnit.SECONDS) - .include(".*CommandHandlerBenchmark.*") - .build() - ).run(); - } - - private static void runCommandEncoderBenchmark() throws RunnerException { - - new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS) - .include(".*CommandEncoderBenchmark.*").build()).run(); - // new - // Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run(); - } - - private static void runRedisStateMachineBenchmark() throws RunnerException { - - new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS) - .include(".*RedisStateMachineBenchmark.*").build()).run(); - // new - // Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run(); } private static ChainedOptionsBuilder prepareOptions() { diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java b/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java index d90e6206e7..6e5e741fd0 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/CommandHandlerBenchmark.java @@ -30,13 +30,17 @@ import io.netty.buffer.ByteBuf; /** - * Benchmark for {@link CommandHandler}. Test cases: + * Benchmark for {@link CommandHandler}. + *

+ * Test cases: *

    *
  • user command writes
  • *
  • netty (in-eventloop) writes
  • + *
  • netty (in-eventloop) reads
  • *
* * @author Mark Paluch + * @author Grzegorz Szpak */ @State(Scope.Benchmark) public class CommandHandlerBenchmark { @@ -53,97 +57,108 @@ public class CommandHandlerBenchmark { private ByteBuf reply10; private ByteBuf reply100; private ByteBuf reply1000; + private List commands1; + private List commands10; + private List commands100; + private List commands1000; @Setup public void setup() throws Exception { + commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE); commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT); commandHandler.setState(CommandHandler.LifecycleState.CONNECTED); - reply1 = strToByteBuf(String.format("+%s", VALUE)); - reply10 = strToByteBuf(makeBulkReply(10)); - reply100 = strToByteBuf(makeBulkReply(100)); - reply1000 = strToByteBuf(makeBulkReply(1000)); - for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) { - buf.retain(); - } + reply1 = createByteBuf(String.format("+%s", VALUE)); + reply10 = createByteBuf(createBulkReply(10)); + reply100 = createByteBuf(createBulkReply(100)); + reply1000 = createByteBuf(createBulkReply(1000)); + commands1 = createCommands(1); + commands10 = createCommands(10); + commands100 = createCommands(100); + commands1000 = createCommands(1000); } @TearDown public void tearDown() throws Exception { + commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT); - for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) { - buf.release(2); - } + Arrays.asList(reply1, reply10, reply100, reply1000).forEach(ByteBuf::release); + } + + private static List createCommands(int count) { + return IntStream.range(0, count).mapToObj(i -> createCommand()).collect(Collectors.toList()); } - private ByteBuf strToByteBuf(String str) { + private static ByteBuf createByteBuf(String str) { + ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer(); buf.writeBytes(str.getBytes()); return buf; } - private String makeBulkReply(int numOfReplies) { + private static String createBulkReply(int numOfReplies) { + String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE); + return String.join("", Collections.nCopies(numOfReplies, baseReply)); } - private Command makeCommand() { - return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)); + @SuppressWarnings("unchecked") + private static Command createCommand() { + return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)) { + @Override + public boolean isDone() { + return false; + } + }; } @Benchmark public void measureNettyWriteAndRead() throws Exception { - Command command = makeCommand(); + + Command command = createCommand(); commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE); + int index = reply1.readerIndex(); + reply1.retain(); commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); - reply1.resetReaderIndex(); - reply1.retain(); + + // cleanup + reply1.readerIndex(index); } @Benchmark public void measureNettyWriteAndReadBatch1() throws Exception { - List commands = Collections.singletonList(makeCommand()); - - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - - commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1); - reply1.resetReaderIndex(); - reply1.retain(); + doBenchmark(commands1, reply1); } @Benchmark public void measureNettyWriteAndReadBatch10() throws Exception { - List commands = IntStream.range(0, 10).mapToObj(i -> makeCommand()).collect(Collectors.toList()); - - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - - commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10); - reply10.resetReaderIndex(); - reply10.retain(); + doBenchmark(commands10, reply10); } @Benchmark public void measureNettyWriteAndReadBatch100() throws Exception { - List commands = IntStream.range(0, 100).mapToObj(i -> makeCommand()).collect(Collectors.toList()); - - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); - - commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100); - reply100.resetReaderIndex(); - reply100.retain(); + doBenchmark(commands100, reply100); } @Benchmark public void measureNettyWriteAndReadBatch1000() throws Exception { - List commands = IntStream.range(0, 1000).mapToObj(i -> makeCommand()).collect(Collectors.toList()); + doBenchmark(commands1000, reply1000); + } + + private void doBenchmark(List commandStack, ByteBuf response) throws Exception { + + commandHandler.write(CHANNEL_HANDLER_CONTEXT, commandStack, PROMISE); + + int index = response.readerIndex(); + response.retain(); - commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE); + commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, response); - commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000); - reply1000.resetReaderIndex(); - reply1000.retain(); + // cleanup + response.readerIndex(index); } } diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java index e238f3e645..358a45a780 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java @@ -15,21 +15,16 @@ */ package com.lambdaworks.redis.protocol; +import java.net.SocketAddress; + import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelId; -import io.netty.channel.ChannelMetadata; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelProgressivePromise; -import io.netty.channel.ChannelPromise; -import io.netty.channel.EventLoop; +import io.netty.channel.*; import io.netty.util.Attribute; import io.netty.util.AttributeKey; -import java.net.SocketAddress; - +/** + * @author Grzegorz Szpak + */ public class EmptyChannel implements Channel { private final static ChannelConfig CONFIG = new EmptyConfig(); diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java index cfcb8df8d6..b315caf72a 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java @@ -15,16 +15,16 @@ */ package com.lambdaworks.redis.protocol; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelOption; -import io.netty.channel.MessageSizeEstimator; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.WriteBufferWaterMark; - import java.util.Map; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.*; + +/** + * @author Grzegorz Szpak + */ public class EmptyConfig implements ChannelConfig { + @Override public Map, Object> getOptions() { return null; diff --git a/src/test/jmh/com/lambdaworks/redis/protocol/JmhMain.java b/src/test/jmh/com/lambdaworks/redis/protocol/JmhMain.java index 7a13ec2e66..e476fbac2e 100644 --- a/src/test/jmh/com/lambdaworks/redis/protocol/JmhMain.java +++ b/src/test/jmh/com/lambdaworks/redis/protocol/JmhMain.java @@ -15,7 +15,6 @@ */ package com.lambdaworks.redis.protocol; -import java.io.IOException; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Mode; @@ -32,7 +31,7 @@ */ public class JmhMain { - public static void main(String... args) throws IOException, RunnerException { + public static void main(String... args) throws RunnerException { // run selectively // runCommandBenchmark();