Skip to content

Commit

Permalink
Restart node instead of panicing and exiting (#218)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
Currently the node will only switch into blocksync mode if the whole
process is restarted, this change aims to add a detection go routine and
restart the node if it's behind configurable

## Testing performed to validate your change

Artificially introduced latency so that the node would fall behind, and
see that it self restarted node

![image](https://user-images.githubusercontent.com/18161326/232617966-22f628ba-a939-415d-a34a-e81bd218b6cd.png)


![image](https://user-images.githubusercontent.com/18161326/232618092-ce17acd5-24ff-45a0-8402-1d51f07fb4bb.png)
  • Loading branch information
BrandonWeng authored Apr 19, 2023
1 parent f0572fe commit 9523b30
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 39 deletions.
7 changes: 7 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,3 +1092,10 @@ func (app *BaseApp) startCompactionRoutine(db dbm.DB) {
}
}()
}

func (app *BaseApp) Close() error {
if err := app.appStore.db.Close(); err != nil {
return err
}
return app.snapshotManager.Close()
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.1
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tendermint/btcd v0.1.1
github.com/tendermint/crypto v0.0.0-20191022145703-50d29ede1e15
github.com/tendermint/go-amino v0.16.0
Expand Down Expand Up @@ -119,7 +120,6 @@ require (
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/zondax/hid v0.9.0 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opencensus.io v0.23.0 // indirect
Expand All @@ -145,7 +145,7 @@ replace (
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.7.0
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.1.181
github.com/tendermint/tendermint => github.com/sei-protocol/sei-tendermint v0.2.7

// latest grpc doesn't work with with our modified proto compiler, so we need to enforce
// the following version across all dependencies.
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -689,8 +689,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/sei-protocol/sei-iavl v0.1.2 h1:jEYkZv83DbTRapJtkT5gBFa2uEBwD4udw8AiYx0gcsI=
github.com/sei-protocol/sei-iavl v0.1.2/go.mod h1:7PfkEVT5dcoQE+s/9KWdoXJ8VVVP1QpYYPLdxlkSXFk=
github.com/sei-protocol/sei-tendermint v0.1.181 h1:ycC21MKAorOGFhdzN5HjZlA0qtIAX4iMlXTSErTLqtM=
github.com/sei-protocol/sei-tendermint v0.1.181/go.mod h1:+BtDvAwTkE64BlxzpH9ZP7S6vUYT9wRXiZa/WW8/o4g=
github.com/sei-protocol/sei-tendermint v0.2.7 h1:uVweGrk1pa3Dla/i6fv/mJY0rGd18tgK+q4dxT6LxzM=
github.com/sei-protocol/sei-tendermint v0.2.7/go.mod h1:+BtDvAwTkE64BlxzpH9ZP7S6vUYT9wRXiZa/WW8/o4g=
github.com/sei-protocol/sei-tm-db v0.0.5 h1:3WONKdSXEqdZZeLuWYfK5hP37TJpfaUa13vAyAlvaQY=
github.com/sei-protocol/sei-tm-db v0.0.5/go.mod h1:Cpa6rGyczgthq7/0pI31jys2Fw0Nfrc+/jKdP1prVqY=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
10 changes: 2 additions & 8 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,10 @@ func New(clientCtx client.Context, logger log.Logger) *Server {
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the Tendermint JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
func (s *Server) Start(cfg config.Config, apiMetrics *telemetry.Metrics) error {
s.mtx.Lock()
if cfg.Telemetry.Enabled {
m, err := telemetry.New(cfg.Telemetry)
if err != nil {
s.mtx.Unlock()
return err
}

s.metrics = m
s.metrics = apiMetrics
s.registerMetrics()
}

Expand Down
85 changes: 61 additions & 24 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
crgserver "github.com/cosmos/cosmos-sdk/server/rosetta/lib/server"
"github.com/cosmos/cosmos-sdk/server/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/telemetry"
)

const (
Expand Down Expand Up @@ -179,13 +180,41 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
}

// amino is needed here for backwards compatibility of REST routes
err = startInProcess(serverCtx, clientCtx, appCreator, tracerProviderOptions)
errCode, ok := err.(ErrorCode)
if !ok {
return err
exitCode := RestartErrorCode

serverCtx.Logger.Info("Creating node metrics provider")
nodeMetricsProvider := node.DefaultMetricsProvider(serverCtx.Config.Instrumentation)(clientCtx.ChainID)

config, _ := config.GetConfig(serverCtx.Viper)
apiMetrics, err := telemetry.New(config.Telemetry)
if err != nil {
return fmt.Errorf("failed to initialize telemetry: %w", err)
}

serverCtx.Logger.Debug(fmt.Sprintf("received quit signal: %d", errCode.Code))
restartCoolDownDuration := time.Second * time.Duration(serverCtx.Config.SelfRemediation.RestartCooldownSeconds)
// Set the first restart time to be now - restartCoolDownDuration so that the first restart can trigger whenever
canRestartAfter := time.Now().Add(-restartCoolDownDuration)
for {
err = startInProcess(
serverCtx,
clientCtx,
appCreator,
tracerProviderOptions,
nodeMetricsProvider,
apiMetrics,
canRestartAfter,
)
errCode, ok := err.(ErrorCode)
exitCode = errCode.Code
if !ok {
return err
}
if exitCode != RestartErrorCode {
break
}
serverCtx.Logger.Info("restarting node...")
canRestartAfter = time.Now().Add(restartCoolDownDuration)
}
return nil
},
}
Expand Down Expand Up @@ -270,15 +299,21 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
svr.Wait()
}()

var restartCh chan struct{}
if ctx.Config.P2P.SelfKillNoPeers {
restartCh = make(chan struct{})
}
restartCh := make(chan struct{})

// Wait for SIGINT or SIGTERM signal
return WaitForQuitSignals(restartCh)
return WaitForQuitSignals(restartCh, time.Now())
}

func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator, tracerProviderOptions []trace.TracerProviderOption) error {
func startInProcess(
ctx *Context,
clientCtx client.Context,
appCreator types.AppCreator,
tracerProviderOptions []trace.TracerProviderOption,
nodeMetricsProvider *node.NodeMetrics,
apiMetrics *telemetry.Metrics,
canRestartAfter time.Time,
) error {
cfg := ctx.Config
home := cfg.RootDir
goCtx, cancel := context.WithCancel(context.Background())
Expand All @@ -287,12 +322,11 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
return err
return fmt.Errorf("failed to create cpuProfile file %w", err)
}

ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile)
if err := pprof.StartCPUProfile(f); err != nil {
return err
return fmt.Errorf("failed to start CPU Profiler %w", err)
}

cpuProfileCleanup = func() {
Expand Down Expand Up @@ -330,9 +364,8 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
restartCh chan struct{}
gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly)
)
if ctx.Config.P2P.SelfKillNoPeers {
restartCh = make(chan struct{})
}

