Skip to content

Commit

Permalink
chore: deliver UDP packets from same connection in receiving order (#…
Browse files Browse the repository at this point in the history
…1540)

All UDP packets are queued into a single channel, and multiple
workers are launched to poll the channel in current design.

This introduces a problem where UDP packets from a single connection
are delivered to different workers, thus forwarded in a random order
if workers are on different CPU cores. Though UDP peers normally
have their own logic to handle out-of-order packets, this behavior will
inevitably cause significant variance in delay and harm connection quality.
Furthermore, this out-of-order behavior is noticeable even if the underlying
transport could provide guaranteed orderly delivery -  this is unacceptable.

This commit takes the idea of RSS in terms of NICs: it creates a distinct
queue for each worker, hashes incoming packets, and distribute the packet
to a worker by hash result. The tuple (SrcIP, SrcPort, DstIP, DstPort, Proto)
is used for hashing (Proto is always UDP so it's dropped from final
implementation), thus packets from the same connection can be sent to
the same worker, keeping the receiving order. Different connections can be
hashed to different workers to maintain performance.

Performance for single UDP connection is not affected, as there is already
a lock in natTable that prevents multiple packets being processed in different
workers, limiting single connection forwarding performance to 1 worker.
The only performance penalty is the hashing code, which should be neglectable
given the footprint of en/decryption work.

Co-authored-by: Hamster Tian <[email protected]>
  • Loading branch information
HamsterReserved and updateing authored Sep 25, 2024
1 parent a4e84f0 commit 3922b17
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"hash/maphash"
"net"
"net/netip"
"path/filepath"
Expand Down Expand Up @@ -31,7 +32,8 @@ import (
var (
status = newAtomicStatus(Suspend)
tcpQueue = make(chan C.ConnContext, 200)
udpQueue = make(chan C.PacketAdapter, 200)
udpQueues []chan C.PacketAdapter
udpHashSeed = maphash.MakeSeed()
natTable = nat.New()
rules []C.Rule
listeners = make(map[string]C.InboundListener)
Expand Down Expand Up @@ -70,8 +72,17 @@ func (t tunnel) HandleTCPConn(conn net.Conn, metadata *C.Metadata) {

func (t tunnel) HandleUDPPacket(packet C.UDPPacket, metadata *C.Metadata) {
packetAdapter := C.NewPacketAdapter(packet, metadata)

var h maphash.Hash

h.SetSeed(udpHashSeed)
h.WriteString(metadata.SourceAddress())
h.WriteString(metadata.RemoteAddress())

queueNo := uint(h.Sum64()) % uint(len(udpQueues))

select {
case udpQueue <- packetAdapter:
case udpQueues[queueNo] <- packetAdapter:
default:
}
}
Expand Down Expand Up @@ -141,7 +152,8 @@ func TCPIn() chan<- C.ConnContext {
// UDPIn return fan-in udp queue
// Deprecated: using Tunnel instead
func UDPIn() chan<- C.PacketAdapter {
return udpQueue
// compatibility: first queue is always available for external callers
return udpQueues[0]
}

// NatTable return nat table
Expand Down Expand Up @@ -243,8 +255,8 @@ func isHandle(t C.Type) bool {
}

// processUDP starts a loop to handle udp packet
func processUDP() {
queue := udpQueue
func processUDP(queueNo int) {
queue := udpQueues[queueNo]
for conn := range queue {
handleUDPConn(conn)
}
Expand All @@ -255,8 +267,11 @@ func process() {
if num := runtime.GOMAXPROCS(0); num > numUDPWorkers {
numUDPWorkers = num
}

udpQueues = make([]chan C.PacketAdapter, numUDPWorkers)
for i := 0; i < numUDPWorkers; i++ {
go processUDP()
udpQueues[i] = make(chan C.PacketAdapter, 200)
go processUDP(i)
}

queue := tcpQueue
Expand Down

0 comments on commit 3922b17

Please sign in to comment.