Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add common.RetryForever() and use for concurrent sync operations #1503

Merged
merged 14 commits into from
May 22, 2023
12 changes: 8 additions & 4 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package polybft

import (
"context"
"encoding/json"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -353,18 +354,21 @@ func (p *Polybft) Start() error {
return fmt.Errorf("failed to start syncer. Error: %w", err)
}

// start syncing
go func() {
// sync concurrently, retrying indefinitely
go common.RetryForever(context.Background(), time.Second, func(context.Context) error {
blockHandler := func(b *types.FullBlock) bool {
p.runtime.OnBlockInserted(b)

return false
}

if err := p.syncer.Sync(blockHandler); err != nil {
p.logger.Error("blocks synchronization failed", "error", err)

return err
}
}()

return nil
})

// start consensus runtime
if err := p.startRuntime(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ require (
github.com/dave/jennifer v1.6.1
github.com/quasilyte/go-ruleguard v0.3.19
github.com/quasilyte/go-ruleguard/dsl v0.3.22
github.com/sethvargo/go-retry v0.2.4
golang.org/x/sync v0.2.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
gopkg.in/DataDog/dd-trace-go.v1 v1.50.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOa
github.com/secure-systems-lab/go-securesystemslib v0.5.0 h1:oTiNu0QnulMQgN/hLK124wJD/r2f9ZhIUuKIeBsCBT8=
github.com/secure-systems-lab/go-securesystemslib v0.5.0/go.mod h1:uoCqUC0Ap7jrBSEanxT+SdACYJTVplRXWLkGMuDjXqk=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sethvargo/go-retry v0.2.4 h1:T+jHEQy/zKJf5s95UkguisicE0zuF9y7+/vgz08Ocec=
github.com/sethvargo/go-retry v0.2.4/go.mod h1:1afjQuvh7s4gflMObvjLPaWgluLLyhA1wmVZ6KLpICw=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=
github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0=
Expand Down
13 changes: 13 additions & 0 deletions helper/common/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
Expand All @@ -18,6 +19,7 @@ import (
"time"

"github.com/0xPolygon/polygon-edge/helper/hex"
"github.com/sethvargo/go-retry"
)

var (
Expand All @@ -30,6 +32,17 @@ var (
errInvalidDuration = errors.New("invalid duration")
)

// RetryForever will execute a function until it completes without error
func RetryForever(ctx context.Context, interval time.Duration, fn func(context.Context) error) {
sergerad marked this conversation as resolved.
Show resolved Hide resolved
_ = retry.Do(ctx, retry.NewConstant(interval), func(context.Context) error {
if err := fn(ctx); err != nil {
return retry.RetryableError(err)
}

return nil
})
}

// Min returns the strictly lower number
func Min(a, b uint64) uint64 {
if a < b {
Expand Down
34 changes: 34 additions & 0 deletions helper/common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package common

import (
"context"
"encoding/json"
"errors"
"math/big"
"testing"
"time"
Expand Down Expand Up @@ -98,3 +100,35 @@ func Test_Duration_Marshal_UnmarshalJSON(t *testing.T) {
require.Equal(t, origTimer, otherTimer)
})
}

func TestRetryForever_AlwaysReturnError_ShouldNeverEnd(t *testing.T) {
interval := time.Millisecond * 10
ended := false

go func() {
RetryForever(context.Background(), interval, func(ctx context.Context) error {
return errors.New("")
})

ended = true
}()
time.Sleep(interval * 10)
require.False(t, ended)
}

func TestRetryForever_ReturnNilAfterFirstRun_ShouldEnd(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
RetryForever(ctx, time.Millisecond*100, func(ctx context.Context) error {
select {
case <-ctx.Done():

return nil
default:
cancel()

return errors.New("")
}
})
<-ctx.Done()
require.True(t, errors.Is(ctx.Err(), context.Canceled))
}
11 changes: 7 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ func (s *Server) setupGRPC() error {
return err
}

// Start server with infinite retries
go func() {
if err := s.grpcServer.Serve(lis); err != nil {
s.logger.Error(err.Error())
Expand Down Expand Up @@ -996,11 +997,13 @@ func (s *Server) startPrometheusServer(listenAddr *net.TCPAddr) *http.Server {
ReadHeaderTimeout: 60 * time.Second,
}

go func() {
s.logger.Info("Prometheus server started", "addr=", listenAddr.String())
s.logger.Info("Prometheus server started", "addr=", listenAddr.String())

if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
vcastellm marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
go func() {
if err := srv.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
s.logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
}
}
}()

Expand Down
53 changes: 32 additions & 21 deletions tracker/event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package tracker

import (
"context"
"time"

"github.com/0xPolygon/polygon-edge/helper/common"
hcf "github.com/hashicorp/go-hclog"
"github.com/umbracle/ethgo"
"github.com/umbracle/ethgo/blocktracker"
Expand Down Expand Up @@ -64,6 +66,29 @@ func (e *EventTracker) Start(ctx context.Context) error {
blockMaxBacklog := e.numBlockConfirmations*2 + 1
blockTracker := blocktracker.NewBlockTracker(provider.Eth(), blocktracker.WithBlockMaxBacklog(blockMaxBacklog))

go func() {
<-ctx.Done()
blockTracker.Close()
store.Close()
}()

// Init and start block tracker concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
vcastellm marked this conversation as resolved.
Show resolved Hide resolved
if err := blockTracker.Init(); err != nil {
e.logger.Error("failed to init blocktracker", "error", err)

return err
}

if err := blockTracker.Start(); err != nil {
e.logger.Error("failed to start blocktracker", "error", err)

return err
}

return nil
})

tt, err := tracker.NewTracker(provider.Eth(),
tracker.WithBatchSize(10),
tracker.WithBlockTracker(blockTracker),
Expand All @@ -79,30 +104,16 @@ func (e *EventTracker) Start(ctx context.Context) error {
if err != nil {
return err
}

go func() {
if err := blockTracker.Init(); err != nil {
e.logger.Error("failed to init blocktracker", "error", err)

return
}

if err := blockTracker.Start(); err != nil {
e.logger.Error("failed to start blocktracker", "error", err)
}
}()

go func() {
<-ctx.Done()
blockTracker.Close()
store.Close()
}()

go func() {
// Sync concurrently, retrying indefinitely
go common.RetryForever(ctx, time.Second, func(context.Context) error {
if err := tt.Sync(ctx); err != nil {
e.logger.Error("failed to sync", "error", err)

return err
}
}()

return nil
})

return nil
}