restartCh = make(chan struct{})

if gRPCOnly {
ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled")
Expand All @@ -347,12 +380,13 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
abciclient.NewLocalClient(ctx.Logger, app),
nil,
tracerProviderOptions,
nodeMetricsProvider,
)
if err != nil {
return err
return fmt.Errorf("error creating node: %w", err)
}
if err := tmNode.Start(goCtx); err != nil {
return err
return fmt.Errorf("error starting node: %w", err)
}
}

Expand All @@ -378,14 +412,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
errCh := make(chan error)

go func() {
if err := apiSrv.Start(config); err != nil {
if err := apiSrv.Start(config, apiMetrics); err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
return err
return fmt.Errorf("error starting api server: %w", err)

case <-time.After(types.ServerStartTime): // assume server started successfully
}
Expand Down Expand Up @@ -415,7 +449,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
// we do not need to start Rosetta or handle any Tendermint related processes.
if gRPCOnly {
// wait for signal capture and gracefully return
return WaitForQuitSignals(restartCh)
return WaitForQuitSignals(restartCh, canRestartAfter)
}

var rosettaSrv crgserver.Server
Expand Down Expand Up @@ -481,9 +515,12 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}
}

ctx.Logger.Info("exiting...")
ctx.Logger.Info("close any other open resource...")
if err := app.Close(); err != nil {
ctx.Logger.Error("error closing database", "err", err)
}
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals(restartCh)
return WaitForQuitSignals(restartCh, canRestartAfter)
}
3 changes: 3 additions & 0 deletions server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ type (

// CommitMultiStore Returns the multistore instance
CommitMultiStore() sdk.CommitMultiStore

// Close any open resources
Close() error
}

