diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 169dc3090b85..317538d6cce5 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http3.client.internal; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -44,6 +45,7 @@ public class ClientHTTP3Session extends ClientProtocolSession { private static final Logger LOG = LoggerFactory.getLogger(ClientHTTP3Session.class); + private final HTTP3Configuration configuration; private final HTTP3SessionClient session; private final QpackEncoder encoder; private final QpackDecoder decoder; @@ -53,7 +55,8 @@ public class ClientHTTP3Session extends ClientProtocolSession public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession quicSession, Session.Client.Listener listener, Promise promise) { super(quicSession); - this.session = new HTTP3SessionClient(this, listener, promise); + this.configuration = configuration; + session = new HTTP3SessionClient(this, listener, promise); addBean(session); session.setStreamIdleTimeout(configuration.getStreamIdleTimeout()); @@ -63,7 +66,8 @@ public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession qu long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId); InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); - this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams()); + encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher)); + encoder.setMaxHeadersSize(configuration.getMaxRequestHeadersSize()); addBean(encoder); if (LOG.isDebugEnabled()) LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); @@ -71,19 +75,19 @@ public ClientHTTP3Session(HTTP3Configuration configuration, ClientQuicSession qu long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId); InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); - this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxResponseHeadersSize()); + decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher)); addBean(decoder); if (LOG.isDebugEnabled()) LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL); QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId); - this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true); + controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true); addBean(controlFlusher); if (LOG.isDebugEnabled()) LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); - this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxRequestHeadersSize(), configuration.isUseOutputDirectByteBuffers()); + messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers()); addBean(messageFlusher); } @@ -105,16 +109,86 @@ public HTTP3SessionClient getSessionClient() @Override protected void onStart() { - // Queue the mandatory SETTINGS frame. Map settings = session.onPreface(); - if (settings == null) - settings = Map.of(); - // TODO: add default settings. + settings = settings != null ? new HashMap<>(settings) : new HashMap<>(); + + settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxDecoderTableCapacity(); + if (v == 0) + v = null; + } + return v; + }); + settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxResponseHeadersSize(); + if (v <= 0) + v = null; + } + return v; + }); + settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxBlockedStreams(); + if (v == 0) + v = null; + } + return v; + }); + + if (LOG.isDebugEnabled()) + LOG.debug("configuring local {} on {}", settings, this); + + settings.forEach((key, value) -> + { + if (key == SettingsFrame.MAX_TABLE_CAPACITY) + decoder.setMaxTableCapacity(value.intValue()); + else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE) + decoder.setMaxHeadersSize(value.intValue()); + else if (key == SettingsFrame.MAX_BLOCKED_STREAMS) + decoder.setMaxBlockedStreams(value.intValue()); + }); + + // Queue the mandatory SETTINGS frame. SettingsFrame frame = new SettingsFrame(settings); if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream))) controlFlusher.iterate(); } + public void onSettings(SettingsFrame frame) + { + Map settings = frame.getSettings(); + if (LOG.isDebugEnabled()) + LOG.debug("configuring encoder {} on {}", settings, this); + settings.forEach((key, value) -> + { + if (key == SettingsFrame.MAX_TABLE_CAPACITY) + { + int maxTableCapacity = value.intValue(); + encoder.setMaxTableCapacity(maxTableCapacity); + encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity())); + } + else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE) + { + // Must cap the maxHeaderSize to avoid large allocations. + int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxRequestHeadersSize()); + encoder.setMaxHeadersSize(maxHeadersSize); + } + else if (key == SettingsFrame.MAX_BLOCKED_STREAMS) + { + int maxBlockedStreams = value.intValue(); + encoder.setMaxBlockedStreams(maxBlockedStreams); + } + }); + } + private void failControlStream(Throwable failure) { long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code(); diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index f701e2dde20b..a71940360462 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -20,6 +20,7 @@ import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.GoAwayFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.HTTP3ErrorCode; import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.quic.common.ProtocolSession; @@ -63,7 +64,7 @@ protected HTTP3StreamClient newHTTP3Stream(QuicStreamEndPoint endPoint, boolean } @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { if (frame.getMetaData().isResponse()) { @@ -76,10 +77,19 @@ public void onHeaders(long streamId, HeadersFrame frame) } else { - super.onHeaders(streamId, frame); + super.onHeaders(streamId, frame, wasBlocked); } } + @Override + public void onSettings(SettingsFrame frame) + { + if (LOG.isDebugEnabled()) + LOG.debug("received {} on {}", frame, this); + getProtocolSession().onSettings(frame); + super.onSettings(frame); + } + @Override public CompletableFuture newRequest(HeadersFrame frame, Stream.Client.Listener listener) { @@ -147,24 +157,4 @@ protected GoAwayFrame newGoAwayFrame(boolean graceful) return GoAwayFrame.CLIENT_GRACEFUL; return super.newGoAwayFrame(graceful); } - - @Override - protected void onSettingMaxTableCapacity(long value) - { - getProtocolSession().getQpackEncoder().setCapacity((int)value); - } - - @Override - protected void onSettingMaxFieldSectionSize(long value) - { - getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value); - } - - @Override - protected void onSettingMaxBlockedStreams(long value) - { - ClientHTTP3Session session = getProtocolSession(); - session.getQpackDecoder().setMaxBlockedStreams((int)value); - session.getQpackEncoder().setMaxBlockedStreams((int)value); - } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Configuration.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Configuration.java index b6d1d937006e..d2deaa6ec671 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Configuration.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3Configuration.java @@ -17,7 +17,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; /** - *

A record that captures HTTP/3 configuration parameters.

+ *

The HTTP/3 configuration parameters.

*/ @ManagedObject public class HTTP3Configuration @@ -27,9 +27,11 @@ public class HTTP3Configuration private int outputBufferSize = 2048; private boolean useInputDirectByteBuffers = true; private boolean useOutputDirectByteBuffers = true; - private int maxBlockedStreams = 0; - private int maxRequestHeadersSize = 8192; - private int maxResponseHeadersSize = 8192; + private int maxBlockedStreams = 64; + private int maxTableCapacity = 64 * 1024; + private int initialTableCapacity = 64 * 1024; + private int maxRequestHeadersSize = 8 * 1024; + private int maxResponseHeadersSize = 8 * 1024; @ManagedAttribute("The stream idle timeout in milliseconds") public long getStreamIdleTimeout() @@ -37,6 +39,13 @@ public long getStreamIdleTimeout() return streamIdleTimeout; } + /** + *

Sets the stream idle timeout in milliseconds.

+ *

Negative values and zero mean that the stream never times out.

+ *

Default value is {@code 30} seconds.

+ * + * @param streamIdleTimeout the stream idle timeout in milliseconds + */ public void setStreamIdleTimeout(long streamIdleTimeout) { this.streamIdleTimeout = streamIdleTimeout; @@ -48,6 +57,12 @@ public int getInputBufferSize() return inputBufferSize; } + /** + *

Sets the size of the buffer used for QUIC network reads.

+ *

Default value is {@code 2048} bytes.

+ * + * @param inputBufferSize the buffer size in bytes + */ public void setInputBufferSize(int inputBufferSize) { this.inputBufferSize = inputBufferSize; @@ -59,39 +74,105 @@ public int getOutputBufferSize() return outputBufferSize; } + /** + *

Sets the size of the buffer used for QUIC network writes.

+ *

Default value is {@code 2048} bytes.

+ * + * @param outputBufferSize the buffer size in bytes + */ public void setOutputBufferSize(int outputBufferSize) { this.outputBufferSize = outputBufferSize; } - @ManagedAttribute("Whether to use direct buffers for input") + @ManagedAttribute("Whether to use direct buffers for network reads") public boolean isUseInputDirectByteBuffers() { return useInputDirectByteBuffers; } + /** + *

Sets whether to use direct buffers for QUIC network reads.

+ *

Default value is {@code true}.

+ * + * @param useInputDirectByteBuffers whether to use direct buffers for network reads + */ public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) { this.useInputDirectByteBuffers = useInputDirectByteBuffers; } - @ManagedAttribute("Whether to use direct buffers for output") + @ManagedAttribute("Whether to use direct buffers for network writes") public boolean isUseOutputDirectByteBuffers() { return useOutputDirectByteBuffers; } + /** + *

Sets whether to use direct buffers for QUIC network writes.

+ *

Default value is {@code true}.

+ * + * @param useOutputDirectByteBuffers whether to use direct buffers for network writes + */ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers) { this.useOutputDirectByteBuffers = useOutputDirectByteBuffers; } + @ManagedAttribute("The local QPACK max decoder dynamic table capacity") + public int getMaxDecoderTableCapacity() + { + return maxTableCapacity; + } + + /** + *

Sets the local QPACK decoder max dynamic table capacity.

+ *

The default value is {@code 65536} bytes.

+ *

This value is configured on the local QPACK decoder, and then + * communicated to the remote QPACK encoder via the SETTINGS frame.

+ * + * @param maxTableCapacity the QPACK decoder dynamic table max capacity + * @see #setInitialEncoderTableCapacity(int) + */ + public void setMaxDecoderTableCapacity(int maxTableCapacity) + { + this.maxTableCapacity = maxTableCapacity; + } + + @ManagedAttribute("The local QPACK initial encoder dynamic table capacity") + public int getInitialEncoderTableCapacity() + { + return initialTableCapacity; + } + + /** + *

Sets the local QPACK encoder initial dynamic table capacity.

+ *

The default value is {@code 65536} bytes.

+ *

This value is configured in the local QPACK encoder, and may be + * overwritten by a smaller value received via the SETTINGS frame.

+ * + * @param initialTableCapacity the QPACK encoder dynamic table initial capacity + * @see #setMaxDecoderTableCapacity(int) + */ + public void setInitialEncoderTableCapacity(int initialTableCapacity) + { + this.initialTableCapacity = initialTableCapacity; + } + @ManagedAttribute("The max number of QPACK blocked streams") public int getMaxBlockedStreams() { return maxBlockedStreams; } + /** + *

Sets the local QPACK decoder max number of blocked streams.

+ *

The default value is {@code 64}.

+ *

This value is configured in the local QPACK decoder, and then + * communicated to the remote QPACK encoder via the SETTINGS frame.

+ * + * @param maxBlockedStreams the QPACK decoder max blocked streams + */ public void setMaxBlockedStreams(int maxBlockedStreams) { this.maxBlockedStreams = maxBlockedStreams; @@ -103,6 +184,17 @@ public int getMaxRequestHeadersSize() return maxRequestHeadersSize; } + /** + *

Sets max request headers size.

+ *

The default value is {@code 8192} bytes.

+ *

This value is configured in the server-side QPACK decoder, and + * then communicated to the client-side QPACK encoder via the SETTINGS + * frame.

+ *

The client-side QPACK encoder uses this value to cap, if necessary, + * the value sent by the server-side QPACK decoder.

+ * + * @param maxRequestHeadersSize the max request headers size in bytes + */ public void setMaxRequestHeadersSize(int maxRequestHeadersSize) { this.maxRequestHeadersSize = maxRequestHeadersSize; @@ -114,6 +206,17 @@ public int getMaxResponseHeadersSize() return maxResponseHeadersSize; } + /** + *

Sets max response headers size.

+ *

The default value is {@code 8192} bytes.

+ *

This value is configured in the client-side QPACK decoder, and + * then communicated to the server-side QPACK encoder via the SETTINGS + * frame.

+ *

The server-side QPACK encoder uses this value to cap, if necessary, + * the value sent by the client-side QPACK decoder.

+ * + * @param maxResponseHeadersSize the max response headers size + */ public void setMaxResponseHeadersSize(int maxResponseHeadersSize) { this.maxResponseHeadersSize = maxResponseHeadersSize; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java index 31ee1b56fc2e..80903d9bff8c 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/DecoderStreamConnection.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import org.eclipse.jetty.http3.internal.parser.ParserListener; import org.eclipse.jetty.http3.qpack.QpackEncoder; import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.ByteBufferPool; @@ -28,9 +29,9 @@ public class DecoderStreamConnection extends InstructionStreamConnection private final QpackEncoder encoder; - public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder) + public DecoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackEncoder encoder, ParserListener listener) { - super(endPoint, executor, byteBufferPool); + super(endPoint, executor, byteBufferPool, listener); this.encoder = encoder; } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java index a8d53825ad12..3c33b6598dec 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/EncoderStreamConnection.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import org.eclipse.jetty.http3.internal.parser.ParserListener; import org.eclipse.jetty.http3.qpack.QpackDecoder; import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.ByteBufferPool; @@ -28,9 +29,9 @@ public class EncoderStreamConnection extends InstructionStreamConnection private final QpackDecoder decoder; - public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder) + public EncoderStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, QpackDecoder decoder, ParserListener listener) { - super(endPoint, executor, byteBufferPool); + super(endPoint, executor, byteBufferPool, listener); this.decoder = decoder; } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index 227a2b8b9e9d..d6b04c4b5772 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -125,7 +125,7 @@ private CompletableFuture goAway(GoAwayFrame frame) boolean failStreams = false; boolean sendGoAway = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { switch (closeState) { @@ -231,7 +231,7 @@ protected GoAwayFrame newGoAwayFrame(boolean graceful) public CompletableFuture shutdown() { CompletableFuture result; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (shutdown != null) return shutdown; @@ -287,7 +287,7 @@ protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint) private HTTP3Stream newHTTP3Stream(QuicStreamEndPoint endPoint, Consumer fail, boolean local) { Throwable failure = null; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (closeState == CloseState.NOT_CLOSED) streamCount.incrementAndGet(); @@ -349,7 +349,7 @@ public Map onPreface() { Map settings = notifyPreface(); if (LOG.isDebugEnabled()) - LOG.debug("produced settings {} on {}", settings, this); + LOG.debug("application produced settings {} on {}", settings, this); return settings; } @@ -369,34 +369,9 @@ private Map notifyPreface() @Override public void onSettings(SettingsFrame frame) { - if (LOG.isDebugEnabled()) - LOG.debug("received {} on {}", frame, this); - - frame.getSettings().forEach((key, value) -> - { - if (key == SettingsFrame.MAX_TABLE_CAPACITY) - onSettingMaxTableCapacity(value); - else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE) - onSettingMaxFieldSectionSize(value); - else if (key == SettingsFrame.MAX_BLOCKED_STREAMS) - onSettingMaxBlockedStreams(value); - }); - notifySettings(frame); } - protected void onSettingMaxTableCapacity(long value) - { - } - - protected void onSettingMaxFieldSectionSize(long value) - { - } - - protected void onSettingMaxBlockedStreams(long value) - { - } - private void notifySettings(SettingsFrame frame) { try @@ -435,7 +410,7 @@ private boolean notifyIdleTimeout() } @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { MetaData metaData = frame.getMetaData(); if (metaData.isRequest() || metaData.isResponse()) @@ -480,7 +455,7 @@ public void onGoAway(GoAwayFrame frame) LOG.debug("received {} on {}", frame, this); boolean failStreams = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { switch (closeState) { @@ -591,7 +566,7 @@ public void onGoAway(GoAwayFrame frame) public boolean onIdleTimeout() { boolean notify = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { switch (closeState) { @@ -647,7 +622,7 @@ public boolean onIdleTimeout() public void inwardClose(long error, String reason) { GoAwayFrame goAwayFrame = null; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { switch (closeState) { @@ -751,7 +726,7 @@ private void terminate() streamTimeouts.destroy(); // Notify the shutdown completable. CompletableFuture shutdown; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { shutdown = this.shutdown; } @@ -762,7 +737,7 @@ private void terminate() private void tryRunZeroStreamsAction() { Runnable action = null; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { long count = streamCount.get(); if (count > 0) @@ -835,7 +810,7 @@ public void onClose(long error, String reason) // A close at the QUIC level does not allow any // data to be sent, update the state and notify. boolean notifyFailure; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { notifyFailure = closeState == CloseState.NOT_CLOSED; closeState = CloseState.CLOSED; diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 73d840160866..b8dca9679089 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -17,6 +17,7 @@ import java.io.UncheckedIOException; import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; @@ -41,17 +42,16 @@ public abstract class HTTP3StreamConnection extends AbstractConnection private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2); private final AutoLock lock = new AutoLock(); + private final AtomicReference event = new AtomicReference<>(); private final RetainableByteBufferPool buffers; private final MessageParser parser; private boolean useInputDirectByteBuffers = true; private RetainableByteBuffer buffer; - private boolean applicationMode; - private boolean parserDataMode; private boolean dataDemand; private boolean dataStalled; private DataFrame dataFrame; private boolean dataLast; - private boolean noData; + private boolean hasNetworkData; private boolean remotelyClosed; public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser) @@ -78,11 +78,6 @@ public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) this.useInputDirectByteBuffers = useInputDirectByteBuffers; } - public void setApplicationMode(boolean mode) - { - this.applicationMode = mode; - } - @Override public void onOpen() { @@ -108,8 +103,8 @@ protected boolean onReadTimeout(Throwable timeout) public void onFillable() { if (LOG.isDebugEnabled()) - LOG.debug("processing parserDataMode={} on {}", parserDataMode, this); - if (parserDataMode) + LOG.debug("processing dataMode={} on {}", parser.isDataMode(), this); + if (parser.isDataMode()) processDataFrames(); else processNonDataFrames(); @@ -118,7 +113,7 @@ public void onFillable() private void processDataFrames() { processDataDemand(); - if (!parserDataMode) + if (!parser.isDataMode()) { if (hasBuffer() && buffer.hasRemaining()) processNonDataFrames(); @@ -133,50 +128,51 @@ private void processNonDataFrames() { tryAcquireBuffer(); - while (true) + MessageParser.Result result = parseAndFill(true); + if (result == MessageParser.Result.NO_FRAME) { - if (parseAndFill(true) == MessageParser.Result.NO_FRAME) - { - tryReleaseBuffer(false); - return; - } + tryReleaseBuffer(false); + return; + } - // TODO: we should also exit if the connection was closed due to errors. - // There is not yet a isClosed() primitive though. - if (remotelyClosed) - { - // We have detected the end of the stream, - // do not loop around to fill & parse again. - // However, the last frame may have - // caused a write that we need to flush. - getEndPoint().getQuicSession().flush(); - tryReleaseBuffer(false); - return; - } + if (result == MessageParser.Result.BLOCKED_FRAME) + { + // Return immediately because another thread may + // resume the processing as the stream is unblocked. + tryReleaseBuffer(false); + return; + } - if (parserDataMode) - { - if (buffer.hasRemaining()) - { - processDataFrames(); - } - else - { - if (applicationMode) - { - if (LOG.isDebugEnabled()) - LOG.debug("skipping fill interest on {}", this); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("setting fill interest on {}", this); - fillInterested(); - } - tryReleaseBuffer(false); - } - return; - } + Runnable action = event.getAndSet(null); + if (action == null) + throw new IllegalStateException(); + action.run(); + + // TODO: we should also exit if the connection was closed due to errors. + // This can be done by overriding relevant methods in MessageListener. + + if (remotelyClosed) + { + // We have detected the end of the stream, do not try to fill & parse again. + // However, the last frame may have caused a write that needs to be flushed. + getEndPoint().getQuicSession().flush(); + tryReleaseBuffer(false); + return; + } + + if (!parser.isDataMode()) + throw new IllegalStateException(); + + if (hasBuffer() && buffer.hasRemaining()) + { + processDataFrames(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("setting fill interest on {}", this); + tryReleaseBuffer(false); + fillInterested(); } } catch (Throwable x) @@ -204,7 +200,12 @@ public Stream.Data readData() { case FRAME: { - if (parserDataMode) + Runnable action = event.getAndSet(null); + if (action == null) + throw new IllegalStateException(); + action.run(); + + if (parser.isDataMode()) { DataFrame frame = dataFrame; dataFrame = null; @@ -225,12 +226,11 @@ public Stream.Data readData() return null; } } - case MODE_SWITCH: + case SWITCH_MODE: { if (LOG.isDebugEnabled()) - LOG.debug("switching to parserDataMode=false on {}", this); + LOG.debug("switching to dataMode=false on {}", this); dataLast = true; - parserDataMode = false; parser.setDataMode(false); tryReleaseBuffer(false); return null; @@ -269,9 +269,9 @@ public void demand() { boolean hasData; boolean process = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { - hasData = !noData; + hasData = hasNetworkData; dataDemand = true; if (dataStalled && hasData) { @@ -289,7 +289,7 @@ else if (!hasData) public boolean hasDemand() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return dataDemand; } @@ -297,7 +297,7 @@ public boolean hasDemand() private void cancelDemand() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { dataDemand = false; } @@ -305,17 +305,17 @@ private void cancelDemand() private boolean isStalled() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return dataStalled; } } - private void setNoData(boolean noData) + private void setHasNetworkData(boolean noData) { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { - this.noData = noData; + this.hasNetworkData = noData; } } @@ -324,7 +324,7 @@ private void processDataDemand() while (true) { boolean process = true; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (LOG.isDebugEnabled()) LOG.debug("processing demand={}, last={} fillInterested={} on {}", dataDemand, dataLast, isFillInterested(), this); @@ -389,7 +389,7 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) if (LOG.isDebugEnabled()) LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, buffer); - setNoData(false); + setHasNetworkData(true); while (true) { @@ -397,7 +397,7 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) MessageParser.Result result = parser.parse(byteBuffer); if (LOG.isDebugEnabled()) LOG.debug("parsed {} on {} with buffer {}", result, this, buffer); - if (result == MessageParser.Result.FRAME || result == MessageParser.Result.MODE_SWITCH) + if (result != MessageParser.Result.NO_FRAME) return result; if (buffer.isRetained()) @@ -430,7 +430,7 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) return MessageParser.Result.FRAME; } - setNoData(true); + setHasNetworkData(false); if (setFillInterest) fillInterested(); } @@ -458,10 +458,74 @@ private int fill(ByteBuffer byteBuffer) } } + private void processHeaders(HeadersFrame frame, boolean wasBlocked, Runnable delegate) + { + MetaData metaData = frame.getMetaData(); + if (metaData.isRequest()) + { + // Expect DATA frames now. + parser.setDataMode(true); + if (LOG.isDebugEnabled()) + LOG.debug("switching to dataMode=true for request {} on {}", metaData, this); + } + else if (metaData.isResponse()) + { + MetaData.Response response = (MetaData.Response)metaData; + if (HttpStatus.isInformational(response.getStatus())) + { + if (LOG.isDebugEnabled()) + LOG.debug("staying in dataMode=false for response {} on {}", metaData, this); + } + else + { + // Expect DATA frames now. + parser.setDataMode(true); + if (LOG.isDebugEnabled()) + LOG.debug("switching to dataMode=true for response {} on {}", metaData, this); + } + } + else + { + // Trailer. + if (!frame.isLast()) + frame = new HeadersFrame(metaData, true); + } + + if (frame.isLast()) + shutdownInput(); + + delegate.run(); + + if (wasBlocked) + onFillable(); + } + + private void processData(DataFrame frame, Runnable delegate) + { + if (dataFrame != null) + throw new IllegalStateException(); + dataFrame = frame; + if (frame.isLast()) + { + dataLast = true; + shutdownInput(); + } + delegate.run(); + } + + private void shutdownInput() + { + remotelyClosed = true; + // We want to shutdown the input to avoid "spurious" wakeups where + // zero bytes could be spuriously read from the EndPoint after the + // stream is remotely closed by receiving a frame with last=true. + getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code()); + } + @Override public String toConnectionString() { - return String.format("%s[demand=%b,stalled=%b,parserDataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parserDataMode); + return String.format("%s[demand=%b,stalled=%b,dataMode=%b]", super.toConnectionString(), hasDemand(), isStalled(), parser.isDataMode()); } private class MessageListener extends ParserListener.Wrapper @@ -472,66 +536,26 @@ private MessageListener(ParserListener listener) } @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { - MetaData metaData = frame.getMetaData(); - if (metaData.isRequest()) - { - // Expect DATA frames now. - parserDataMode = true; - parser.setDataMode(true); - if (LOG.isDebugEnabled()) - LOG.debug("switching to parserDataMode=true for request {} on {}", metaData, this); - } - else if (metaData.isResponse()) - { - MetaData.Response response = (MetaData.Response)metaData; - if (HttpStatus.isInformational(response.getStatus())) - { - if (LOG.isDebugEnabled()) - LOG.debug("staying in parserDataMode=false for response {} on {}", metaData, this); - } - else - { - // Expect DATA frames now. - parserDataMode = true; - parser.setDataMode(true); - if (LOG.isDebugEnabled()) - LOG.debug("switching to parserDataMode=true for response {} on {}", metaData, this); - } - } - else - { - // Trailer. - if (!frame.isLast()) - frame = new HeadersFrame(metaData, true); - } - if (frame.isLast()) - shutdownInput(); - super.onHeaders(streamId, frame); + if (LOG.isDebugEnabled()) + LOG.debug("received {}#{} wasBlocked={}", frame, streamId, wasBlocked); + Runnable delegate = () -> super.onHeaders(streamId, frame, wasBlocked); + Runnable action = () -> processHeaders(frame, wasBlocked, delegate); + if (wasBlocked) + action.run(); + else if (!event.compareAndSet(null, action)) + throw new IllegalStateException(); } @Override public void onData(long streamId, DataFrame frame) { - if (dataFrame != null) + if (LOG.isDebugEnabled()) + LOG.debug("received {}#{}", frame, streamId); + Runnable delegate = () -> super.onData(streamId, frame); + if (!event.compareAndSet(null, () -> processData(frame, delegate))) throw new IllegalStateException(); - dataFrame = frame; - if (frame.isLast()) - { - dataLast = true; - shutdownInput(); - } - super.onData(streamId, frame); - } - - private void shutdownInput() - { - remotelyClosed = true; - // We want to shutdown the input to avoid "spurious" wakeups where - // zero bytes could be spuriously read from the EndPoint after the - // stream is remotely closed by receiving a frame with last=true. - getEndPoint().shutdownInput(HTTP3ErrorCode.NO_ERROR.code()); } } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java index 09e56d9c06b2..40f71a5751d2 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/InstructionStreamConnection.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.Executor; +import org.eclipse.jetty.http3.internal.parser.ParserListener; import org.eclipse.jetty.http3.qpack.QpackException; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; @@ -29,13 +30,15 @@ public abstract class InstructionStreamConnection extends AbstractConnection imp { private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class); private final ByteBufferPool byteBufferPool; + private final ParserListener listener; private boolean useInputDirectByteBuffers = true; private ByteBuffer buffer; - public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool) + public InstructionStreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, ParserListener listener) { super(endPoint, executor); this.byteBufferPool = byteBufferPool; + this.listener = listener; } public boolean isUseInputDirectByteBuffers() @@ -102,13 +105,34 @@ else if (filled < 0) } } } + catch (QpackException.SessionException x) + { + fail(x.getErrorCode(), x.getMessage(), x); + } + catch (Throwable x) + { + fail(HTTP3ErrorCode.INTERNAL_ERROR.code(), "internal_error", x); + } + } + + private void fail(long errorCode, String message, Throwable failure) + { + byteBufferPool.release(buffer); + buffer = null; + if (LOG.isDebugEnabled()) + LOG.debug("could not process instruction stream {}", getEndPoint(), failure); + notifySessionFailure(errorCode, message, failure); + } + + protected void notifySessionFailure(long error, String reason, Throwable failure) + { + try + { + listener.onSessionFailure(error, reason, failure); + } catch (Throwable x) { - if (LOG.isDebugEnabled()) - LOG.debug("could not process decoder stream {}", getEndPoint(), x); - byteBufferPool.release(buffer); - buffer = null; - getEndPoint().close(x); + LOG.info("failure while notifying listener {}", listener, x); } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java index 164218829fdd..d0388e1ec3b5 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/MessageFlusher.java @@ -39,10 +39,10 @@ public class MessageFlusher extends IteratingCallback private final MessageGenerator generator; private Entry entry; - public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) + public MessageFlusher(ByteBufferPool byteBufferPool, QpackEncoder encoder, boolean useDirectByteBuffers) { this.lease = new ByteBufferPool.Lease(byteBufferPool); - this.generator = new MessageGenerator(encoder, maxHeadersLength, useDirectByteBuffers); + this.generator = new MessageGenerator(encoder, useDirectByteBuffers); } public boolean offer(QuicStreamEndPoint endPoint, Frame frame, Callback callback) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java index 398864f2e8e7..f3a71ecfa53c 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/UnidirectionalStreamConnection.java @@ -143,7 +143,7 @@ private void detectAndUpgrade(long streamType) } else if (streamType == EncoderStreamConnection.STREAM_TYPE) { - EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder); + EncoderStreamConnection newConnection = new EncoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, decoder, listener); newConnection.setInputBufferSize(getInputBufferSize()); newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) @@ -152,7 +152,7 @@ else if (streamType == EncoderStreamConnection.STREAM_TYPE) } else if (streamType == DecoderStreamConnection.STREAM_TYPE) { - DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder); + DecoderStreamConnection newConnection = new DecoderStreamConnection(getEndPoint(), getExecutor(), byteBufferPool, encoder, listener); newConnection.setInputBufferSize(getInputBufferSize()); newConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java index 9a09fddc61be..847aa0b20d79 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/HeadersGenerator.java @@ -27,13 +27,11 @@ public class HeadersGenerator extends FrameGenerator { private final QpackEncoder encoder; - private final int maxLength; private final boolean useDirectByteBuffers; - public HeadersGenerator(QpackEncoder encoder, int maxLength, boolean useDirectByteBuffers) + public HeadersGenerator(QpackEncoder encoder, boolean useDirectByteBuffers) { this.encoder = encoder; - this.maxLength = maxLength; this.useDirectByteBuffers = useDirectByteBuffers; } @@ -52,6 +50,7 @@ private int generateHeadersFrame(ByteBufferPool.Lease lease, long streamId, Head int frameTypeLength = VarLenInt.length(FrameType.HEADERS.type()); int maxHeaderLength = frameTypeLength + VarLenInt.MAX_LENGTH; // The capacity of the buffer is larger than maxLength, but we need to enforce at most maxLength. + int maxLength = encoder.getMaxHeadersSize(); ByteBuffer buffer = lease.acquire(maxHeaderLength + maxLength, useDirectByteBuffers); buffer.position(maxHeaderLength); buffer.limit(buffer.position() + maxLength); diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/MessageGenerator.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/MessageGenerator.java index 24dba7ce9029..09bce25ef288 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/MessageGenerator.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/generator/MessageGenerator.java @@ -24,10 +24,10 @@ public class MessageGenerator { private final FrameGenerator[] generators = new FrameGenerator[FrameType.maxType() + 1]; - public MessageGenerator(QpackEncoder encoder, int maxHeadersLength, boolean useDirectByteBuffers) + public MessageGenerator(QpackEncoder encoder, boolean useDirectByteBuffers) { generators[FrameType.DATA.type()] = new DataGenerator(useDirectByteBuffers); - generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, maxHeadersLength, useDirectByteBuffers); + generators[FrameType.HEADERS.type()] = new HeadersGenerator(encoder, useDirectByteBuffers); generators[FrameType.PUSH_PROMISE.type()] = new PushPromiseGenerator(); } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java index d8db95c132c4..0c957dd59f40 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/BodyParser.java @@ -125,6 +125,6 @@ protected void notifyGoAway(GoAwayFrame frame) public enum Result { - NO_FRAME, FRAGMENT_FRAME, WHOLE_FRAME + NO_FRAME, BLOCKED_FRAME, FRAGMENT_FRAME, WHOLE_FRAME } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java index fa4c06618acf..285efb0d52c4 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/HeadersBodyParser.java @@ -105,7 +105,7 @@ public Result parse(ByteBuffer buffer) // needs to be parsed, then it's not the last frame. boolean last = isLast.getAsBoolean() && !buffer.hasRemaining(); - return decode(encoded, last) ? Result.WHOLE_FRAME : Result.NO_FRAME; + return decode(encoded, last) ? Result.WHOLE_FRAME : Result.BLOCKED_FRAME; } } default: @@ -121,7 +121,7 @@ private boolean decode(ByteBuffer encoded, boolean last) { try { - return decoder.decode(streamId, encoded, (streamId, metaData) -> onHeaders(metaData, last)); + return decoder.decode(streamId, encoded, (streamId, metaData, wasBlocked) -> onHeaders(metaData, last, wasBlocked)); } catch (QpackException.StreamException x) { @@ -144,19 +144,19 @@ private boolean decode(ByteBuffer encoded, boolean last) return false; } - private void onHeaders(MetaData metaData, boolean last) + private void onHeaders(MetaData metaData, boolean last, boolean wasBlocked) { HeadersFrame frame = new HeadersFrame(metaData, last); reset(); - notifyHeaders(frame); + notifyHeaders(frame, wasBlocked); } - protected void notifyHeaders(HeadersFrame frame) + protected void notifyHeaders(HeadersFrame frame, boolean wasBlocked) { ParserListener listener = getParserListener(); try { - listener.onHeaders(streamId, frame); + listener.onHeaders(streamId, frame, wasBlocked); } catch (Throwable x) { diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java index 1e654dba2d66..113b2000b23d 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/MessageParser.java @@ -72,6 +72,11 @@ public ParserListener getListener() return listener; } + public boolean isDataMode() + { + return dataMode; + } + public void setDataMode(boolean enable) { this.dataMode = enable; @@ -100,7 +105,7 @@ public Result parse(ByteBuffer buffer) state = State.BODY; // If we are in data mode, but we did not parse a DATA frame, bail out. if (dataMode && headerParser.getFrameType() != FrameType.DATA.type()) - return Result.MODE_SWITCH; + return Result.SWITCH_MODE; break; } return Result.NO_FRAME; @@ -143,18 +148,32 @@ public Result parse(ByteBuffer buffer) if (LOG.isDebugEnabled()) LOG.debug("parsed {} empty frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); reset(); + return Result.FRAME; } else { BodyParser.Result result = bodyParser.parse(buffer); + if (LOG.isDebugEnabled()) + LOG.debug("parsed {} {} body from {}", result, FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + + // Not enough bytes, there is no frame. if (result == BodyParser.Result.NO_FRAME) return Result.NO_FRAME; - if (LOG.isDebugEnabled()) - LOG.debug("parsed {} frame body from {}", FrameType.from(frameType), BufferUtil.toDetailString(buffer)); + + // Do not reset() if it is a fragment frame. + if (result == BodyParser.Result.FRAGMENT_FRAME) + return Result.FRAME; + + reset(); + + if (result == BodyParser.Result.BLOCKED_FRAME) + return Result.BLOCKED_FRAME; + if (result == BodyParser.Result.WHOLE_FRAME) - reset(); + return Result.FRAME; + + throw new IllegalStateException(); } - return Result.FRAME; } } default: @@ -180,7 +199,23 @@ private void sessionFailure(ByteBuffer buffer, long error, String reason, Throwa public enum Result { - NO_FRAME, FRAME, MODE_SWITCH + /** + * Indicates that no frame was parsed, either for lack of bytes, or because or errors. + */ + NO_FRAME, + /** + * Indicates that a frame was parsed. + */ + FRAME, + /** + * Indicates that a frame was parsed but its notification was deferred. + * This is the case of HEADERS frames that are waiting to be unblocked. + */ + BLOCKED_FRAME, + /** + * Indicates that a DATA frame was expected, but a HEADERS was found instead. + */ + SWITCH_MODE } private enum State diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java index 8c755a8f1f0a..6aff792a7b82 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/parser/ParserListener.java @@ -20,7 +20,7 @@ public interface ParserListener { - public default void onHeaders(long streamId, HeadersFrame frame) + public default void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { } @@ -54,9 +54,9 @@ public Wrapper(ParserListener listener) } @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { - listener.onHeaders(streamId, frame); + listener.onHeaders(streamId, frame, wasBlocked); } @Override diff --git a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java index d9c441777119..6114db0bd3f5 100644 --- a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java +++ b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/DataGenerateParseTest.java @@ -55,7 +55,7 @@ private void testGenerateParse(ByteBuffer byteBuffer) DataFrame input = new DataFrame(ByteBuffer.wrap(inputBytes), true); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); - new MessageGenerator(null, 8192, true).generate(lease, 0, input, null); + new MessageGenerator(null, true).generate(lease, 0, input, null); List frames = new ArrayList<>(); MessageParser parser = new MessageParser(new ParserListener() diff --git a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java index 8025672e8522..abe1637c11f6 100644 --- a/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java +++ b/jetty-http3/http3-common/src/test/java/org/eclipse/jetty/http3/internal/HeadersGenerateParseTest.java @@ -47,16 +47,18 @@ public void testGenerateParse() .put("Cookie", "c=d"); HeadersFrame input = new HeadersFrame(new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, fields), true); - QpackEncoder encoder = new QpackEncoder(instructions -> {}, 100); + QpackEncoder encoder = new QpackEncoder(instructions -> {}); + encoder.setMaxHeadersSize(4 * 1024); ByteBufferPool.Lease lease = new ByteBufferPool.Lease(new NullByteBufferPool()); - new MessageGenerator(encoder, 8192, true).generate(lease, 0, input, null); + new MessageGenerator(encoder, true).generate(lease, 0, input, null); - QpackDecoder decoder = new QpackDecoder(instructions -> {}, 8192); + QpackDecoder decoder = new QpackDecoder(instructions -> {}); + decoder.setMaxHeadersSize(4 * 1024); List frames = new ArrayList<>(); MessageParser parser = new MessageParser(new ParserListener() { @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { frames.add(frame); } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java index d01d59bf6b79..ebed4579154f 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackDecoder.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.eclipse.jetty.http3.qpack.QpackException.H3_GENERAL_PROTOCOL_ERROR; import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_DECOMPRESSION_FAILED; import static org.eclipse.jetty.http3.qpack.QpackException.QPACK_ENCODER_STREAM_ERROR; @@ -55,8 +56,9 @@ public class QpackDecoder implements Dumpable private final NBitIntegerDecoder _integerDecoder = new NBitIntegerDecoder(); private final InstructionHandler _instructionHandler = new InstructionHandler(); private final Map _blockedStreams = new HashMap<>(); - private int _maxHeaderSize; + private int _maxHeadersSize; private int _maxBlockedStreams; + private int _maxTableCapacity; private static class MetaDataNotification { @@ -71,21 +73,24 @@ public MetaDataNotification(long streamId, MetaData metaData, Handler handler) _handler = handler; } - public void notifyHandler() + public void notifyHandler(boolean wasBlocked) { - _handler.onMetaData(_streamId, _metaData); + _handler.onMetaData(_streamId, _metaData, wasBlocked); } } - /** - * @param maxHeaderSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field - */ - public QpackDecoder(Instruction.Handler handler, int maxHeaderSize) + public QpackDecoder(Instruction.Handler handler) { _context = new QpackContext(); _handler = handler; _parser = new DecoderInstructionParser(_instructionHandler); - _maxHeaderSize = maxHeaderSize; + } + + @Deprecated + public QpackDecoder(Instruction.Handler handler, int maxHeaderSize) + { + this(handler); + setMaxHeadersSize(maxHeaderSize); } QpackContext getQpackContext() @@ -93,14 +98,17 @@ QpackContext getQpackContext() return _context; } - public int getMaxHeaderSize() + public int getMaxHeadersSize() { - return _maxHeaderSize; + return _maxHeadersSize; } - public void setMaxHeaderSize(int maxHeaderSize) + /** + * @param maxHeadersSize The maximum allowed size of a headers block, expressed as total of all name and value characters, plus 32 per field + */ + public void setMaxHeadersSize(int maxHeadersSize) { - _maxHeaderSize = maxHeaderSize; + _maxHeadersSize = maxHeadersSize; } public int getMaxBlockedStreams() @@ -113,9 +121,19 @@ public void setMaxBlockedStreams(int maxBlockedStreams) _maxBlockedStreams = maxBlockedStreams; } + public int getMaxTableCapacity() + { + return _maxTableCapacity; + } + + public void setMaxTableCapacity(int maxTableCapacity) + { + _maxTableCapacity = maxTableCapacity; + } + public interface Handler { - void onMetaData(long streamId, MetaData metadata); + void onMetaData(long streamId, MetaData metadata, boolean wasBlocked); } /** @@ -137,8 +155,8 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws // If the buffer is big, don't even think about decoding it // Huffman may double the size, but it will only be a temporary allocation until detected in MetaDataBuilder.emit(). - int maxHeaderSize = getMaxHeaderSize(); - if (buffer.remaining() > maxHeaderSize) + int maxHeaderSize = getMaxHeadersSize(); + if (maxHeaderSize > 0 && buffer.remaining() > maxHeaderSize) throw new QpackException.SessionException(QPACK_DECOMPRESSION_FAILED, "header_too_large"); _integerDecoder.setPrefix(8); @@ -155,7 +173,7 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws // Decode the Required Insert Count using the DynamicTable state. DynamicTable dynamicTable = _context.getDynamicTable(); int insertCount = dynamicTable.getInsertCount(); - int maxDynamicTableSize = dynamicTable.getCapacity(); + int maxDynamicTableSize = getMaxTableCapacity(); int requiredInsertCount = decodeInsertCount(encodedInsertCount, insertCount, maxDynamicTableSize); try @@ -177,7 +195,7 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws else { if (LOG.isDebugEnabled()) - LOG.debug("Deferred Decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection); + LOG.debug("Deferred decoding: streamId={}, encodedFieldSection={}", streamId, encodedFieldSection); AtomicInteger blockedFields = _blockedStreams.computeIfAbsent(streamId, id -> new AtomicInteger(0)); blockedFields.incrementAndGet(); if (_blockedStreams.size() > _maxBlockedStreams) @@ -187,7 +205,7 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws boolean hadMetaData = !_metaDataNotifications.isEmpty(); notifyInstructionHandler(); - notifyMetaDataHandler(); + notifyMetaDataHandler(false); return hadMetaData; } catch (QpackException.SessionException e) @@ -205,10 +223,13 @@ public boolean decode(long streamId, ByteBuffer buffer, Handler handler) throws * the Encoder to the Decoder. This method will fully consume the supplied {@link ByteBuffer} and produce instructions * to update the state of the Decoder and its Dynamic Table. * @param buffer a buffer containing bytes from the Encoder stream. - * @throws QpackException if there was an error parsing or handling the instructions. + * @throws QpackException.SessionException if there was an error parsing or handling the instructions. */ - public void parseInstructions(ByteBuffer buffer) throws QpackException + public void parseInstructions(ByteBuffer buffer) throws QpackException.SessionException { + if (LOG.isDebugEnabled()) + LOG.debug("Parsing Instructions {}", BufferUtil.toDetailString(buffer)); + try { while (BufferUtil.hasContent(buffer)) @@ -216,7 +237,7 @@ public void parseInstructions(ByteBuffer buffer) throws QpackException _parser.parse(buffer); } notifyInstructionHandler(); - notifyMetaDataHandler(); + notifyMetaDataHandler(true); } catch (QpackException.SessionException e) { @@ -254,7 +275,7 @@ private void checkEncodedFieldSections() throws QpackException { iterator.remove(); long streamId = encodedFieldSection.getStreamId(); - MetaData metaData = encodedFieldSection.decode(_context, _maxHeaderSize); + MetaData metaData = encodedFieldSection.decode(_context, getMaxHeadersSize()); if (_blockedStreams.get(streamId).decrementAndGet() <= 0) _blockedStreams.remove(streamId); if (LOG.isDebugEnabled()) @@ -316,13 +337,16 @@ private void notifyInstructionHandler() _instructions.clear(); } - private void notifyMetaDataHandler() + private void notifyMetaDataHandler(boolean wasBlocked) { - for (MetaDataNotification notification : _metaDataNotifications) + // Copy the list to avoid re-entrance, where the call to + // notifyHandler() may end up calling again this method. + List notifications = new ArrayList<>(_metaDataNotifications); + _metaDataNotifications.clear(); + for (MetaDataNotification notification : notifications) { - notification.notifyHandler(); + notification.notifyHandler(wasBlocked); } - _metaDataNotifications.clear(); } InstructionHandler getInstructionHandler() @@ -336,8 +360,10 @@ InstructionHandler getInstructionHandler() class InstructionHandler implements DecoderInstructionParser.Handler { @Override - public void onSetDynamicTableCapacity(int capacity) + public void onSetDynamicTableCapacity(int capacity) throws QpackException { + if (capacity > getMaxTableCapacity()) + throw new QpackException.StreamException(H3_GENERAL_PROTOCOL_ERROR, "DynamicTable capacity exceeds max capacity"); _context.getDynamicTable().setCapacity(capacity); } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java index 8ce7cc5c32bd..7767cb4916c9 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/QpackEncoder.java @@ -95,17 +95,25 @@ public class QpackEncoder implements Dumpable private final Map _streamInfoMap = new HashMap<>(); private final EncoderInstructionParser _parser; private final InstructionHandler _instructionHandler = new InstructionHandler(); - private int _knownInsertCount = 0; - private int _blockedStreams = 0; + private int _knownInsertCount; + private int _blockedStreams; + private int _maxHeadersSize; + private int _maxTableCapacity; - public QpackEncoder(Instruction.Handler handler, int maxBlockedStreams) + public QpackEncoder(Instruction.Handler handler) { _handler = handler; _context = new QpackContext(); - _maxBlockedStreams = maxBlockedStreams; _parser = new EncoderInstructionParser(_instructionHandler); } + @Deprecated + public QpackEncoder(Instruction.Handler handler, int maxBlockedStreams) + { + this(handler); + setMaxBlockedStreams(maxBlockedStreams); + } + Map getStreamInfoMap() { return _streamInfoMap; @@ -121,7 +129,30 @@ public void setMaxBlockedStreams(int maxBlockedStreams) _maxBlockedStreams = maxBlockedStreams; } - public int getCapacity() + public int getMaxHeadersSize() + { + return _maxHeadersSize; + } + + public void setMaxHeadersSize(int maxHeadersSize) + { + _maxHeadersSize = maxHeadersSize; + } + + public int getMaxTableCapacity() + { + return _maxTableCapacity; + } + + public void setMaxTableCapacity(int maxTableCapacity) + { + _maxTableCapacity = maxTableCapacity; + int capacity = getTableCapacity(); + if (capacity > maxTableCapacity) + setTableCapacity(maxTableCapacity); + } + + public int getTableCapacity() { return _context.getDynamicTable().getCapacity(); } @@ -131,11 +162,16 @@ public int getCapacity() * * @param capacity the new capacity. */ - public void setCapacity(int capacity) + public void setTableCapacity(int capacity) { - _context.getDynamicTable().setCapacity(capacity); - _handler.onInstructions(List.of(new SetCapacityInstruction(capacity))); - notifyInstructionHandler(); + try (AutoLock ignored = lock.lock()) + { + if (capacity > getMaxTableCapacity()) + throw new IllegalArgumentException("DynamicTable capacity exceeds max capacity"); + _context.getDynamicTable().setCapacity(capacity); + _handler.onInstructions(List.of(new SetCapacityInstruction(capacity))); + notifyInstructionHandler(); + } } /** @@ -151,7 +187,7 @@ public void setCapacity(int capacity) */ public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws QpackException { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (LOG.isDebugEnabled()) LOG.debug("Encoding: streamId={}, metadata={}", streamId, metadata); @@ -159,6 +195,9 @@ public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws Q // Verify that we can encode without errors. if (metadata.getFields() != null) { + // TODO: enforce that the length of the header is less than maxHeadersSize. + // See RFC 9114, section 4.2.2. + for (HttpField field : metadata.getFields()) { String name = field.getName(); @@ -252,7 +291,7 @@ public void encode(ByteBuffer buffer, long streamId, MetaData metadata) throws Q */ public void parseInstructions(ByteBuffer buffer) throws QpackException { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { while (BufferUtil.hasContent(buffer)) { @@ -279,43 +318,46 @@ public void parseInstructions(ByteBuffer buffer) throws QpackException */ public boolean insert(HttpField field) { - DynamicTable dynamicTable = _context.getDynamicTable(); - if (field.getValue() == null) - field = new HttpField(field.getHeader(), field.getName(), ""); + try (AutoLock ignored = lock.lock()) + { + DynamicTable dynamicTable = _context.getDynamicTable(); + if (field.getValue() == null) + field = new HttpField(field.getHeader(), field.getName(), ""); - // If we should not index this entry or there is no room to insert it, then just return false. - boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field); - if (!canCreateEntry) - return false; + // If we should not index this entry or there is no room to insert it, then just return false. + boolean canCreateEntry = shouldIndex(field) && dynamicTable.canInsert(field); + if (!canCreateEntry) + return false; - // Can we insert by duplicating an existing entry? - Entry entry = _context.get(field); - if (entry != null) - { - int index = _context.indexOf(entry); - dynamicTable.add(new Entry(field)); - _instructions.add(new DuplicateInstruction(index)); - notifyInstructionHandler(); - return true; - } + // Can we insert by duplicating an existing entry? + Entry entry = _context.get(field); + if (entry != null) + { + int index = _context.indexOf(entry); + dynamicTable.add(new Entry(field)); + _instructions.add(new DuplicateInstruction(index)); + notifyInstructionHandler(); + return true; + } - // Can we insert by referencing a name? - boolean huffman = shouldHuffmanEncode(field); - Entry nameEntry = _context.get(field.getName()); - if (nameEntry != null) - { - int index = _context.indexOf(nameEntry); + // Can we insert by referencing a name? + boolean huffman = shouldHuffmanEncode(field); + Entry nameEntry = _context.get(field.getName()); + if (nameEntry != null) + { + int index = _context.indexOf(nameEntry); + dynamicTable.add(new Entry(field)); + _instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue())); + notifyInstructionHandler(); + return true; + } + + // Add the entry without referencing an existing entry. dynamicTable.add(new Entry(field)); - _instructions.add(new IndexedNameEntryInstruction(!nameEntry.isStatic(), index, huffman, field.getValue())); + _instructions.add(new LiteralNameEntryInstruction(field, huffman)); notifyInstructionHandler(); return true; } - - // Add the entry without referencing an existing entry. - dynamicTable.add(new Entry(field)); - _instructions.add(new LiteralNameEntryInstruction(field, huffman)); - notifyInstructionHandler(); - return true; } /** @@ -327,8 +369,11 @@ public boolean insert(HttpField field) */ public void streamCancellation(long streamId) { - _instructionHandler.onStreamCancellation(streamId); - notifyInstructionHandler(); + try (AutoLock ignored = lock.lock()) + { + _instructionHandler.onStreamCancellation(streamId); + notifyInstructionHandler(); + } } protected boolean shouldIndex(HttpField httpField) diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/DuplicateInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/DuplicateInstruction.java index 8ec1cb827e8d..6523b923bdea 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/DuplicateInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/DuplicateInstruction.java @@ -48,6 +48,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[index=%d]", getClass().getSimpleName(), hashCode(), getIndex()); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/IndexedNameEntryInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/IndexedNameEntryInstruction.java index 555703192b1b..d8cf7adea839 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/IndexedNameEntryInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/IndexedNameEntryInstruction.java @@ -83,6 +83,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[index=%d,name=%s]", getClass().getSimpleName(), hashCode(), getIndex(), getValue()); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java index 5d883a8572ed..751750798af0 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/InsertCountIncrementInstruction.java @@ -48,6 +48,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[increment=%d]", getClass().getSimpleName(), hashCode(), getIncrement()); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/LiteralNameEntryInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/LiteralNameEntryInstruction.java index 79947c0a43fc..74cef4be33cc 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/LiteralNameEntryInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/LiteralNameEntryInstruction.java @@ -93,6 +93,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[name=%s,value=%s]", getClass().getSimpleName(), hashCode(), getName(), getValue()); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java index efb107f32c62..ae3b69859b8b 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SectionAcknowledgmentInstruction.java @@ -48,6 +48,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SetCapacityInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SetCapacityInstruction.java index 24eb0f44611d..69878108afdc 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SetCapacityInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/SetCapacityInstruction.java @@ -48,6 +48,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[capacity=%d]", getClass().getSimpleName(), hashCode(), getCapacity()); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/StreamCancellationInstruction.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/StreamCancellationInstruction.java index 5dcf89f2bfd0..f460e9edcdb9 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/StreamCancellationInstruction.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/instruction/StreamCancellationInstruction.java @@ -43,6 +43,6 @@ public void encode(ByteBufferPool.Lease lease) @Override public String toString() { - return String.format("%s@%x", getClass().getSimpleName(), hashCode()); + return String.format("%s@%x[stream=%d]", getClass().getSimpleName(), hashCode(), _streamId); } } diff --git a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/metadata/MetaDataBuilder.java b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/metadata/MetaDataBuilder.java index aa04bce1a3ee..014aa749d6d5 100644 --- a/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/metadata/MetaDataBuilder.java +++ b/jetty-http3/http3-qpack/src/main/java/org/eclipse/jetty/http3/qpack/internal/metadata/MetaDataBuilder.java @@ -78,7 +78,7 @@ public void emit(HttpField field) throws QpackException.SessionException String value = field.getValue(); int fieldSize = name.length() + (value == null ? 0 : value.length()); _size += fieldSize + 32; - if (_size > _maxSize) + if (_maxSize > 0 && _size > _maxSize) throw new QpackException.SessionException(QpackException.QPACK_DECOMPRESSION_FAILED, String.format("Header size %d > %d", _size, _maxSize)); if (field instanceof StaticTableHttpField) diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java index 5fc9157943e4..e32a98778b52 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/BlockedStreamsTest.java @@ -67,7 +67,10 @@ public void testBlockedStreams() throws Exception // Set capacity of the encoder & decoder to allow entries to be added to the table. int capacity = 1024; - _encoder.setCapacity(capacity); + _encoder.setMaxTableCapacity(capacity); + _encoder.setTableCapacity(capacity); + _decoder.setMaxTableCapacity(capacity); + Instruction instruction = _encoderHandler.getInstruction(); assertThat(instruction, instanceOf(SetCapacityInstruction.class)); _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction)); @@ -178,7 +181,10 @@ public void testMaxBlockedStreams() throws Exception // Set capacity of the encoder & decoder to allow entries to be added to the table. int capacity = 1024; - _encoder.setCapacity(capacity); + _encoder.setMaxTableCapacity(capacity); + _encoder.setTableCapacity(capacity); + _decoder.setMaxTableCapacity(capacity); + Instruction instruction = _encoderHandler.getInstruction(); assertThat(instruction, instanceOf(SetCapacityInstruction.class)); _decoder.parseInstructions(QpackTestUtil.toBuffer(instruction)); diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EncodeDecodeTest.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EncodeDecodeTest.java index 4c5f0cda2792..1bebb6bb7589 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EncodeDecodeTest.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EncodeDecodeTest.java @@ -96,7 +96,7 @@ public void test() throws Exception // B.2. Dynamic Table. // Set capacity to 220. - _encoder.setCapacity(220); + _encoder.setTableCapacity(220); Instruction instruction = _encoderHandler.getInstruction(); assertThat(instruction, instanceOf(SetCapacityInstruction.class)); assertThat(((SetCapacityInstruction)instruction).getCapacity(), is(220)); diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EvictionTest.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EvictionTest.java index edffbc5ab036..14f566cd438e 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EvictionTest.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/EvictionTest.java @@ -34,14 +34,14 @@ public class EvictionTest private final TestEncoderHandler _encoderHandler = new TestEncoderHandler(); private final Random random = new Random(); - private static final int MAX_BLOCKED_STREAMS = 5; - private static final int MAX_HEADER_SIZE = 1024; - @BeforeEach public void before() { - _decoder = new QpackDecoder(_decoderHandler, MAX_HEADER_SIZE); - _encoder = new QpackEncoder(_encoderHandler, MAX_BLOCKED_STREAMS) + _decoder = new QpackDecoder(_decoderHandler); + _decoder.setMaxHeadersSize(1024); + _decoder.setMaxTableCapacity(4 * 1024); + + _encoder = new QpackEncoder(_encoderHandler) { @Override protected boolean shouldHuffmanEncode(HttpField httpField) @@ -49,12 +49,14 @@ protected boolean shouldHuffmanEncode(HttpField httpField) return false; } }; + _encoder.setMaxTableCapacity(4 * 1024); + _encoder.setTableCapacity(4 * 1024); + _encoder.setMaxBlockedStreams(5); } @Test public void test() throws Exception { - _encoder.setCapacity(1024); ByteBuffer encodedFields = ByteBuffer.allocate(1024); for (int i = 0; i < 10000; i++) diff --git a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/TestDecoderHandler.java b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/TestDecoderHandler.java index 20355a50925a..c5a7c2361e88 100644 --- a/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/TestDecoderHandler.java +++ b/jetty-http3/http3-qpack/src/test/java/org/eclipse/jetty/http3/qpack/TestDecoderHandler.java @@ -25,7 +25,7 @@ public class TestDecoderHandler implements QpackDecoder.Handler, Instruction.Han private final LinkedList _instructionList = new LinkedList<>(); @Override - public void onMetaData(long streamId, MetaData metadata) + public void onMetaData(long streamId, MetaData metadata, boolean wasBlocked) { _metadataList.add(metadata); } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java index c834bd7abab6..7fbbdba158ef 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java @@ -17,6 +17,7 @@ import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.GoAwayFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.util.Callback; @@ -27,7 +28,7 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server { private static final Logger LOG = LoggerFactory.getLogger(HTTP3SessionServer.class); - public HTTP3SessionServer(ServerHTTP3Session session, Session.Server.Listener listener) + public HTTP3SessionServer(ServerHTTP3Session session, Server.Listener listener) { super(session, listener); } @@ -58,7 +59,7 @@ protected HTTP3StreamServer newHTTP3Stream(QuicStreamEndPoint endPoint, boolean } @Override - public void onHeaders(long streamId, HeadersFrame frame) + public void onHeaders(long streamId, HeadersFrame frame, boolean wasBlocked) { if (frame.getMetaData().isRequest()) { @@ -71,10 +72,19 @@ public void onHeaders(long streamId, HeadersFrame frame) } else { - super.onHeaders(streamId, frame); + super.onHeaders(streamId, frame, wasBlocked); } } + @Override + public void onSettings(SettingsFrame frame) + { + if (LOG.isDebugEnabled()) + LOG.debug("received {} on {}", frame, this); + getProtocolSession().onSettings(frame); + super.onSettings(frame); + } + @Override public void writeControlFrame(Frame frame, Callback callback) { @@ -95,26 +105,6 @@ protected GoAwayFrame newGoAwayFrame(boolean graceful) return super.newGoAwayFrame(graceful); } - @Override - protected void onSettingMaxTableCapacity(long value) - { - getProtocolSession().getQpackEncoder().setCapacity((int)value); - } - - @Override - protected void onSettingMaxFieldSectionSize(long value) - { - getProtocolSession().getQpackDecoder().setMaxHeaderSize((int)value); - } - - @Override - protected void onSettingMaxBlockedStreams(long value) - { - ServerHTTP3Session session = getProtocolSession(); - session.getQpackDecoder().setMaxBlockedStreams((int)value); - session.getQpackEncoder().setMaxBlockedStreams((int)value); - } - private void notifyAccept() { Server.Listener listener = getListener(); diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java index 1811d37494a1..c04a2e4333ce 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java @@ -46,16 +46,14 @@ public class HttpChannelOverHTTP3 extends HttpChannel private final AutoLock lock = new AutoLock(); private final HTTP3Stream stream; - private final ServerHTTP3StreamConnection connection; private HttpInput.Content content; private boolean expect100Continue; private boolean delayedUntilContent; - public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP3 transport, HTTP3Stream stream, ServerHTTP3StreamConnection connection) + public HttpChannelOverHTTP3(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP3 transport, HTTP3Stream stream) { super(connector, configuration, endPoint, transport); this.stream = stream; - this.connection = connection; } @Override @@ -142,8 +140,6 @@ public Runnable onRequest(HeadersFrame frame) // demand for content, so when it arrives we can dispatch. if (delayedUntilContent) stream.demand(); - else - connection.setApplicationMode(true); } if (LOG.isDebugEnabled()) @@ -195,9 +191,6 @@ public Runnable onDataAvailable() boolean wasDelayed = delayedUntilContent; delayedUntilContent = false; - if (wasDelayed) - connection.setApplicationMode(true); - return wasDelayed || woken ? this : null; } @@ -222,9 +215,6 @@ public Runnable onTrailer(HeadersFrame frame) boolean wasDelayed = delayedUntilContent; delayedUntilContent = false; - if (wasDelayed) - connection.setApplicationMode(true); - return wasDelayed || handle ? this : null; } @@ -233,9 +223,6 @@ public boolean onIdleTimeout(Throwable failure, Consumer consumer) boolean wasDelayed = delayedUntilContent; delayedUntilContent = false; - if (wasDelayed) - connection.setApplicationMode(true); - getHttpTransport().onIdleTimeout(failure); boolean neverDispatched = getState().isIdle(); diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 95f424492a29..bf6c4ac98a40 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.http3.server.internal; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -43,6 +44,7 @@ public class ServerHTTP3Session extends ServerProtocolSession { private static final Logger LOG = LoggerFactory.getLogger(ServerHTTP3Session.class); + private final HTTP3Configuration configuration; private final HTTP3SessionServer session; private final QpackEncoder encoder; private final QpackDecoder decoder; @@ -52,7 +54,8 @@ public class ServerHTTP3Session extends ServerProtocolSession public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession quicSession, Session.Server.Listener listener) { super(quicSession); - this.session = new HTTP3SessionServer(this, listener); + this.configuration = configuration; + session = new HTTP3SessionServer(this, listener); addBean(session); session.setStreamIdleTimeout(configuration.getStreamIdleTimeout()); @@ -62,7 +65,8 @@ public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession qu long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint encoderEndPoint = openInstructionEndPoint(encoderStreamId); InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE); - this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), configuration.getMaxBlockedStreams()); + encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher)); + encoder.setMaxHeadersSize(configuration.getMaxResponseHeadersSize()); addBean(encoder); if (LOG.isDebugEnabled()) LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint); @@ -70,19 +74,19 @@ public ServerHTTP3Session(HTTP3Configuration configuration, ServerQuicSession qu long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint decoderEndPoint = openInstructionEndPoint(decoderStreamId); InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE); - this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), configuration.getMaxRequestHeadersSize()); + decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher)); addBean(decoder); if (LOG.isDebugEnabled()) LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint); long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL); QuicStreamEndPoint controlEndPoint = openControlEndPoint(controlStreamId); - this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers()); + controlFlusher = new ControlFlusher(quicSession, controlEndPoint, configuration.isUseOutputDirectByteBuffers()); addBean(controlFlusher); if (LOG.isDebugEnabled()) LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint); - this.messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.getMaxResponseHeadersSize(), configuration.isUseOutputDirectByteBuffers()); + messageFlusher = new MessageFlusher(quicSession.getByteBufferPool(), encoder, configuration.isUseOutputDirectByteBuffers()); addBean(messageFlusher); } @@ -104,16 +108,86 @@ public HTTP3SessionServer getSessionServer() @Override protected void onStart() { - // Queue the mandatory SETTINGS frame. Map settings = session.onPreface(); - if (settings == null) - settings = Map.of(); - // TODO: add default settings. + settings = settings != null ? new HashMap<>(settings) : new HashMap<>(); + + settings.compute(SettingsFrame.MAX_TABLE_CAPACITY, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxDecoderTableCapacity(); + if (v == 0) + v = null; + } + return v; + }); + settings.compute(SettingsFrame.MAX_FIELD_SECTION_SIZE, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxRequestHeadersSize(); + if (v <= 0) + v = null; + } + return v; + }); + settings.compute(SettingsFrame.MAX_BLOCKED_STREAMS, (k, v) -> + { + if (v == null) + { + v = (long)configuration.getMaxBlockedStreams(); + if (v == 0) + v = null; + } + return v; + }); + + if (LOG.isDebugEnabled()) + LOG.debug("configuring decoder {} on {}", settings, this); + + settings.forEach((key, value) -> + { + if (key == SettingsFrame.MAX_TABLE_CAPACITY) + decoder.setMaxTableCapacity(value.intValue()); + else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE) + decoder.setMaxHeadersSize(value.intValue()); + else if (key == SettingsFrame.MAX_BLOCKED_STREAMS) + decoder.setMaxBlockedStreams(value.intValue()); + }); + + // Queue the mandatory SETTINGS frame. SettingsFrame frame = new SettingsFrame(settings); if (controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::failControlStream))) controlFlusher.iterate(); } + public void onSettings(SettingsFrame frame) + { + Map settings = frame.getSettings(); + if (LOG.isDebugEnabled()) + LOG.debug("configuring encoder {} on {}", settings, this); + settings.forEach((key, value) -> + { + if (key == SettingsFrame.MAX_TABLE_CAPACITY) + { + int maxTableCapacity = value.intValue(); + encoder.setMaxTableCapacity(maxTableCapacity); + encoder.setTableCapacity(Math.min(maxTableCapacity, configuration.getInitialEncoderTableCapacity())); + } + else if (key == SettingsFrame.MAX_FIELD_SECTION_SIZE) + { + // Must cap the maxHeaderSize to avoid large allocations. + int maxHeadersSize = Math.min(value.intValue(), configuration.getMaxResponseHeadersSize()); + encoder.setMaxHeadersSize(maxHeadersSize); + } + else if (key == SettingsFrame.MAX_BLOCKED_STREAMS) + { + int maxBlockedStreams = value.intValue(); + encoder.setMaxBlockedStreams(maxBlockedStreams); + } + }); + } + private void failControlStream(Throwable failure) { long error = HTTP3ErrorCode.CLOSED_CRITICAL_STREAM_ERROR.code(); diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java index 9770bcc08e99..675b09dc9477 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3StreamConnection.java @@ -46,7 +46,7 @@ protected void onDataAvailable(long streamId) public Runnable onRequest(HTTP3StreamServer stream, HeadersFrame frame) { HttpTransportOverHTTP3 transport = new HttpTransportOverHTTP3(stream); - HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream, this); + HttpChannelOverHTTP3 channel = new HttpChannelOverHTTP3(connector, httpConfiguration, getEndPoint(), transport, stream); stream.setAttachment(channel); return channel.onRequest(frame); } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java index ae02335c95e9..ad28e45cc138 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/ClientServerTest.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http3.HTTP3Configuration; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.client.internal.HTTP3SessionClient; @@ -42,9 +43,12 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class ClientServerTest extends AbstractClientServerTest @@ -135,15 +139,15 @@ public void onSettings(Session session, SettingsFrame frame) assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); HTTP3SessionServer serverSession = serverSessionRef.get(); - assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getCapacity()); + assertEquals(maxTableCapacity.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity()); assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams()); assertEquals(maxBlockedStreams.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams()); - assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize()); + assertEquals(maxHeaderSize.getValue(), serverSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize()); - assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getCapacity()); + assertEquals(maxTableCapacity.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxTableCapacity()); assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackEncoder().getMaxBlockedStreams()); assertEquals(maxBlockedStreams.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxBlockedStreams()); - assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeaderSize()); + assertEquals(maxHeaderSize.getValue(), clientSession.getProtocolSession().getQpackDecoder().getMaxHeadersSize()); } @Test @@ -370,7 +374,11 @@ public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame }); int maxRequestHeadersSize = 128; - http3Client.getHTTP3Configuration().setMaxRequestHeadersSize(maxRequestHeadersSize); + HTTP3Configuration http3Configuration = http3Client.getHTTP3Configuration(); + http3Configuration.setMaxRequestHeadersSize(maxRequestHeadersSize); + // Disable the dynamic table, otherwise the large header + // is sent as string literal on the encoder stream. + http3Configuration.setInitialEncoderTableCapacity(0); Session.Client clientSession = newSession(new Session.Client.Listener() {}); CountDownLatch requestFailureLatch = new CountDownLatch(1); @@ -406,10 +414,17 @@ public void onResponse(Stream.Client stream, HeadersFrame frame) public void testResponseHeadersTooLarge() throws Exception { int maxResponseHeadersSize = 128; + CountDownLatch settingsLatch = new CountDownLatch(2); AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch responseFailureLatch = new CountDownLatch(1); start(new Session.Server.Listener() { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + @Override public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame) { @@ -441,9 +456,22 @@ public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame }); AbstractHTTP3ServerConnectionFactory h3 = connector.getConnectionFactory(AbstractHTTP3ServerConnectionFactory.class); assertNotNull(h3); - h3.getHTTP3Configuration().setMaxResponseHeadersSize(maxResponseHeadersSize); + HTTP3Configuration http3Configuration = h3.getHTTP3Configuration(); + // Disable the dynamic table, otherwise the large header + // is sent as string literal on the encoder stream. + http3Configuration.setInitialEncoderTableCapacity(0); + http3Configuration.setMaxResponseHeadersSize(maxResponseHeadersSize); - Session.Client clientSession = newSession(new Session.Client.Listener() {}); + Session.Client clientSession = newSession(new Session.Client.Listener() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + }); + + assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); CountDownLatch streamFailureLatch = new CountDownLatch(1); clientSession.newRequest(new HeadersFrame(newRequest("/large"), true), new Stream.Client.Listener() @@ -474,5 +502,118 @@ public void onResponse(Stream.Client stream, HeadersFrame frame) assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); } - // TODO: write a test calling readData() from onRequest() (not from onDataAvailable()). + @Test + public void testHeadersThenTrailers() throws Exception + { + CountDownLatch requestLatch = new CountDownLatch(1); + CountDownLatch trailerLatch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame) + { + stream.demand(); + requestLatch.countDown(); + return new Stream.Server.Listener() + { + @Override + public void onDataAvailable(Stream.Server stream) + { + // TODO: we should not be needing this!!! + Stream.Data data = stream.readData(); + assertNull(data); + stream.demand(); + } + + @Override + public void onTrailer(Stream.Server stream, HeadersFrame frame) + { + trailerLatch.countDown(); + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true)); + } + }; + } + }); + + Session.Client clientSession = newSession(new Session.Client.Listener() {}); + + CountDownLatch responseLatch = new CountDownLatch(1); + Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener() + { + @Override + public void onResponse(Stream.Client stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + responseLatch.countDown(); + } + }) + .get(5, TimeUnit.SECONDS); + + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + + clientStream.trailer(new HeadersFrame(new MetaData(HttpVersion.HTTP_3, HttpFields.EMPTY), true)); + + assertTrue(trailerLatch.await(5, TimeUnit.SECONDS)); + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testReadDataFromOnRequest() throws Exception + { + CountDownLatch requestLatch = new CountDownLatch(1); + CountDownLatch data1Latch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame) + { + requestLatch.countDown(); + Stream.Data data = await().atMost(5, TimeUnit.SECONDS).until(stream::readData, notNullValue()); + data.complete(); + stream.demand(); + data1Latch.countDown(); + return new Stream.Server.Listener() + { + @Override + public void onDataAvailable(Stream.Server stream) + { + Stream.Data data = stream.readData(); + if (data == null) + { + stream.demand(); + return; + } + data.complete(); + if (!data.isLast()) + { + stream.demand(); + return; + } + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), true)); + } + }; + } + }); + + Session.Client clientSession = newSession(new Session.Client.Listener() {}); + + CountDownLatch responseLatch = new CountDownLatch(1); + Stream clientStream = clientSession.newRequest(new HeadersFrame(newRequest("/large"), false), new Stream.Client.Listener() + { + @Override + public void onResponse(Stream.Client stream, HeadersFrame frame) + { + responseLatch.countDown(); + } + }) + .get(5, TimeUnit.SECONDS); + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + + clientStream.data(new DataFrame(ByteBuffer.allocate(1024), false)); + assertTrue(data1Latch.await(555, TimeUnit.SECONDS)); + + clientStream.data(new DataFrame(ByteBuffer.allocate(512), true)); + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java index db5c1c8b4a6d..29d474132e82 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java @@ -1177,6 +1177,9 @@ public void onDisconnect(Session session, long error, String reason) assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + // Wait a bit more to allow the unidirectional streams to be setup. + Thread.sleep(1000); + // Stopping the HttpClient will also stop the HTTP3Client. httpClient.stop(); @@ -1234,6 +1237,9 @@ public void onDisconnect(Session session, long error, String reason) assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + // Wait a bit more to allow the unidirectional streams to be setup. + Thread.sleep(1000); + server.stop(); assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));