Skip to content

Commit

Permalink
The H2C upgrading stream of a connection closes the stream prematurel…
Browse files Browse the repository at this point in the history
…y when the response is sent.

The HTTP/2 RFC specs mandates the following items
- Requests that contain a payload body MUST be sent in their entirety before the client can send HTTP/2 frames.
- The HTTP/1.1 request that is sent prior to upgrade is assigned a stream identifier of 1 with default priority values.  Stream 1 is implicitly "half-closed" from the client toward the server, since the request is completed as an HTTP/1.1 request.

- From the Netty perspective, the stream is seen as half closed remote, since the stream cannot receive anymore frames from the HTTP/2 upgraded channel.
- From the Vert.x perspective, the stream is seen as open since the header frame is processed but the stream is not ended e.g the stream can contain a body and we expect content messages.

When an application ends the HTTP response, the stream transitions to the close state while the stream can still process content messages.

The server should handle specifically the H2C upgrading stream to continue processing HTTP messages even though the stream is considered closed from the Netty perspective.
  • Loading branch information
vietj committed Mar 22, 2023
1 parent 69883be commit bb0c09d
Show file tree
Hide file tree
Showing 18 changed files with 173 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
handler = initializer.buildHttp2ConnectionHandler(initializer.context, initializer.connectionHandler);
pipeline.addLast("handler", handler);
handler.serverUpgrade(ctx, settings, request);
handler.serverUpgrade(ctx, settings);
DefaultHttp2Headers headers = new DefaultHttp2Headers();
headers.method(request.method().name());
headers.path(request.uri());
Expand Down
23 changes: 9 additions & 14 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,7 @@
import io.vertx.core.MultiMap;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.GoAway;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.headers.HeadersMultiMap;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
import io.vertx.core.impl.ContextInternal;
Expand Down Expand Up @@ -140,11 +133,12 @@ void upgradeStream(Object metric, Object trace, ContextInternal context, Handler
Future<HttpClientStream> fut;
synchronized (this) {
try {
StreamImpl stream = createStream(context);
Stream stream = createStream(context);
stream.init(handler.connection().stream(1));
stream.metric = metric;
stream.trace = trace;
fut = Future.succeededFuture(stream);
stream.requestEnded = true;
fut = Future.succeededFuture((HttpClientStream) stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
}
Expand Down Expand Up @@ -366,7 +360,7 @@ private void removeStatusHeaders(Http2Headers headers) {
}

@Override
void onClose(HttpClosedException ex) {
void onClose() {
if (conn.metrics != null) {
if (!requestEnded || !responseEnded) {
conn.metrics.requestReset(metric);
Expand All @@ -378,14 +372,15 @@ void onClose(HttpClosedException ex) {
if (responseEnded && requestEnded) {
err = null;
} else {
err = ex;
err = HttpUtils.STREAM_CLOSED_EXCEPTION;
}
tracer.receiveResponse(context, response, trace, err, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (!responseEnded) {
onError(ex);
// NOT SURE OF THAT
onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
}
super.onClose(ex);
super.onClose();
// commented to be used later when we properly define the HTTP/2 connection expiration from the pool
// boolean disposable = conn.streams.isEmpty();
if (!push) {
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ VertxHttp2Stream<?> stream(int id) {
void onStreamError(int streamId, Throwable cause) {
VertxHttp2Stream stream = stream(streamId);
if (stream != null) {
stream.onError(cause);
stream.onException(cause);
}
}

Expand All @@ -147,11 +147,13 @@ void onStreamWritabilityChanged(Http2Stream s) {
void onStreamClosed(Http2Stream s) {
VertxHttp2Stream stream = s.getProperty(streamKey);
if (stream != null) {
boolean active = chctx.channel().isActive();
if (goAwayStatus != null) {
stream.onClose(new HttpClosedException(goAwayStatus));
} else {
stream.onClose(HttpUtils.STREAM_CLOSED_EXCEPTION);
stream.onException(new HttpClosedException(goAwayStatus));
} else if (!active) {
stream.onException(HttpUtils.STREAM_CLOSED_EXCEPTION);
}
stream.onClose();
}
checkShutdown();
}
Expand Down
28 changes: 21 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class Http2ServerConnection extends Http2ConnectionBase implements HttpSe
Handler<HttpServerRequest> requestHandler;
private int concurrentStreams;
private final ArrayDeque<Push> pendingPushes = new ArrayDeque<>(8);
private VertxHttp2Stream upgraded;

Http2ServerConnection(
EventLoopContext context,
Expand Down Expand Up @@ -134,17 +135,25 @@ String determineContentEncoding(Http2Headers headers) {
return null;
}

private Http2ServerStream createRequest(int streamId, Http2Headers headers, boolean streamEnded) {
private Http2ServerStream createStream(int streamId, Http2Headers headers, boolean streamEnded) {
Http2Stream stream = handler.connection().stream(streamId);
String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(headers) : null;
Http2ServerStream vertxStream = new Http2ServerStream(this, streamContextSupplier.get(), headers, serverOrigin);
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, options.getTracingPolicy(), headers, contentEncoding, streamEnded);
Http2ServerStream vertxStream = new Http2ServerStream(this, streamContextSupplier.get(), headers, serverOrigin, options.getTracingPolicy(), streamEnded);
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, headers, contentEncoding);
vertxStream.request = request;
vertxStream.isConnect = request.method() == HttpMethod.CONNECT;
vertxStream.init(stream);
return vertxStream;
}

VertxHttp2Stream<?> stream(int id) {
VertxHttp2Stream<?> stream = super.stream(id);
if (stream == null && id == 1 && handler.upgraded) {
return upgraded;
}
return stream;
}

@Override
protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriority streamPriority, boolean endOfStream) {
VertxHttp2Stream stream = stream(streamId);
Expand All @@ -153,7 +162,12 @@ protected synchronized void onHeadersRead(int streamId, Http2Headers headers, St
handler.writeReset(streamId, Http2Error.PROTOCOL_ERROR.code());
return;
}
stream = createRequest(streamId, headers, endOfStream);
if (streamId == 1 && handler.upgraded) {
stream = createStream(streamId, headers, true);
upgraded = stream;
} else {
stream = createStream(streamId, headers, endOfStream);
}
stream.onHeaders(headers, streamPriority);
} else {
// Http server request trailer - not implemented yet (in api)
Expand Down Expand Up @@ -190,7 +204,7 @@ private synchronized void doSendPush(int streamId, String host, HttpMethod metho
int promisedStreamId = future.getNow();
String contentEncoding = determineContentEncoding(headers_);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path, options.getTracingPolicy(), true);
Push push = new Push(vertxStream, contentEncoding, promise);
vertxStream.request = push;
push.stream.priority(streamPriority);
Expand Down Expand Up @@ -255,7 +269,7 @@ public void handleException(Throwable cause) {
}

@Override
public void handleClose(HttpClosedException ex) {
public void handleClose() {
if (pendingPushes.remove(this)) {
promise.fail("Push reset by client");
} else {
Expand All @@ -266,7 +280,7 @@ public void handleClose(HttpClosedException ex) {
concurrentStreams++;
push.complete();
}
response.handleClose(ex);
response.handleClose();
}
}

Expand Down
48 changes: 3 additions & 45 deletions src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerFileUpload;
Expand All @@ -44,14 +43,11 @@
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.net.URISyntaxException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
Expand All @@ -70,18 +66,13 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt
private final String serverOrigin;
private final MultiMap headersMap;
private final String scheme;
private final TracingPolicy tracingPolicy;

// Accessed on event loop
private Object trace;

// Accessed on context thread
private Charset paramsCharset = StandardCharsets.UTF_8;
private MultiMap params;
private String absoluteURI;
private MultiMap attributes;
private HttpEventHandler eventHandler;
private boolean streamEnded;
private boolean ended;
private Handler<HttpServerFileUpload> uploadHandler;
private HttpPostRequestDecoder postRequestDecoder;
Expand All @@ -90,10 +81,8 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt

Http2ServerRequest(Http2ServerStream stream,
String serverOrigin,
TracingPolicy tracingPolicy,
Http2Headers headers,
String contentEncoding,
boolean streamEnded) {
String contentEncoding) {
String scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null;
headers.remove(":method");
headers.remove(":scheme");
Expand All @@ -104,10 +93,8 @@ public class Http2ServerRequest extends HttpServerRequestInternal implements Htt
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, false, contentEncoding);
this.serverOrigin = serverOrigin;
this.streamEnded = streamEnded;
this.scheme = scheme;
this.headersMap = new Http2HeadersAdaptor(headers);
this.tracingPolicy = tracingPolicy;
}

private HttpEventHandler eventHandler(boolean create) {
Expand All @@ -118,10 +105,6 @@ private HttpEventHandler eventHandler(boolean create) {
}

public void dispatch(Handler<HttpServerRequest> handler) {
VertxTracer tracer = context.tracer();
if (tracer != null) {
trace = tracer.receiveRequest(context, SpanKind.RPC, tracingPolicy, this, method().name(), headers(), HttpUtils.SERVER_REQUEST_TAG_EXTRACTOR);
}
context.emit(this, handler);
}

Expand Down Expand Up @@ -155,32 +138,8 @@ private void notifyException(Throwable failure) {
}

@Override
public void onClose(HttpClosedException ex) {
VertxTracer tracer = context.tracer();
Object trace = this.trace;
if (tracer != null && trace != null) {
Throwable failure;
synchronized (stream.conn) {
if (!streamEnded && (!ended || !response.ended())) {
failure = ex;
} else {
failure = null;
}
}
tracer.sendResponse(context, failure == null ? response : null, trace, failure, HttpUtils.SERVER_RESPONSE_TAG_EXTRACTOR);
}
}

@Override
public void handleClose(HttpClosedException ex) {
boolean notify;
synchronized (stream.conn) {
notify = !streamEnded;
}
if (notify) {
notifyException(new ClosedChannelException());
}
response.handleClose(ex);
public void handleClose() {
response.handleClose();
}

@Override
Expand All @@ -207,7 +166,6 @@ public void handleData(Buffer data) {
public void handleEnd(MultiMap trailers) {
HttpEventHandler handler;
synchronized (stream.conn) {
streamEnded = true;
ended = true;
if (postRequestDecoder != null) {
try {
Expand Down
17 changes: 5 additions & 12 deletions src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
Expand Down Expand Up @@ -97,27 +91,26 @@ void handleReset(long code) {
void handleException(Throwable cause) {
Handler<Throwable> handler;
synchronized (conn) {
if (ended) {
return;
}
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(cause);
}
}

void handleClose(HttpClosedException ex) {
void handleClose() {
Handler<Throwable> exceptionHandler;
Handler<Void> endHandler;
Handler<Void> closeHandler;
synchronized (conn) {
closed = true;
boolean failed = !ended;
exceptionHandler = failed ? this.exceptionHandler : null;
endHandler = failed ? this.endHandler : null;
closeHandler = this.closeHandler;
}
if (exceptionHandler != null) {
stream.context.emit(ex, exceptionHandler);
}
if (endHandler != null) {
stream.context.emit(null, endHandler);
}
Expand Down
Loading

0 comments on commit bb0c09d

Please sign in to comment.