// AppCreator is a function that allows us to lazily initialize an
Expand Down
11 changes: 9 additions & 2 deletions server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import (
// a command's Context.
const ServerContextKey = sdk.ContextKey("server.context")

// Error code reserved for signalled
const RestartErrorCode = 100

// server context
type Context struct {
Viper *viper.Viper
Expand Down Expand Up @@ -382,7 +385,7 @@ func TrapSignal(cleanupFunc func()) {
}

// WaitForQuitSignals waits for SIGINT and SIGTERM and returns.
func WaitForQuitSignals(restartCh chan struct{}) ErrorCode {
func WaitForQuitSignals(restartCh chan struct{}, canRestartAfter time.Time) ErrorCode {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
if restartCh != nil {
Expand All @@ -391,7 +394,11 @@ func WaitForQuitSignals(restartCh chan struct{}) ErrorCode {
case sig := <-sigs:
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
case <-restartCh:
return ErrorCode{Code: 1}
// If it's in the restart cooldown period
if time.Now().Before(canRestartAfter) {
continue
}
return ErrorCode{Code: RestartErrorCode}
}
}
} else {
Expand Down
78 changes: 78 additions & 0 deletions server/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"errors"
"fmt"
"os"
"os/signal"
"path"
"path/filepath"
"strings"
"syscall"
"testing"
"time"

"github.com/spf13/cobra"
"go.opentelemetry.io/otel/sdk/trace"
Expand Down Expand Up @@ -401,3 +404,78 @@ func TestInterceptConfigsWithBadPermissions(t *testing.T) {
t.Fatalf("Failed to catch permissions error, got: [%T] %v", err, err)
}
}

func TestWaitForQuitSignals(t *testing.T) {
t.Run("WithRestartChannelAndCanRestartAfterNotReached", func(t *testing.T) {
restartCh := make(chan struct{})
go func() {
time.Sleep(100 * time.Millisecond)
restartCh <- struct{}{}
}()

go func() {
time.Sleep(200 * time.Millisecond)
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

errCode := server.WaitForQuitSignals(
restartCh,
time.Now().Add(500*time.Millisecond),
)
expectedCode := int(syscall.SIGTERM) + 128
if errCode.Code != expectedCode {
t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code)
}
})

t.Run("WithRestartChannelAndCanRestartAfterReached", func(t *testing.T) {
restartCh := make(chan struct{})
go func() {
time.Sleep(100 * time.Millisecond)
restartCh <- struct{}{}
}()

errCode := server.WaitForQuitSignals(
restartCh,
time.Now().Add(-100*time.Millisecond),
)
if errCode.Code != server.RestartErrorCode {
t.Errorf("Expected error code %d, got %d", server.RestartErrorCode, errCode.Code)
}
})

t.Run("WithSIGINT", func(t *testing.T) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT)

go func() {
time.Sleep(100 * time.Millisecond)
syscall.Kill(syscall.Getpid(), syscall.SIGINT)
}()

errCode := server.WaitForQuitSignals(
make(chan struct{}),
time.Now(),
)
expectedCode := int(syscall.SIGINT) + 128
if errCode.Code != expectedCode {
t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code)
}
})

t.Run("WithSIGTERM", func(t *testing.T) {
go func() {
time.Sleep(100 * time.Millisecond)
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

errCode := server.WaitForQuitSignals(
make(chan struct{}),
time.Now(),
)
expectedCode := int(syscall.SIGTERM) + 128
if errCode.Code != expectedCode {
t.Errorf("Expected error code %d, got %d", expectedCode, errCode.Code)
}
})
}
4 changes: 4 additions & 0 deletions snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func NewManagerWithExtensions(store *Store, multistore types.Snapshotter, extens
}
}

func (m *Manager) Close() error {
return m.store.db.Close()
}

// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
for _, extension := range extensions {
Expand Down
Loading

0 comments on commit 9523b30

Please sign in to comment.