diff --git a/src/main/java/io/netty/incubator/codec/quic/FlushStrategy.java b/src/main/java/io/netty/incubator/codec/quic/FlushStrategy.java new file mode 100644 index 000000000..9f6fb362c --- /dev/null +++ b/src/main/java/io/netty/incubator/codec/quic/FlushStrategy.java @@ -0,0 +1,60 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.incubator.codec.quic; + +import io.netty.util.internal.ObjectUtil; + +/** + * Allows to configure a strategy for when flushes should be happening. + */ +public interface FlushStrategy { + + /** + * Default {@link FlushStrategy} implementation. + */ + FlushStrategy DEFAULT = afterNumBytes(20 * Quic.MAX_DATAGRAM_SIZE); + + /** + * Returns {@code true} if a flush should happen now, {@code false} otherwise. + * + * @param numPackets the number of packets that were written since the last flush. + * @param numBytes the number of bytes that were written since the last flush. + * @return {@code true} if a flush should be done now, {@code false} otherwise. + */ + boolean shouldFlushNow(int numPackets, int numBytes); + + /** + * Implementation that flushes after a number of bytes. + * + * @param bytes the number of bytes after which we should issue a flush. + * @return the {@link FlushStrategy}. + */ + static FlushStrategy afterNumBytes(int bytes) { + ObjectUtil.checkPositive(bytes, "bytes"); + return (numPackets, numBytes) -> numBytes > bytes; + } + + /** + * Implementation that flushes after a number of packets. + * + * @param packets the number of packets after which we should issue a flush. + * @return the {@link FlushStrategy}. + */ + static FlushStrategy afterNumPackets(int packets) { + ObjectUtil.checkPositive(packets, "packets"); + return (numPackets, numBytes) -> numPackets > packets; + } +} diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicClientCodecBuilder.java b/src/main/java/io/netty/incubator/codec/quic/QuicClientCodecBuilder.java index 07b95e326..2eb5ac48b 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicClientCodecBuilder.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicClientCodecBuilder.java @@ -44,7 +44,7 @@ public QuicClientCodecBuilder clone() { @Override protected ChannelHandler build(QuicheConfig config, Function sslEngineProvider, - int localConnIdLength, int maxBytesBeforeFlush) { - return new QuicheQuicClientCodec(config, sslEngineProvider, localConnIdLength, maxBytesBeforeFlush); + int localConnIdLength, FlushStrategy flushStrategy) { + return new QuicheQuicClientCodec(config, sslEngineProvider, localConnIdLength, flushStrategy); } } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicCodecBuilder.java b/src/main/java/io/netty/incubator/codec/quic/QuicCodecBuilder.java index 883ae0167..12b0f3e4d 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicCodecBuilder.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicCodecBuilder.java @@ -18,6 +18,7 @@ import io.netty.channel.ChannelHandler; import io.netty.util.internal.ObjectUtil; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -31,7 +32,6 @@ * @param the type of the {@link QuicCodecBuilder}. */ public abstract class QuicCodecBuilder> { - private static final int DEFAULT_MAX_BYTES_BEFORE_FLUSH = 20 * Quic.MAX_DATAGRAM_SIZE; private final boolean server; private Boolean grease; private Long maxIdleTimeout; @@ -50,7 +50,7 @@ public abstract class QuicCodecBuilder> { private QuicCongestionControlAlgorithm congestionControlAlgorithm; private int localConnIdLength = Quiche.QUICHE_MAX_CONN_ID_LEN; private Function sslEngineProvider; - private int maxBytesBeforeFlush = DEFAULT_MAX_BYTES_BEFORE_FLUSH; + private FlushStrategy flushStrategy = FlushStrategy.DEFAULT; QuicCodecBuilder(boolean server) { Quic.ensureAvailability(); @@ -77,7 +77,7 @@ public abstract class QuicCodecBuilder> { this.congestionControlAlgorithm = builder.congestionControlAlgorithm; this.localConnIdLength = builder.localConnIdLength; this.sslEngineProvider = builder.sslEngineProvider; - this.maxBytesBeforeFlush = builder.maxBytesBeforeFlush; + this.flushStrategy = builder.flushStrategy; } /** @@ -91,15 +91,14 @@ protected final B self() { } /** - * Sets the max number of bytes that were written before we force a flush of the data. While batching more bytes may - * help to reduce syscalls it may also make the latency worse. Only adjust the setting if you really know what you - * are doing. + * Sets the {@link FlushStrategy} that will be used to detect when an automatic flush + * should happen. * - * @param maxBytesBeforeFlush the maximum number of bytes before a flush will be forced. - * @return the instance itself. + * @param flushStrategy the strategy to use. + * @return the instance itself. */ - public final B maxBytesBeforeFlush(int maxBytesBeforeFlush) { - this.maxBytesBeforeFlush = ObjectUtil.checkPositive(maxBytesBeforeFlush, "maxBytesBeforeFlush"); + public final B flushStrategy(FlushStrategy flushStrategy) { + this.flushStrategy = Objects.requireNonNull(flushStrategy, "flushStrategy"); return self(); } @@ -411,7 +410,7 @@ public final ChannelHandler build() { validate(); QuicheConfig config = createConfig(); try { - return build(config, sslEngineProvider, localConnIdLength, maxBytesBeforeFlush); + return build(config, sslEngineProvider, localConnIdLength, flushStrategy); } catch (Throwable cause) { config.free(); throw cause; @@ -431,9 +430,10 @@ public final ChannelHandler build() { * @param config the {@link QuicheConfig} that should be used. * @param sslContextProvider the context provider * @param localConnIdLength the local connection id length. + * @param flushStrategy the {@link FlushStrategy} that should be used. * @return the {@link ChannelHandler} which acts as codec. */ protected abstract ChannelHandler build(QuicheConfig config, Function sslContextProvider, - int localConnIdLength, int maxBytesBeforeFlush); + int localConnIdLength, FlushStrategy flushStrategy); } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicServerCodecBuilder.java b/src/main/java/io/netty/incubator/codec/quic/QuicServerCodecBuilder.java index e02f06826..8ce1f811b 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicServerCodecBuilder.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicServerCodecBuilder.java @@ -184,7 +184,7 @@ protected void validate() { @Override protected ChannelHandler build(QuicheConfig config, Function sslEngineProvider, - int localConnIdLength, int maxBytesBeforeFlush) { + int localConnIdLength, FlushStrategy flushStrategy) { validate(); QuicTokenHandler tokenHandler = this.tokenHandler; QuicConnectionIdGenerator generator = connectionIdAddressGenerator; @@ -193,7 +193,7 @@ protected ChannelHandler build(QuicheConfig config, } ChannelHandler handler = this.handler; ChannelHandler streamHandler = this.streamHandler; - return new QuicheQuicServerCodec(config, localConnIdLength, tokenHandler, generator, maxBytesBeforeFlush, + return new QuicheQuicServerCodec(config, localConnIdLength, tokenHandler, generator, flushStrategy, sslEngineProvider, handler, Quic.toOptionsArray(options), Quic.toAttributesArray(attrs), streamHandler, Quic.toOptionsArray(streamOptions), Quic.toAttributesArray(streamAttrs)); } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicClientCodec.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicClientCodec.java index 266c461a9..364d2b69d 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicClientCodec.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicClientCodec.java @@ -18,7 +18,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.ssl.SslContext; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -33,10 +32,10 @@ final class QuicheQuicClientCodec extends QuicheQuicCodec { private final Function sslEngineProvider; QuicheQuicClientCodec(QuicheConfig config, Function sslEngineProvider, - int localConnIdLength, int maxBytesBeforeFlush) { + int localConnIdLength, FlushStrategy flushStrategy) { // Let's just use Quic.MAX_DATAGRAM_SIZE as the maximum size for a token on the client side. This should be // safe enough and as we not have too many codecs at the same time this should be ok. - super(config, localConnIdLength, Quic.MAX_DATAGRAM_SIZE, maxBytesBeforeFlush); + super(config, localConnIdLength, Quic.MAX_DATAGRAM_SIZE, flushStrategy); this.sslEngineProvider = sslEngineProvider; } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicCodec.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicCodec.java index b1d2f9a40..7e146fc00 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicCodec.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicCodec.java @@ -41,21 +41,22 @@ abstract class QuicheQuicCodec extends ChannelDuplexHandler { private final Map connections = new HashMap<>(); private final Queue needsFireChannelReadComplete = new ArrayDeque<>(); private final int maxTokenLength; - private final int maxBytesBeforeFlush; + private final FlushStrategy flushStrategy; private MessageSizeEstimator.Handle estimatorHandle; private QuicHeaderParser headerParser; private QuicHeaderParser.QuicHeaderProcessor parserCallback; private int pendingBytes; + private int pendingPackets; protected final QuicheConfig config; protected final int localConnIdLength; - QuicheQuicCodec(QuicheConfig config, int localConnIdLength, int maxTokenLength, int maxBytesBeforeFlush) { + QuicheQuicCodec(QuicheConfig config, int localConnIdLength, int maxTokenLength, FlushStrategy flushStrategy) { this.config = config; this.localConnIdLength = localConnIdLength; this.maxTokenLength = maxTokenLength; - this.maxBytesBeforeFlush = maxBytesBeforeFlush; + this.flushStrategy = flushStrategy; } protected QuicheQuicChannel getChannel(ByteBuffer key) { @@ -191,14 +192,14 @@ public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise pr int size = estimatorHandle.size(msg); if (size > 0) { pendingBytes += size; + pendingPackets ++; } try { ctx.write(msg, promise); } finally { - // If the number of bytes pending exceeds the max batch size we should force a flush() and so ensure - // these are delivered in a timely manner and also make room in the outboundbuffer again that belongs - // to the underlying channel. - if (pendingBytes > maxBytesBeforeFlush) { + // Check if we should force a flush() and so ensure the packets are delivered in a timely + // manner and also make room in the outboundbuffer again that belongs to the underlying channel. + if (flushStrategy.shouldFlushNow(pendingPackets, pendingBytes)) { flushNow(ctx); } } @@ -213,6 +214,7 @@ public final void flush(ChannelHandlerContext ctx) { private void flushNow(ChannelHandlerContext ctx) { pendingBytes = 0; + pendingPackets = 0; ctx.flush(); } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicServerCodec.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicServerCodec.java index f794ef9c8..0317faec9 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicServerCodec.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicServerCodec.java @@ -54,7 +54,7 @@ final class QuicheQuicServerCodec extends QuicheQuicCodec { int localConnIdLength, QuicTokenHandler tokenHandler, QuicConnectionIdGenerator connectionIdAddressGenerator, - int maxBytesBeforeFlush, + FlushStrategy flushStrategy, Function sslEngineProvider, ChannelHandler handler, Map.Entry, Object>[] optionsArray, @@ -62,7 +62,7 @@ final class QuicheQuicServerCodec extends QuicheQuicCodec { ChannelHandler streamHandler, Map.Entry, Object>[] streamOptionsArray, Map.Entry, Object>[] streamAttrsArray) { - super(config, localConnIdLength, tokenHandler.maxTokenLength(), maxBytesBeforeFlush); + super(config, localConnIdLength, tokenHandler.maxTokenLength(), flushStrategy); this.tokenHandler = tokenHandler; this.connectionIdAddressGenerator = connectionIdAddressGenerator; this.sslEngineProvider = sslEngineProvider;