From 91c90cb4935503a3e00224294b924d760977d12b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 18 Mar 2021 20:42:27 +0100 Subject: [PATCH] Make it more flexible in terms of supporting GSO (#222) --- .../codec/quic/QuicChannelOption.java | 6 +- .../codec/quic/QuicheQuicChannel.java | 25 ++++---- .../codec/quic/QuicheQuicChannelConfig.java | 23 ++++---- .../SegmentedDatagramPacketAllocator.java | 59 +++++++++++++++++++ ...EpollSegmentedDatagramPacketAllocator.java | 45 ++++++++++++++ .../incubator/codec/quic/QuicTestUtils.java | 11 +++- 6 files changed, 140 insertions(+), 29 deletions(-) create mode 100644 src/main/java/io/netty/incubator/codec/quic/SegmentedDatagramPacketAllocator.java create mode 100644 src/test/java/io/netty/incubator/codec/quic/EpollSegmentedDatagramPacketAllocator.java diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java b/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java index 633aa43a9..8d776cfde 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicChannelOption.java @@ -38,10 +38,10 @@ public final class QuicChannelOption extends ChannelOption { /** * Use GSO - * for QUIC packets if possible. If the number is bigger then 1 we will try to use segments. + * for QUIC packets if possible. */ - public static final ChannelOption UDP_SEGMENTS = - valueOf(QuicChannelOption.class, "QUIC_UDP_SEGMENTS"); + public static final ChannelOption SEGMENTED_DATAGRAM_PACKET_ALLOCATOR = + valueOf(QuicChannelOption.class, "SEGMENTED_DATAGRAM_PACKET_ALLOCATOR"); @SuppressWarnings({ "deprecation" }) private QuicChannelOption() { diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java index 5c3a23c9f..b56561f9c 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannel.java @@ -32,8 +32,6 @@ import io.netty.channel.DefaultChannelPipeline; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.epoll.EpollDatagramChannel; -import io.netty.channel.epoll.SegmentedDatagramPacket; import io.netty.channel.socket.DatagramPacket; import io.netty.util.AttributeKey; import io.netty.util.collection.LongObjectHashMap; @@ -111,7 +109,6 @@ public void operationComplete(ChannelFuture future) { private final Map.Entry, Object>[] streamAttrsArray; private final TimeoutHandler timeoutHandler = new TimeoutHandler(); private final InetSocketAddress remote; - private final boolean supportsUdpSegment; private QuicheQuicConnection connection; private boolean inFireChannelReadCompleteQueue; @@ -153,7 +150,6 @@ private QuicheQuicChannel(Channel parent, boolean server, ByteBuffer key, Map.Entry, Object>[] streamOptionsArray, Map.Entry, Object>[] streamAttrsArray) { super(parent); - this.supportsUdpSegment = SegmentedDatagramPacket.isSupported() && parent instanceof EpollDatagramChannel; config = new QuicheQuicChannelConfig(this); this.server = server; this.idGenerator = new QuicStreamIdGenerator(server); @@ -877,8 +873,8 @@ private boolean isConnDestroyed() { return connection == null; } - private boolean connectionSendSegments(int maxSegments) { - final int bufferSize = maxSegments * Quic.MAX_DATAGRAM_SIZE; + private boolean connectionSendSegments(SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) { + final int bufferSize = segmentedDatagramPacketAllocator.maxNumSegments() * Quic.MAX_DATAGRAM_SIZE; long connAddr = connection.address(); boolean packetWasWritten = false; @@ -908,7 +904,7 @@ private boolean connectionSendSegments(int maxSegments) { int readable = out.readableBytes(); if (readable != 0) { if (lastWritten != -1 && readable > lastWritten) { - parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote)); } else { parent().write(new DatagramPacket(out, remote)); } @@ -923,7 +919,7 @@ private boolean connectionSendSegments(int maxSegments) { // The write was smaller then the write before. This means we can write all together as the // last segment can be smaller then the other segments. out.writerIndex(writerIndex + written); - parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote)); packetWasWritten = true; out = alloc().directBuffer(bufferSize); @@ -939,7 +935,7 @@ private boolean connectionSendSegments(int maxSegments) { // As the last write was smaller then this write we first need to write what we had before as // a segment can never be bigger then the previous segment. After this we will try to build a new // chain of segments for the writes to follow. - parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote)); packetWasWritten = true; out = newOut; @@ -953,9 +949,9 @@ private boolean connectionSendSegments(int maxSegments) { // check if we either built the maximum number of segments for a write or if the ByteBuf is not writable // anymore. In this case lets write what we have and start a new chain of segments. - if (numSegments == maxSegments || + if (numSegments == segmentedDatagramPacketAllocator.maxNumSegments() || !out.isWritable()) { - parent().write(new SegmentedDatagramPacket(out, lastWritten, remote)); + parent().write(segmentedDatagramPacketAllocator.newPacket(out, lastWritten, remote)); packetWasWritten = true; out = alloc().directBuffer(bufferSize); @@ -1010,9 +1006,10 @@ private boolean connectionSend() { inConnectionSend = true; try { boolean packetWasWritten; - int segments = supportsUdpSegment ? config.getUdpSegments() : 0; - if (segments > 0) { - packetWasWritten = connectionSendSegments(segments); + SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator = + config.getSegmentedDatagramPacketAllocator(); + if (segmentedDatagramPacketAllocator.maxNumSegments() > 0) { + packetWasWritten = connectionSendSegments(segmentedDatagramPacketAllocator); } else { packetWasWritten = connectionSendSimple(); } diff --git a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java index 18758ab70..df2608b45 100644 --- a/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java +++ b/src/main/java/io/netty/incubator/codec/quic/QuicheQuicChannelConfig.java @@ -31,8 +31,8 @@ final class QuicheQuicChannelConfig extends DefaultChannelConfig implements QuicChannelConfig { private volatile QLogConfiguration qLogConfiguration; - // Try to use UDP_SEGMENT by default if possible - private volatile int udpSegment = 10; + private volatile SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator = + SegmentedDatagramPacketAllocator.NONE; QuicheQuicChannelConfig(Channel channel) { super(channel); @@ -41,7 +41,7 @@ final class QuicheQuicChannelConfig extends DefaultChannelConfig implements Quic @Override public Map, Object> getOptions() { return getOptions(super.getOptions(), - QuicChannelOption.QLOG, QuicChannelOption.UDP_SEGMENTS); + QuicChannelOption.QLOG, QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR); } @SuppressWarnings("unchecked") @@ -50,8 +50,8 @@ public T getOption(ChannelOption option) { if (option == QuicChannelOption.QLOG) { return (T) getQLogConfiguration(); } - if (option == QuicChannelOption.UDP_SEGMENTS) { - return (T) Integer.valueOf(getUdpSegments()); + if (option == QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR) { + return (T) getSegmentedDatagramPacketAllocator(); } return super.getOption(option); } @@ -62,8 +62,8 @@ public boolean setOption(ChannelOption option, T value) { setQLogConfiguration((QLogConfiguration) value); return true; } - if (option == QuicChannelOption.UDP_SEGMENTS) { - setUdpSegments((Integer) value); + if (option == QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR) { + setSegmentedDatagramPacketAllocator((SegmentedDatagramPacketAllocator) value); return true; } return super.setOption(option, value); @@ -147,11 +147,12 @@ private void setQLogConfiguration(QLogConfiguration qLogConfiguration) { this.qLogConfiguration = qLogConfiguration; } - int getUdpSegments() { - return udpSegment; + SegmentedDatagramPacketAllocator getSegmentedDatagramPacketAllocator() { + return segmentedDatagramPacketAllocator; } - private void setUdpSegments(int udpSegment) { - this.udpSegment = udpSegment; + private void setSegmentedDatagramPacketAllocator( + SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) { + this.segmentedDatagramPacketAllocator = segmentedDatagramPacketAllocator; } } diff --git a/src/main/java/io/netty/incubator/codec/quic/SegmentedDatagramPacketAllocator.java b/src/main/java/io/netty/incubator/codec/quic/SegmentedDatagramPacketAllocator.java new file mode 100644 index 000000000..020997e66 --- /dev/null +++ b/src/main/java/io/netty/incubator/codec/quic/SegmentedDatagramPacketAllocator.java @@ -0,0 +1,59 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.incubator.codec.quic; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.socket.DatagramPacket; + +import java.net.InetSocketAddress; + +/** + * Used to allocate datagram packets that use UDP_SEGMENT (GSO). + */ +public interface SegmentedDatagramPacketAllocator { + + /** + * {@link SegmentedDatagramPacketAllocator} which should be used if no UDP_SEGMENT is supported and used. + */ + SegmentedDatagramPacketAllocator NONE = new SegmentedDatagramPacketAllocator() { + @Override + public int maxNumSegments() { + return 0; + } + + @Override + public DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress) { + throw new UnsupportedOperationException(); + } + }; + + /** + * The maximum number of segments to use per packet. + * + * @return the segments. + */ + int maxNumSegments(); + + /** + * Return a new segmented {@link DatagramPacket}. + * + * @param buffer the {@link ByteBuf} that is used as content. + * @param segmentSize the size of each segment. + * @param remoteAddress the remote address to send to. + * @return the packet. + */ + DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress); +} diff --git a/src/test/java/io/netty/incubator/codec/quic/EpollSegmentedDatagramPacketAllocator.java b/src/test/java/io/netty/incubator/codec/quic/EpollSegmentedDatagramPacketAllocator.java new file mode 100644 index 000000000..9938fe772 --- /dev/null +++ b/src/test/java/io/netty/incubator/codec/quic/EpollSegmentedDatagramPacketAllocator.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.incubator.codec.quic; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.epoll.SegmentedDatagramPacket; +import io.netty.channel.socket.DatagramPacket; + +import java.net.InetSocketAddress; + +final class EpollSegmentedDatagramPacketAllocator implements SegmentedDatagramPacketAllocator { + + private final int maxNumSegments; + + EpollSegmentedDatagramPacketAllocator(int maxNumSegments) { + this.maxNumSegments = maxNumSegments; + } + + @Override + public int maxNumSegments() { + return maxNumSegments; + } + + @Override + public DatagramPacket newPacket(ByteBuf buffer, int segmentSize, InetSocketAddress remoteAddress) { + return new SegmentedDatagramPacket(buffer, segmentSize, remoteAddress); + } + + static boolean isSupported() { + return SegmentedDatagramPacket.isSupported(); + } +} diff --git a/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java b/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java index 6f51e793c..1b0b5c493 100644 --- a/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java +++ b/src/test/java/io/netty/incubator/codec/quic/QuicTestUtils.java @@ -16,13 +16,16 @@ package io.netty.incubator.codec.quic; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.SegmentedDatagramPacket; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.SelfSignedCertificate; @@ -98,7 +101,7 @@ static QuicServerCodecBuilder newQuicServerBuilder() { } static QuicServerCodecBuilder newQuicServerBuilder(QuicSslContext context) { - return new QuicServerCodecBuilder() + QuicServerCodecBuilder builder = new QuicServerCodecBuilder() .sslEngineProvider(q -> context.newEngine(q.alloc())) .maxIdleTimeout(5000, TimeUnit.MILLISECONDS) .initialMaxData(10000000) @@ -108,6 +111,11 @@ static QuicServerCodecBuilder newQuicServerBuilder(QuicSslContext context) { .initialMaxStreamsBidirectional(100) .initialMaxStreamsUnidirectional(100) .activeMigration(false); + if (GROUP instanceof EpollEventLoopGroup && EpollSegmentedDatagramPacketAllocator.isSupported()) { + builder.option(QuicChannelOption.SEGMENTED_DATAGRAM_PACKET_ALLOCATOR, + new EpollSegmentedDatagramPacketAllocator(10)); + } + return builder; } private static Bootstrap newServerBootstrap(QuicServerCodecBuilder serverBuilder, @@ -148,4 +156,5 @@ static void closeIfNotNull(Channel channel) throws Exception { channel.close().sync(); } } + }