Skip to content

Commit

Permalink
Fix a subtle push hook race in client shutdown.
Browse files Browse the repository at this point in the history
Prior to this change, shortly after a client is closed it is possible that a
queued push notification from the server that arrived before the close will
have been dequeued, and then delivered to the OnNotify hook while the client is
waiting to exit. In that case, the Close method may return before the hook is
invoked.

To avoid this scenario, ensure that delivery also gates shutdown, converting
the done channel into a wait group. This case is tricky to test explicitly; I
noticed it because of a very rare flake in the test for OnNotify itself.  This
change fixes the flake even for -count=1000, but I did not add a new test for
this condition yet.

This bug would only affect clients using OnNotify, and expecting hooks to have
finished their work by the time Close returns (as the test does). That is now
true, but was not in rare cases before.
  • Loading branch information
creachadair committed Aug 5, 2021
1 parent 86c5633 commit ad3affb
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// A Client is a JSON-RPC 2.0 client. The client sends requests and receives
// responses on a channel.Channel provided by the caller.
type Client struct {
done chan struct{} // closed when the reader is done at shutdown time
done *sync.WaitGroup // done when the reader is finished at shutdown time

log func(string, ...interface{}) // write debug logs here
enctx encoder
Expand All @@ -36,7 +36,7 @@ type Client struct {
// NewClient returns a new client that communicates with the server via ch.
func NewClient(ch channel.Channel, opts *ClientOptions) *Client {
c := &Client{
done: make(chan struct{}),
done: new(sync.WaitGroup),
log: opts.logger(),
allow1: opts.allowV1(),
enctx: opts.encodeContext(),
Expand All @@ -57,8 +57,9 @@ func NewClient(ch channel.Channel, opts *ClientOptions) *Client {
// back to pending requests by their ID. Outbound requests do not queue;
// they are sent synchronously in the Send method.

c.done.Add(1)
go func() {
defer close(c.done)
defer c.done.Done()
for c.accept(ch) == nil {
}
}()
Expand All @@ -85,7 +86,9 @@ func (c *Client) accept(ch channel.Receiver) error {
}

c.log("Received %d responses", len(in))
c.done.Add(1)
go func() {
defer c.done.Done()
c.mu.Lock()
defer c.mu.Unlock()
for _, rsp := range in {
Expand Down Expand Up @@ -367,7 +370,8 @@ func (c *Client) Close() error {
c.mu.Lock()
c.stop(errClientStopped)
c.mu.Unlock()
<-c.done
c.done.Wait()

// Don't remark on a closed channel or EOF as a noteworthy failure.
if isUninteresting(c.err) {
return nil
Expand Down

0 comments on commit ad3affb

Please sign in to comment.