From 696469aae79dd1dfd2d27e8310a9735305cd0861 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Wed, 9 Dec 2020 11:25:20 +0000 Subject: [PATCH] fix: avoid potential hang when starting event listener It was possible for NewEvents to never return, blocked on waiting for a WaitGroup to be done. The call to Done was in a goroutine that could exit before reaching the Done call. Replace the WaitGroup with a channel that is closed to signal that initialisation is complete. Also, while we are waiting on the channel, wait on the context so we can exit clealy if the context is canceled. --- chain/events/events.go | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/chain/events/events.go b/chain/events/events.go index dcdf6c16245..1dcf634231c 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -20,8 +20,10 @@ import ( var log = logging.Logger("events") // HeightHandler `curH`-`ts.Height` = `confidence` -type HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error -type RevertHandler func(ctx context.Context, ts *types.TipSet) error +type ( + HeightHandler func(ctx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error + RevertHandler func(ctx context.Context, ts *types.TipSet) error +) type heightHandler struct { confidence int @@ -48,7 +50,7 @@ type Events struct { tsc *tipSetCache lk sync.Mutex - ready sync.WaitGroup + ready chan struct{} readyOnce sync.Once heightEvents @@ -76,15 +78,16 @@ func NewEvents(ctx context.Context, api eventAPI) *Events { }, hcEvents: newHCEvents(ctx, api, tsc, uint64(gcConfidence)), + ready: make(chan struct{}), } - e.ready.Add(1) - go e.listenHeadChanges(ctx) - e.ready.Wait() - - // TODO: cleanup/gc goroutine + // Wait for the first tipset to be seen or bail if shutting down + select { + case <-e.ready: + case <-ctx.Done(): + } return e } @@ -111,13 +114,21 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { notifs, err := e.api.ChainNotify(ctx) if err != nil { - // TODO: retry + // Retry is handled by caller return xerrors.Errorf("listenHeadChanges ChainNotify call failed: %w", err) } - cur, ok := <-notifs // TODO: timeout? - if !ok { - return xerrors.Errorf("notification channel closed") + var cur []*api.HeadChange + var ok bool + + // Wait for first tipset or bail + select { + case cur, ok = <-notifs: + if !ok { + return xerrors.Errorf("notification channel closed") + } + case <-ctx.Done(): + return ctx.Err() } if len(cur) != 1 { @@ -134,8 +145,8 @@ func (e *Events) listenHeadChangesOnce(ctx context.Context) error { e.readyOnce.Do(func() { e.lastTs = cur[0].Val - - e.ready.Done() + // Signal that we have seen first tipset + close(e.ready) }) for notif := range notifs {