Skip to content

Commit

Permalink
internal/socket: reuse closure in Recv/SendMmsg
Browse files Browse the repository at this point in the history
The closure for the callback to RawConn.Read/Write is responsible for
multiple allocations per call to RecvMmsg and SendMmsg.
The batched read and write are used primarily to avoid per-call
overhead, so any such overhead negates the advantage of using these
functions.

This change introduces a struct type holding all the variables
captured by the closure passed to RawConn.Read/Write. The struct is
reused to amortize the allocations by means of a sync.Pool.
A suitable global sync.Pool instance already existed, for buffers used
to pack mmsg headers.

This change allows to reuse all allocations in WriteBatch. In ReadBatch,
only the returned net.Addr instances still need to be allocated for each
message, which cannot be avoided without fundamental changes to the
package interface.

```
name             old time/op    new time/op    delta
UDP/Batch-1-8      5.34µs ± 1%    5.40µs ± 3%     ~     (p=0.173 n=8+10)
UDP/Batch-2-8      9.74µs ± 1%    9.24µs ± 9%   -5.21%  (p=0.035 n=9+10)
UDP/Batch-4-8      16.2µs ± 4%    16.2µs ± 1%     ~     (p=0.758 n=9+7)
UDP/Batch-8-8      30.0µs ± 4%    30.0µs ± 4%     ~     (p=0.971 n=10+10)
UDP/Batch-16-8     57.3µs ± 3%    60.9µs ±16%   +6.43%  (p=0.031 n=9+9)
UDP/Batch-32-8      115µs ± 5%     119µs ± 6%   +3.15%  (p=0.043 n=10+10)
UDP/Batch-64-8      234µs ±16%     237µs ± 4%     ~     (p=0.173 n=10+8)
UDP/Batch-128-8     447µs ± 4%     470µs ± 7%   +5.22%  (p=0.002 n=10+10)
UDP/Batch-256-8     960µs ±10%     966µs ±19%     ~     (p=0.853 n=10+10)
UDP/Batch-512-8    1.00ms ± 7%    0.99ms ± 7%     ~     (p=0.387 n=9+9)

name             old alloc/op   new alloc/op   delta
UDP/Batch-1-8        232B ± 0%       52B ± 0%  -77.59%  (p=0.000 n=10+10)
UDP/Batch-2-8        280B ± 0%      104B ± 0%  -62.86%  (p=0.000 n=10+10)
UDP/Batch-4-8        384B ± 0%      208B ± 0%  -45.83%  (p=0.000 n=10+10)
UDP/Batch-8-8        592B ± 0%      416B ± 0%  -29.73%  (p=0.000 n=10+10)
UDP/Batch-16-8     1.01kB ± 0%    0.83kB ± 0%  -17.46%  (p=0.000 n=10+10)
UDP/Batch-32-8     1.84kB ± 0%    1.66kB ± 0%   -9.57%  (p=0.002 n=8+10)
UDP/Batch-64-8     3.51kB ± 0%    3.33kB ± 0%   -5.00%  (p=0.000 n=10+8)
UDP/Batch-128-8    6.84kB ± 0%    6.66kB ± 0%   -2.57%  (p=0.001 n=7+7)
UDP/Batch-256-8    13.5kB ± 0%    13.3kB ± 0%   -1.33%  (p=0.000 n=10+10)
UDP/Batch-512-8    14.7kB ± 0%    14.5kB ± 0%   -1.19%  (p=0.000 n=8+8)

name             old allocs/op  new allocs/op  delta
UDP/Batch-1-8        8.00 ± 0%      2.00 ± 0%  -75.00%  (p=0.000 n=10+10)
UDP/Batch-2-8        10.0 ± 0%       4.0 ± 0%  -60.00%  (p=0.000 n=10+10)
UDP/Batch-4-8        14.0 ± 0%       8.0 ± 0%  -42.86%  (p=0.000 n=10+10)
UDP/Batch-8-8        22.0 ± 0%      16.0 ± 0%  -27.27%  (p=0.000 n=10+10)
UDP/Batch-16-8       38.0 ± 0%      32.0 ± 0%  -15.79%  (p=0.000 n=10+10)
UDP/Batch-32-8       70.0 ± 0%      64.0 ± 0%   -8.57%  (p=0.000 n=10+10)
UDP/Batch-64-8        134 ± 0%       128 ± 0%   -4.48%  (p=0.000 n=10+10)
UDP/Batch-128-8       262 ± 0%       256 ± 0%   -2.29%  (p=0.000 n=10+10)
UDP/Batch-256-8       518 ± 0%       512 ± 0%   -1.16%  (p=0.000 n=10+10)
UDP/Batch-512-8       562 ± 0%       556 ± 0%   -1.07%  (p=0.000 n=10+10)
```

Contributes to golang/go#26838

Change-Id: I16ecfc38dbb5a4d9b1ceacd1dd99fda38f346807
GitHub-Last-Rev: d1dda93
GitHub-Pull-Request: #126
Reviewed-on: https://go-review.googlesource.com/c/net/+/380934
Run-TryBot: Ian Lance Taylor <[email protected]>
TryBot-Result: Gopher Robot <[email protected]>
Reviewed-by: Ian Lance Taylor <[email protected]>
Trust: Michael Knyszek <[email protected]>
  • Loading branch information
matzf authored and ianlancetaylor committed Jan 27, 2022
1 parent 2fabfed commit cd36cc0
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 34 deletions.
80 changes: 73 additions & 7 deletions internal/socket/mmsghdr_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ package socket

