Skip to content

Commit

Permalink
core: Simplify shared listeners, fix deadline bug
Browse files Browse the repository at this point in the history
When this listener code was first written, UsagePool didn't exist. We
can simplify much of the wrapped listener logic by utilizing UsagePool.

This also fixes a bug where new servers were able to clear deadlines
set by old servers, even if the old server didn't get booted out of its
Accept() call yet. And with the deadline cleared, they never would.
(Sometimes. Based on reports and difficulty of reproducing the bug,
this behavior was extremely rare.) I don't know why that happened
exactly, maybe some polling mechanism in the kernel and if the timings
worked out just wrong it would expose the bug.

Anyway, now we ensure that only the closer that set the deadline is the
same one that clears it, ensuring that old servers always return out of
Accept(), because the deadline doesn't get cleared until they do.

Of course, all this hinges on the hope that my suspicions in the middle
of the night are correct and that kernels work the way I think they do
in my head.

Also minor enhancement to UsagePool where if a value errors upon
construction (a very real possibility with listeners), it is removed from
the pool. Not 100% sure the sync logic is correct there, or maybe we
don't have to even put it in the pool until after construction, but it's
subtle either way and I think this is safe... right?
  • Loading branch information
mholt committed Jan 11, 2022
1 parent c634bbe commit 64a3218
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 168 deletions.
305 changes: 138 additions & 167 deletions listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package caddy

import (
"fmt"
"log"
"net"
"strconv"
"strings"
Expand All @@ -26,124 +25,90 @@ import (
"time"
)

// Listen returns a listener suitable for use in a Caddy module.
// Always be sure to close listeners when you are done with them.
// Listen is like net.Listen, except Caddy's listeners can overlap
// each other: multiple listeners may be created on the same socket
// at the same time. This is useful because during config changes,
// the new config is started while the old config is still running.
// When Caddy listeners are closed, the closing logic is virtualized
// so the underlying socket isn't actually closed until all uses of
// the socket have been finished. Always be sure to close listeners
// when you are done with them, just like normal listeners.
func Listen(network, addr string) (net.Listener, error) {
lnKey := network + "/" + addr

listenersMu.Lock()
defer listenersMu.Unlock()

// if listener already exists, increment usage counter, then return listener
if lnGlobal, ok := listeners[lnKey]; ok {
atomic.AddInt32(&lnGlobal.usage, 1)
return &fakeCloseListener{
usage: &lnGlobal.usage,
deadline: &lnGlobal.deadline,
deadlineMu: &lnGlobal.deadlineMu,
key: lnKey,
Listener: lnGlobal.ln,
}, nil
}

// or, create new one and save it
ln, err := net.Listen(network, addr)
sharedLn, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
ln, err := net.Listen(network, addr)
if err != nil {
return nil, err
}
return &sharedListener{Listener: ln, key: lnKey}, nil
})
if err != nil {
return nil, err
}

// make sure to start its usage counter at 1
lnGlobal := &globalListener{usage: 1, ln: ln}
listeners[lnKey] = lnGlobal

return &fakeCloseListener{
usage: &lnGlobal.usage,
deadline: &lnGlobal.deadline,
deadlineMu: &lnGlobal.deadlineMu,
key: lnKey,
Listener: ln,
}, nil
return &fakeCloseListener{sharedListener: sharedLn.(*sharedListener)}, nil
}

// ListenPacket returns a net.PacketConn suitable for use in a Caddy module.
// It is like Listen except for PacketConns.
// Always be sure to close the PacketConn when you are done.
func ListenPacket(network, addr string) (net.PacketConn, error) {
lnKey := network + "/" + addr

listenersMu.Lock()
defer listenersMu.Unlock()

// if listener already exists, increment usage counter, then return listener
if lnGlobal, ok := listeners[lnKey]; ok {
atomic.AddInt32(&lnGlobal.usage, 1)
log.Printf("[DEBUG] %s: Usage counter should not go above 2 or maybe 3, is now: %d", lnKey, atomic.LoadInt32(&lnGlobal.usage)) // TODO: remove
return &fakeClosePacketConn{usage: &lnGlobal.usage, key: lnKey, PacketConn: lnGlobal.pc}, nil
}

// or, create new one and save it
pc, err := net.ListenPacket(network, addr)
sharedPc, _, err := listenerPool.LoadOrNew(lnKey, func() (Destructor, error) {
pc, err := net.ListenPacket(network, addr)
if err != nil {
return nil, err
}
return &sharedPacketConn{PacketConn: pc, key: lnKey}, nil
})
if err != nil {
return nil, err
}

// make sure to start its usage counter at 1
lnGlobal := &globalListener{usage: 1, pc: pc}
listeners[lnKey] = lnGlobal

return &fakeClosePacketConn{usage: &lnGlobal.usage, key: lnKey, PacketConn: pc}, nil
return &fakeClosePacketConn{sharedPacketConn: sharedPc.(*sharedPacketConn)}, nil
}

