diff --git a/pom.xml b/pom.xml index 032ddce89..8b96b84a7 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ ${project.basedir}/src/main/c ${project.build.directory}/native-lib-only false - 4.1.59.Final + 4.1.60.Final 28 ${os.detected.name}-${os.detected.arch} netty_quiche_${os.detected.name}_${os.detected.arch} @@ -817,6 +817,18 @@ netty-transport ${netty.version} + + io.netty + netty-transport-native-epoll + ${netty.version} + + + io.netty + netty-transport-native-epoll + ${netty.version} + linux-x86_64 + test + junit junit diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java b/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java index 999ee4664..633aa43a9 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java @@ -36,6 +36,13 @@ public final class QuicChannelOption extends ChannelOption { */ public static final ChannelOption QLOG = valueOf(QuicChannelOption.class, "QLOG"); + /** + * Use GSO + * for QUIC packets if possible. If the number is bigger then 1 we will try to use segments. + */ + public static final ChannelOption UDP_SEGMENTS = + valueOf(QuicChannelOption.class, "QUIC_UDP_SEGMENTS"); + @SuppressWarnings({ "deprecation" }) private QuicChannelOption() { super(null); diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java index 20c023116..8a29aba0c 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java @@ -32,6 +32,8 @@ import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.SegmentedDatagramPacket; import io.netty.channel.socket.DatagramPacket; import io.netty.util.AttributeKey; import io.netty.util.collection.LongObjectHashMap; @@ -124,6 +126,7 @@ public void operationComplete(ChannelFuture future) { private final Map.Entry, Object>[] streamAttrsArray; private final TimeoutHandler timeoutHandler = new TimeoutHandler(); private final InetSocketAddress remote; + private final boolean supportsUdpSegment; private QuicheQuicConnection connection; private boolean inFireChannelReadCompleteQueue; @@ -165,6 +168,7 @@ private QuicheQuicChannel(Channel parent, boolean server, ByteBuffer key, Map.Entry, Object>[] streamOptionsArray, Map.Entry, Object>[] streamAttrsArray) { super(parent); + this.supportsUdpSegment = SegmentedDatagramPacket.isSupported() && parent instanceof EpollDatagramChannel; config = new QuicheQuicChannelConfig(this); this.server = server; this.idGenerator = new QuicStreamIdGenerator(server); @@ -888,6 +892,127 @@ private boolean isConnDestroyed() { return connection == null; } + private boolean connectionSendSegments(int maxSegments) { + final int bufferSize = maxSegments * Quic.MAX_DATAGRAM_SIZE; + + long connAddr = connection.address(); + boolean packetWasWritten = false; + int numSegments = 0; + + ByteBuf out = alloc().directBuffer(bufferSize); + int lastWritten = -1; + for (;;) { + boolean done; + int writerIndex = out.writerIndex(); + int written = Quiche.quiche_conn_send( + connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes()); + if (written == 0) { + // No need to create a new datagram packet. Just try again. + continue; + } + + try { + done = Quiche.throwIfError(written); + } catch (Exception e) { + done = true; + pipeline().fireExceptionCaught(e); + } + if (done) { + // We need to write what we have build up so far before we break out of the loop or release the buffer + // if nothing is contained in there. + int readable = out.readableBytes(); + if (readable != 0) { + if (lastWritten != -1 && readable > lastWritten) { + parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + } else { + parent().write(new DatagramPacket(out, remote)); + } + packetWasWritten = true; + } else { + out.release(); + } + break; + } + + if (written < lastWritten) { + // The write was smaller then the write before. This means we can write all together as the + // last segment can be smaller then the other segments. + out.writerIndex(writerIndex + written); + parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + packetWasWritten = true; + + out = alloc().directBuffer(bufferSize); + lastWritten = -1; + numSegments = 0; + continue; + } + + if (lastWritten != -1 && lastWritten != written) { + ByteBuf newOut = alloc().directBuffer(bufferSize); + newOut.writeBytes(out, out.writerIndex(), written); + + // As the last write was smaller then this write we first need to write what we had before as + // a segment can never be bigger then the previous segment. After this we will try to build a new + // chain of segments for the writes to follow. + parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + packetWasWritten = true; + + out = newOut; + lastWritten = written; + numSegments = 0; + } else { + out.writerIndex(writerIndex + written); + lastWritten = written; + numSegments++; + } + + // check if we either built the maximum number of segments for a write or if the ByteBuf is not writable + // anymore. In this case lets write what we have and start a new chain of segments. + if (numSegments == maxSegments || + !out.isWritable()) { + parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + packetWasWritten = true; + + out = alloc().directBuffer(bufferSize); + numSegments = 0; + lastWritten = -1; + } + } + return packetWasWritten; + } + + private boolean connectionSendSimple() { + long connAddr = connection.address(); + boolean packetWasWritten = false; + for (;;) { + ByteBuf out = alloc().directBuffer(Quic.MAX_DATAGRAM_SIZE); + int writerIndex = out.writerIndex(); + int written = Quiche.quiche_conn_send( + connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes()); + + try { + if (Quiche.throwIfError(written)) { + out.release(); + break; + } + } catch (Exception e) { + out.release(); + pipeline().fireExceptionCaught(e); + break; + } + + if (written == 0) { + // No need to create a new datagram packet. Just release and try again. + out.release(); + continue; + } + out.writerIndex(writerIndex + written); + parent().write(new DatagramPacket(out, remote)); + packetWasWritten = true; + } + return packetWasWritten; + } + /** * Write datagrams if needed and return {@code true} if something was written and we need to call * {@link Channel#flush()} at some point. @@ -898,41 +1023,18 @@ private boolean connectionSend() { } inConnectionSend = true; - try { - long connAddr = connection.address(); - boolean packetWasWritten = false; - for (;;) { - ByteBuf out = alloc().directBuffer(Quic.MAX_DATAGRAM_SIZE); - int writerIndex = out.writerIndex(); - int written = Quiche.quiche_conn_send( - connAddr, Quiche.memoryAddress(out) + writerIndex, out.writableBytes()); - - try { - if (Quiche.throwIfError(written)) { - out.release(); - break; - } - } catch (Exception e) { - out.release(); - pipeline().fireExceptionCaught(e); - break; - } - - if (written == 0) { - // No need to create a new datagram packet. Just release and try again. - out.release(); - continue; - } - out.writerIndex(writerIndex + written); - parent().write(new DatagramPacket(out, remote)); - packetWasWritten = true; + boolean packetWasWritten; + int segments = supportsUdpSegment ? config.getUdpSegments() : 0; + if (segments > 0) { + packetWasWritten = connectionSendSegments(segments); + } else { + packetWasWritten = connectionSendSimple(); } if (packetWasWritten) { timeoutHandler.scheduleTimeout(); - return true; } - return false; + return packetWasWritten; } finally { inConnectionSend = false; } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java index 3ec978f11..18758ab70 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java @@ -31,6 +31,8 @@ final class QuicheQuicChannelConfig extends DefaultChannelConfig implements QuicChannelConfig { private volatile QLogConfiguration qLogConfiguration; + // Try to use UDP_SEGMENT by default if possible + private volatile int udpSegment = 10; QuicheQuicChannelConfig(Channel channel) { super(channel); @@ -38,7 +40,8 @@ final class QuicheQuicChannelConfig extends DefaultChannelConfig implements Quic @Override public Map, Object> getOptions() { - return getOptions(super.getOptions(), QuicChannelOption.QLOG); + return getOptions(super.getOptions(), + QuicChannelOption.QLOG, QuicChannelOption.UDP_SEGMENTS); } @SuppressWarnings("unchecked") @@ -47,6 +50,9 @@ public T getOption(ChannelOption option) { if (option == QuicChannelOption.QLOG) { return (T) getQLogConfiguration(); } + if (option == QuicChannelOption.UDP_SEGMENTS) { + return (T) Integer.valueOf(getUdpSegments()); + } return super.getOption(option); } @@ -56,6 +62,10 @@ public boolean setOption(ChannelOption option, T value) { setQLogConfiguration((QLogConfiguration) value); return true; } + if (option == QuicChannelOption.UDP_SEGMENTS) { + setUdpSegments((Integer) value); + return true; + } return super.setOption(option, value); } @@ -136,4 +146,12 @@ private void setQLogConfiguration(QLogConfiguration qLogConfiguration) { } this.qLogConfiguration = qLogConfiguration; } + + int getUdpSegments() { + return udpSegment; + } + + private void setUdpSegments(int udpSegment) { + this.udpSegment = udpSegment; + } } diff --git a/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java b/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java index 63a52b7f8..6f51e793c 100644 --- a/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java +++ b/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java @@ -19,6 +19,9 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollDatagramChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; @@ -46,7 +49,8 @@ final class QuicTestUtils { private QuicTestUtils() { } - private static final EventLoopGroup GROUP = new NioEventLoopGroup(); + private static final EventLoopGroup GROUP = Epoll.isAvailable() ? new EpollEventLoopGroup() : + new NioEventLoopGroup(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @@ -63,7 +67,7 @@ static Channel newClient() throws Exception { static Channel newClient(QuicClientCodecBuilder builder) throws Exception { return new Bootstrap().group(GROUP) - .channel(NioDatagramChannel.class) + .channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class) // We don't want any special handling of the channel so just use a dummy handler. .handler(builder.build()) .bind(new InetSocketAddress(NetUtil.LOCALHOST4, 0)).sync().channel(); @@ -117,7 +121,7 @@ private static Bootstrap newServerBootstrap(QuicServerCodecBuilder serverBuilder ChannelHandler codec = serverBuilder.build(); Bootstrap bs = new Bootstrap(); return bs.group(GROUP) - .channel(NioDatagramChannel.class) + .channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class) // We don't want any special handling of the channel so just use a dummy handler. .handler(codec) .localAddress(new InetSocketAddress(NetUtil.LOCALHOST4, 0));