Skip to content

Commit

Permalink
Decouple Http2ServerStream from HttpServerRequest implementations for…
Browse files Browse the repository at this point in the history
… HTTP/2
  • Loading branch information
vietj committed Nov 7, 2022
1 parent ac486c3 commit 0de1c53
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 110 deletions.
50 changes: 28 additions & 22 deletions src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,15 @@ String determineContentEncoding(Http2Headers headers) {
return null;
}

private Http2ServerRequest createRequest(int streamId, Http2Headers headers, boolean streamEnded) {
private Http2ServerStream createRequest(int streamId, Http2Headers headers, boolean streamEnded) {
Http2Stream stream = handler.connection().stream(streamId);
String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(headers) : null;
Http2ServerRequest request = new Http2ServerRequest(this, options.getTracingPolicy(), streamContextSupplier.get(), serverOrigin, headers, contentEncoding, streamEnded);
request.isConnect = request.method() == HttpMethod.CONNECT;
request.init(stream);
return request;
Http2ServerStream vertxStream = new Http2ServerStream(this, streamContextSupplier.get(), headers, serverOrigin);
Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, options.getTracingPolicy(), headers, contentEncoding, streamEnded);
vertxStream.request = request;
vertxStream.isConnect = request.method() == HttpMethod.CONNECT;
vertxStream.init(stream);
return vertxStream;
}

@Override
Expand Down Expand Up @@ -188,9 +190,11 @@ private synchronized void doSendPush(int streamId, String host, HttpMethod metho
int promisedStreamId = future.getNow();
String contentEncoding = determineContentEncoding(headers_);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
Push push = new Push(context, contentEncoding, method, path, promise);
push.priority(streamPriority);
push.init(promisedStream);
Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path);
Push push = new Push(vertxStream, contentEncoding, promise);
vertxStream.request = push;
push.stream.priority(streamPriority);
push.stream.init(promisedStream);
int maxConcurrentStreams = handler.maxConcurrentStreams();
if (concurrentStreams < maxConcurrentStreams) {
concurrentStreams++;
Expand All @@ -210,46 +214,48 @@ protected void updateSettings(Http2Settings settingsUpdate, Handler<AsyncResult<
super.updateSettings(settingsUpdate, completionHandler);
}

private class Push extends Http2ServerStream {
private class Push implements Http2ServerStreamHandler {

protected final ContextInternal context;
protected final Http2ServerStream stream;
protected final Http2ServerResponse response;
private final Promise<HttpServerResponse> promise;

public Push(ContextInternal context,
public Push(Http2ServerStream stream,
String contentEncoding,
HttpMethod method,
String uri,
Promise<HttpServerResponse> promise) {
super(Http2ServerConnection.this, context, contentEncoding, method, uri);
this.context = stream.context;
this.stream = stream;
this.response = new Http2ServerResponse(stream.conn, stream, true, contentEncoding);
this.promise = promise;
}

@Override
void dispatch(Handler<HttpServerRequest> handler) {
throw new UnsupportedOperationException();
public Http2ServerResponse response() {
return response;
}

@Override
void handleWritabilityChanged(boolean writable) {
response.handlerWritabilityChanged(writable);
public void dispatch(Handler<HttpServerRequest> handler) {
throw new UnsupportedOperationException();
}

@Override
void handleReset(long errorCode) {
public void handleReset(long errorCode) {
if (!promise.tryFail(new StreamResetException(errorCode))) {
response.handleReset(errorCode);
}
}

@Override
void handleException(Throwable cause) {
public void handleException(Throwable cause) {
if (response != null) {
response.handleException(cause);
}
}

@Override
void handleClose(HttpClosedException ex) {
super.handleClose(ex);
public void handleClose(HttpClosedException ex) {
if (pendingPushes.remove(this)) {
promise.fail("Push reset by client");
} else {
Expand All @@ -265,7 +271,7 @@ void handleClose(HttpClosedException ex) {
}

void complete() {
registerMetrics();
stream.registerMetrics();
promise.complete(response);
}
}
Expand Down
Loading

0 comments on commit 0de1c53

Please sign in to comment.