Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate busy loop when receiving messages from non-blocking socket #977

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions nl/nl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ type NetlinkSocket struct {
fd int32
lsa unix.SockaddrNetlink
sync.Mutex

// pfd is non nil when the socket is in non-blocking mode, and is used to wait for events on the socket.
pfd *unix.PollFd
pollTimeout int64
}

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
Expand Down Expand Up @@ -728,17 +732,22 @@ func Subscribe(protocol int, groups ...uint) (*NetlinkSocket, error) {
return nil, err
}

var pfd *unix.PollFd
// Sometimes (socket_linux.go:SocketGet), Subscribe is used to create a socket
// that subscirbed to no groups. So we don't need to set nonblock there.
// that subscribes to no groups. So we don't need to set nonblock there.
if len(groups) > 0 {
if err := unix.SetNonblock(fd, true); err != nil {
unix.Close(fd)
return nil, err
}
pfd = &unix.PollFd{Fd: int32(fd), Events: unix.POLLIN}
}

s := &NetlinkSocket{
fd: int32(fd),
fd: int32(fd),
pfd: pfd,
// poll blocks infinitely by default.
pollTimeout: -1,
}
s.lsa.Family = unix.AF_NETLINK

Expand Down Expand Up @@ -791,6 +800,13 @@ func (s *NetlinkSocket) Receive() ([]syscall.NetlinkMessage, *unix.SockaddrNetli
if fd < 0 {
return nil, nil, fmt.Errorf("Receive called on a closed socket")
}
// The socket is in non-blocking mode.
if s.pfd != nil {
if _, err := unix.Poll([]unix.PollFd{*s.pfd}, int(atomic.LoadInt64(&s.pollTimeout))); err != nil {
return nil, nil, fmt.Errorf("Error polling the socket: %w", err)
}
}

var fromAddr *unix.SockaddrNetlink
var rb [RECEIVE_BUFFER_SIZE]byte
nr, from, err := unix.Recvfrom(fd, rb[:], 0)
Expand Down Expand Up @@ -825,7 +841,12 @@ func (s *NetlinkSocket) SetSendTimeout(timeout *unix.Timeval) error {
func (s *NetlinkSocket) SetReceiveTimeout(timeout *unix.Timeval) error {
// Set a read timeout of SOCKET_READ_TIMEOUT, this will allow the Read to periodically unblock and avoid that a routine
// remains stuck on a recvmsg on a closed fd
return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout)
if err := unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout); err != nil {
return err
}
// Set poll timeout to the same value to allow it to unblock upon timeout.
atomic.StoreInt64(&s.pollTimeout, timeout.Sec*1000+timeout.Usec/1000)
return nil
}

// SetReceiveBufferSize allows to set a receive buffer size on the socket
Expand Down
2 changes: 1 addition & 1 deletion nl/nl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestIfSocketCloses(t *testing.T) {
for {
_, _, err := sk.Receive()
// Receive returned because of a timeout and the FD == -1 means that the socket got closed
if err == unix.EAGAIN && nlSock.GetFd() == -1 {
if nlSock.GetFd() == -1 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The err is no longer unix.EAGAIN because Receive doesn't return when blocking on unix.Recvfrom, instead, it blocks on unix.Poll, then after 2s timeout, it calls unix.Recvfrom and gets "bad file descriptor"

endCh <- err
return
}
Expand Down
Loading