From 3922b17067e3555bfffb83ade97affee082e55da Mon Sep 17 00:00:00 2001 From: HamsterReserved Date: Wed, 25 Sep 2024 21:28:30 +0800 Subject: [PATCH] chore: deliver UDP packets from same connection in receiving order (#1540) All UDP packets are queued into a single channel, and multiple workers are launched to poll the channel in current design. This introduces a problem where UDP packets from a single connection are delivered to different workers, thus forwarded in a random order if workers are on different CPU cores. Though UDP peers normally have their own logic to handle out-of-order packets, this behavior will inevitably cause significant variance in delay and harm connection quality. Furthermore, this out-of-order behavior is noticeable even if the underlying transport could provide guaranteed orderly delivery - this is unacceptable. This commit takes the idea of RSS in terms of NICs: it creates a distinct queue for each worker, hashes incoming packets, and distribute the packet to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto) is used for hashing (Proto is always UDP so it's dropped from final implementation), thus packets from the same connection can be sent to the same worker, keeping the receiving order. Different connections can be hashed to different workers to maintain performance. Performance for single UDP connection is not affected, as there is already a lock in natTable that prevents multiple packets being processed in different workers, limiting single connection forwarding performance to 1 worker. The only performance penalty is the hashing code, which should be neglectable given the footprint of en/decryption work. Co-authored-by: Hamster Tian --- tunnel/tunnel.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 60ba03234d..5b55f07dad 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "hash/maphash" "net" "net/netip" "path/filepath" @@ -31,7 +32,8 @@ import ( var ( status = newAtomicStatus(Suspend) tcpQueue = make(chan C.ConnContext, 200) - udpQueue = make(chan C.PacketAdapter, 200) + udpQueues []chan C.PacketAdapter + udpHashSeed = maphash.MakeSeed() natTable = nat.New() rules []C.Rule listeners = make(map[string]C.InboundListener) @@ -70,8 +72,17 @@ func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) { func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) { packetAdapter := C.NewPacketAdapter(packet, metadata) + + var h maphash.Hash + + h.SetSeed(udpHashSeed) + h.WriteString(metadata.SourceAddress()) + h.WriteString(metadata.RemoteAddress()) + + queueNo := uint(h.Sum64()) % uint(len(udpQueues)) + select { - case udpQueue <- packetAdapter: + case udpQueues[queueNo] <- packetAdapter: default: } } @@ -141,7 +152,8 @@ func TCPIn() chan<- C.ConnContext { // UDPIn return fan-in udp queue // Deprecated: using Tunnel instead func UDPIn() chan<- C.PacketAdapter { - return udpQueue + // compatibility: first queue is always available for external callers + return udpQueues[0] } // NatTable return nat table @@ -243,8 +255,8 @@ func isHandle(t C.Type) bool { } // processUDP starts a loop to handle udp packet -func processUDP() { - queue := udpQueue +func processUDP(queueNo int) { + queue := udpQueues[queueNo] for conn := range queue { handleUDPConn(conn) } @@ -255,8 +267,11 @@ func process() { if num := runtime.GOMAXPROCS(0); num > numUDPWorkers { numUDPWorkers = num } + + udpQueues = make([]chan C.PacketAdapter, numUDPWorkers) for i := 0; i < numUDPWorkers; i++ { - go processUDP() + udpQueues[i] = make(chan C.PacketAdapter, 200) + go processUDP(i) } queue := tcpQueue