From c3601e8026683ad4f9809eb2f6c42e6eddeece1f Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sat, 16 Apr 2022 19:02:08 -0400 Subject: [PATCH 1/2] WIP: don't block when queuing methods on *Server. This should fix #189, though right now the tests are deadlocking and I don't have the energy to debug, so I'm putting it down. This also obviates MaxConcurrentCalls; in the current version of the patch it is still present, but unused. Once this is working, a subsequent commit will remove the Policy struct. This significantly restructures the internals of *Server, and I think the result is easier to understand: the methods on *Server that start calls just stick the calls in a queue, and there is a goroutine that pulls them out and handles them. This even gets rid of Server.mu entirely, since nothing actually needed the lock anymore! Hopefully it will stay simple when I actually get it working. --- server/server.go | 250 ++++++++++++++--------------------------------- 1 file changed, 76 insertions(+), 174 deletions(-) diff --git a/server/server.go b/server/server.go index 97520624..34721d3a 100644 --- a/server/server.go +++ b/server/server.go @@ -10,6 +10,7 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/exc" + "capnproto.org/go/capnp/v3/internal/mpsc" ) // A Method describes a single capability method on a server object. @@ -21,30 +22,24 @@ type Method struct { // Call holds the state of an ongoing capability method call. // A Call cannot be used after the server method returns. type Call struct { - args capnp.Struct + ctx context.Context + cancel context.CancelFunc + method *Method + recv capnp.Recv + aq *answerQueue + srv *Server alloced bool - alloc resultsAllocer results capnp.Struct - ack chan<- struct{} acked bool } -func newCall(args capnp.Struct, ra resultsAllocer) (*Call, <-chan struct{}) { - ack := make(chan struct{}) - return &Call{ - args: args, - alloc: ra, - ack: ack, - }, ack -} - // Args returns the call's arguments. Args is not safe to // reference after a method implementation returns. Args is safe to // call and read from multiple goroutines. func (c *Call) Args() capnp.Struct { - return c.args + return c.recv.Args } // AllocResults allocates the results struct. It is an error to call @@ -55,7 +50,7 @@ func (c *Call) AllocResults(sz capnp.ObjectSize) (capnp.Struct, error) { } var err error c.alloced = true - c.results, err = c.alloc.AllocResults(sz) + c.results, err = c.recv.Returner.AllocResults(sz) return c.results, err } @@ -73,8 +68,8 @@ func (c *Call) Ack() { if c.acked { return } - close(c.ack) c.acked = true + go c.srv.handleCalls(c.srv.handleCallsCtx) } // Shutdowner is the interface that wraps the Shutdown method. @@ -88,32 +83,20 @@ type Server struct { methods sortedMethods brand interface{} shutdown Shutdowner - policy Policy - - // mu protects the following fields. - // mu should never be held while calling application code. - mu sync.Mutex - // ongoing is a fixed-size list of ongoing calls. - // It is used as a semaphore: when all elements are set, no new work - // can be started until an element is cleared. - ongoing []cstate + // Cancels handleCallsCtx + cancelCalls context.CancelFunc - // starting is non-nil if start() is waiting for acknowledgement of a - // call. It is closed when the acknowledgement is received. - starting <-chan struct{} + // Context used by the goroutine running handleCalls() + handleCallsCtx context.Context - // full is non-nil if a start() is waiting for a space in ongoing to - // free up. It is closed and set to nil when the next call returns. - full chan<- struct{} - - // drain is non-nil when Shutdown starts and is closed by the last - // call to return. - drain chan struct{} -} + // wg is incremented each time a method is queued, and + // decremented after it is handled. + wg sync.WaitGroup -type cstate struct { - cancel context.CancelFunc // nil if slot free + // Calls are inserted into this queue, to be handled + // by a goroutine running handleCalls() + callQueue *mpsc.Queue[*Call] } // Policy is a set of behavioral parameters for a Server. @@ -135,20 +118,19 @@ type Policy struct { // return or acknowledgment of the previous call. See Call.Ack for more // details. func New(methods []Method, brand interface{}, shutdown Shutdowner, policy *Policy) *Server { + ctx, cancel := context.WithCancel(context.Background()) + srv := &Server{ - methods: make(sortedMethods, len(methods)), - brand: brand, - shutdown: shutdown, + methods: make(sortedMethods, len(methods)), + brand: brand, + shutdown: shutdown, + callQueue: mpsc.New[*Call](), + cancelCalls: cancel, + handleCallsCtx: ctx, } copy(srv.methods, methods) sort.Sort(srv.methods) - if policy != nil { - srv.policy = *policy - } - if srv.policy.MaxConcurrentCalls < 1 { - srv.policy.MaxConcurrentCalls = 2 - } - srv.ongoing = make([]cstate, srv.policy.MaxConcurrentCalls) + go srv.handleCalls(ctx) return srv } @@ -186,128 +168,64 @@ func (srv *Server) Recv(ctx context.Context, r capnp.Recv) capnp.PipelineCaller return srv.start(ctx, mm, r) } -func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.PipelineCaller { - // Acquire "starting" condition variable. - srv.mu.Lock() +func (srv *Server) handleCalls(ctx context.Context) { for { - if srv.drain != nil { - srv.mu.Unlock() - r.Reject(exc.New(exc.Failed, "capnp server", "call after shutdown")) - return nil - } - if srv.starting == nil { + call, err := srv.callQueue.Recv(ctx) + if err != nil { break } - wait := srv.starting - srv.mu.Unlock() - select { - case <-wait: - case <-ctx.Done(): - r.Reject(ctx.Err()) - return nil + + srv.handleCall(call) + if call.acked { + // Another goroutine has taken over; time + // to retire. + return } - srv.mu.Lock() } - starting := make(chan struct{}) - srv.starting = starting - - // Acquire an ID (semaphore). - id := srv.nextID() - if id == -1 { - full := make(chan struct{}) - srv.full = full - srv.mu.Unlock() - select { - case <-full: - case <-ctx.Done(): - srv.mu.Lock() - srv.starting = nil - close(starting) - srv.full = nil // full could be nil or non-nil, ensure it is nil. - srv.mu.Unlock() - r.Reject(ctx.Err()) - return nil - } - srv.mu.Lock() - id = srv.nextID() - if srv.drain != nil { - srv.starting = nil - close(starting) - srv.mu.Unlock() - r.Reject(exc.New(exc.Failed, "capnp server", "call after shutdown")) - return nil + for { + // Context has been canceled; drain the rest of the queue, + // cancelling each call. + call, ok := srv.callQueue.TryRecv() + if !ok { + return } + call.cancel() + srv.handleCall(call) } +} - // Bookkeeping: set starting to indicate we're waiting for an ack and - // record the cancel function for draining. - ctx, cancel := context.WithCancel(ctx) - srv.ongoing[id] = cstate{cancel} - srv.mu.Unlock() +func (srv *Server) handleCall(c *Call) { + defer srv.wg.Done() + defer c.cancel() - // Call implementation function. - call, ack := newCall(r.Args, r.Returner) - aq := newAnswerQueue(r.Method) - done := make(chan struct{}) - go func() { - err := m.Impl(ctx, call) - r.ReleaseArgs() - if err == nil { - aq.fulfill(call.results) - r.Returner.Return(nil) - } else { - aq.reject(err) - r.Returner.Return(err) - } - srv.mu.Lock() - srv.ongoing[id].cancel() - srv.ongoing[id] = cstate{} - if srv.drain != nil && !srv.hasOngoing() { - close(srv.drain) - } - if srv.full != nil { - close(srv.full) - srv.full = nil - } - srv.mu.Unlock() - close(done) - }() - var pcall capnp.PipelineCaller - select { - case <-ack: - pcall = aq - case <-done: - // Implementation functions may not call Ack, which is fine for - // smaller functions. + err := c.ctx.Err() + if err == nil { + err = c.method.Impl(c.ctx, c) } - srv.mu.Lock() - srv.starting = nil - close(starting) - srv.mu.Unlock() - return pcall -} - -// nextID returns the next available index in srv.ongoing or -1 if -// there are too many ongoing calls. The caller must be holding onto -// srv.mu. -func (srv *Server) nextID() int { - for i := range srv.ongoing { - if srv.ongoing[i].cancel == nil { - return i - } + c.recv.ReleaseArgs() + if err == nil { + c.aq.fulfill(c.results) + } else { + c.aq.reject(err) } - return -1 + c.recv.Returner.Return(err) } -// hasOngoing reports whether there are any ongoing calls. -// The caller must be holding onto srv.mu. -func (srv *Server) hasOngoing() bool { - for i := range srv.ongoing { - if srv.ongoing[i].cancel != nil { - return true - } - } - return false +func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.PipelineCaller { + srv.wg.Add(1) + + ctx, cancel := context.WithCancel(ctx) + + aq := newAnswerQueue(r.Method) + srv.callQueue.Send(&Call{ + ctx: ctx, + cancel: cancel, + method: m, + recv: r, + aq: aq, + srv: srv, + }) + return aq } // Brand returns a value that will match IsServer. @@ -319,24 +237,8 @@ func (srv *Server) Brand() capnp.Brand { // Shutdowner passed into NewServer. Shutdown must not be called more // than once. func (srv *Server) Shutdown() { - srv.mu.Lock() - if srv.drain != nil { - srv.mu.Unlock() - panic("capnp server: Shutdown called multiple times") - } - srv.drain = make(chan struct{}) - if srv.hasOngoing() { - for _, cs := range srv.ongoing { - if cs.cancel != nil { - cs.cancel() - } - } - srv.mu.Unlock() - <-srv.drain - } else { - close(srv.drain) - srv.mu.Unlock() - } + srv.cancelCalls() + srv.wg.Wait() if srv.shutdown != nil { srv.shutdown.Shutdown() } From 9bd1e541cbfe33581c7c01c58e01447de586ed51 Mon Sep 17 00:00:00 2001 From: Ian Denhardt Date: Sun, 17 Apr 2022 02:02:46 -0400 Subject: [PATCH 2/2] Fix a deadlock in server.Shutdown() See the comments documenting ordering requirements; previously, if the method call was blocking on its own context, it would not be stopped in response to server.Shutdown(), since the context for handleCalls is entirely independent. --- server/server.go | 69 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 13 deletions(-) diff --git a/server/server.go b/server/server.go index 34721d3a..90b4b899 100644 --- a/server/server.go +++ b/server/server.go @@ -85,9 +85,12 @@ type Server struct { shutdown Shutdowner // Cancels handleCallsCtx - cancelCalls context.CancelFunc + cancelHandleCalls context.CancelFunc - // Context used by the goroutine running handleCalls() + // Context used by the goroutine running handleCalls(). Note + // the calls themselves will have different contexts, which + // are not children of this context, but are supplied by + // start(). See cancelCurrentCall. handleCallsCtx context.Context // wg is incremented each time a method is queued, and @@ -97,6 +100,15 @@ type Server struct { // Calls are inserted into this queue, to be handled // by a goroutine running handleCalls() callQueue *mpsc.Queue[*Call] + + // When a call is in progress, this channel will contain the + // CancelFunc for that call's context. A goroutine may receive + // on this to fetch the function, and is then responsible for calling + // it. This happens in Shutdown(). + // + // The caller must call cancelHandleCalls() *before* calling + // the received CancelFunc. + cancelCurrentCall chan context.CancelFunc } // Policy is a set of behavioral parameters for a Server. @@ -121,12 +133,13 @@ func New(methods []Method, brand interface{}, shutdown Shutdowner, policy *Polic ctx, cancel := context.WithCancel(context.Background()) srv := &Server{ - methods: make(sortedMethods, len(methods)), - brand: brand, - shutdown: shutdown, - callQueue: mpsc.New[*Call](), - cancelCalls: cancel, - handleCallsCtx: ctx, + methods: make(sortedMethods, len(methods)), + brand: brand, + shutdown: shutdown, + callQueue: mpsc.New[*Call](), + cancelHandleCalls: cancel, + handleCallsCtx: ctx, + cancelCurrentCall: make(chan context.CancelFunc, 1), } copy(srv.methods, methods) sort.Sort(srv.methods) @@ -175,7 +188,7 @@ func (srv *Server) handleCalls(ctx context.Context) { break } - srv.handleCall(call) + srv.handleCall(ctx, call) if call.acked { // Another goroutine has taken over; time // to retire. @@ -190,18 +203,38 @@ func (srv *Server) handleCalls(ctx context.Context) { return } call.cancel() - srv.handleCall(call) + srv.handleCall(ctx, call) } } -func (srv *Server) handleCall(c *Call) { +func (srv *Server) handleCall(ctx context.Context, c *Call) { defer srv.wg.Done() defer c.cancel() - err := c.ctx.Err() + // Store this in the channel, in case Shutdown() gets called + // while we're servicing the method call. + srv.cancelCurrentCall <- c.cancel + defer func() { + select { + case <-srv.cancelCurrentCall: + default: + } + }() + + // Handling the contexts is tricky here, since neither one + // is necessarily a parent of the other. We need to check + // the context that was passed to us (which manages the + // handleCalls loop) some time *after* storing c.cancel, + // above, to avoid a race between this code and Shutdown(), + // which cancels ctx before attempting to receive c.cancel. + err := ctx.Err() + if err == nil { + err = c.ctx.Err() + } if err == nil { err = c.method.Impl(c.ctx, c) } + c.recv.ReleaseArgs() if err == nil { c.aq.fulfill(c.results) @@ -237,7 +270,17 @@ func (srv *Server) Brand() capnp.Brand { // Shutdowner passed into NewServer. Shutdown must not be called more // than once. func (srv *Server) Shutdown() { - srv.cancelCalls() + // Cancel the loop in handleCalls(), and then cancel the outstanding + // call, if any. The order here is critical; if we cancel the + // outstanding call first, the loop may start another call before + // we cancel it. + srv.cancelHandleCalls() + select { + case cancel := <-srv.cancelCurrentCall: + cancel() + default: + } + srv.wg.Wait() if srv.shutdown != nil { srv.shutdown.Shutdown()