Skip to content

Commit

Permalink
Using discardSomeReadBytes instead of discardReadBytes for CommandHan…
Browse files Browse the repository at this point in the history
…dler's buffer #725

Add benchmarks for channelRead added to CommandHandler JMH test suite.

CommandHandlerBenchmark tests the whole flow now - both writes and reads.
Also, creation of commands was moved to benchmark methods -
after one usage they become not writable which causes the benchmark
to give incorrect results.

Original pull request: #726
  • Loading branch information
gszpak authored and mp911de committed Mar 15, 2018
1 parent 33b35d0 commit cc14436
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
buffer.discardSomeReadBytes();
}

afterComplete(ctx, command);
Expand Down
13 changes: 9 additions & 4 deletions src/test/jmh/com/lambdaworks/redis/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ private static void runCommandBenchmark() throws RunnerException {

private static void runCommandHandlerBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
.build()).run();
// new
// Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run();
// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
// .build()).run();
new Runner(
prepareOptions()
.mode(Mode.Throughput)
.timeUnit(TimeUnit.SECONDS)
.include(".*CommandHandlerBenchmark.*")
.build()
).run();
}

private static void runCommandEncoderBenchmark() throws RunnerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@
*/
package com.lambdaworks.redis.protocol;

import java.util.Collection;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.openjdk.jmh.annotations.*;

import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.codec.ByteArrayCodec;
import com.lambdaworks.redis.output.ValueOutput;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.buffer.ByteBuf;

/**
* Benchmark for {@link CommandHandler}. Test cases:
Expand All @@ -40,79 +42,108 @@
public class CommandHandlerBenchmark {

private final static ByteArrayCodec CODEC = new ByteArrayCodec();
private final static ClientOptions CLIENT_OPTIONS = ClientOptions.builder().build();
private final static ClientOptions CLIENT_OPTIONS = ClientOptions.create();
private final static EmptyContext CHANNEL_HANDLER_CONTEXT = new EmptyContext();
private final static byte[] KEY = "key".getBytes();
private final static EmptyPromise EMPTY = new EmptyPromise();
private static final String VALUE = "value\r\n";
private final EmptyPromise PROMISE = new EmptyPromise();

private CommandHandler commandHandler;
private Collection<?> stack;
private Command command;
private ByteBuf reply1;
private ByteBuf reply10;
private ByteBuf reply100;
private ByteBuf reply1000;

@Setup
public void setup() {
public void setup() throws Exception {
commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE);
commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT);
commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);

commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE) {
@Override
protected void setState(LifecycleState lifecycleState) {
CommandHandlerBenchmark.this.stack = super.stack;
super.setState(lifecycleState);
}
};
reply1 = strToByteBuf(String.format("+%s", VALUE));
reply10 = strToByteBuf(makeBulkReply(10));
reply100 = strToByteBuf(makeBulkReply(100));
reply1000 = strToByteBuf(makeBulkReply(1000));
for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.retain();
}
}

command = new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
@TearDown
public void tearDown() throws Exception {
commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT);
for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.release(2);
}
}

commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);
private ByteBuf strToByteBuf(String str) {
ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer();
buf.writeBytes(str.getBytes());
return buf;
}

commandHandler.channel = new MyLocalChannel();
private String makeBulkReply(int numOfReplies) {
String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE);
return String.join("", Collections.nCopies(numOfReplies, baseReply));
}

@TearDown(Level.Iteration)
public void tearDown() {
commandHandler.reset();
stack.clear();
private Command makeCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
}

@Benchmark
public void measureUserWrite() {
commandHandler.write(command);
public void measureNettyWriteAndRead() throws Exception {
Command command = makeCommand();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
}

@Benchmark
public void measureNettyWrite() throws Exception {
commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, EMPTY);
stack.remove(command);
public void measureNettyWriteAndReadBatch1() throws Exception {
List<Command> commands = Collections.singletonList(makeCommand());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
}

private final static class MyLocalChannel extends EmbeddedChannel {
@Override
public boolean isActive() {
return true;
}
@Benchmark
public void measureNettyWriteAndReadBatch10() throws Exception {
List<Command> commands = IntStream.range(0, 10).mapToObj(i -> makeCommand()).collect(Collectors.toList());

@Override
public boolean isOpen() {
return true;
}
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

@Override
public ChannelFuture write(Object msg) {
return EMPTY;
}
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10);
reply10.resetReaderIndex();
reply10.retain();
}

@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return promise;
}
@Benchmark
public void measureNettyWriteAndReadBatch100() throws Exception {
List<Command> commands = IntStream.range(0, 100).mapToObj(i -> makeCommand()).collect(Collectors.toList());

@Override
public ChannelFuture writeAndFlush(Object msg) {
return EMPTY;
}
commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return promise;
}
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100);
reply100.resetReaderIndex();
reply100.retain();
}

@Benchmark
public void measureNettyWriteAndReadBatch1000() throws Exception {
List<Command> commands = IntStream.range(0, 1000).mapToObj(i -> makeCommand()).collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000);
reply1000.resetReaderIndex();
reply1000.retain();
}
}
Loading

0 comments on commit cc14436

Please sign in to comment.