Skip to content

Commit

Permalink
Polishing #725
Browse files Browse the repository at this point in the history
Move back to discardReadBytes() but discard bytes outside the decoding loop to not enforce cleanup upon each decoded command. Tweak JMH benchmarks to not include command creation overhead caused by IntStream and element collection. Tweak commands in test to never return done state so commands can be reused for all benchmark runs.

Original pull request: #726
  • Loading branch information
mp911de committed Mar 15, 2018
1 parent cc14436 commit 41a7a53
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* @author Will Glozer
* @author Mark Paluch
* @author Jongyeol Choi
* @author Grzegorz Szpak
*/
@ChannelHandler.Sharable
public class CommandHandler<K, V> extends ChannelDuplexHandler implements RedisChannelWriter<K, V> {
Expand Down Expand Up @@ -312,12 +313,12 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) {
}
}

if (buffer.refCnt() != 0) {
buffer.discardSomeReadBytes();
}

afterComplete(ctx, command);
}

if (buffer.refCnt() != 0) {
buffer.discardReadBytes();
}
}

/**
Expand Down
37 changes: 2 additions & 35 deletions src/test/jmh/com/lambdaworks/redis/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.lambdaworks.redis;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Mode;
Expand All @@ -32,9 +31,9 @@
*/
public class JmhMain {

public static void main(String... args) throws IOException, RunnerException {
public static void main(String... args) throws RunnerException {

runCommandHandlerBenchmark();
runCommandBenchmark();
}

private static void runBenchmarks() throws RunnerException {
Expand All @@ -45,38 +44,6 @@ private static void runCommandBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*CommandBenchmark.*")
.build()).run();

new Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandBenchmark.*").build())
.run();
}

private static void runCommandHandlerBenchmark() throws RunnerException {

// new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS).include(".*RedisClientBenchmark.*")
// .build()).run();
new Runner(
prepareOptions()
.mode(Mode.Throughput)
.timeUnit(TimeUnit.SECONDS)
.include(".*CommandHandlerBenchmark.*")
.build()
).run();
}

private static void runCommandEncoderBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS)
.include(".*CommandEncoderBenchmark.*").build()).run();
// new
// Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run();
}

private static void runRedisStateMachineBenchmark() throws RunnerException {

new Runner(prepareOptions().mode(Mode.AverageTime).timeUnit(TimeUnit.NANOSECONDS)
.include(".*RedisStateMachineBenchmark.*").build()).run();
// new
// Runner(prepareOptions().mode(Mode.Throughput).timeUnit(TimeUnit.SECONDS).include(".*CommandHandlerBenchmark.*").build()).run();
}

private static ChainedOptionsBuilder prepareOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import io.netty.buffer.ByteBuf;

/**
* Benchmark for {@link CommandHandler}. Test cases:
* Benchmark for {@link CommandHandler}.
* <p/>
* Test cases:
* <ul>
* <li>user command writes</li>
* <li>netty (in-eventloop) writes</li>
* <li>netty (in-eventloop) reads</li>
* </ul>
*
* @author Mark Paluch
* @author Grzegorz Szpak
*/
@State(Scope.Benchmark)
public class CommandHandlerBenchmark {
Expand All @@ -53,97 +57,108 @@ public class CommandHandlerBenchmark {
private ByteBuf reply10;
private ByteBuf reply100;
private ByteBuf reply1000;
private List<Command> commands1;
private List<Command> commands10;
private List<Command> commands100;
private List<Command> commands1000;

@Setup
public void setup() throws Exception {

commandHandler = new CommandHandler(CLIENT_OPTIONS, EmptyClientResources.INSTANCE);
commandHandler.channelRegistered(CHANNEL_HANDLER_CONTEXT);
commandHandler.setState(CommandHandler.LifecycleState.CONNECTED);

reply1 = strToByteBuf(String.format("+%s", VALUE));
reply10 = strToByteBuf(makeBulkReply(10));
reply100 = strToByteBuf(makeBulkReply(100));
reply1000 = strToByteBuf(makeBulkReply(1000));
for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.retain();
}
reply1 = createByteBuf(String.format("+%s", VALUE));
reply10 = createByteBuf(createBulkReply(10));
reply100 = createByteBuf(createBulkReply(100));
reply1000 = createByteBuf(createBulkReply(1000));
commands1 = createCommands(1);
commands10 = createCommands(10);
commands100 = createCommands(100);
commands1000 = createCommands(1000);
}

@TearDown
public void tearDown() throws Exception {

commandHandler.channelUnregistered(CHANNEL_HANDLER_CONTEXT);
for (ByteBuf buf : Arrays.asList(reply1, reply10, reply100, reply1000)) {
buf.release(2);
}
Arrays.asList(reply1, reply10, reply100, reply1000).forEach(ByteBuf::release);
}

private static List<Command> createCommands(int count) {
return IntStream.range(0, count).mapToObj(i -> createCommand()).collect(Collectors.toList());
}

private ByteBuf strToByteBuf(String str) {
private static ByteBuf createByteBuf(String str) {

ByteBuf buf = CHANNEL_HANDLER_CONTEXT.alloc().directBuffer();
buf.writeBytes(str.getBytes());
return buf;
}

private String makeBulkReply(int numOfReplies) {
private static String createBulkReply(int numOfReplies) {

String baseReply = String.format("$%d\r\n%s\r\n", VALUE.length(), VALUE);

return String.join("", Collections.nCopies(numOfReplies, baseReply));
}

private Command makeCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY));
@SuppressWarnings("unchecked")
private static Command createCommand() {
return new Command(CommandType.GET, new ValueOutput<>(CODEC), new CommandArgs(CODEC).addKey(KEY)) {
@Override
public boolean isDone() {
return false;
}
};
}

@Benchmark
public void measureNettyWriteAndRead() throws Exception {
Command command = makeCommand();

Command command = createCommand();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, command, PROMISE);
int index = reply1.readerIndex();
reply1.retain();

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();

// cleanup
reply1.readerIndex(index);
}

