Skip to content

Commit

Permalink
start consolidation & withdrawal indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Oct 8, 2024
1 parent 7fb76b1 commit 8c48ada
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
17 changes: 17 additions & 0 deletions db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,23 @@ func GetHighestRootBeforeSlot(slot uint64, withOrphaned bool) []byte {
return result
}

func GetFirstRootAfterSlot(slot uint64, withOrphaned bool) []byte {
var result []byte
statusFilter := ""
if !withOrphaned {
statusFilter = "AND status != 2"
}

err := ReaderDb.Get(&result, `
SELECT root FROM slots WHERE slot >= $1 `+statusFilter+` AND status != 0 ORDER BY slot ASC LIMIT 1
`, slot)
if err != nil {
logger.Errorf("Error while fetching first root after %v: %v", slot, err)
return nil
}
return result
}

func GetSlotAssignment(slot uint64) uint64 {
proposer := uint64(math.MaxInt64)
err := ReaderDb.Get(&proposer, `
Expand Down
18 changes: 17 additions & 1 deletion indexer/execution/consolidation_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (ds *ConsolidationIndexer) runConsolidationIndexerLoop() {
defer utils.HandleSubroutinePanic("ConsolidationIndexer.runConsolidationIndexerLoop")

for {
time.Sleep(60 * time.Second)
time.Sleep(30 * time.Second)
ds.logger.Debugf("run consolidation indexer logic")

err := ds.runConsolidationIndexer()
Expand All @@ -84,6 +84,22 @@ func (ds *ConsolidationIndexer) runConsolidationIndexer() error {
ds.loadState()
}

specs := ds.indexer.chainState.GetSpecs()
if ds.indexer.chainState.CurrentEpoch() < phase0.Epoch(*specs.ElectraForkEpoch) {
// skip consolidation indexer before Electra fork
return nil
}

if ds.state.FinalBlock == 0 {
// start from electra fork block
electraSlot := ds.indexer.chainState.EpochToSlot(phase0.Epoch(*specs.ElectraForkEpoch))
dbSlotRoot := db.GetFirstRootAfterSlot(uint64(electraSlot), false)
if dbSlotRoot == nil {
dbSlot := db.GetSlotByRoot(dbSlotRoot)
ds.state.FinalBlock = *dbSlot.EthBlockNumber
}
}

finalizedEpoch, _ := ds.indexer.chainState.GetFinalizedCheckpoint()
if finalizedEpoch > 0 {
finalizedBlockNumber := ds.getFinalizedBlockNumber()
Expand Down
18 changes: 17 additions & 1 deletion indexer/execution/withdrawal_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (ds *WithdrawalIndexer) runWithdrawalIndexerLoop() {
defer utils.HandleSubroutinePanic("WithdrawalIndexer.runWithdrawalIndexerLoop")

for {
time.Sleep(60 * time.Second)
time.Sleep(30 * time.Second)
ds.logger.Debugf("run withdrawal indexer logic")

err := ds.runWithdrawalIndexer()
Expand All @@ -84,6 +84,22 @@ func (ds *WithdrawalIndexer) runWithdrawalIndexer() error {
ds.loadState()
}

specs := ds.indexer.chainState.GetSpecs()
if ds.indexer.chainState.CurrentEpoch() < phase0.Epoch(*specs.ElectraForkEpoch) {
// skip consolidation indexer before Electra fork
return nil
}

if ds.state.FinalBlock == 0 {
// start from electra fork block
electraSlot := ds.indexer.chainState.EpochToSlot(phase0.Epoch(*specs.ElectraForkEpoch))
dbSlotRoot := db.GetFirstRootAfterSlot(uint64(electraSlot), false)
if dbSlotRoot == nil {
dbSlot := db.GetSlotByRoot(dbSlotRoot)
ds.state.FinalBlock = *dbSlot.EthBlockNumber
}
}

finalizedEpoch, _ := ds.indexer.chainState.GetFinalizedCheckpoint()
if finalizedEpoch > 0 {
finalizedBlockNumber := ds.getFinalizedBlockNumber()
Expand Down
2 changes: 2 additions & 0 deletions services/chainservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func (cs *ChainService) StartService() error {

// add execution indexers
execindexer.NewDepositIndexer(executionIndexerCtx)
execindexer.NewConsolidationIndexer(executionIndexerCtx)
execindexer.NewWithdrawalIndexer(executionIndexerCtx)

// start MEV relay indexer
cs.mevRelayIndexer.StartUpdater()
Expand Down

0 comments on commit 8c48ada

Please sign in to comment.