Skip to content

Commit

Permalink
Add FlushStrategy to be even more flexible (java-native-access#228)
Browse files Browse the repository at this point in the history
Motivation:

85d313b added some sort of auto flushing after X bytes. While this works very well in practice some users may need something more flexible.

Modifications:

Add FlushStrategy as an API that can be used to influence how flushes are executed.
Add a default implementation which flushes after X bytes (just as we did before)

Result:

More flexible way of influence when flushes happen
  • Loading branch information
normanmaurer authored Mar 20, 2021
1 parent 85d313b commit 2423c2f
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 28 deletions.
60 changes: 60 additions & 0 deletions src/main/java/io/netty/incubator/codec/quic/FlushStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.incubator.codec.quic;

import io.netty.util.internal.ObjectUtil;

/**
* Allows to configure a strategy for when flushes should be happening.
*/
public interface FlushStrategy {

/**
* Default {@link FlushStrategy} implementation.
*/
FlushStrategy DEFAULT = afterNumBytes(20 * Quic.MAX_DATAGRAM_SIZE);

/**
* Returns {@code true} if a flush should happen now, {@code false} otherwise.
*
* @param numPackets the number of packets that were written since the last flush.
* @param numBytes the number of bytes that were written since the last flush.
* @return {@code true} if a flush should be done now, {@code false} otherwise.
*/
boolean shouldFlushNow(int numPackets, int numBytes);

/**
* Implementation that flushes after a number of bytes.
*
* @param bytes the number of bytes after which we should issue a flush.
* @return the {@link FlushStrategy}.
*/
static FlushStrategy afterNumBytes(int bytes) {
ObjectUtil.checkPositive(bytes, "bytes");
return (numPackets, numBytes) -> numBytes > bytes;
}

/**
* Implementation that flushes after a number of packets.
*
* @param packets the number of packets after which we should issue a flush.
* @return the {@link FlushStrategy}.
*/
static FlushStrategy afterNumPackets(int packets) {
ObjectUtil.checkPositive(packets, "packets");
return (numPackets, numBytes) -> numPackets > packets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public QuicClientCodecBuilder clone() {
@Override
protected ChannelHandler build(QuicheConfig config,
Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider,
int localConnIdLength, int maxBytesBeforeFlush) {
return new QuicheQuicClientCodec(config, sslEngineProvider, localConnIdLength, maxBytesBeforeFlush);
int localConnIdLength, FlushStrategy flushStrategy) {
return new QuicheQuicClientCodec(config, sslEngineProvider, localConnIdLength, flushStrategy);
}
}
24 changes: 12 additions & 12 deletions src/main/java/io/netty/incubator/codec/quic/QuicCodecBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.channel.ChannelHandler;
import io.netty.util.internal.ObjectUtil;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand All @@ -31,7 +32,6 @@
* @param <B> the type of the {@link QuicCodecBuilder}.
*/
public abstract class QuicCodecBuilder<B extends QuicCodecBuilder<B>> {
private static final int DEFAULT_MAX_BYTES_BEFORE_FLUSH = 20 * Quic.MAX_DATAGRAM_SIZE;
private final boolean server;
private Boolean grease;
private Long maxIdleTimeout;
Expand All @@ -50,7 +50,7 @@ public abstract class QuicCodecBuilder<B extends QuicCodecBuilder<B>> {
private QuicCongestionControlAlgorithm congestionControlAlgorithm;
private int localConnIdLength = Quiche.QUICHE_MAX_CONN_ID_LEN;
private Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider;
private int maxBytesBeforeFlush = DEFAULT_MAX_BYTES_BEFORE_FLUSH;
private FlushStrategy flushStrategy = FlushStrategy.DEFAULT;

QuicCodecBuilder(boolean server) {
Quic.ensureAvailability();
Expand All @@ -77,7 +77,7 @@ public abstract class QuicCodecBuilder<B extends QuicCodecBuilder<B>> {
this.congestionControlAlgorithm = builder.congestionControlAlgorithm;
this.localConnIdLength = builder.localConnIdLength;
this.sslEngineProvider = builder.sslEngineProvider;
this.maxBytesBeforeFlush = builder.maxBytesBeforeFlush;
this.flushStrategy = builder.flushStrategy;
}

/**
Expand All @@ -91,15 +91,14 @@ protected final B self() {
}

/**
* Sets the max number of bytes that were written before we force a flush of the data. While batching more bytes may
* help to reduce syscalls it may also make the latency worse. Only adjust the setting if you really know what you
* are doing.
* Sets the {@link FlushStrategy} that will be used to detect when an automatic flush
* should happen.
*
* @param maxBytesBeforeFlush the maximum number of bytes before a flush will be forced.
* @return the instance itself.
* @param flushStrategy the strategy to use.
* @return the instance itself.
*/
public final B maxBytesBeforeFlush(int maxBytesBeforeFlush) {
this.maxBytesBeforeFlush = ObjectUtil.checkPositive(maxBytesBeforeFlush, "maxBytesBeforeFlush");
public final B flushStrategy(FlushStrategy flushStrategy) {
this.flushStrategy = Objects.requireNonNull(flushStrategy, "flushStrategy");
return self();
}

Expand Down Expand Up @@ -411,7 +410,7 @@ public final ChannelHandler build() {
validate();
QuicheConfig config = createConfig();
try {
return build(config, sslEngineProvider, localConnIdLength, maxBytesBeforeFlush);
return build(config, sslEngineProvider, localConnIdLength, flushStrategy);
} catch (Throwable cause) {
config.free();
throw cause;
Expand All @@ -431,9 +430,10 @@ public final ChannelHandler build() {
* @param config the {@link QuicheConfig} that should be used.
* @param sslContextProvider the context provider
* @param localConnIdLength the local connection id length.
* @param flushStrategy the {@link FlushStrategy} that should be used.
* @return the {@link ChannelHandler} which acts as codec.
*/
protected abstract ChannelHandler build(QuicheConfig config,
Function<QuicChannel, ? extends QuicSslEngine> sslContextProvider,
int localConnIdLength, int maxBytesBeforeFlush);
int localConnIdLength, FlushStrategy flushStrategy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ protected void validate() {
@Override
protected ChannelHandler build(QuicheConfig config,
Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider,
int localConnIdLength, int maxBytesBeforeFlush) {
int localConnIdLength, FlushStrategy flushStrategy) {
validate();
QuicTokenHandler tokenHandler = this.tokenHandler;
QuicConnectionIdGenerator generator = connectionIdAddressGenerator;
Expand All @@ -193,7 +193,7 @@ protected ChannelHandler build(QuicheConfig config,
}
ChannelHandler handler = this.handler;
ChannelHandler streamHandler = this.streamHandler;
return new QuicheQuicServerCodec(config, localConnIdLength, tokenHandler, generator, maxBytesBeforeFlush,
return new QuicheQuicServerCodec(config, localConnIdLength, tokenHandler, generator, flushStrategy,
sslEngineProvider, handler, Quic.toOptionsArray(options), Quic.toAttributesArray(attrs),
streamHandler, Quic.toOptionsArray(streamOptions), Quic.toAttributesArray(streamAttrs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.ssl.SslContext;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -33,10 +32,10 @@ final class QuicheQuicClientCodec extends QuicheQuicCodec {
private final Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider;

QuicheQuicClientCodec(QuicheConfig config, Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider,
int localConnIdLength, int maxBytesBeforeFlush) {
int localConnIdLength, FlushStrategy flushStrategy) {
// Let's just use Quic.MAX_DATAGRAM_SIZE as the maximum size for a token on the client side. This should be
// safe enough and as we not have too many codecs at the same time this should be ok.
super(config, localConnIdLength, Quic.MAX_DATAGRAM_SIZE, maxBytesBeforeFlush);
super(config, localConnIdLength, Quic.MAX_DATAGRAM_SIZE, flushStrategy);
this.sslEngineProvider = sslEngineProvider;
}

Expand Down
16 changes: 9 additions & 7 deletions src/main/java/io/netty/incubator/codec/quic/QuicheQuicCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,22 @@ abstract class QuicheQuicCodec extends ChannelDuplexHandler {
private final Map<ByteBuffer, QuicheQuicChannel> connections = new HashMap<>();
private final Queue<QuicheQuicChannel> needsFireChannelReadComplete = new ArrayDeque<>();
private final int maxTokenLength;
private final int maxBytesBeforeFlush;
private final FlushStrategy flushStrategy;

private MessageSizeEstimator.Handle estimatorHandle;
private QuicHeaderParser headerParser;
private QuicHeaderParser.QuicHeaderProcessor parserCallback;
private int pendingBytes;
private int pendingPackets;

protected final QuicheConfig config;
protected final int localConnIdLength;

QuicheQuicCodec(QuicheConfig config, int localConnIdLength, int maxTokenLength, int maxBytesBeforeFlush) {
QuicheQuicCodec(QuicheConfig config, int localConnIdLength, int maxTokenLength, FlushStrategy flushStrategy) {
this.config = config;
this.localConnIdLength = localConnIdLength;
this.maxTokenLength = maxTokenLength;
this.maxBytesBeforeFlush = maxBytesBeforeFlush;
this.flushStrategy = flushStrategy;
}

protected QuicheQuicChannel getChannel(ByteBuffer key) {
Expand Down Expand Up @@ -191,14 +192,14 @@ public final void write(ChannelHandlerContext ctx, Object msg, ChannelPromise pr
int size = estimatorHandle.size(msg);
if (size > 0) {
pendingBytes += size;
pendingPackets ++;
}
try {
ctx.write(msg, promise);
} finally {
// If the number of bytes pending exceeds the max batch size we should force a flush() and so ensure
// these are delivered in a timely manner and also make room in the outboundbuffer again that belongs
// to the underlying channel.
if (pendingBytes > maxBytesBeforeFlush) {
// Check if we should force a flush() and so ensure the packets are delivered in a timely
// manner and also make room in the outboundbuffer again that belongs to the underlying channel.
if (flushStrategy.shouldFlushNow(pendingPackets, pendingBytes)) {
flushNow(ctx);
}
}
Expand All @@ -213,6 +214,7 @@ public final void flush(ChannelHandlerContext ctx) {

private void flushNow(ChannelHandlerContext ctx) {
pendingBytes = 0;
pendingPackets = 0;
ctx.flush();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ final class QuicheQuicServerCodec extends QuicheQuicCodec {
int localConnIdLength,
QuicTokenHandler tokenHandler,
QuicConnectionIdGenerator connectionIdAddressGenerator,
int maxBytesBeforeFlush,
FlushStrategy flushStrategy,
Function<QuicChannel, ? extends QuicSslEngine> sslEngineProvider,
ChannelHandler handler,
Map.Entry<ChannelOption<?>, Object>[] optionsArray,
Map.Entry<AttributeKey<?>, Object>[] attrsArray,
ChannelHandler streamHandler,
Map.Entry<ChannelOption<?>, Object>[] streamOptionsArray,
Map.Entry<AttributeKey<?>, Object>[] streamAttrsArray) {
super(config, localConnIdLength, tokenHandler.maxTokenLength(), maxBytesBeforeFlush);
super(config, localConnIdLength, tokenHandler.maxTokenLength(), flushStrategy);
this.tokenHandler = tokenHandler;
this.connectionIdAddressGenerator = connectionIdAddressGenerator;
this.sslEngineProvider = sslEngineProvider;
Expand Down

0 comments on commit 2423c2f

Please sign in to comment.