Skip to content

Commit

Permalink
Using discardSomeReadBytes instead of discardReadBytes for CommandHan…
Browse files Browse the repository at this point in the history
…dler's buffer #725

Add benchmarks for channelRead added to CommandHandler JMH test suite.

CommandHandlerBenchmark tests the whole flow now - both writes and reads.
Also, creation of commands was moved to benchmark methods -
after one usage they become not writable which causes the benchmark
to give incorrect results.

Original pull request: #726
  • Loading branch information
gszpak authored and mp911de committed Mar 15, 2018
1 parent 82ccd90 commit 03b510c
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 47 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Interrup
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
buffer.discardSomeReadBytes();
}

afterComplete(ctx, command);
Expand Down
13 changes: 9 additions & 4 deletions src/test/jmh/io/lettuce/core/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,15 @@ private static void runCommandBenchmark() throws RunnerException {

private static void runCommandHandlerBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
.build()).run();
// new
// Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run();
// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
// .build()).run();
new Runner(
prepareOptions()
.mode(Mode.Throughput)
.timeUnit(TimeUnit.SECONDS)
.include(".*CommandHandlerBenchmark.*")
.build()
).run();
}

private static void runCommandEncoderBenchmark() throws RunnerException {
Expand Down
112 changes: 72 additions & 40 deletions src/test/jmh/io/lettuce/core/protocol/CommandHandlerBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.lettuce.core.protocol;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -25,10 +25,12 @@
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.output.ValueOutput;
import io.netty.buffer.ByteBuf;

/**
* Benchmark for {@link CommandHandler}. Test cases:
Expand All @@ -46,81 +48,111 @@ public class CommandHandlerBenchmark {
private final static ClientOptions CLIENT_OPTIONS = ClientOptions.create();
private final static EmptyContext CHANNEL_HANDLER_CONTEXT = new EmptyContext();
private final static byte[] KEY = "key".getBytes();
private static final String VALUE = "value\r\n";
private final EmptyPromise PROMISE = new EmptyPromise();

private CommandHandler commandHandler;
private Command command;
private Command batchCommand;
private Collection<Command> commands1;
private List<Command> commands10;
private List<Command> commands100;
private List<Command> commands1000;
private ByteBuf reply1;
private ByteBuf reply10;
private ByteBuf reply100;
private ByteBuf reply1000;

@Setup
public void setup() {

public void setup() throws Exception {
commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE, new DefaultEndpoint(CLIENT_OPTIONS));
command = new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT);
commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);

reply1 = strToByteBuf(String.format("+%s", VALUE));
reply10 = strToByteBuf(makeBulkReply(10));
reply100 = strToByteBuf(makeBulkReply(100));
reply1000 = strToByteBuf(makeBulkReply(1000));
for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.retain();
}
}

commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);
@TearDown
public void tearDown() throws Exception {
commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT);
for (ByteBuf buf: Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.release(2);
}
}

commands1 = Arrays.asList(command);
commands10 = IntStream.range(0, 10)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());
private ByteBuf strToByteBuf(String str) {
ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer();
buf.writeBytes(str.getBytes());
return buf;
}

commands100 = IntStream.range(0, 100)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());
private String makeBulkReply(int numOfReplies) {
String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE);
return String.join("", Collections.nCopies(numOfReplies, baseReply));
}

commands1000 = IntStream.range(0, 1000)
.mapToObj(i -> new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)))
.collect(Collectors.toList());
private Command makeCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
}

@Benchmark
public void measureNettyWrite() throws Exception {
public void measureNettyWriteAndRead() throws Exception {
Command command = makeCommand();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
}

@Benchmark
public void measureNettyWriteBatch1() throws Exception {
public void measureNettyWriteAndReadBatch1() throws Exception {
List<Command> commands = Collections.singletonList(makeCommand());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1, PROMISE);
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
}

@Benchmark
public void measureNettyWriteBatch10() throws Exception {
public void measureNettyWriteAndReadBatch10() throws Exception {
List<Command> commands = IntStream.range(0, 10)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands10, PROMISE);
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10);
reply10.resetReaderIndex();
reply10.retain();
}

@Benchmark
public void measureNettyWriteBatch100() throws Exception {
public void measureNettyWriteAndReadBatch100() throws Exception {
List<Command> commands = IntStream.range(0, 100)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands100, PROMISE);
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100);
reply100.resetReaderIndex();
reply100.retain();
}

@Benchmark
public void measureNettyWriteBatch1000() throws Exception {
public void measureNettyWriteAndReadBatch1000() throws Exception {
List<Command> commands = IntStream.range(0, 1000)
.mapToObj(i -> makeCommand())
.collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands1000, PROMISE);
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

// Prevent OOME
commandHandler.getStack().clear();
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000);
reply1000.resetReaderIndex();
reply1000.retain();
}
}
Loading

0 comments on commit 03b510c

Please sign in to comment.