From a17e5f13a7f83fa1c403f437cee729ca2bba0920 Mon Sep 17 00:00:00 2001 From: booky10 Date: Fri, 20 Sep 2024 20:04:29 +0200 Subject: [PATCH 1/2] Rework bungee injector to allow for packet cancellation --- .../handlers/PacketEventsEncoder.java | 67 ++++++++++++++----- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java b/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java index 92630cce3..0420eb34b 100644 --- a/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java +++ b/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java @@ -27,16 +27,33 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.EncoderException; +import io.netty.handler.codec.MessageToByteEncoder; import io.netty.handler.codec.MessageToMessageEncoder; +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.PromiseCombiner; import net.md_5.bungee.api.connection.ProxiedPlayer; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.List; // Thanks to ViaVersion for the compression method. @ChannelHandler.Sharable -public class PacketEventsEncoder extends MessageToMessageEncoder { +public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter { + + private static final Recycler> OUT_LIST_RECYCLER = new Recycler>() { + @Override + protected List newObject(Handle> handle) { + // the default bungee compressor only produces one output bytebuf + return new ArrayList<>(1); + } + }; + public ProxiedPlayer player; public User user; public boolean handledCompression; @@ -45,7 +62,7 @@ public PacketEventsEncoder(User user) { this.user = user; } - public void read(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) throws Exception { boolean doCompression = handleCompressionOrder(ctx, buffer); int firstReaderIndex = buffer.readerIndex(); PacketSendEvent packetSendEvent = EventCreationUtil.createSendEvent(ctx.channel(), user, player, @@ -62,12 +79,12 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List out) th buffer.readerIndex(firstReaderIndex); } if (doCompression) { - recompress(ctx, buffer, out); + this.recompress(ctx, buffer, promise); } else { - out.add(buffer.retain()); + ctx.write(buffer, promise); } } else { - ByteBufHelper.clear(packetSendEvent.getByteBuf()); + ReferenceCountUtil.release(packetSendEvent.getByteBuf()); } if (packetSendEvent.hasPostTasks()) { for (Runnable task : packetSendEvent.getPostTasks()) { @@ -77,16 +94,17 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, List out) th } @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { - if (!msg.isReadable()) { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (!(msg instanceof ByteBuf)) { + super.write(ctx, msg, promise); return; } - read(ctx, msg, out); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - super.exceptionCaught(ctx, cause); + ByteBuf buf = (ByteBuf) msg; + if (!buf.isReadable()) { + buf.release(); + } else { + this.read(ctx, buf, promise); + } } private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer) { @@ -125,12 +143,27 @@ private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer return false; } - private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, List out) { - ChannelHandler compressor = ctx.pipeline().get("compress"); + private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) { + List out = OUT_LIST_RECYCLER.get(); try { + ChannelHandler compressor = ctx.pipeline().get("compress"); CustomPipelineUtil.callPacketEncodeByteBuf(compressor, ctx, buffer, out); - } catch (InvocationTargetException e) { - e.printStackTrace(); + } catch (InvocationTargetException exception) { + throw new EncoderException("Error while recompressing bytebuf " + buffer.readableBytes(), exception); + } + + int len = out.size(); + if (len == 1) { + // should be the only case which + // happens on vanilla bungeecord + ctx.write(out.get(0), promise); + } else { + // copied from MessageToMessageEncoder#writePromiseCombiner + PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); + for (int i = 0; i < len; i++) { + combiner.add(ctx.write(out.get(i))); + } + combiner.finish(promise); } } } From 7a9f83129efb8df799de1053b22f1a397c7e8a30 Mon Sep 17 00:00:00 2001 From: booky10 Date: Fri, 20 Sep 2024 20:11:05 +0200 Subject: [PATCH 2/2] Fix incorrect list recycling usage --- .../handlers/PacketEventsEncoder.java | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java b/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java index 0420eb34b..39c1c1bc9 100644 --- a/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java +++ b/bungeecord/src/main/java/io/github/retrooper/packetevents/handlers/PacketEventsEncoder.java @@ -31,8 +31,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.EncoderException; -import io.netty.handler.codec.MessageToByteEncoder; -import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.PromiseCombiner; @@ -46,11 +44,10 @@ @ChannelHandler.Sharable public class PacketEventsEncoder extends ChannelOutboundHandlerAdapter { - private static final Recycler> OUT_LIST_RECYCLER = new Recycler>() { + private static final Recycler OUT_LIST_RECYCLER = new Recycler() { @Override - protected List newObject(Handle> handle) { - // the default bungee compressor only produces one output bytebuf - return new ArrayList<>(1); + protected OutList newObject(Handle handle) { + return new OutList(handle); } }; @@ -74,8 +71,7 @@ public void read(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promi ByteBufHelper.clear(packetSendEvent.getByteBuf()); packetSendEvent.getLastUsedWrapper().writeVarInt(packetSendEvent.getPacketId()); packetSendEvent.getLastUsedWrapper().write(); - } - else { + } else { buffer.readerIndex(firstReaderIndex); } if (doCompression) { @@ -144,26 +140,40 @@ private boolean handleCompressionOrder(ChannelHandlerContext ctx, ByteBuf buffer } private void recompress(ChannelHandlerContext ctx, ByteBuf buffer, ChannelPromise promise) { - List out = OUT_LIST_RECYCLER.get(); + OutList outWrapper = OUT_LIST_RECYCLER.get(); + List out = outWrapper.list; try { ChannelHandler compressor = ctx.pipeline().get("compress"); CustomPipelineUtil.callPacketEncodeByteBuf(compressor, ctx, buffer, out); + + int len = out.size(); + if (len == 1) { + // should be the only case which + // happens on vanilla bungeecord + ctx.write(out.get(0), promise); + } else { + // copied from MessageToMessageEncoder#writePromiseCombiner + PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); + for (int i = 0; i < len; i++) { + combiner.add(ctx.write(out.get(i))); + } + combiner.finish(promise); + } } catch (InvocationTargetException exception) { throw new EncoderException("Error while recompressing bytebuf " + buffer.readableBytes(), exception); + } finally { + outWrapper.handle.recycle(outWrapper); } + } - int len = out.size(); - if (len == 1) { - // should be the only case which - // happens on vanilla bungeecord - ctx.write(out.get(0), promise); - } else { - // copied from MessageToMessageEncoder#writePromiseCombiner - PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); - for (int i = 0; i < len; i++) { - combiner.add(ctx.write(out.get(i))); - } - combiner.finish(promise); + private static final class OutList { + + // the default bungee compressor only produces one output bytebuf + private final List list = new ArrayList<>(1); + private final Recycler.Handle handle; + + public OutList(Recycler.Handle handle) { + this.handle = handle; } } }