@Benchmark
public void measureNettyWriteAndReadBatch1() throws Exception {
List<Command> commands = Collections.singletonList(makeCommand());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1);
reply1.resetReaderIndex();
reply1.retain();
doBenchmark(commands1, reply1);
}

@Benchmark
public void measureNettyWriteAndReadBatch10() throws Exception {
List<Command> commands = IntStream.range(0, 10).mapToObj(i -> makeCommand()).collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply10);
reply10.resetReaderIndex();
reply10.retain();
doBenchmark(commands10, reply10);
}

@Benchmark
public void measureNettyWriteAndReadBatch100() throws Exception {
List<Command> commands = IntStream.range(0, 100).mapToObj(i -> makeCommand()).collect(Collectors.toList());

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply100);
reply100.resetReaderIndex();
reply100.retain();
doBenchmark(commands100, reply100);
}

@Benchmark
public void measureNettyWriteAndReadBatch1000() throws Exception {
List<Command> commands = IntStream.range(0, 1000).mapToObj(i -> makeCommand()).collect(Collectors.toList());
doBenchmark(commands1000, reply1000);
}

private void doBenchmark(List<Command> commandStack, ByteBuf response) throws Exception {

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commandStack, PROMISE);

int index = response.readerIndex();
response.retain();

commandHandler.write(CHANNEL_HANDLER_CONTEXT, commands, PROMISE);
commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, response);

commandHandler.channelRead(CHANNEL_HANDLER_CONTEXT, reply1000);
reply1000.resetReaderIndex();
reply1000.retain();
// cleanup
response.readerIndex(index);
}
}
17 changes: 6 additions & 11 deletions src/test/jmh/com/lambdaworks/redis/protocol/EmptyChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@
*/
package com.lambdaworks.redis.protocol;

import java.net.SocketAddress;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.*;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.net.SocketAddress;

/**
* @author Grzegorz Szpak
*/
public class EmptyChannel implements Channel {

private final static ChannelConfig CONFIG = new EmptyConfig();
Expand Down
14 changes: 7 additions & 7 deletions src/test/jmh/com/lambdaworks/redis/protocol/EmptyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
*/
package com.lambdaworks.redis.protocol;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;

import java.util.Map;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;

/**
* @author Grzegorz Szpak
*/
public class EmptyConfig implements ChannelConfig {

@Override
public Map<ChannelOption<?>, Object> getOptions() {
return null;
Expand Down
3 changes: 1 addition & 2 deletions src/test/jmh/com/lambdaworks/redis/protocol/JmhMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.lambdaworks.redis.protocol;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.Mode;
Expand All @@ -32,7 +31,7 @@
*/
public class JmhMain {

public static void main(String... args) throws IOException, RunnerException {
public static void main(String... args) throws RunnerException {

// run selectively
// runCommandBenchmark();
Expand Down

0 comments on commit 41a7a53

Please sign in to comment.