Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for HTTP pipelining #2591

Merged
merged 10 commits into from
Dec 10, 2020
5 changes: 5 additions & 0 deletions webserver/webserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,10 @@
<artifactId>helidon-config-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -68,38 +68,41 @@ class BareResponseImpl implements BareResponse {
private final CompletableFuture<BareResponse> responseFuture;
private final CompletableFuture<BareResponse> headersFuture;
private final BooleanSupplier requestContentConsumed;
private final Thread thread;
private final long requestId;
private final HttpHeaders requestHeaders;
private final ChannelFuture channelClosedFuture;
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;

private volatile Flow.Subscription subscription;
private volatile DataChunk firstChunk;
private volatile DefaultHttpResponse response;
// Accessed by Subscriber method threads
private Flow.Subscription subscription;
private DataChunk firstChunk;
private CompletableFuture<?> prevRequestChunk;

// Accessed by writeStatusHeaders(status, headers) method
private volatile boolean lengthOptimization;
private volatile boolean isWebSocketUpgrade = false;
private volatile DefaultHttpResponse response;

/**
* @param ctx the channel handler context
* @param request the request
* @param requestContentConsumed whether the request content is consumed
* @param thread the outbound event loop thread which will be used to write the response
* @param prevRequestChunk Future that represents previous request completion for HTTP pipelining
* @param requestId the correlation ID that is added to the log statements
*/
BareResponseImpl(ChannelHandlerContext ctx,
HttpRequest request,
BooleanSupplier requestContentConsumed,
Thread thread,
CompletableFuture<?> prevRequestChunk,
long requestId) {
this.requestContentConsumed = requestContentConsumed;
this.thread = thread;
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
this.ctx = ctx;
this.requestId = requestId;
this.keepAlive = HttpUtil.isKeepAlive(request);
this.requestHeaders = request.headers();
this.prevRequestChunk = prevRequestChunk;

// We need to keep this listener so we can remove it when this response completes. If we don't, we leak
// while the channel remains open since each response adds a new listener that references 'this'.
Expand All @@ -114,6 +117,12 @@ class BareResponseImpl implements BareResponse {
responseFuture.whenComplete(this::responseComplete);
}

/**
* Steps required for the completion of this response.
*
* @param self this instance
* @param throwable a throwable indicating unsuccessful completion
*/
private void responseComplete(BareResponse self, Throwable throwable) {
if (throwable == null) {
headersFuture.complete(this);
Expand All @@ -123,6 +132,11 @@ private void responseComplete(BareResponse self, Throwable throwable) {
channelClosedFuture.removeListener(channelClosedListener);
}

/**
* Called when a channel is closed programmatically.
*
* @param future a future
*/
private void channelClosed(Future<? super Void> future) {
responseFuture.completeExceptionally(CLOSED);
}
Expand Down Expand Up @@ -204,12 +218,23 @@ private void completeResponseFuture(Throwable throwable) {
}

/**
* Completes this response. No other data are send to the client when response is completed. All caches are flushed.
* Completes this response. No other data are send to the client when response is completed.
* All caches are flushed.
*
* @param throwable if {@code not-null} then this response is completed exceptionally.
*/
private void completeInternal(Throwable throwable) {
if (!internallyClosed.compareAndSet(false, true)) {
boolean wasClosed = !internallyClosed.compareAndSet(false, true);

if (prevRequestChunk == null) {
completeInternalPipe(wasClosed, throwable);
} else {
prevRequestChunk = prevRequestChunk.thenRun(() -> completeInternalPipe(wasClosed, throwable));
}
}

private void completeInternalPipe(boolean wasClosed, Throwable throwable) {
if (wasClosed) {
// if already closed, as the contract specifies, don't fail
completeResponseFuture(throwable);
return;
Expand All @@ -230,7 +255,6 @@ private void completeInternal(Throwable throwable) {
}

} else {

LOGGER.finest(() -> log("Closing with an empty buffer; keep-alive: " + keepAlive));

writeLastContent(throwable, ChannelFutureListener.CLOSE);
Expand Down Expand Up @@ -311,18 +335,32 @@ public void onNext(DataChunk data) {
}
if (data != null) {
if (data.isFlushChunk()) {
ctx.flush();
if (prevRequestChunk == null) {
ctx.flush();
} else {
prevRequestChunk = prevRequestChunk.thenRun(ctx::flush);
}
return;
}

if (lengthOptimization && firstChunk == null) {
firstChunk = data.isReadOnly() ? data : data.duplicate(); // cache first chunk
return;
}

if (prevRequestChunk == null) {
onNextPipe(data);
} else {
prevRequestChunk = prevRequestChunk.thenRun(() -> onNextPipe(data));
}
}
}

private void onNextPipe(DataChunk data) {
if (lengthOptimization) {
if (firstChunk == null) {
firstChunk = data.isReadOnly() ? data : data.duplicate(); // cache first chunk
return;
}
initWriteResponse();
}
sendData(data);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;

Expand Down Expand Up @@ -79,6 +80,9 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
private long actualPayloadSize;
private boolean ignorePayload;

private CompletableFuture<?> prevRequestFuture;
private boolean lastContent;

ForwardingHandler(Routing routing,
NettyWebServer webServer,
SSLEngine sslEngine,
Expand Down Expand Up @@ -106,6 +110,13 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
if (requestContext == null) {
// there was no publisher associated with this connection
// this happens in case there was no http request made on this connection

// this also happens after LastHttpContent has been consumed by channelRead0
if (lastContent) {
// if the last thing that went through channelRead0 was LastHttpContent, then
// there is no request handler that should be enforcing backpressure
ctx.channel().config().setAutoRead(true);
}
return;
}

Expand All @@ -121,6 +132,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
System.identityHashCode(this), System.identityHashCode(ctx.channel()), msg.getClass()));

if (msg instanceof HttpRequest) {
lastContent = false;
// Turns off auto read
ctx.channel().config().setAutoRead(false);

Expand All @@ -144,7 +156,9 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
// Queue, context and publisher creation
ReferenceHoldingQueue<DataChunk> queue = new ReferenceHoldingQueue<>();
queues.add(queue);
requestContext = new RequestContext(new HttpRequestScopedPublisher(ctx, queue), request);
RequestContext requestContext = new RequestContext(new HttpRequestScopedPublisher(ctx, queue), request);
this.requestContext = requestContext;

danielkec marked this conversation as resolved.
Show resolved Hide resolved
// the only reason we have the 'ref' here is that the field might get assigned with null
final HttpRequestScopedPublisher publisherRef = requestContext.publisher();
long requestId = REQUEST_ID_GENERATOR.incrementAndGet();
Expand Down Expand Up @@ -180,12 +194,18 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
}

// If prev response is done, the next can start writing right away (HTTP pipelining)
if (prevRequestFuture != null && prevRequestFuture.isDone()) {
prevRequestFuture = null;
}

// Create response and handler for its completion
BareResponseImpl bareResponse =
new BareResponseImpl(ctx, request, publisherRef::isCompleted, Thread.currentThread(), requestId);
new BareResponseImpl(ctx, request, publisherRef::isCompleted, prevRequestFuture, requestId);
prevRequestFuture = new CompletableFuture<>();
CompletableFuture<?> thisResp = prevRequestFuture;
bareResponse.whenCompleted()
.thenRun(() -> {
RequestContext requestContext = this.requestContext;
if (requestContext != null) {
requestContext.responseCompleted(true);
}
Expand All @@ -198,9 +218,8 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
publisherRef.clearBuffer(DataChunk::release);

// Enable auto-read only after response has been completed
// to avoid a race condition with the next response
ctx.channel().config().setAutoRead(true);
// Enables next response to proceed (HTTP pipelining)
thisResp.complete(null);
});
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx);
Expand Down Expand Up @@ -230,6 +249,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
throw new IllegalStateException("There is no request context associated with this http content. "
+ "This is never expected to happen!");
}
lastContent = false;

HttpContent httpContent = (HttpContent) msg;

Expand Down Expand Up @@ -271,6 +291,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof LastHttpContent) {
if (!isWebSocketUpgrade) {
lastContent = true;
requestContext.publisher().complete();
requestContext = null; // just to be sure that current http req/res session doesn't interfere with other ones
}
Expand Down
Loading