// fakeCloseListener's Close() method is a no-op. This allows
// stopping servers that are using the listener without giving
// up the socket; thus, servers become hot-swappable while the
// listener remains running. Listeners should be re-wrapped in
// a new fakeCloseListener each time the listener is reused.
// Other than the 'closed' field (which pertains to this value
// only), the other fields in this struct should be pointers to
// the associated globalListener's struct fields (except 'key'
// which is there for read-only purposes, so it can be a copy).
// fakeCloseListener is a private wrapper over a listener that
// is shared. The state of fakeCloseListener is not shared.
// This allows one user of a socket to "close" the listener
// while in reality the socket stays open for other users of
// the listener. In this way, servers become hot-swappable
// while the listener remains running. Listeners should be
// re-wrapped in a new fakeCloseListener each time the listener
// is reused. This type is atomic and values must not be copied.
type fakeCloseListener struct {
closed int32 // accessed atomically; belongs to this struct only
usage *int32 // accessed atomically; global
deadline *bool // protected by deadlineMu; global
deadlineMu *sync.Mutex // global
key string // global, but read-only, so can be copy
net.Listener // global
closed int32 // accessed atomically; belongs to this struct only
*sharedListener // embedded, so we also become a net.Listener
}

// Accept accepts connections until Close() is called.
func (fcl *fakeCloseListener) Accept() (net.Conn, error) {
// if the listener is already "closed", return error
if atomic.LoadInt32(&fcl.closed) == 1 {
return nil, fcl.fakeClosedErr()
}

// wrap underlying accept
conn, err := fcl.Listener.Accept()
// call underlying accept
conn, err := fcl.sharedListener.Accept()
if err == nil {
return conn, nil
}

// accept returned with error
// TODO: This may be better as a condition variable so the deadline is cleared only once?
fcl.deadlineMu.Lock()
if *fcl.deadline {
switch ln := fcl.Listener.(type) {
case *net.TCPListener:
_ = ln.SetDeadline(time.Time{})
case *net.UnixListener:
_ = ln.SetDeadline(time.Time{})
}
*fcl.deadline = false
}
fcl.deadlineMu.Unlock()

// since Accept() returned an error, it may be because our reference to
// the listener (this fakeCloseListener) may have been closed, i.e. the
// server is shutting down; in that case, we need to clear the deadline
// that we set when Close() was called, and return a non-temporary and
// non-timeout error value to the caller, masking the "true" error, so
// that server loops / goroutines won't retry, linger, and leak
if atomic.LoadInt32(&fcl.closed) == 1 {
// if we canceled the Accept() by setting a deadline
// on the listener, we need to make sure any callers of
// Accept() think the listener was actually closed;
// if we return the timeout error instead, callers might
// simply retry, leaking goroutines for longer
// we dereference the sharedListener explicitly even though it's embedded
// so that it's clear in the code that side-effects are shared with other
// users of this listener, not just our own reference to it; we also don't
// do anything with the error because all we could do is log it, but we
// expliclty assign it to nothing so we don't forget it's there if needed
_ = fcl.sharedListener.clearDeadline()

if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return nil, fcl.fakeClosedErr()
}
Expand All @@ -152,82 +117,54 @@ func (fcl *fakeCloseListener) Accept() (net.Conn, error) {
return nil, err
}

// Close stops accepting new connections without
// closing the underlying listener, unless no one
// else is using it.
// Close stops accepting new connections without closing the
// underlying listener. The underlying listener is only closed
// if the caller is the last known user of the socket.
func (fcl *fakeCloseListener) Close() error {
if atomic.CompareAndSwapInt32(&fcl.closed, 0, 1) {
// unfortunately, there is no way to cancel any
// currently-blocking calls to Accept() that are
// awaiting connections since we're not actually
// closing the listener; so we cheat by setting
// a deadline in the past, which forces it to
// time out; note that this only works for
// certain types of listeners...
fcl.deadlineMu.Lock()
if !*fcl.deadline {
switch ln := fcl.Listener.(type) {
case *net.TCPListener:
_ = ln.SetDeadline(time.Now().Add(-1 * time.Minute))
case *net.UnixListener:
_ = ln.SetDeadline(time.Now().Add(-1 * time.Minute))
}
*fcl.deadline = true
}
fcl.deadlineMu.Unlock()

// since we're no longer using this listener,
// decrement the usage counter and, if no one
// else is using it, close underlying listener
if atomic.AddInt32(fcl.usage, -1) == 0 {
listenersMu.Lock()
delete(listeners, fcl.key)
listenersMu.Unlock()
err := fcl.Listener.Close()
if err != nil {
return err
}
}

// There are two ways I know of to get an Accept()
// function to return to the server loop that called
// it: close the listener, or set a deadline in the
// past. Obviously, we can't close the socket yet
// since others may be using it (hence this whole
// file). But we can set the deadline in the past,
// and this is kind of cheating, but it works, and
// it apparently even works on Windows.
_ = fcl.sharedListener.setDeadline()
listenerPool.Delete(fcl.sharedListener.key)
}

return nil
}

// fakeClosedErr returns an error value that is not temporary
// nor a timeout, suitable for making the caller think the
// listener is actually closed
func (fcl *fakeCloseListener) fakeClosedErr() error {
return &net.OpError{
Op: "accept",
Net: fcl.Listener.Addr().Network(),
Addr: fcl.Listener.Addr(),
Net: fcl.Addr().Network(),
Addr: fcl.Addr(),
Err: errFakeClosed,
}
}

// ErrFakeClosed is the underlying error value returned by
// fakeCloseListener.Accept() after Close() has been called,
// indicating that it is pretending to be closed so that the
// server using it can terminate, while the underlying
// socket is actually left open.
var errFakeClosed = fmt.Errorf("listener 'closed' 😉")

// fakeClosePacketConn is like fakeCloseListener, but for PacketConns.
type fakeClosePacketConn struct {
closed int32 // accessed atomically
usage *int32 // accessed atomically
key string
net.PacketConn
closed int32 // accessed atomically; belongs to this struct only
*sharedPacketConn // embedded, so we also become a net.PacketConn
}

func (fcpc *fakeClosePacketConn) Close() error {
log.Println("[DEBUG] Fake-closing underlying packet conn") // TODO: remove this

if atomic.CompareAndSwapInt32(&fcpc.closed, 0, 1) {
// since we're no longer using this listener,
// decrement the usage counter and, if no one
// else is using it, close underlying listener
if atomic.AddInt32(fcpc.usage, -1) == 0 {
listenersMu.Lock()
delete(listeners, fcpc.key)
listenersMu.Unlock()
err := fcpc.PacketConn.Close()
if err != nil {
return err
}
}
listenerPool.Delete(fcpc.sharedPacketConn.key)
}

return nil
}

Expand All @@ -249,28 +186,64 @@ func (fcpc fakeClosePacketConn) SyscallConn() (syscall.RawConn, error) {
return nil, fmt.Errorf("SyscallConn() not implemented for %T", fcpc.PacketConn)
}

// ErrFakeClosed is the underlying error value returned by
// fakeCloseListener.Accept() after Close() has been called,
// indicating that it is pretending to be closed so that the
// server using it can terminate, while the underlying
// socket is actually left open.
var errFakeClosed = fmt.Errorf("listener 'closed' 😉")

// globalListener keeps global state for a listener
// that may be shared by multiple servers. In other
// words, values in this struct exist only once and
// all other uses of these values point to the ones
// in this struct. In particular, the usage count
// (how many callers are using the listener), the
// actual listener, and synchronization of the
// listener's deadline changes are singular, global
// values that must not be copied.
type globalListener struct {
usage int32 // accessed atomically
deadline bool
// sharedListener is a wrapper over an underlying listener. The listener
// and the other fields on the struct are shared state that is synchronized,
// so sharedListener structs must never be copied (always use a pointer).
type sharedListener struct {
net.Listener
key string // uniquely identifies this listener
deadline bool // whether a deadline is currently set
deadlineMu sync.Mutex
ln net.Listener
pc net.PacketConn
}

func (sl *sharedListener) clearDeadline() error {
var err error
sl.deadlineMu.Lock()
if sl.deadline {
switch ln := sl.Listener.(type) {
case *net.TCPListener:
err = ln.SetDeadline(time.Time{})
case *net.UnixListener:
err = ln.SetDeadline(time.Time{})
}
sl.deadline = false
}
sl.deadlineMu.Unlock()
return err
}

func (sl *sharedListener) setDeadline() error {
timeInPast := time.Now().Add(-1 * time.Minute)
var err error
sl.deadlineMu.Lock()
if !sl.deadline {
switch ln := sl.Listener.(type) {
case *net.TCPListener:
err = ln.SetDeadline(timeInPast)
case *net.UnixListener:
err = ln.SetDeadline(timeInPast)
}
sl.deadline = true
}
sl.deadlineMu.Unlock()
return err
}

// Destruct is called by the UsagePool when the listener is
// finally not being used anymore. It closes the socket.
func (sl *sharedListener) Destruct() error {
return sl.Listener.Close()
}

// sharedPacketConn is like sharedListener, but for net.PacketConns.
type sharedPacketConn struct {
net.PacketConn
key string
}

// Destruct closes the underlying socket.
func (spc *sharedPacketConn) Destruct() error {
return spc.PacketConn.Close()
}

// NetworkAddress contains the individual components
Expand Down Expand Up @@ -445,10 +418,8 @@ type ListenerWrapper interface {
WrapListener(net.Listener) net.Listener
}

var (
listeners = make(map[string]*globalListener)
listenersMu sync.Mutex
)
// listenerPool stores and allows reuse of active listeners.
var listenerPool = NewUsagePool()

const maxPortSpan = 65535

Expand Down
Loading

0 comments on commit 64a3218

Please sign in to comment.