diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java index aa092cc3ef2..e4552340f87 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java @@ -134,13 +134,15 @@ String determineContentEncoding(Http2Headers headers) { return null; } - private Http2ServerRequest createRequest(int streamId, Http2Headers headers, boolean streamEnded) { + private Http2ServerStream createRequest(int streamId, Http2Headers headers, boolean streamEnded) { Http2Stream stream = handler.connection().stream(streamId); String contentEncoding = options.isCompressionSupported() ? determineContentEncoding(headers) : null; - Http2ServerRequest request = new Http2ServerRequest(this, options.getTracingPolicy(), streamContextSupplier.get(), serverOrigin, headers, contentEncoding, streamEnded); - request.isConnect = request.method() == HttpMethod.CONNECT; - request.init(stream); - return request; + Http2ServerStream vertxStream = new Http2ServerStream(this, streamContextSupplier.get(), headers, serverOrigin); + Http2ServerRequest request = new Http2ServerRequest(vertxStream, serverOrigin, options.getTracingPolicy(), headers, contentEncoding, streamEnded); + vertxStream.request = request; + vertxStream.isConnect = request.method() == HttpMethod.CONNECT; + vertxStream.init(stream); + return vertxStream; } @Override @@ -188,9 +190,11 @@ private synchronized void doSendPush(int streamId, String host, HttpMethod metho int promisedStreamId = future.getNow(); String contentEncoding = determineContentEncoding(headers_); Http2Stream promisedStream = handler.connection().stream(promisedStreamId); - Push push = new Push(context, contentEncoding, method, path, promise); - push.priority(streamPriority); - push.init(promisedStream); + Http2ServerStream vertxStream = new Http2ServerStream(this, context, method, path); + Push push = new Push(vertxStream, contentEncoding, promise); + vertxStream.request = push; + push.stream.priority(streamPriority); + push.stream.init(promisedStream); int maxConcurrentStreams = handler.maxConcurrentStreams(); if (concurrentStreams < maxConcurrentStreams) { concurrentStreams++; @@ -210,46 +214,48 @@ protected void updateSettings(Http2Settings settingsUpdate, Handler promise; - public Push(ContextInternal context, + public Push(Http2ServerStream stream, String contentEncoding, - HttpMethod method, - String uri, Promise promise) { - super(Http2ServerConnection.this, context, contentEncoding, method, uri); + this.context = stream.context; + this.stream = stream; + this.response = new Http2ServerResponse(stream.conn, stream, true, contentEncoding); this.promise = promise; } @Override - void dispatch(Handler handler) { - throw new UnsupportedOperationException(); + public Http2ServerResponse response() { + return response; } @Override - void handleWritabilityChanged(boolean writable) { - response.handlerWritabilityChanged(writable); + public void dispatch(Handler handler) { + throw new UnsupportedOperationException(); } @Override - void handleReset(long errorCode) { + public void handleReset(long errorCode) { if (!promise.tryFail(new StreamResetException(errorCode))) { response.handleReset(errorCode); } } @Override - void handleException(Throwable cause) { + public void handleException(Throwable cause) { if (response != null) { response.handleException(cause); } } @Override - void handleClose(HttpClosedException ex) { - super.handleClose(ex); + public void handleClose(HttpClosedException ex) { if (pendingPushes.remove(this)) { promise.fail("Push reset by client"); } else { @@ -265,7 +271,7 @@ void handleClose(HttpClosedException ex) { } void complete() { - registerMetrics(); + stream.registerMetrics(); promise.complete(response); } } diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java b/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java index c5ffedd9a61..ecf42ddc1b2 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerRequest.java @@ -60,10 +60,13 @@ /** * @author Julien Viet */ -public class Http2ServerRequest extends Http2ServerStream implements HttpServerRequestInternal, io.vertx.core.spi.observability.HttpRequest { +public class Http2ServerRequest implements HttpServerRequestInternal, Http2ServerStreamHandler, io.vertx.core.spi.observability.HttpRequest { private static final Logger log = LoggerFactory.getLogger(Http1xServerRequest.class); + protected final ContextInternal context; + protected final Http2ServerStream stream; + protected final Http2ServerResponse response; private final String serverOrigin; private final MultiMap headersMap; private final String scheme; @@ -85,21 +88,21 @@ public class Http2ServerRequest extends Http2ServerStream implements HttpServerR private Handler customFrameHandler; private Handler streamPriorityHandler; - Http2ServerRequest(Http2ServerConnection conn, - TracingPolicy tracingPolicy, - ContextInternal context, + Http2ServerRequest(Http2ServerStream stream, String serverOrigin, + TracingPolicy tracingPolicy, Http2Headers headers, String contentEncoding, boolean streamEnded) { - super(conn, context, headers, contentEncoding, serverOrigin); - String scheme = headers.get(":scheme") != null ? headers.get(":scheme").toString() : null; headers.remove(":method"); headers.remove(":scheme"); headers.remove(":path"); headers.remove(":authority"); + this.context = stream.context; + this.stream = stream; + this.response = new Http2ServerResponse(stream.conn, stream, false, contentEncoding); this.serverOrigin = serverOrigin; this.streamEnded = streamEnded; this.scheme = scheme; @@ -114,7 +117,7 @@ private HttpEventHandler eventHandler(boolean create) { return eventHandler; } - void dispatch(Handler handler) { + public void dispatch(Handler handler) { VertxTracer tracer = context.tracer(); if (tracer != null) { trace = tracer.receiveRequest(context, SpanKind.RPC, tracingPolicy, this, method().name(), headers(), HttpUtils.SERVER_REQUEST_TAG_EXTRACTOR); @@ -123,14 +126,9 @@ void dispatch(Handler handler) { } @Override - void handleWritabilityChanged(boolean writable) { - response.handlerWritabilityChanged(writable); - } - - @Override - void handleException(Throwable cause) { + public void handleException(Throwable cause) { boolean notify; - synchronized (conn) { + synchronized (stream.conn) { notify = !ended; } if (notify) { @@ -142,7 +140,7 @@ void handleException(Throwable cause) { private void notifyException(Throwable failure) { InterfaceHttpData upload = null; HttpEventHandler handler; - synchronized (conn) { + synchronized (stream.conn) { if (postRequestDecoder != null) { upload = postRequestDecoder.currentPartialHttpData(); } @@ -157,12 +155,12 @@ private void notifyException(Throwable failure) { } @Override - void onClose(HttpClosedException ex) { + public void onClose(HttpClosedException ex) { VertxTracer tracer = context.tracer(); Object trace = this.trace; if (tracer != null && trace != null) { Throwable failure; - synchronized (conn) { + synchronized (stream.conn) { if (!streamEnded && (!ended || !response.ended())) { failure = ex; } else { @@ -171,14 +169,12 @@ void onClose(HttpClosedException ex) { } tracer.sendResponse(context, failure == null ? response : null, trace, failure, HttpUtils.SERVER_RESPONSE_TAG_EXTRACTOR); } - super.onClose(ex); } @Override - void handleClose(HttpClosedException ex) { - super.handleClose(ex); + public void handleClose(HttpClosedException ex) { boolean notify; - synchronized (conn) { + synchronized (stream.conn) { notify = !streamEnded; } if (notify) { @@ -188,13 +184,13 @@ void handleClose(HttpClosedException ex) { } @Override - void handleCustomFrame(HttpFrame frame) { + public void handleCustomFrame(HttpFrame frame) { if (customFrameHandler != null) { customFrameHandler.handle(frame); } } - void handleData(Buffer data) { + public void handleData(Buffer data) { if (postRequestDecoder != null) { try { postRequestDecoder.offer(new DefaultHttpContent(data.getByteBuf())); @@ -208,9 +204,9 @@ void handleData(Buffer data) { } } - void handleEnd(MultiMap trailers) { + public void handleEnd(MultiMap trailers) { HttpEventHandler handler; - synchronized (conn) { + synchronized (stream.conn) { streamEnded = true; ended = true; if (postRequestDecoder != null) { @@ -246,9 +242,9 @@ void handleEnd(MultiMap trailers) { } @Override - void handleReset(long errorCode) { + public void handleReset(long errorCode) { boolean notify; - synchronized (conn) { + synchronized (stream.conn) { notify = !ended; ended = true; } @@ -264,6 +260,21 @@ private void checkEnded() { } } + @Override + public HttpMethod method() { + return stream.method; + } + + @Override + public int id() { + return stream.id(); + } + + @Override + public Object metric() { + return stream.metric(); + } + @Override public Context context() { return context; @@ -271,7 +282,7 @@ public Context context() { @Override public HttpServerRequest exceptionHandler(Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { HttpEventHandler eventHandler = eventHandler(handler != null); if (eventHandler != null) { eventHandler.exceptionHandler(handler); @@ -282,7 +293,7 @@ public HttpServerRequest exceptionHandler(Handler handler) { @Override public HttpServerRequest handler(Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { if (handler != null) { checkEnded(); } @@ -296,9 +307,9 @@ public HttpServerRequest handler(Handler handler) { @Override public HttpServerRequest pause() { - synchronized (conn) { + synchronized (stream.conn) { checkEnded(); - doPause(); + stream.doPause(); } return this; } @@ -310,16 +321,16 @@ public HttpServerRequest resume() { @Override public HttpServerRequest fetch(long amount) { - synchronized (conn) { + synchronized (stream.conn) { checkEnded(); - doFetch(amount); + stream.doFetch(amount); } return this; } @Override public HttpServerRequest endHandler(Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { if (handler != null) { checkEnded(); } @@ -338,20 +349,20 @@ public HttpVersion version() { @Override public String uri() { - return uri; + return stream.uri; } @Override public String path() { - synchronized (conn) { - return uri != null ? HttpUtils.parsePath(uri) : null; + synchronized (stream.conn) { + return stream.uri != null ? HttpUtils.parsePath(stream.uri) : null; } } @Override public String query() { - synchronized (conn) { - return uri != null ? HttpUtils.parseQuery(uri) : null; + synchronized (stream.conn) { + return stream.uri != null ? HttpUtils.parseQuery(stream.uri) : null; } } @@ -362,12 +373,12 @@ public String scheme() { @Override public String host() { - return host; + return stream.host; } @Override public long bytesRead() { - return super.bytesRead(); + return stream.bytesRead(); } @Override @@ -397,7 +408,7 @@ public String getParamsCharset() { } @Override public MultiMap params() { - synchronized (conn) { + synchronized (stream.conn) { if (params == null) { params = HttpUtils.params(uri(), paramsCharset); } @@ -407,20 +418,20 @@ public MultiMap params() { @Override public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException { - return conn.peerCertificateChain(); + return stream.conn.peerCertificateChain(); } @Override public SocketAddress remoteAddress() { - return conn.remoteAddress(); + return stream.conn.remoteAddress(); } @Override public String absoluteURI() { - if (method == HttpMethod.CONNECT) { + if (stream.method == HttpMethod.CONNECT) { return null; } - synchronized (conn) { + synchronized (stream.conn) { if (absoluteURI == null) { try { absoluteURI = HttpUtils.absoluteURI(serverOrigin, this); @@ -439,7 +450,7 @@ public Future toNetSocket() { @Override public HttpServerRequest setExpectMultipart(boolean expect) { - synchronized (conn) { + synchronized (stream.conn) { checkEnded(); if (expect) { if (postRequestDecoder == null) { @@ -450,16 +461,16 @@ public HttpServerRequest setExpectMultipart(boolean expect) { if (!HttpUtils.isValidMultipartContentType(contentType)) { throw new IllegalStateException("Request must have a valid content-type header to decode a multipart request"); } - if (!HttpUtils.isValidMultipartMethod(method.toNetty())) { + if (!HttpUtils.isValidMultipartMethod(stream.method.toNetty())) { throw new IllegalStateException("Request method must be one of POST, PUT, PATCH or DELETE to decode a multipart request"); } HttpRequest req = new DefaultHttpRequest( io.netty.handler.codec.http.HttpVersion.HTTP_1_1, - method.toNetty(), - uri); + stream.method.toNetty(), + stream.uri); req.headers().add(HttpHeaderNames.CONTENT_TYPE, contentType); NettyFileUploadDataFactory factory = new NettyFileUploadDataFactory(context, this, () -> uploadHandler); - factory.setMaxLimit(conn.options.getMaxFormAttributeSize()); + factory.setMaxLimit(stream.conn.options.getMaxFormAttributeSize()); postRequestDecoder = new HttpPostRequestDecoder(factory, req); } } else { @@ -471,14 +482,14 @@ public HttpServerRequest setExpectMultipart(boolean expect) { @Override public boolean isExpectMultipart() { - synchronized (conn) { + synchronized (stream.conn) { return postRequestDecoder != null; } } @Override public HttpServerRequest uploadHandler(@Nullable Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { if (handler != null) { checkEnded(); } @@ -489,7 +500,7 @@ public HttpServerRequest uploadHandler(@Nullable Handler h @Override public MultiMap formAttributes() { - synchronized (conn) { + synchronized (stream.conn) { // Create it lazily if (attributes == null) { attributes = MultiMap.caseInsensitiveMultiMap(); @@ -515,14 +526,14 @@ public Future toWebSocket() { @Override public boolean isEnded() { - synchronized (conn) { + synchronized (stream.conn) { return ended; } } @Override public HttpServerRequest customFrameHandler(Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { customFrameHandler = handler; } return this; @@ -530,7 +541,7 @@ public HttpServerRequest customFrameHandler(Handler handler) { @Override public HttpConnection connection() { - return conn; + return stream.conn; } @Override @@ -546,12 +557,12 @@ public synchronized Future end() { } public StreamPriority streamPriority() { - return priority(); + return stream.priority(); } @Override public HttpServerRequest streamPriorityHandler(Handler handler) { - synchronized (conn) { + synchronized (stream.conn) { streamPriorityHandler = handler; } return this; @@ -563,9 +574,9 @@ public DecoderResult decoderResult() { } @Override - void handlePriorityChange(StreamPriority streamPriority) { + public void handlePriorityChange(StreamPriority streamPriority) { Handler handler; - synchronized (conn) { + synchronized (stream.conn) { handler = streamPriorityHandler; } if (handler != null) { @@ -597,7 +608,7 @@ public Cookie getCookie(String name, String domain, String path) { @Override public HttpServerRequest routed(String route) { - super.routed(route); + stream.routed(route); return this; } } diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java b/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java index b260997acd9..d4434ad8656 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerResponse.java @@ -53,7 +53,6 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse { private final ChannelHandlerContext ctx; private final Http2ServerConnection conn; private final boolean push; - private final String host; private final String contentEncoding; private final Http2Headers headers = new DefaultHttp2Headers(); private Http2HeadersAdaptor headersMap; @@ -77,13 +76,11 @@ public class Http2ServerResponse implements HttpServerResponse, HttpResponse { public Http2ServerResponse(Http2ServerConnection conn, Http2ServerStream stream, boolean push, - String contentEncoding, - String host) { + String contentEncoding) { this.stream = stream; this.ctx = conn.handlerContext; this.conn = conn; this.push = push; - this.host = host; this.contentEncoding = contentEncoding; } @@ -421,7 +418,7 @@ Future netSocket() { netSocket = stream.context.failedFuture("Response for CONNECT already sent"); } else { ctx.flush(); - HttpNetSocket ns = HttpNetSocket.netSocket(conn, stream.context, (ReadStream) stream, this); + HttpNetSocket ns = HttpNetSocket.netSocket(conn, stream.context, (ReadStream) stream.request, this); netSocket = Future.succeededFuture(ns); } } @@ -695,7 +692,7 @@ public Future push(HttpMethod method, String host, String pa throw new IllegalStateException("A push response cannot promise another push"); } if (host == null) { - host = this.host; + host = stream.host; } synchronized (conn) { checkValid(); diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java index 1d16c60f629..a005ac6a0c2 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java @@ -16,7 +16,9 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClosedException; +import io.vertx.core.http.HttpFrame; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.StreamPriority; @@ -27,20 +29,19 @@ import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED; -abstract class Http2ServerStream extends VertxHttp2Stream { +class Http2ServerStream extends VertxHttp2Stream { protected final Http2Headers headers; protected final HttpMethod method; protected final String uri; protected final String host; - protected final Http2ServerResponse response; private Object metric; private boolean requestEnded; private boolean responseEnded; + Http2ServerStreamHandler request; Http2ServerStream(Http2ServerConnection conn, ContextInternal context, - String contentEncoding, HttpMethod method, String uri) { super(conn, context); @@ -49,10 +50,9 @@ abstract class Http2ServerStream extends VertxHttp2Stream this.method = method; this.uri = uri; this.host = null; - this.response = new Http2ServerResponse(conn, this, true, contentEncoding, null); } - Http2ServerStream(Http2ServerConnection conn, ContextInternal context, Http2Headers headers, String contentEncoding, String serverOrigin) { + Http2ServerStream(Http2ServerConnection conn, ContextInternal context, Http2Headers headers, String serverOrigin) { super(conn, context); String host = headers.get(":authority") != null ? headers.get(":authority").toString() : null; @@ -65,17 +65,16 @@ abstract class Http2ServerStream extends VertxHttp2Stream this.host = host; this.uri = headers.get(":path") != null ? headers.get(":path").toString() : null; this.method = headers.get(":method") != null ? HttpMethod.valueOf(headers.get(":method").toString()) : null; - this.response = new Http2ServerResponse(conn, this, false, contentEncoding, host); } void registerMetrics() { if (METRICS_ENABLED) { HttpServerMetrics metrics = conn.metrics(); if (metrics != null) { - if (response.isPush()) { - metric = metrics.responsePushed(conn.metric(), method(), uri, response); + if (request.response().isPush()) { + metric = metrics.responsePushed(conn.metric(), method(), uri, request.response()); } else { - metric = metrics.requestBegin(conn.metric(), (HttpRequest) this); + metric = metrics.requestBegin(conn.metric(), (HttpRequest) request); } } } @@ -91,9 +90,9 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) { if (conn.options.isHandle100ContinueAutomatically() && ((value != null && HttpHeaderValues.CONTINUE.equals(value)) || headers.contains(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))) { - response.writeContinue(); + request.response().writeContinue(); } - dispatch(conn.requestHandler); + request.dispatch(conn.requestHandler); } @Override @@ -102,20 +101,18 @@ void onEnd(MultiMap trailers) { if (Metrics.METRICS_ENABLED) { HttpServerMetrics metrics = conn.metrics(); if (metrics != null) { - metrics.requestEnd(metric, (HttpRequest) this, bytesRead()); + metrics.requestEnd(metric, (HttpRequest) request, bytesRead()); } } super.onEnd(trailers); } - abstract void dispatch(Handler handler); - @Override void doWriteHeaders(Http2Headers headers, boolean end, Handler> handler) { if (Metrics.METRICS_ENABLED && !end) { HttpServerMetrics metrics = conn.metrics(); if (metrics != null) { - metrics.responseBegin(metric, response); + metrics.responseBegin(metric, request.response()); } } super.doWriteHeaders(headers, end, handler); @@ -123,9 +120,7 @@ void doWriteHeaders(Http2Headers headers, boolean end, Handler @Override void handleWritabilityChanged(boolean writable) { - if (response != null) { - response.handlerWritabilityChanged(writable); - } + request.response().handlerWritabilityChanged(writable); } public HttpMethod method() { @@ -138,7 +133,7 @@ protected void endWritten() { if (METRICS_ENABLED) { HttpServerMetrics metrics = conn.metrics(); if (metrics != null) { - metrics.responseEnd(metric, response, bytesWritten()); + metrics.responseEnd(metric, request.response(), bytesWritten()); } } } @@ -153,13 +148,50 @@ void handleClose(HttpClosedException ex) { metrics.requestReset(metric); } } + request.handleClose(ex); + } + + @Override + void handleReset(long errorCode) { + request.handleReset(errorCode); + } + + @Override + void handleException(Throwable cause) { + request.handleException(cause); + } + + @Override + void handleCustomFrame(HttpFrame frame) { + request.handleCustomFrame(frame); + } + + @Override + void handlePriorityChange(StreamPriority newPriority) { + request.handlePriorityChange(newPriority); + } + + @Override + void handleData(Buffer buf) { + request.handleData(buf); + } + + @Override + void handleEnd(MultiMap trailers) { + request.handleEnd(trailers); + } + + @Override + void onClose(HttpClosedException ex) { + request.onClose(ex); + super.onClose(ex); } public Object metric() { return metric; } - HttpServerRequest routed(String route) { + public HttpServerRequest routed(String route) { if (METRICS_ENABLED) { HttpServerMetrics metrics = conn.metrics(); if (metrics != null && !responseEnded) { diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java new file mode 100644 index 00000000000..c7c8dbb8685 --- /dev/null +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStreamHandler.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http.impl; + +import io.vertx.core.Handler; +import io.vertx.core.MultiMap; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClosedException; +import io.vertx.core.http.HttpFrame; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.http.StreamPriority; +import io.vertx.core.impl.ContextInternal; + +interface Http2ServerStreamHandler { + + Http2ServerResponse response(); + + void dispatch(Handler handler); + + void handleReset(long errorCode); + + void handleException(Throwable cause); + + void handleClose(HttpClosedException ex); + + default void handleData(Buffer data) { + } + + default void handleEnd(MultiMap trailers) { + } + + default void handleCustomFrame(HttpFrame frame) { + } + + default void handlePriorityChange(StreamPriority streamPriority) { + } + + default void onClose(HttpClosedException ex) { + } +}