Skip to content

Commit

Permalink
Split incoming/outgoing packet registry, transition protocol states c…
Browse files Browse the repository at this point in the history
…orrectly (#841)

* Initial code changes

* Make it compile

* Small inlining

* Make less detectable by anticheats and fix keepalive during configuration

* Fix keepalive edge case

* Properly switch inbound protocol in server listener

* Add flow control

* Make toggling automatic keepalive work in another way

* Remove ping pong packets again

* Address review

* Handle keepalive in configuration

* Only spawn keepalive after login is acknowledged

* Prevent very unlikely race conditions with keepalive being switched during a task

* Add debug log for packet serialization and state switching

* Add one more debug print

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/Session.java

Co-authored-by: chris <[email protected]>

* Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java

Co-authored-by: chris <[email protected]>

* Update protocol/src/main/java/org/geysermc/mcprotocollib/protocol/MinecraftProtocol.java

Co-authored-by: chris <[email protected]>

* Mark packet as nonnull

* Fix outbound writing race conditions

* Ensure packets are always sent on the event loop

This replicates the same approach Mojang uses in their networking code.

* Reduce log verbosity

* Put errors into debug

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpClientSession.java

Co-authored-by: chris <[email protected]>

* Add comment to always running in event loop

* Handle auto read earlier to prevent race conditions

* Make instance dynamic

* Revert "Make instance dynamic"

This reverts commit 7f8affb.

* Make flush packet priority

* Do not hide original line that is the cause of the exception

* Cancel packet using exception rather than return

* Properly iterate through parents

* Set log level to debug for unit tests

* Revert "Properly iterate through parents"

This reverts commit 4e2b64d.

* Revert "Cancel packet using exception rather than return"

This reverts commit 6507e77.

* Add write length filter

* Reuse bytebuf for fake flush to avoid unnecessary allocations

* Make tests happy

* Remake dropping packets

* Update protocol/src/main/java/org/geysermc/mcprotocollib/network/tcp/TcpServer.java

Co-authored-by: chris <[email protected]>

* Fix space

* Rename to flush packet

* Add mojmap reference

* Share keepalive code

* Fix compilation

* Revert a tiny bit closer to vanilla

* Inline lambda

* Inherit annotation

* Inherit annotation 2

* Use checkerframework annotation

* Fixup grammar slightly

* Add reset states method

* Add log marker for packet logging

---------

Co-authored-by: chris <[email protected]>
  • Loading branch information
AlexProgrammerDE and onebeastchris authored Oct 8, 2024
1 parent b2c9268 commit f846035
Show file tree
Hide file tree
Showing 22 changed files with 447 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ public void newServerSession(Server server, Session session) {
}

@Override
public PacketRegistry getPacketRegistry() {
public PacketRegistry getInboundPacketRegistry() {
return registry;
}

@Override
public PacketRegistry getOutboundPacketRegistry() {
return registry;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void packetReceived(Session session, Packet packet) {
@Override
public void sessionRemoved(SessionRemovedEvent event) {
MinecraftProtocol protocol = (MinecraftProtocol) event.getSession().getPacketProtocol();
if (protocol.getState() == ProtocolState.GAME) {
if (protocol.getOutboundState() == ProtocolState.GAME) {
log.info("Closing server.");
event.getServer().close(false);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.geysermc.mcprotocollib.network;

import io.netty.channel.Channel;
import net.kyori.adventure.text.Component;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -10,6 +11,7 @@
import org.geysermc.mcprotocollib.network.event.session.SessionListener;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.tcp.FlushHandler;

import java.net.SocketAddress;
import java.util.List;
Expand Down Expand Up @@ -212,7 +214,17 @@ public interface Session {
*
* @param packet Packet to send.
*/
void send(Packet packet);
default void send(@NonNull Packet packet) {
this.send(packet, null);
}

/**
* Sends a packet and runs the specified callback when the packet has been sent.
*
* @param packet Packet to send.
* @param onSent Callback to run when the packet has been sent.
*/
void send(@NonNull Packet packet, @Nullable Runnable onSent);

/**
* Disconnects the session.
Expand Down Expand Up @@ -255,4 +267,48 @@ default void disconnect(@NonNull Component reason) {
* @param cause Throwable responsible for disconnecting.
*/
void disconnect(@NonNull Component reason, @Nullable Throwable cause);

/**
* Auto read in netty means that the server is automatically reading from the channel.
* Turning it off means that we won't get more packets being decoded until we turn it back on.
* We use this to hold off on reading packets until we are ready to process them.
* For example this is used for switching inbound states with {@link #switchInboundState(Runnable)}.
*
* @param autoRead Whether to enable auto read.
* Default is true.
*/
void setAutoRead(boolean autoRead);

/**
* Returns the underlying netty channel of this session.
*
* @return The netty channel
*/
Channel getChannel();

/**
* Changes the inbound state of the session and then re-enables auto read.
* This is used after a terminal packet was handled and the session is ready to receive more packets in the new state.
*
* @param switcher The runnable that switches the inbound state.
*/
default void switchInboundState(Runnable switcher) {
switcher.run();

// We switched to the new inbound state
// we can start reading again
setAutoRead(true);
}

/**
* Flushes all packets that are due to be sent and changes the outbound state of the session.
* This makes sure no other threads have scheduled packets to be sent.
*
* @param switcher The runnable that switches the outbound state.
*/
default void switchOutboundState(Runnable switcher) {
getChannel().writeAndFlush(FlushHandler.FLUSH_PACKET).syncUninterruptibly();

switcher.run();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.geysermc.mcprotocollib.network.packet;

import io.netty.buffer.ByteBuf;
import org.geysermc.mcprotocollib.network.Session;

/**
* A network packet. Any given packet must have a constructor that takes in a {@link ByteBuf}.
Expand All @@ -17,4 +18,14 @@ public interface Packet {
default boolean isPriority() {
return false;
}

/**
* Returns whether the packet is terminal. If true, this should be the last packet sent inside a protocol state.
* Subsequently, {@link Session#setAutoRead(boolean)} should be disabled when a terminal packet is received, until the session has switched into a new state and is ready to receive more packets.
*
* @return Whether the packet is terminal.
*/
default boolean isTerminal() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,16 @@ public abstract class PacketProtocol {
public abstract void newServerSession(Server server, Session session);

/**
* Gets the packet registry for this protocol.
* Gets the inbound packet registry for this protocol.
*
* @return The protocol's packet registry.
* @return The protocol's inbound packet registry.
*/
public abstract PacketRegistry getPacketRegistry();
public abstract PacketRegistry getInboundPacketRegistry();

/**
* Gets the outbound packet registry for this protocol.
*
* @return The protocol's outbound packet registry.
*/
public abstract PacketRegistry getOutboundPacketRegistry();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.geysermc.mcprotocollib.network.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

/**
* Sending a {@link FlushPacket} will ensure all before were sent.
* This handler makes sure it's dropped before it reaches the encoder.
* This logic is similar to the Minecraft UnconfiguredPipelineHandler.OutboundConfigurationTask.
*/
public class FlushHandler extends ChannelOutboundHandlerAdapter {
public static final FlushPacket FLUSH_PACKET = new FlushPacket();

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg == FLUSH_PACKET) {
promise.setSuccess();
} else {
super.write(ctx, msg, promise);
}
}

public static class FlushPacket {
private FlushPacket() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.ProxyInfo;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
Expand Down Expand Up @@ -100,7 +101,7 @@ public void connect(boolean wait, boolean transferring) {
.localAddress(bindAddress, bindPort)
.handler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
PacketProtocol protocol = getPacketProtocol();
protocol.newClientSession(TcpClientSession.this, transferring);

Expand All @@ -117,7 +118,9 @@ public void initChannel(Channel channel) {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(getCodecHelper()));

pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(TcpClientSession.this, true));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", TcpClientSession.this);
}
});
Expand Down Expand Up @@ -246,9 +249,7 @@ private void initializeHAProxySupport(Channel channel) {
HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, proxiedProtocol,
clientAddress.getAddress().getHostAddress(), remoteAddress.getAddress().getHostAddress(),
clientAddress.getPort(), remoteAddress.getPort()
)).addListener(future -> {
channel.pipeline().remove("proxy-protocol-encoder");
});
)).addListener(future -> channel.pipeline().remove("proxy-protocol-encoder"));
}

private static void createTcpEventLoopGroup() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.geysermc.mcprotocollib.network.tcp;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.flow.FlowControlHandler;

/**
* A flow control handler for TCP connections.
* When auto-read is disabled, this will halt decoding of packets until auto-read is re-enabled.
* This is needed because auto-read still allows packets to be decoded, even if the channel is not reading anymore from the network.
* This can happen when the channel already read a packet, but the packet is not yet decoded.
* This will halt all decoding until the channel is ready to process more packets.
*/
public class TcpFlowControlHandler extends FlowControlHandler {
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().config().isAutoRead()) {
super.read(ctx);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import org.geysermc.mcprotocollib.network.Session;
import org.geysermc.mcprotocollib.network.codec.PacketCodecHelper;
import org.geysermc.mcprotocollib.network.codec.PacketDefinition;
import org.geysermc.mcprotocollib.network.event.session.PacketErrorEvent;
import org.geysermc.mcprotocollib.network.packet.Packet;
import org.geysermc.mcprotocollib.network.packet.PacketProtocol;
import org.geysermc.mcprotocollib.network.packet.PacketRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import java.util.List;

public class TcpPacketCodec extends ByteToMessageCodec<Packet> {
public class TcpPacketCodec extends MessageToMessageCodec<ByteBuf, Packet> {
private static final Marker marker = MarkerFactory.getMarker("packet_logging");
private static final Logger log = LoggerFactory.getLogger(TcpPacketCodec.class);

private final Session session;
private final boolean client;

Expand All @@ -23,57 +33,87 @@ public TcpPacketCodec(Session session, boolean client) {

@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf buf) {
int initial = buf.writerIndex();
public void encode(ChannelHandlerContext ctx, Packet packet, List<Object> out) {
if (log.isTraceEnabled()) {
log.trace(marker, "Encoding packet: {}", packet.getClass().getSimpleName());
}

PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getOutboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
try {
int packetId = this.client ? packetProtocol.getPacketRegistry().getServerboundId(packet) : packetProtocol.getPacketRegistry().getClientboundId(packet);
PacketDefinition definition = this.client ? packetProtocol.getPacketRegistry().getServerboundDefinition(packetId) : packetProtocol.getPacketRegistry().getClientboundDefinition(packetId);
int packetId = this.client ? packetRegistry.getServerboundId(packet) : packetRegistry.getClientboundId(packet);
PacketDefinition definition = this.client ? packetRegistry.getServerboundDefinition(packetId) : packetRegistry.getClientboundDefinition(packetId);

ByteBuf buf = ctx.alloc().buffer();
packetProtocol.getPacketHeader().writePacketId(buf, codecHelper, packetId);
definition.getSerializer().serialize(buf, codecHelper, packet);

out.add(buf);

if (log.isDebugEnabled()) {
log.debug(marker, "Encoded packet {} ({})", packet.getClass().getSimpleName(), packetId);
}
} catch (Throwable t) {
// Reset writer index to make sure incomplete data is not written out.
buf.writerIndex(initial);
log.debug(marker, "Error encoding packet", t);

PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new EncoderException(t);
}
}
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
// Vanilla also checks for 0 length
if (buf.readableBytes() == 0) {
return;
}

int initial = buf.readerIndex();

PacketProtocol packetProtocol = this.session.getPacketProtocol();
PacketRegistry packetRegistry = packetProtocol.getInboundPacketRegistry();
PacketCodecHelper codecHelper = this.session.getCodecHelper();
Packet packet = null;
try {
int id = packetProtocol.getPacketHeader().readPacketId(buf, codecHelper);
if (id == -1) {
buf.readerIndex(initial);
return;
}

Packet packet = this.client ? packetProtocol.getPacketRegistry().createClientboundPacket(id, buf, codecHelper) : packetProtocol.getPacketRegistry().createServerboundPacket(id, buf, codecHelper);
log.trace(marker, "Decoding packet with id: {}", id);

packet = this.client ? packetRegistry.createClientboundPacket(id, buf, codecHelper) : packetRegistry.createServerboundPacket(id, buf, codecHelper);

if (buf.readableBytes() > 0) {
throw new IllegalStateException("Packet \"" + packet.getClass().getSimpleName() + "\" not fully read.");
}

out.add(packet);

if (log.isDebugEnabled()) {
log.debug(marker, "Decoded packet {} ({})", packet.getClass().getSimpleName(), id);
}
} catch (Throwable t) {
log.debug(marker, "Error decoding packet", t);

// Advance buffer to end to make sure remaining data in this packet is skipped.
buf.readerIndex(buf.readerIndex() + buf.readableBytes());

PacketErrorEvent e = new PacketErrorEvent(this.session, t);
this.session.callEvent(e);
if (!e.shouldSuppress()) {
throw t;
throw new DecoderException(t);
}
} finally {
if (packet != null && packet.isTerminal()) {
// Next packets are in a different protocol state, so we must
// disable auto-read to prevent reading wrong packets.
session.setAutoRead(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.channel.EventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.geysermc.mcprotocollib.network.AbstractServer;
import org.geysermc.mcprotocollib.network.BuiltinFlags;
import org.geysermc.mcprotocollib.network.helper.TransportHelper;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void bindImpl(boolean wait, final Runnable callback) {
.localAddress(this.getHost(), this.getPort())
.childHandler(new ChannelInitializer<>() {
@Override
public void initChannel(Channel channel) {
public void initChannel(@NonNull Channel channel) {
InetSocketAddress address = (InetSocketAddress) channel.remoteAddress();
PacketProtocol protocol = createPacketProtocol();

Expand All @@ -68,7 +69,9 @@ public void initChannel(Channel channel) {
pipeline.addLast("sizer", new TcpPacketSizer(protocol.getPacketHeader(), session.getCodecHelper()));
pipeline.addLast("compression", new TcpPacketCompression(session.getCodecHelper()));

pipeline.addLast("flow-control", new TcpFlowControlHandler());
pipeline.addLast("codec", new TcpPacketCodec(session, false));
pipeline.addLast("flush-handler", new FlushHandler());
pipeline.addLast("manager", session);
}
});
Expand Down
Loading

0 comments on commit f846035

Please sign in to comment.