Skip to content

Commit

Permalink
Polishing #725
Browse files Browse the repository at this point in the history
Move back to discardReadBytes() but discard bytes outside the decoding loop to not enforce cleanup upon each decoded command. Tweak JMH benchmarks to not include command creation overhead caused by IntStream and element collection. Tweak commands in test to never return done state so commands can be reused for all benchmark runs.

Original pull request: #726
  • Loading branch information
mp911de committed Mar 15, 2018
1 parent 230d1a2 commit bcb2783
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 92 deletions.
9 changes: 5 additions & 4 deletions src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* @author Will Glozer
* @author Mark Paluch
* @author Jongyeol Choi
* @author Grzegorz Szpak
*/
public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCommands {

Expand Down Expand Up @@ -560,12 +561,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
}
}

if (buffer.refCnt() != 0) {
buffer.discardSomeReadBytes();
}

afterComplete(ctx, command);
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}

protected boolean canDecode(ByteBuf buffer) {
Expand Down
13 changes: 1 addition & 12 deletions src/test/jmh/io/lettuce/core/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,16 @@
public class JmhMain {

public static void main(String... args) throws RunnerException {

// runCommandHandlerBenchmark();
runRedisClientBenchmark();
}

private static void runBenchmarks() throws RunnerException {
new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).build()).run();
}

private static void runCommandBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*CommandBenchmark.*")
.build()).run();

new Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandBenchmark.*").build())
.run();
}

private static void runRedisClientBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".RedisClientBenchmark.*")
.build()).run();
}

Expand Down
119 changes: 63 additions & 56 deletions src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,25 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.*;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.output.ValueOutput;
import io.netty.buffer.ByteBuf;

/**
* Benchmark for {@link CommandHandler}. Test cases:
* Benchmark for {@link CommandHandler}.
* <p/>
* Test cases:
* <ul>
* <li>user command writes</li>
* <li>netty (in-eventloop) writes</li>
* <li>netty (in-eventloop) batch writes</li>
* <li>netty (in-eventloop) reads</li>
* </ul>
*
* @author Mark Paluch
* @author Grzegorz Szpak
*/
@State(Scope.Benchmark)
public class CommandHandlerBenchmark {
Expand All @@ -56,103 +56,110 @@ public class CommandHandlerBenchmark {
private ByteBuf reply10;
private ByteBuf reply100;
private ByteBuf reply1000;
private List<Command> commands1;
private List<Command> commands10;
private List<Command> commands100;
private List<Command> commands1000;

@Setup
public void setup() throws Exception {

commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE, new DefaultEndpoint(CLIENT_OPTIONS));
commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT);
commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);

reply1 = strToByteBuf(String.format("+%s", VALUE));
reply10 = strToByteBuf(makeBulkReply(10));
reply100 = strToByteBuf(makeBulkReply(100));
reply1000 = strToByteBuf(makeBulkReply(1000));
for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.retain();
}
reply1 = createByteBuf(String.format("+%s", VALUE));
reply10 = createByteBuf(createBulkReply(10));
reply100 = createByteBuf(createBulkReply(100));
reply1000 = createByteBuf(createBulkReply(1000));

commands1 = createCommands(1);
commands10 = createCommands(10);
commands100 = createCommands(100);
commands1000 = createCommands(1000);
}

@TearDown
public void tearDown() throws Exception {

commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT);
for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.release(2);
}

Arrays.asList(reply1, reply10, reply100, reply1000).forEach(ByteBuf::release);
}

private ByteBuf strToByteBuf(String str) {
private static List<Command> createCommands(int count) {
return IntStream.range(0, count).mapToObj(i -> createCommand()).collect(Collectors.toList());
}

private static ByteBuf createByteBuf(String str) {

ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer();
buf.writeBytes(str.getBytes());
return buf;
}

private String makeBulkReply(int numOfReplies) {
private static String createBulkReply(int numOfReplies) {

String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE);

return String.join("", Collections.nCopies(numOfReplies, baseReply));
}

private Command makeCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
@SuppressWarnings("unchecked")
private static Command createCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)) {
@Override
public boolean isDone() {
return false;
}
};
}

@Benchmark
public void measureNettyWriteAndRead() throws Exception {
Command command = makeCommand();

Command command = createCommand();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE);
int index = reply1.readerIndex();
reply1.retain();

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();

// cleanup
reply1.readerIndex(index);
}

@Benchmark
public void measureNettyWriteAndReadBatch1() throws Exception {
List<Command> commands = Collections.singletonList(makeCommand());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
doBenchmark(commands1, reply1);
}

@Benchmark
public void measureNettyWriteAndReadBatch10() throws Exception {
List<Command> commands = IntStream.range(0, 10)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10);
reply10.resetReaderIndex();
reply10.retain();
doBenchmark(commands10, reply10);
}

@Benchmark
public void measureNettyWriteAndReadBatch100() throws Exception {
List<Command> commands = IntStream.range(0, 100)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100);
reply100.resetReaderIndex();
reply100.retain();
doBenchmark(commands100, reply100);
}

@Benchmark
public void measureNettyWriteAndReadBatch1000() throws Exception {
List<Command> commands = IntStream.range(0, 1000)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());
doBenchmark(commands1000, reply1000);
}

private void doBenchmark(List<Command> commandStack, ByteBuf response) throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commandStack, PROMISE);

int index = response.readerIndex();
response.retain();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, response);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000);
reply1000.resetReaderIndex();
reply1000.retain();
// cleanup
response.readerIndex(index);
}
}
17 changes: 6 additions & 11 deletions src/test/jmh/io/lettuce/core/protocol/EmptyChannel.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package io.lettuce.core.protocol;

import java.net.SocketAddress;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.*;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.net.SocketAddress;

/**
* @author Grzegorz Szpak
*/
public class EmptyChannel implements Channel {

private final static ChannelConfig CONFIG = new EmptyConfig();
Expand Down
14 changes: 7 additions & 7 deletions src/test/jmh/io/lettuce/core/protocol/EmptyConfig.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.lettuce.core.protocol;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;

import java.util.Map;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;

/**
* @author Grzegorz Szpak
*/
public class EmptyConfig implements ChannelConfig {

@Override
public Map<ChannelOption<?>, Object> getOptions() {
return null;
Expand Down
3 changes: 1 addition & 2 deletions src/test/jmh/io/lettuce/core/protocol/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.lettuce.core.protocol;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Mode;
Expand All @@ -32,7 +31,7 @@
*/
public class JmhMain {

public static void main(String... args) throws IOException, RunnerException {
public static void main(String... args) throws RunnerException {

// run selectively
// runCommandBenchmark();
Expand Down

0 comments on commit bcb2783

Please sign in to comment.