Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embedded HTTP benchmark, various small perf improvements #8974

Merged
merged 13 commits into from
Mar 21, 2023
2 changes: 2 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies {
api project(":inject")
api project(":inject-java-test")
api project(":http-server")
api project(":http-server-netty")
api project(":jackson-databind")
api project(":router")
api project(":runtime")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package io.micronaut.http.server.stack;

import io.micronaut.context.ApplicationContext;
import io.micronaut.http.server.netty.NettyHttpServer;
import io.micronaut.runtime.server.EmbeddedServer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.junit.jupiter.api.Assertions;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class FullHttpStackBenchmark {
@Benchmark
public void test(Holder holder) {
ByteBuf response = holder.exchange();
if (!holder.responseBytes.equals(response)) {
throw new AssertionError("Response did not match");
}
response.release();
}

public static void main(String[] args) throws Exception {
JmhFastThreadLocalExecutor exec = new JmhFastThreadLocalExecutor(1, "init-test");
exec.submit(() -> {
// simple test that everything works properly
for (StackFactory stack : StackFactory.values()) {
Holder holder = new Holder();
holder.stack = stack;
holder.setUp();
holder.tearDown();
}
return null;
}).get();
exec.shutdown();

Options opt = new OptionsBuilder()
.include(FullHttpStackBenchmark.class.getName() + ".*")
.warmupIterations(20)
.measurementIterations(30)
.mode(Mode.AverageTime)
.timeUnit(TimeUnit.NANOSECONDS)
.addProfiler(AsyncProfiler.class, "libPath=/home/yawkat/bin/async-profiler-2.9-linux-x64/build/libasyncProfiler.so;output=flamegraph")
.forks(1)
.jvmArgsAppend("-Djmh.executor=CUSTOM", "-Djmh.executor.class=" + JmhFastThreadLocalExecutor.class.getName())
.build();

new Runner(opt).run();
}

@State(Scope.Thread)
public static class Holder {
@Param({"MICRONAUT"/*, "PURE_NETTY"*/})
StackFactory stack = StackFactory.MICRONAUT;

AutoCloseable ctx;
EmbeddedChannel channel;
ByteBuf requestBytes;
ByteBuf responseBytes;

@Setup
public void setUp() {
if (!(Thread.currentThread() instanceof FastThreadLocalThread)) {
throw new IllegalStateException("Should run on a netty FTL thread");
}

Stack stack = this.stack.openChannel();
ctx = stack.closeable;
channel = stack.serverChannel;

channel.freezeTime();

EmbeddedChannel clientChannel = new EmbeddedChannel();
clientChannel.pipeline().addLast(new HttpClientCodec());
clientChannel.pipeline().addLast(new HttpObjectAggregator(1000));

FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.POST,
"/search/find",
Unpooled.wrappedBuffer("{\"haystack\": [\"xniomb\", \"seelzp\", \"nzogdq\", \"omblsg\", \"idgtlm\", \"ydonzo\"], \"needle\": \"idg\"}".getBytes(StandardCharsets.UTF_8))
);
request.headers().add(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
request.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
request.headers().add(HttpHeaderNames.ACCEPT, HttpHeaderValues.APPLICATION_JSON);
clientChannel.writeOutbound(request);
clientChannel.flushOutbound();

requestBytes = PooledByteBufAllocator.DEFAULT.buffer();
while (true) {
ByteBuf part = clientChannel.readOutbound();
if (part == null) {
break;
}
requestBytes.writeBytes(part);
}

// sanity check: run req/resp once and see that the response is correct
responseBytes = exchange();
clientChannel.writeInbound(responseBytes.retainedDuplicate());
FullHttpResponse response = clientChannel.readInbound();
//System.out.println(response);
//System.out.println(response.content().toString(StandardCharsets.UTF_8));
Assertions.assertEquals(HttpResponseStatus.OK, response.status());
Assertions.assertEquals("application/json", response.headers().get(HttpHeaderNames.CONTENT_TYPE));
Assertions.assertEquals("keep-alive", response.headers().get(HttpHeaderNames.CONNECTION));
String expectedResponseBody = "{\"listIndex\":4,\"stringIndex\":0}";
Assertions.assertEquals(expectedResponseBody, response.content().toString(StandardCharsets.UTF_8));
Assertions.assertEquals(expectedResponseBody.length(), response.headers().getInt(HttpHeaderNames.CONTENT_LENGTH));
response.release();
}

private ByteBuf exchange() {
channel.writeInbound(requestBytes.retainedDuplicate());
channel.runPendingTasks();
CompositeByteBuf response = PooledByteBufAllocator.DEFAULT.compositeBuffer();
while (true) {
ByteBuf part = channel.readOutbound();
if (part == null) {
break;
}
response.addComponent(true, part);
}
return response;
}

@TearDown
public void tearDown() throws Exception {
ctx.close();
requestBytes.release();
responseBytes.release();
}
}

public enum StackFactory {
MICRONAUT {
@Override
Stack openChannel() {
ApplicationContext ctx = ApplicationContext.run(Map.of(
"spec.name", "FullHttpStackBenchmark",
"micronaut.server.date-header", false // disabling this makes the response identical each time
));
EmbeddedServer server = ctx.getBean(EmbeddedServer.class);
EmbeddedChannel channel = ((NettyHttpServer) server).buildEmbeddedChannel(false);
return new Stack(channel, ctx);
}
},
PURE_NETTY {
@Override
Stack openChannel() {
HttpObjectAggregator aggregator = new HttpObjectAggregator(10_000_000);
aggregator.setMaxCumulationBufferComponents(100000);
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(aggregator);
channel.pipeline().addLast(new RequestHandler());
return new Stack(channel, () -> {
});
}
};

abstract Stack openChannel();
}

private record Stack(EmbeddedChannel serverChannel, AutoCloseable closeable) {
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.micronaut.http.server.stack;

import io.micronaut.core.annotation.NonNull;
import io.netty.util.concurrent.FastThreadLocalThread;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public final class JmhFastThreadLocalExecutor extends ThreadPoolExecutor {
public JmhFastThreadLocalExecutor(int maxThreads, String prefix) {
super(maxThreads, maxThreads,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactory() {
final AtomicInteger counter = new AtomicInteger();

@Override
public Thread newThread(@NonNull Runnable r) {
return new FastThreadLocalThread(r, prefix + "-jmh-worker-ftl-" + counter.incrementAndGet());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.micronaut.http.server.stack;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslProvider;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;

@ChannelHandler.Sharable
final class RequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ObjectReader reader = objectMapper.readerFor(SearchController.Input.class);
private final ObjectWriter writerResult = objectMapper.writerFor(SearchController.Result.class);
private final ObjectWriter writerStatus = objectMapper.writerFor(Status.class);

@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
FullHttpResponse response = computeResponse(ctx, msg);
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
ctx.writeAndFlush(response, ctx.voidPromise());
ctx.read();
}

private FullHttpResponse computeResponse(ChannelHandlerContext ctx, FullHttpRequest msg) {
try {
String path = URI.create(msg.uri()).getPath();
if (path.equals("/search/find")) {
return computeResponseSearch(ctx, msg);
}
if (path.equals("/status")) {
return computeResponseStatus(ctx, msg);
}
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
} catch (Exception e) {
e.printStackTrace();
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}

private FullHttpResponse computeResponseSearch(ChannelHandlerContext ctx, FullHttpRequest msg) throws IOException {
if (!msg.method().equals(HttpMethod.POST)) {
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
}
if (!msg.headers().contains(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON, true)) {
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE);
}

ByteBuf content = msg.content();
SearchController.Input input;
if (content.hasArray()) {
input = reader.readValue(content.array(), content.readerIndex() + content.arrayOffset(), content.readableBytes());
} else {
input = reader.readValue((InputStream) new ByteBufInputStream(content));
}

SearchController.Result result = find(input.haystack(), input.needle());
if (result == null) {
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
} else {
return serialize(ctx, writerResult, result);
}
}

private FullHttpResponse serialize(ChannelHandlerContext ctx, ObjectWriter writer, Object result) throws IOException {
ByteBuf buffer = ctx.alloc().buffer();
writer.writeValue((OutputStream) new ByteBufOutputStream(buffer), result);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer);
response.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
return response;
}

private FullHttpResponse computeResponseStatus(ChannelHandlerContext ctx, FullHttpRequest msg) throws IOException {
if (!msg.method().equals(HttpMethod.GET)) {
return new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.METHOD_NOT_ALLOWED);
}

Status status = new Status(
ctx.channel().getClass().getName(),
SslContext.defaultServerProvider()
);

return serialize(ctx, writerStatus, status);
}

private static SearchController.Result find(List<String> haystack, String needle) {
for (int listIndex = 0; listIndex < haystack.size(); listIndex++) {
String s = haystack.get(listIndex);
int stringIndex = s.indexOf(needle);
if (stringIndex != -1) {
return new SearchController.Result(listIndex, stringIndex);
}
}
return null;
}

record Status(String channelImplementation,
SslProvider sslProvider) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.micronaut.http.server.stack;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Introspected;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;

import java.util.List;

@Controller("/search")
@Requires(property = "spec.name", value = "FullHttpStackBenchmark")
public class SearchController {
@Post("find")
public HttpResponse<?> find(@Body Input input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to test more variations, like returning a simple string, a completable future reactive response.
Right now the async flow (completable future) is executed using the reactor which can be improved

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this benchmark is from micronaut-benchmark, which does not use reactive / async code so that it's a more fair comparison between frameworks. benchmarking reactive code is also worthwhile but this simple controller is basically the best "bare-bones" test of the http stack

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, having some other benchmark testing overhead of async/reactive flow would be nice

return find(input.haystack, input.needle);
}

private static MutableHttpResponse<?> find(List<String> haystack, String needle) {
for (int listIndex = 0; listIndex < haystack.size(); listIndex++) {
String s = haystack.get(listIndex);
int stringIndex = s.indexOf(needle);
if (stringIndex != -1) {
return HttpResponse.ok(new Result(listIndex, stringIndex));
}
}
return HttpResponse.notFound();
}

@Introspected
record Input(List<String> haystack, String needle) {
}

@Introspected
record Result(int listIndex, int stringIndex) {
}
}
Loading