From d14674b597bab82994c98fb0da67483c5b7afb84 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 14 Sep 2020 11:16:19 +0200 Subject: [PATCH] go/runtime/client: Fix possible panic on shutdown --- .changelog/3271.bugfix.md | 1 + go/runtime/client/client.go | 34 +++++++++++++++++++++++----------- go/runtime/client/watcher.go | 7 +++---- 3 files changed, 27 insertions(+), 15 deletions(-) create mode 100644 .changelog/3271.bugfix.md diff --git a/.changelog/3271.bugfix.md b/.changelog/3271.bugfix.md new file mode 100644 index 00000000000..c5713695e22 --- /dev/null +++ b/.changelog/3271.bugfix.md @@ -0,0 +1 @@ +go/runtime/client: Fix possible panic on shutdown diff --git a/go/runtime/client/client.go b/go/runtime/client/client.go index 55057f49c50..316ef3bd10a 100644 --- a/go/runtime/client/client.go +++ b/go/runtime/client/client.go @@ -84,15 +84,24 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque } c.Unlock() + // Send a request for watching a new runtime transaction. respCh := make(chan *watchResult) - var requestID hash.Hash - requestID.FromBytes(request.Data) - watcher.newCh <- &watchRequest{ - id: &requestID, + req := &watchRequest{ ctx: ctx, respCh: respCh, } + req.id.FromBytes(request.Data) + select { + case <-ctx.Done(): + // The context we're working in was canceled, abort. + return nil, ctx.Err() + case <-c.common.ctx.Done(): + // Client is shutting down. + return nil, fmt.Errorf("client: shutting down") + case watcher.newCh <- req: + } + // Wait for response, handling retries if/when needed. for { var resp *watchResult var ok bool @@ -100,10 +109,17 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque select { case <-ctx.Done(): // The context we're working in was canceled, abort. - return nil, context.Canceled - + return nil, ctx.Err() + case <-c.common.ctx.Done(): + // Client is shutting down. + return nil, fmt.Errorf("client: shutting down") case resp, ok = <-respCh: - // The main event is getting a response from the watcher, handled below. + if !ok { + return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)") + } + + // The main event is getting a response from the watcher, handled below. If there is + // no result yet, this means that we need to retry publish. if resp.result == nil { break } @@ -111,10 +127,6 @@ func (c *runtimeClient) SubmitTx(ctx context.Context, request *api.SubmitTxReque return resp.result, nil } - if !ok { - return nil, fmt.Errorf("client: block watch channel closed unexpectedly (unknown error)") - } - c.common.p2p.Publish(context.Background(), request.RuntimeID, &p2p.Message{ Tx: &executor.Tx{ Data: request.Data, diff --git a/go/runtime/client/watcher.go b/go/runtime/client/watcher.go index 620b912726e..eb6f53b29f6 100644 --- a/go/runtime/client/watcher.go +++ b/go/runtime/client/watcher.go @@ -19,7 +19,7 @@ const ( ) type watchRequest struct { - id *hash.Hash + id hash.Hash ctx context.Context respCh chan *watchResult height int64 @@ -96,7 +96,6 @@ func (w *blockWatcher) checkBlock(blk *block.Block) { func (w *blockWatcher) watch() { defer func() { - close(w.newCh) for _, watch := range w.watched { close(watch.respCh) } @@ -204,13 +203,13 @@ func (w *blockWatcher) watch() { } case newWatch := <-w.newCh: - w.watched[*newWatch.id] = newWatch + w.watched[newWatch.id] = newWatch res := &watchResult{ groupVersion: latestGroupVersion, } if newWatch.send(res, latestHeight) != nil { - delete(w.watched, *newWatch.id) + delete(w.watched, newWatch.id) } case <-w.stopCh: