Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle potentially corrupt data after bad shutdown #21

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {
// Create new sidecar instance
sidecar := sidecar.NewSidecar(&sidecar.SidecarConfig{
GenesisBlockNumber: cfg.GetGenesisBlockNumber(),
}, cfg, mds, p, l, client)
}, cfg, mds, p, sm, l, client)

// RPC channel to notify the RPC server to shutdown gracefully
rpcChannel := make(chan bool)
Expand Down
4 changes: 4 additions & 0 deletions internal/eigenState/avsOperators/avsOperators.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ func (a *AvsOperatorsModel) merkelizeState(blockNumber uint64, avsOperators []Re
)
}

func (a *AvsOperatorsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return a.BaseEigenState.DeleteState("registered_avs_operators", startBlockNumber, endBlockNumber, a.Db)
}

func encodeOperatorLeaf(operator string, registered bool) []byte {
return []byte(fmt.Sprintf("%s:%t", operator, registered))
}
Expand Down
30 changes: 30 additions & 0 deletions internal/eigenState/base/baseEigenState.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package base

import (
"database/sql"
"encoding/json"
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/parser"
"github.com/Layr-Labs/go-sidecar/internal/storage"
"go.uber.org/zap"
"gorm.io/gorm"
"slices"
"strings"
)
Expand Down Expand Up @@ -61,3 +63,31 @@ func (b *BaseEigenState) IsInterestingLog(contractsEvents map[string][]string, l
}
return false
}

func (b *BaseEigenState) DeleteState(tableName string, startBlockNumber uint64, endBlockNumber uint64, db *gorm.DB) error {
if endBlockNumber != 0 && endBlockNumber < startBlockNumber {
b.Logger.Sugar().Errorw("Invalid block range",
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return fmt.Errorf("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber")
}

// tokenizing the table name apparently doesnt work, so we need to use Sprintf to include it.
query := fmt.Sprintf(`
delete from %s
where block_number >= @startBlockNumber
`, tableName)
if endBlockNumber > 0 {
query += " and block_number <= @endBlockNumber"
}
res := db.Exec(query,
sql.Named("tableName", tableName),
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber))
if res.Error != nil {
b.Logger.Sugar().Errorw("Failed to delete state", zap.Error(res.Error))
return res.Error
}
return nil
}
4 changes: 4 additions & 0 deletions internal/eigenState/operatorShares/operatorShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,7 @@ func encodeStratTree(strategy string, operatorTreeRoot []byte) []byte {
strategyBytes := []byte(strategy)
return append(strategyBytes, operatorTreeRoot[:]...)
}

func (osm *OperatorSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return osm.BaseEigenState.DeleteState("operator_shares", startBlockNumber, endBlockNumber, osm.Db)
}
4 changes: 4 additions & 0 deletions internal/eigenState/stakerDelegations/stakerDelegations.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,7 @@ func encodeStakerLeaf(staker string, delegated bool) []byte {
func encodeOperatorLeaf(operator string, operatorStakersRoot []byte) []byte {
return append([]byte(operator), operatorStakersRoot[:]...)
}

func (s *StakerDelegationsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return s.BaseEigenState.DeleteState("delegated_stakers", startBlockNumber, endBlockNumber, s.Db)
}
4 changes: 4 additions & 0 deletions internal/eigenState/stakerShares/stakerShares.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,7 @@ func encodeStratTree(strategy string, stakerTreeRoot []byte) []byte {
strategyBytes := []byte(strategy)
return append(strategyBytes, stakerTreeRoot[:]...)
}

func (ss *StakerSharesModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error {
return ss.BaseEigenState.DeleteState("staker_shares", startBlockNumber, endBlockNumber, ss.Db)
}
28 changes: 28 additions & 0 deletions internal/eigenState/stateManager/stateManager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stateManager

import (
"errors"
"fmt"
"github.com/Layr-Labs/go-sidecar/internal/eigenState/types"
"github.com/Layr-Labs/go-sidecar/internal/storage"
Expand Down Expand Up @@ -169,3 +170,30 @@ func (e *EigenStateManager) GetSortedModelIndexes() []int {
slices.Sort(indexes)
return indexes
}

func (e *EigenStateManager) GetLatestStateRoot() (*StateRoot, error) {
root := &StateRoot{}
result := e.Db.Model(&StateRoot{}).Order("eth_block_number desc").First(&root)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, result.Error
}
return root, nil
}

// DeleteCorruptedState deletes state stored that may be incomplete or corrupted
//
// @param startBlock the block number to start deleting state from (inclusive)
// @param endBlock the block number to end deleting state from (inclusive). If 0, delete all state from startBlock
func (e *EigenStateManager) DeleteCorruptedState(startBlock uint64, endBlock uint64) error {
for _, index := range e.GetSortedModelIndexes() {
state := e.StateModels[index]
err := state.DeleteState(startBlock, endBlock)
if err != nil {
return err
}
}
return nil
}
7 changes: 7 additions & 0 deletions internal/eigenState/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type IEigenStateModel interface {
// ClearAccumulatedState
// Clear the accumulated state for the model to free up memory
ClearAccumulatedState(blockNumber uint64) error

// DeleteState used to delete state stored that may be incomplete or corrupted
// to allow for reprocessing of the state
//
// @param startBlockNumber the block number to start deleting state from (inclusive)
// @param endBlockNumber the block number to end deleting state from (inclusive). If 0, delete all state from startBlockNumber
DeleteState(startBlockNumber uint64, endBlockNumber uint64) error
}

// StateTransitions
Expand Down
33 changes: 32 additions & 1 deletion internal/sidecar/blockIndexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,46 @@ func (ct *currentTip) Set(tip uint64) {
}

func (s *Sidecar) IndexFromCurrentToTip(ctx context.Context) error {

latestBlock, err := s.GetLastIndexedBlock()
if err != nil {
return err
}

latestStateRoot, err := s.StateManager.GetLatestStateRoot()
if err != nil {
s.Logger.Sugar().Errorw("Failed to get latest state root", zap.Error(err))
return err
}

s.Logger.Sugar().Infow("Comparing latest block and latest state root",
zap.Int64("latestBlock", latestBlock),
zap.Uint64("latestStateRootBlock", latestStateRoot.EthBlockNumber),
)

if latestBlock == 0 {
s.Logger.Sugar().Infow("No blocks indexed, starting from genesis block", zap.Uint64("genesisBlock", s.Config.GenesisBlockNumber))
latestBlock = int64(s.Config.GenesisBlockNumber)
} else {
latestBlock += 1
// if the latest state root is behind the latest block, delete the corrupted state and set the
// latest block to the latest state root + 1
if latestStateRoot != nil && latestStateRoot.EthBlockNumber < uint64(latestBlock) {
s.Logger.Sugar().Infow("Latest state root is behind latest block, deleting corrupted state",
zap.Uint64("latestStateRoot", latestStateRoot.EthBlockNumber),
zap.Int64("latestBlock", latestBlock),
)
if err := s.StateManager.DeleteCorruptedState(latestStateRoot.EthBlockNumber+1, uint64(latestBlock)); err != nil {
s.Logger.Sugar().Errorw("Failed to delete corrupted state", zap.Error(err))
return err
}
if err := s.Storage.DeleteCorruptedState(uint64(latestStateRoot.EthBlockNumber+1), uint64(latestBlock)); err != nil {
s.Logger.Sugar().Errorw("Failed to delete corrupted state", zap.Error(err))
return err
}
} else {
// otherwise, start from the latest block + 1
latestBlock += 1
}
}

blockNumber, err := s.EthereumClient.GetBlockNumberUint64(ctx)
Expand Down
3 changes: 3 additions & 0 deletions internal/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Sidecar struct {
Storage storage.BlockStore
Pipeline *pipeline.Pipeline
EthereumClient *ethereum.Client
StateManager *stateManager.EigenStateManager
ShutdownChan chan bool
}

Expand All @@ -43,6 +44,7 @@ func NewSidecar(
gCfg *config.Config,
s storage.BlockStore,
p *pipeline.Pipeline,
em *stateManager.EigenStateManager,
l *zap.Logger,
ethClient *ethereum.Client,
) *Sidecar {
Expand All @@ -53,6 +55,7 @@ func NewSidecar(
Storage: s,
Pipeline: p,
EthereumClient: ethClient,
StateManager: em,
ShutdownChan: make(chan bool),
}
}
Expand Down
48 changes: 48 additions & 0 deletions internal/storage/sqlite/storage.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sqlite

import (
"database/sql"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -203,3 +204,50 @@ func (s *SqliteBlockStore) GetLatestActiveAvsOperators(blockNumber uint64, avsDi
}
return rows, nil
}

func (s *SqliteBlockStore) DeleteCorruptedState(startBlockNumber uint64, endBlockNumber uint64) error {
if endBlockNumber != 0 && endBlockNumber < startBlockNumber {
s.Logger.Sugar().Errorw("Invalid block range",
zap.Uint64("startBlockNumber", startBlockNumber),
zap.Uint64("endBlockNumber", endBlockNumber),
)
return fmt.Errorf("Invalid block range; endBlockNumber must be greater than or equal to startBlockNumber")
}

tablesWithBlockNumber := []string{
"transaction_logs",
"transactions",
}

for _, tableName := range tablesWithBlockNumber {
query := fmt.Sprintf(`
delete from %s
where block_number >= @startBlockNumber
`, tableName)
if endBlockNumber > 0 {
query += " and block_number <= @endBlockNumber"
}
res := s.Db.Exec(query,
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber),
)
if res.Error != nil {
return xerrors.Errorf("Failed to delete corrupted state from table '%s': %w", tableName, res.Error)
}
}
blocksQuery := `
delete from blocks
where number >= @startBlockNumber
`
if endBlockNumber > 0 {
blocksQuery += " and number <= @endBlockNumber"
}
res := s.Db.Exec(blocksQuery,
sql.Named("startBlockNumber", startBlockNumber),
sql.Named("endBlockNumber", endBlockNumber),
)
if res.Error != nil {
return xerrors.Errorf("Failed to delete corrupted state from table 'blocks': %w", res.Error)
}
return nil
}
6 changes: 6 additions & 0 deletions internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type BlockStore interface {

// Less generic functions
GetLatestActiveAvsOperators(blockNumber uint64, avsDirectoryAddress string) ([]*ActiveAvsOperator, error)

// DeleteCorruptedState deletes all the corrupted state from the database
//
// @param startBlockNumber: The block number from which to start (inclusive)
// @param endBlockNumber: The block number at which to end (inclusive). If 0, it will delete all the corrupted state from the startBlock
DeleteCorruptedState(startBlockNumber uint64, endBlockNumber uint64) error
}

// Tables
Expand Down
Loading