From 5826b61b2e0ad94d65fd791dc4fbd91a8428a921 Mon Sep 17 00:00:00 2001 From: sergerad Date: Sun, 14 May 2023 15:52:56 +1200 Subject: [PATCH 01/14] Add common.RetryForever() and use for concurrent operations that can fail --- consensus/polybft/polybft.go | 9 ++++--- go.mod | 1 + go.sum | 2 ++ helper/common/common.go | 12 +++++++++ jsonrpc/jsonrpc.go | 8 ++++-- network/gossip.go | 10 +++++--- server/server.go | 15 ++++++----- tracker/event_tracker.go | 48 ++++++++++++++++++++---------------- 8 files changed, 68 insertions(+), 37 deletions(-) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 1941038b06..324e726d21 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -2,6 +2,7 @@ package polybft import ( + "context" "encoding/json" "fmt" "path/filepath" @@ -354,17 +355,17 @@ func (p *Polybft) Start() error { } // start syncing - go func() { + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { blockHandler := func(b *types.FullBlock) bool { p.runtime.OnBlockInserted(b) - return false } - if err := p.syncer.Sync(blockHandler); err != nil { p.logger.Error("blocks synchronization failed", "error", err) + return err } - }() + return nil + }) // start consensus runtime if err := p.startRuntime(); err != nil { diff --git a/go.mod b/go.mod index 4cb639fd50..62c463531d 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/dave/jennifer v1.6.1 github.com/quasilyte/go-ruleguard v0.3.19 github.com/quasilyte/go-ruleguard/dsl v0.3.22 + github.com/sethvargo/go-retry v0.2.4 golang.org/x/sync v0.2.0 google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 gopkg.in/DataDog/dd-trace-go.v1 v1.50.1 diff --git a/go.sum b/go.sum index 490445c85d..7e33310893 100644 --- a/go.sum +++ b/go.sum @@ -619,6 +619,8 @@ github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOa github.com/secure-systems-lab/go-securesystemslib v0.5.0 h1:oTiNu0QnulMQgN/hLK124wJD/r2f9ZhIUuKIeBsCBT8= github.com/secure-systems-lab/go-securesystemslib v0.5.0/go.mod h1:uoCqUC0Ap7jrBSEanxT+SdACYJTVplRXWLkGMuDjXqk= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec= +github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0= diff --git a/helper/common/common.go b/helper/common/common.go index 954d6af4ae..188fdd9c66 100644 --- a/helper/common/common.go +++ b/helper/common/common.go @@ -1,6 +1,7 @@ package common import ( + "context" "encoding/binary" "encoding/json" "errors" @@ -18,6 +19,7 @@ import ( "time" "github.com/0xPolygon/polygon-edge/helper/hex" + "github.com/sethvargo/go-retry" ) var ( @@ -30,6 +32,16 @@ var ( errInvalidDuration = errors.New("invalid duration") ) +// RetryForever will execute a function until it completes without error +func RetryForever(ctx context.Context, interval time.Duration, fn func(context.Context) error) { + _ = retry.Do(ctx, retry.NewConstant(interval), func(context.Context) error { + if err := fn(ctx); err != nil { + return retry.RetryableError(err) + } + return nil + }) +} + // Min returns the strictly lower number func Min(a, b uint64) uint64 { if a < b { diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 6df997eb58..1a8b30afb2 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -1,6 +1,7 @@ package jsonrpc import ( + "context" "encoding/json" "fmt" "io" @@ -9,6 +10,7 @@ import ( "sync" "time" + "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/versioning" "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" @@ -126,11 +128,13 @@ func (j *JSONRPC) setupHTTP() error { ReadHeaderTimeout: 60 * time.Second, } - go func() { + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { if err := srv.Serve(lis); err != nil { j.logger.Error("closed http connection", "err", err) + return err } - }() + return nil + }) return nil } diff --git a/network/gossip.go b/network/gossip.go index 1f3914f039..fe505edc63 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -6,7 +6,9 @@ import ( "reflect" "sync" "sync/atomic" + "time" + "github.com/0xPolygon/polygon-edge/helper/common" "github.com/hashicorp/go-hclog" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -102,16 +104,16 @@ func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{}, continue } - go func() { + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { obj := t.createObj() if err := proto.Unmarshal(msg.Data, obj); err != nil { t.logger.Error("failed to unmarshal topic", "err", err) - - return + return err } handler(obj, msg.GetFrom()) - }() + return nil + }) } } diff --git a/server/server.go b/server/server.go index 960f8952ee..8cee685bf2 100644 --- a/server/server.go +++ b/server/server.go @@ -917,11 +917,14 @@ func (s *Server) setupGRPC() error { return err } - go func() { + // Start server with infinite retries + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { if err := s.grpcServer.Serve(lis); err != nil { s.logger.Error(err.Error()) + return err } - }() + return nil + }) s.logger.Info("GRPC server running", "addr", s.config.GRPCAddr.String()) @@ -996,13 +999,13 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { ReadHeaderTimeout: 60 * time.Second, } - go func() { - s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) - + s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) } - }() + return nil + }) return srv } diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 882e7bab6e..ad999d9a1d 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -2,7 +2,9 @@ package tracker import ( "context" + "time" + "github.com/0xPolygon/polygon-edge/helper/common" hcf "github.com/hashicorp/go-hclog" "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/blocktracker" @@ -64,6 +66,26 @@ func (e *EventTracker) Start(ctx context.Context) error { blockMaxBacklog := e.numBlockConfirmations*2 + 1 blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog)) + go func() { + <-ctx.Done() + blockTracker.Close() + store.Close() + }() + + // Init and start block tracker concurrently, with infinite retries + go common.RetryForever(ctx, time.Second, func(context.Context) error { + if err := blockTracker.Init(); err != nil { + e.logger.Error("failed to init blocktracker", "error", err) + return err + } + + if err := blockTracker.Start(); err != nil { + e.logger.Error("failed to start blocktracker", "error", err) + return err + } + return nil + }) + tt, err := tracker.NewTracker(provider.Eth(), tracker.WithBatchSize(10), tracker.WithBlockTracker(blockTracker), @@ -79,30 +101,14 @@ func (e *EventTracker) Start(ctx context.Context) error { if err != nil { return err } - - go func() { - if err := blockTracker.Init(); err != nil { - e.logger.Error("failed to init blocktracker", "error", err) - - return - } - - if err := blockTracker.Start(); err != nil { - e.logger.Error("failed to start blocktracker", "error", err) - } - }() - - go func() { - <-ctx.Done() - blockTracker.Close() - store.Close() - }() - - go func() { + // Sync concurrently, with infinite retries + go common.RetryForever(ctx, time.Second, func(context.Context) error { //nolint: errcheck if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) + return err } - }() + return nil + }) return nil } From 4b7892cc0981bf34703f57b353028666db9d1f0e Mon Sep 17 00:00:00 2001 From: sergerad Date: Sun, 14 May 2023 18:49:13 +1200 Subject: [PATCH 02/14] Add unit tests for RetryForever() --- helper/common/common_test.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/helper/common/common_test.go b/helper/common/common_test.go index 938ec30632..3ca6b53e56 100644 --- a/helper/common/common_test.go +++ b/helper/common/common_test.go @@ -1,7 +1,9 @@ package common import ( + "context" "encoding/json" + "errors" "math/big" "testing" "time" @@ -98,3 +100,31 @@ func Test_Duration_Marshal_UnmarshalJSON(t *testing.T) { require.Equal(t, origTimer, otherTimer) }) } + +func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) { + interval := time.Millisecond * 10 + ended := false + go func() { + RetryForever(context.Background(), interval, func(ctx context.Context) error { + return errors.New("") + }) + ended = true + }() + time.Sleep(interval * 10) + require.False(t, ended) +} + +func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + default: + cancel() + return errors.New("") + } + }) + <-ctx.Done() + require.True(t, errors.Is(ctx.Err(), context.Canceled)) +} From 86cdbd3e70c0e94a2fbc3da52d25c02c8f462ff5 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 15 May 2023 19:23:24 +1200 Subject: [PATCH 03/14] Fix missing return err statement --- server/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/server.go b/server/server.go index 8cee685bf2..ad33e557b9 100644 --- a/server/server.go +++ b/server/server.go @@ -1003,6 +1003,7 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { go common.RetryForever(context.Background(), time.Second, func(context.Context) error { if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) + return err } return nil }) From 2dbddca5bc6a62a0d50465d4f0b1e450c7738400 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 15 May 2023 19:28:11 +1200 Subject: [PATCH 04/14] Update prometheus error logic --- server/server.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/server.go b/server/server.go index ad33e557b9..3fe54ed53a 100644 --- a/server/server.go +++ b/server/server.go @@ -1001,7 +1001,10 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) go common.RetryForever(context.Background(), time.Second, func(context.Context) error { - if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + if err := srv.ListenAndServe(); err != nil { + if errors.Is(err, http.ErrServerClosed) { + return nil + } s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) return err } From 629dc0fc8c0469e713b0a2b1ed68c271045984a5 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 15 May 2023 19:34:31 +1200 Subject: [PATCH 05/14] Revert gossip retry --- consensus/ibft/signer/helper_test.go | 4 ++-- network/gossip.go | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/consensus/ibft/signer/helper_test.go b/consensus/ibft/signer/helper_test.go index 4386f63ee7..c5a24331b4 100644 --- a/consensus/ibft/signer/helper_test.go +++ b/consensus/ibft/signer/helper_test.go @@ -73,7 +73,7 @@ func Test_wrapCommitHash(t *testing.T) { assert.Equal(t, expectedOutput, output) } -//nolint +// nolint func Test_getOrCreateECDSAKey(t *testing.T) { t.Parallel() @@ -184,7 +184,7 @@ func Test_getOrCreateECDSAKey(t *testing.T) { } } -//nolint +// nolint func Test_getOrCreateBLSKey(t *testing.T) { t.Parallel() diff --git a/network/gossip.go b/network/gossip.go index fe505edc63..1f3914f039 100644 --- a/network/gossip.go +++ b/network/gossip.go @@ -6,9 +6,7 @@ import ( "reflect" "sync" "sync/atomic" - "time" - "github.com/0xPolygon/polygon-edge/helper/common" "github.com/hashicorp/go-hclog" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -104,16 +102,16 @@ func (t *Topic) readLoop(sub *pubsub.Subscription, handler func(obj interface{}, continue } - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + go func() { obj := t.createObj() if err := proto.Unmarshal(msg.Data, obj); err != nil { t.logger.Error("failed to unmarshal topic", "err", err) - return err + + return } handler(obj, msg.GetFrom()) - return nil - }) + }() } } From b3c741187ebe0dfee104dbe10d455eecbc339bc6 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 15 May 2023 19:44:55 +1200 Subject: [PATCH 06/14] Revert unwanted fmt --- consensus/ibft/signer/helper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/ibft/signer/helper_test.go b/consensus/ibft/signer/helper_test.go index c5a24331b4..4386f63ee7 100644 --- a/consensus/ibft/signer/helper_test.go +++ b/consensus/ibft/signer/helper_test.go @@ -73,7 +73,7 @@ func Test_wrapCommitHash(t *testing.T) { assert.Equal(t, expectedOutput, output) } -// nolint +//nolint func Test_getOrCreateECDSAKey(t *testing.T) { t.Parallel() @@ -184,7 +184,7 @@ func Test_getOrCreateECDSAKey(t *testing.T) { } } -// nolint +//nolint func Test_getOrCreateBLSKey(t *testing.T) { t.Parallel() From 52e6ce901506c6f1cf600238b1775ae5794ae54e Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 16 May 2023 08:23:53 +1200 Subject: [PATCH 07/14] Only do sync concurrently in event_tracker.go --- tracker/event_tracker.go | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index ad999d9a1d..5c1b0204c8 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -2,6 +2,7 @@ package tracker import ( "context" + "fmt" "time" "github.com/0xPolygon/polygon-edge/helper/common" @@ -60,31 +61,24 @@ func (e *EventTracker) Start(ctx context.Context) error { store, err := NewEventTrackerStore(e.dbPath, e.numBlockConfirmations, e.subscriber, e.logger) if err != nil { - return err + return fmt.Errorf("failed to create event tracker store: %w", err) } blockMaxBacklog := e.numBlockConfirmations*2 + 1 blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog)) - go func() { <-ctx.Done() blockTracker.Close() store.Close() }() - // Init and start block tracker concurrently, with infinite retries - go common.RetryForever(ctx, time.Second, func(context.Context) error { - if err := blockTracker.Init(); err != nil { - e.logger.Error("failed to init blocktracker", "error", err) - return err - } - - if err := blockTracker.Start(); err != nil { - e.logger.Error("failed to start blocktracker", "error", err) - return err - } - return nil - }) + // Start block tracker + if err := blockTracker.Init(); err != nil { + return fmt.Errorf("failed to init blocktracker: %w", err) + } + if err := blockTracker.Start(); err != nil { + return fmt.Errorf("failed to start blocktracker: %w", err) + } tt, err := tracker.NewTracker(provider.Eth(), tracker.WithBatchSize(10), @@ -99,10 +93,10 @@ func (e *EventTracker) Start(ctx context.Context) error { }), ) if err != nil { - return err + return fmt.Errorf("failed to create tracker: %w", err) } - // Sync concurrently, with infinite retries - go common.RetryForever(ctx, time.Second, func(context.Context) error { //nolint: errcheck + // Sync concurrently, retry indefinitely + go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) return err From 74e3cba5c2a0957db51ba750b9473d647cbf7a09 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 16 May 2023 10:02:10 +1200 Subject: [PATCH 08/14] RM some retries --- consensus/polybft/polybft.go | 20 ++++++++------------ jsonrpc/jsonrpc.go | 13 ++++--------- tracker/event_tracker.go | 1 + 3 files changed, 13 insertions(+), 21 deletions(-) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 324e726d21..1a3952dec8 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -2,7 +2,6 @@ package polybft import ( - "context" "encoding/json" "fmt" "path/filepath" @@ -355,17 +354,14 @@ func (p *Polybft) Start() error { } // start syncing - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { - blockHandler := func(b *types.FullBlock) bool { - p.runtime.OnBlockInserted(b) - return false - } - if err := p.syncer.Sync(blockHandler); err != nil { - p.logger.Error("blocks synchronization failed", "error", err) - return err - } - return nil - }) + blockHandler := func(b *types.FullBlock) bool { + p.runtime.OnBlockInserted(b) + return false + } + if err := p.syncer.Sync(blockHandler); err != nil { + p.logger.Error("blocks synchronization failed", "error", err) + return err + } // start consensus runtime if err := p.startRuntime(); err != nil { diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 1a8b30afb2..bcabbb5598 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -1,7 +1,6 @@ package jsonrpc import ( - "context" "encoding/json" "fmt" "io" @@ -10,7 +9,6 @@ import ( "sync" "time" - "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/versioning" "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" @@ -128,13 +126,10 @@ func (j *JSONRPC) setupHTTP() error { ReadHeaderTimeout: 60 * time.Second, } - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { - if err := srv.Serve(lis); err != nil { - j.logger.Error("closed http connection", "err", err) - return err - } - return nil - }) + if err := srv.Serve(lis); err != nil { + j.logger.Error("closed http connection", "err", err) + return err + } return nil } diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 5c1b0204c8..a2fdd09ccb 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -80,6 +80,7 @@ func (e *EventTracker) Start(ctx context.Context) error { return fmt.Errorf("failed to start blocktracker: %w", err) } + // Run tracker tt, err := tracker.NewTracker(provider.Eth(), tracker.WithBatchSize(10), tracker.WithBlockTracker(blockTracker), From b7f825a07837ab09d9f4627ce351f0079aa4c3b7 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 16 May 2023 11:54:40 +1200 Subject: [PATCH 09/14] Revert "Only do sync concurrently in event_tracker.go" This reverts commit a758a1c5fe1924b0fb76419dded4f04f04ea13c3. --- tracker/event_tracker.go | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index a2fdd09ccb..3c9c341763 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -2,7 +2,6 @@ package tracker import ( "context" - "fmt" "time" "github.com/0xPolygon/polygon-edge/helper/common" @@ -61,24 +60,31 @@ func (e *EventTracker) Start(ctx context.Context) error { store, err := NewEventTrackerStore(e.dbPath, e.numBlockConfirmations, e.subscriber, e.logger) if err != nil { - return fmt.Errorf("failed to create event tracker store: %w", err) + return err } blockMaxBacklog := e.numBlockConfirmations*2 + 1 blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog)) + go func() { <-ctx.Done() blockTracker.Close() store.Close() }() - // Start block tracker - if err := blockTracker.Init(); err != nil { - return fmt.Errorf("failed to init blocktracker: %w", err) - } - if err := blockTracker.Start(); err != nil { - return fmt.Errorf("failed to start blocktracker: %w", err) - } + // Init and start block tracker concurrently, with infinite retries + go common.RetryForever(ctx, time.Second, func(context.Context) error { + if err := blockTracker.Init(); err != nil { + e.logger.Error("failed to init blocktracker", "error", err) + return err + } + + if err := blockTracker.Start(); err != nil { + e.logger.Error("failed to start blocktracker", "error", err) + return err + } + return nil + }) // Run tracker tt, err := tracker.NewTracker(provider.Eth(), @@ -94,10 +100,10 @@ func (e *EventTracker) Start(ctx context.Context) error { }), ) if err != nil { - return fmt.Errorf("failed to create tracker: %w", err) + return err } - // Sync concurrently, retry indefinitely - go common.RetryForever(ctx, time.Second, func(context.Context) error { + // Sync concurrently, with infinite retries + go common.RetryForever(ctx, time.Second, func(context.Context) error { //nolint: errcheck if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) return err From b855a0d7bee185ad0dc133fb1bf9e062f868959f Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 16 May 2023 12:36:46 +1200 Subject: [PATCH 10/14] Revert "RM some retries" This reverts commit b1ae839df523fff5ce13337d78616617a1d4e434. --- consensus/polybft/polybft.go | 20 ++++++++++++-------- jsonrpc/jsonrpc.go | 13 +++++++++---- tracker/event_tracker.go | 1 - 3 files changed, 21 insertions(+), 13 deletions(-) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 1a3952dec8..324e726d21 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -2,6 +2,7 @@ package polybft import ( + "context" "encoding/json" "fmt" "path/filepath" @@ -354,14 +355,17 @@ func (p *Polybft) Start() error { } // start syncing - blockHandler := func(b *types.FullBlock) bool { - p.runtime.OnBlockInserted(b) - return false - } - if err := p.syncer.Sync(blockHandler); err != nil { - p.logger.Error("blocks synchronization failed", "error", err) - return err - } + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + blockHandler := func(b *types.FullBlock) bool { + p.runtime.OnBlockInserted(b) + return false + } + if err := p.syncer.Sync(blockHandler); err != nil { + p.logger.Error("blocks synchronization failed", "error", err) + return err + } + return nil + }) // start consensus runtime if err := p.startRuntime(); err != nil { diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index bcabbb5598..1a8b30afb2 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -1,6 +1,7 @@ package jsonrpc import ( + "context" "encoding/json" "fmt" "io" @@ -9,6 +10,7 @@ import ( "sync" "time" + "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/versioning" "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" @@ -126,10 +128,13 @@ func (j *JSONRPC) setupHTTP() error { ReadHeaderTimeout: 60 * time.Second, } - if err := srv.Serve(lis); err != nil { - j.logger.Error("closed http connection", "err", err) - return err - } + go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + if err := srv.Serve(lis); err != nil { + j.logger.Error("closed http connection", "err", err) + return err + } + return nil + }) return nil } diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 3c9c341763..ad999d9a1d 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -86,7 +86,6 @@ func (e *EventTracker) Start(ctx context.Context) error { return nil }) - // Run tracker tt, err := tracker.NewTracker(provider.Eth(), tracker.WithBatchSize(10), tracker.WithBlockTracker(blockTracker), From 6dc28e07e90718b5633f4a0e6582dafa4bf486f0 Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 17 May 2023 06:25:27 +1200 Subject: [PATCH 11/14] RM retries --- jsonrpc/jsonrpc.go | 8 ++------ server/server.go | 17 ++++++----------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/jsonrpc/jsonrpc.go b/jsonrpc/jsonrpc.go index 1a8b30afb2..6df997eb58 100644 --- a/jsonrpc/jsonrpc.go +++ b/jsonrpc/jsonrpc.go @@ -1,7 +1,6 @@ package jsonrpc import ( - "context" "encoding/json" "fmt" "io" @@ -10,7 +9,6 @@ import ( "sync" "time" - "github.com/0xPolygon/polygon-edge/helper/common" "github.com/0xPolygon/polygon-edge/versioning" "github.com/gorilla/websocket" "github.com/hashicorp/go-hclog" @@ -128,13 +126,11 @@ func (j *JSONRPC) setupHTTP() error { ReadHeaderTimeout: 60 * time.Second, } - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + go func() { if err := srv.Serve(lis); err != nil { j.logger.Error("closed http connection", "err", err) - return err } - return nil - }) + }() return nil } diff --git a/server/server.go b/server/server.go index 3fe54ed53a..f6180f4fe2 100644 --- a/server/server.go +++ b/server/server.go @@ -918,13 +918,11 @@ func (s *Server) setupGRPC() error { } // Start server with infinite retries - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + go func() { if err := s.grpcServer.Serve(lis); err != nil { s.logger.Error(err.Error()) - return err } - return nil - }) + }() s.logger.Info("GRPC server running", "addr", s.config.GRPCAddr.String()) @@ -1000,16 +998,13 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { } s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) - go common.RetryForever(context.Background(), time.Second, func(context.Context) error { + go func() { if err := srv.ListenAndServe(); err != nil { - if errors.Is(err, http.ErrServerClosed) { - return nil + if !errors.Is(err, http.ErrServerClosed) { + s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) } - s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err) - return err } - return nil - }) + }() return srv } From 10b3199c88f18cf497156923e0dc082ac6f8b500 Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 17 May 2023 06:28:06 +1200 Subject: [PATCH 12/14] Lint --- consensus/polybft/polybft.go | 5 ++++- helper/common/common.go | 1 + helper/common/common_test.go | 4 ++++ server/server.go | 1 + tracker/event_tracker.go | 6 +++--- 5 files changed, 13 insertions(+), 4 deletions(-) diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 324e726d21..3993a07e7c 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -354,16 +354,19 @@ func (p *Polybft) Start() error { return fmt.Errorf("failed to start syncer. Error: %w", err) } - // start syncing + // sync concurrently, retrying indefinitely go common.RetryForever(context.Background(), time.Second, func(context.Context) error { blockHandler := func(b *types.FullBlock) bool { p.runtime.OnBlockInserted(b) + return false } if err := p.syncer.Sync(blockHandler); err != nil { p.logger.Error("blocks synchronization failed", "error", err) + return err } + return nil }) diff --git a/helper/common/common.go b/helper/common/common.go index 188fdd9c66..11139435d3 100644 --- a/helper/common/common.go +++ b/helper/common/common.go @@ -38,6 +38,7 @@ func RetryForever(ctx context.Context, interval time.Duration, fn func(context.C if err := fn(ctx); err != nil { return retry.RetryableError(err) } + return nil }) } diff --git a/helper/common/common_test.go b/helper/common/common_test.go index 3ca6b53e56..0d1799916a 100644 --- a/helper/common/common_test.go +++ b/helper/common/common_test.go @@ -104,10 +104,12 @@ func Test_Duration_Marshal_UnmarshalJSON(t *testing.T) { func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) { interval := time.Millisecond * 10 ended := false + go func() { RetryForever(context.Background(), interval, func(ctx context.Context) error { return errors.New("") }) + ended = true }() time.Sleep(interval * 10) @@ -119,9 +121,11 @@ func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) { RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error { select { case <-ctx.Done(): + return nil default: cancel() + return errors.New("") } }) diff --git a/server/server.go b/server/server.go index f6180f4fe2..3855e8b147 100644 --- a/server/server.go +++ b/server/server.go @@ -998,6 +998,7 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server { } s.logger.Info("Prometheus server started", "addr=", listenAddr.String()) + go func() { if err := srv.ListenAndServe(); err != nil { if !errors.Is(err, http.ErrServerClosed) { diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index ad999d9a1d..438239f965 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -72,7 +72,7 @@ func (e *EventTracker) Start(ctx context.Context) error { store.Close() }() - // Init and start block tracker concurrently, with infinite retries + // Init and start block tracker concurrently, retrying indefinitely go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := blockTracker.Init(); err != nil { e.logger.Error("failed to init blocktracker", "error", err) @@ -101,8 +101,8 @@ func (e *EventTracker) Start(ctx context.Context) error { if err != nil { return err } - // Sync concurrently, with infinite retries - go common.RetryForever(ctx, time.Second, func(context.Context) error { //nolint: errcheck + // Sync concurrently, retrying indefinitely + go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) return err From 80ba145622da0c234a231ac63e94ae839a8c7ea3 Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 17 May 2023 08:46:15 +1200 Subject: [PATCH 13/14] Add timeout to shouldend test --- helper/common/common_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/helper/common/common_test.go b/helper/common/common_test.go index 0d1799916a..6a7af9e4e4 100644 --- a/helper/common/common_test.go +++ b/helper/common/common_test.go @@ -117,7 +117,7 @@ func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) { } func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error { select { case <-ctx.Done(): From 29a0cbaccf15a63dbec5454a22c39e04120ffe1f Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 19 May 2023 21:40:50 +1200 Subject: [PATCH 14/14] Fix lint missed earlier --- tracker/event_tracker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 438239f965..2a2895c02e 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -76,13 +76,16 @@ func (e *EventTracker) Start(ctx context.Context) error { go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := blockTracker.Init(); err != nil { e.logger.Error("failed to init blocktracker", "error", err) + return err } if err := blockTracker.Start(); err != nil { e.logger.Error("failed to start blocktracker", "error", err) + return err } + return nil }) @@ -105,8 +108,10 @@ func (e *EventTracker) Start(ctx context.Context) error { go common.RetryForever(ctx, time.Second, func(context.Context) error { if err := tt.Sync(ctx); err != nil { e.logger.Error("failed to sync", "error", err) + return err } + return nil })