import (
"net"
"os"
"sync"
"syscall"
)

type mmsghdrs []mmsghdr
Expand Down Expand Up @@ -93,22 +95,86 @@ func (p *mmsghdrsPacker) pack(ms []Message, parseFn func([]byte, string) (net.Ad
return hs
}

var defaultMmsghdrsPool = mmsghdrsPool{
// syscaller is a helper to invoke recvmmsg and sendmmsg via the RawConn.Read/Write interface.
// It is reusable, to amortize the overhead of allocating a closure for the function passed to
// RawConn.Read/Write.
type syscaller struct {
n int
operr error
hs mmsghdrs
flags int

boundRecvmmsgF func(uintptr) bool
boundSendmmsgF func(uintptr) bool
}

func (r *syscaller) init() {
r.boundRecvmmsgF = r.recvmmsgF
r.boundSendmmsgF = r.sendmmsgF
}

func (r *syscaller) recvmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
r.n = 0
r.operr = nil
r.hs = hs
r.flags = flags
if err := c.Read(r.boundRecvmmsgF); err != nil {
return r.n, err
}
if r.operr != nil {
return r.n, os.NewSyscallError("recvmmsg", r.operr)
}
return r.n, nil
}

func (r *syscaller) recvmmsgF(s uintptr) bool {
r.n, r.operr = recvmmsg(s, r.hs, r.flags)
return ioComplete(r.flags, r.operr)
}

func (r *syscaller) sendmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
r.n = 0
r.operr = nil
r.hs = hs
r.flags = flags
if err := c.Write(r.boundSendmmsgF); err != nil {
return r.n, err
}
if r.operr != nil {
return r.n, os.NewSyscallError("sendmmsg", r.operr)
}
return r.n, nil
}

func (r *syscaller) sendmmsgF(s uintptr) bool {
r.n, r.operr = sendmmsg(s, r.hs, r.flags)
return ioComplete(r.flags, r.operr)
}

// mmsgTmps holds reusable temporary helpers for recvmmsg and sendmmsg.
type mmsgTmps struct {
packer mmsghdrsPacker
syscaller syscaller
}

var defaultMmsgTmpsPool = mmsgTmpsPool{
p: sync.Pool{
New: func() interface{} {
return new(mmsghdrsPacker)
tmps := new(mmsgTmps)
tmps.syscaller.init()
return tmps
},
},
}

type mmsghdrsPool struct {
type mmsgTmpsPool struct {
p sync.Pool
}

func (p *mmsghdrsPool) Get() *mmsghdrsPacker {
return p.p.Get().(*mmsghdrsPacker)
func (p *mmsgTmpsPool) Get() *mmsgTmps {
return p.p.Get().(*mmsgTmps)
}

func (p *mmsghdrsPool) Put(packer *mmsghdrsPacker) {
p.p.Put(packer)
func (p *mmsgTmpsPool) Put(tmps *mmsgTmps) {
p.p.Put(tmps)
}
37 changes: 10 additions & 27 deletions internal/socket/rawconn_mmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,23 @@ package socket

import (
"net"
"os"
)

func (c *Conn) recvMsgs(ms []Message, flags int) (int, error) {
for i := range ms {
ms[i].raceWrite()
}
packer := defaultMmsghdrsPool.Get()
defer defaultMmsghdrsPool.Put(packer)
tmps := defaultMmsgTmpsPool.Get()
defer defaultMmsgTmpsPool.Put(tmps)
var parseFn func([]byte, string) (net.Addr, error)
if c.network != "tcp" {
parseFn = parseInetAddr
}
hs := packer.pack(ms, parseFn, nil)
var operr error
var n int
fn := func(s uintptr) bool {
n, operr = recvmmsg(s, hs, flags)
return ioComplete(flags, operr)
}
if err := c.c.Read(fn); err != nil {
hs := tmps.packer.pack(ms, parseFn, nil)
n, err := tmps.syscaller.recvmmsg(c.c, hs, flags)
if err != nil {
return n, err
}
if operr != nil {
return n, os.NewSyscallError("recvmmsg", operr)
}
if err := hs[:n].unpack(ms[:n], parseFn, c.network); err != nil {
return n, err
}
Expand All @@ -45,25 +36,17 @@ func (c *Conn) sendMsgs(ms []Message, flags int) (int, error) {
for i := range ms {
ms[i].raceRead()
}
packer := defaultMmsghdrsPool.Get()
defer defaultMmsghdrsPool.Put(packer)
tmps := defaultMmsgTmpsPool.Get()
defer defaultMmsgTmpsPool.Put(tmps)
var marshalFn func(net.Addr, []byte) int
if c.network != "tcp" {
marshalFn = marshalInetAddr
}
hs := packer.pack(ms, nil, marshalFn)
var operr error
var n int
fn := func(s uintptr) bool {
n, operr = sendmmsg(s, hs, flags)
return ioComplete(flags, operr)
}
if err := c.c.Write(fn); err != nil {
hs := tmps.packer.pack(ms, nil, marshalFn)
n, err := tmps.syscaller.sendmmsg(c.c, hs, flags)
if err != nil {
return n, err
}
if operr != nil {
return n, os.NewSyscallError("sendmmsg", operr)
}
if err := hs[:n].unpack(ms[:n], nil, ""); err != nil {
return n, err
}
Expand Down

0 comments on commit cd36cc0

Please sign in to comment.