From 626063b0cb4b8ee61d6c91efcb29a9a1516697fd Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Thu, 23 May 2024 16:03:28 +0800 Subject: [PATCH] Eliminate busy loop when receiving messages from non-blocking socket For a non-blocking socket, it should wait for events first before receiving messages from the socket, otherwise it would receive empty message and run into a busy loop. Signed-off-by: Quan Tian --- nl/nl_linux.go | 27 ++++++++++++++++++++++++--- nl/nl_linux_test.go | 2 +- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/nl/nl_linux.go b/nl/nl_linux.go index 959a72c3..58266fdd 100644 --- a/nl/nl_linux.go +++ b/nl/nl_linux.go @@ -632,6 +632,10 @@ type NetlinkSocket struct { fd int32 lsa unix.SockaddrNetlink sync.Mutex + + // pfd is non nil when the socket is in non-blocking mode, and is used to wait for events on the socket. + pfd *unix.PollFd + pollTimeout int64 } func getNetlinkSocket(protocol int) (*NetlinkSocket, error) { @@ -728,17 +732,22 @@ func Subscribe(protocol int, groups ...uint) (*NetlinkSocket, error) { return nil, err } + var pfd *unix.PollFd // Sometimes (socket_linux.go:SocketGet), Subscribe is used to create a socket - // that subscirbed to no groups. So we don't need to set nonblock there. + // that subscribes to no groups. So we don't need to set nonblock there. if len(groups) > 0 { if err := unix.SetNonblock(fd, true); err != nil { unix.Close(fd) return nil, err } + pfd = &unix.PollFd{Fd: int32(fd), Events: unix.POLLIN} } s := &NetlinkSocket{ - fd: int32(fd), + fd: int32(fd), + pfd: pfd, + // poll blocks infinitely by default. + pollTimeout: -1, } s.lsa.Family = unix.AF_NETLINK @@ -791,6 +800,13 @@ func (s *NetlinkSocket) Receive() ([]syscall.NetlinkMessage, *unix.SockaddrNetli if fd < 0 { return nil, nil, fmt.Errorf("Receive called on a closed socket") } + // The socket is in non-blocking mode. + if s.pfd != nil { + if _, err := unix.Poll([]unix.PollFd{*s.pfd}, int(atomic.LoadInt64(&s.pollTimeout))); err != nil { + return nil, nil, fmt.Errorf("Error polling the socket: %w", err) + } + } + var fromAddr *unix.SockaddrNetlink var rb [RECEIVE_BUFFER_SIZE]byte nr, from, err := unix.Recvfrom(fd, rb[:], 0) @@ -825,7 +841,12 @@ func (s *NetlinkSocket) SetSendTimeout(timeout *unix.Timeval) error { func (s *NetlinkSocket) SetReceiveTimeout(timeout *unix.Timeval) error { // Set a read timeout of SOCKET_READ_TIMEOUT, this will allow the Read to periodically unblock and avoid that a routine // remains stuck on a recvmsg on a closed fd - return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout) + if err := unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout); err != nil { + return err + } + // Set poll timeout to the same value to allow it to unblock upon timeout. + atomic.StoreInt64(&s.pollTimeout, timeout.Sec*1000+timeout.Usec/1000) + return nil } // SetReceiveBufferSize allows to set a receive buffer size on the socket diff --git a/nl/nl_linux_test.go b/nl/nl_linux_test.go index b911069f..d50a1d4a 100644 --- a/nl/nl_linux_test.go +++ b/nl/nl_linux_test.go @@ -75,7 +75,7 @@ func TestIfSocketCloses(t *testing.T) { for { _, _, err := sk.Receive() // Receive returned because of a timeout and the FD == -1 means that the socket got closed - if err == unix.EAGAIN && nlSock.GetFd() == -1 { + if nlSock.GetFd() == -1 { endCh <- err return }