Skip to content

Commit

Permalink
🧵 service: join multiple wg messages into coalesced segments
Browse files Browse the repository at this point in the history
As of Linux 6.11, the in-kernel wg implementation does not send messages with GSO. A proxy client or server receiving from lo on the same system won't get any coalesced messages.

This commit updates the mmsg relay routines to attempt to join as many received uncoalesced messages as possible after encrypting, so sending them can take advantage of GSO.
  • Loading branch information
database64128 committed Oct 19, 2024
1 parent 79a89f0 commit 62b37de
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
4 changes: 1 addition & 3 deletions service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,16 +544,14 @@ func (c *client) relayWgToProxyGeneric(uplink clientNatUplinkGeneric) {
)

for rqp := range uplink.proxyConnSendCh {
wgPacketBuf := rqp.buf

var (
isHandshake bool
sqpLength uint32
sqpSegmentSize uint32
sqpSegmentCount uint32
)

for len(wgPacketBuf) > 0 {
for wgPacketBuf := rqp.buf; len(wgPacketBuf) > 0; {
wgPacketLength := min(len(wgPacketBuf), int(rqp.segmentSize))
wgPacket := wgPacketBuf[:wgPacketLength]
wgPacketBuf = wgPacketBuf[wgPacketLength:]
Expand Down
37 changes: 17 additions & 20 deletions service/client_mmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,25 +422,22 @@ func (c *client) relayWgToProxySendmmsg(uplink clientNatUplinkMmsg) {

main:
for {
var isHandshake bool

// Block on first dequeue op.
rqp, ok := <-uplink.proxyConnSendCh
if !ok {
break
}

var (
isHandshake bool
sqpLength uint32
sqpSegmentSize uint32
sqpSegmentCount uint32
)

dequeue:
for {
wgPacketBuf := rqp.buf

var (
sqpLength uint32
sqpSegmentSize uint32
sqpSegmentCount uint32
)

for len(wgPacketBuf) > 0 {
for wgPacketBuf := rqp.buf; len(wgPacketBuf) > 0; {
wgPacketLength := min(len(wgPacketBuf), int(rqp.segmentSize))
wgPacket := wgPacketBuf[:wgPacketLength]
wgPacketBuf = wgPacketBuf[wgPacketLength:]
Expand Down Expand Up @@ -498,17 +495,9 @@ main:
packetBuf = dst
}

if sqpLength > 0 {
sendQueuedPackets = append(sendQueuedPackets, queuedPacket{
buf: packetBuf[len(packetBuf)-int(sqpLength):],
segmentSize: sqpSegmentSize,
segmentCount: sqpSegmentCount,
})
}

c.putPacketBuf(rqp.buf)

if len(sendQueuedPackets) == 0 {
if len(sendQueuedPackets) == 0 && sqpLength == 0 {
continue main
}

Expand All @@ -526,6 +515,14 @@ main:
}
}

if sqpLength > 0 {
sendQueuedPackets = append(sendQueuedPackets, queuedPacket{
buf: packetBuf[len(packetBuf)-int(sqpLength):],
segmentSize: sqpSegmentSize,
segmentCount: sqpSegmentCount,
})
}

for _, sqp := range sendQueuedPackets {
b := sqp.buf
segmentsRemaining := sqp.segmentCount
Expand Down
27 changes: 14 additions & 13 deletions service/server_mmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,12 @@ func (s *server) relayWgToProxySendmmsg(downlink serverNatDownlinkMmsg) {
msgsReceived += uint64(nr)
burstRecvBatchSize = max(burstRecvBatchSize, nr)

var (
qpLength uint32
qpSegmentSize uint32
qpSegmentCount uint32
)

rmsgvecn := rmsgvec[:nr]

for i := range rmsgvecn {
Expand Down Expand Up @@ -760,12 +766,7 @@ func (s *server) relayWgToProxySendmmsg(downlink serverNatDownlinkMmsg) {
recvSegmentSize = len(wgPacketBuf)
}

var (
recvSegmentCount uint32
qpLength uint32
qpSegmentSize uint32
qpSegmentCount uint32
)
var recvSegmentCount uint32

for len(wgPacketBuf) > 0 {
wgPacketLength := min(len(wgPacketBuf), recvSegmentSize)
Expand Down Expand Up @@ -824,14 +825,14 @@ func (s *server) relayWgToProxySendmmsg(downlink serverNatDownlinkMmsg) {

packetsReceived += uint64(recvSegmentCount)
burstRecvSegmentCount = max(burstRecvSegmentCount, recvSegmentCount)
}

if qpLength > 0 {
queuedPackets = append(queuedPackets, queuedPacket{
buf: sendPacketBuf[len(sendPacketBuf)-int(qpLength):],
segmentSize: qpSegmentSize,
segmentCount: qpSegmentCount,
})
}
if qpLength > 0 {
queuedPackets = append(queuedPackets, queuedPacket{
buf: sendPacketBuf[len(sendPacketBuf)-int(qpLength):],
segmentSize: qpSegmentSize,
segmentCount: qpSegmentCount,
})
}

if len(queuedPackets) == 0 {
Expand Down

0 comments on commit 62b37de

Please sign in to comment.