diff --git a/db/consolidation_requests.go b/db/consolidation_requests.go index 574c762c..3afc72ee 100644 --- a/db/consolidation_requests.go +++ b/db/consolidation_requests.go @@ -8,6 +8,60 @@ import ( "github.com/jmoiron/sqlx" ) +func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error { + var sql strings.Builder + fmt.Fprint(&sql, + EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ", + dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ", + }), + "(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)", + " VALUES ", + ) + argIdx := 0 + fieldCount := 12 + + args := make([]any, len(consolidationTxs)*fieldCount) + for i, consolidationTx := range consolidationTxs { + if i > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "(") + for f := 0; f < fieldCount; f++ { + if f > 0 { + fmt.Fprintf(&sql, ", ") + } + fmt.Fprintf(&sql, "$%v", argIdx+f+1) + + } + fmt.Fprintf(&sql, ")") + + args[argIdx+0] = consolidationTx.BlockNumber + args[argIdx+1] = consolidationTx.BlockIndex + args[argIdx+2] = consolidationTx.BlockTime + args[argIdx+3] = consolidationTx.BlockRoot + args[argIdx+4] = consolidationTx.ForkId + args[argIdx+5] = consolidationTx.SourceAddress + args[argIdx+6] = consolidationTx.SourcePubkey + args[argIdx+7] = consolidationTx.TargetPubkey + args[argIdx+8] = consolidationTx.TxHash + args[argIdx+9] = consolidationTx.TxSender + args[argIdx+10] = consolidationTx.TxTarget + args[argIdx+11] = consolidationTx.DequeueBlock + argIdx += fieldCount + } + fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id", + dbtypes.DBEngineSqlite: "", + })) + + _, err := tx.Exec(sql.String(), args...) + if err != nil { + return err + } + return nil +} + func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, tx *sqlx.Tx) error { var sql strings.Builder fmt.Fprint(&sql, diff --git a/db/forks.go b/db/forks.go index 0d21bc73..74cb3318 100644 --- a/db/forks.go +++ b/db/forks.go @@ -137,3 +137,17 @@ func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error return nil } + +func GetForkById(forkId uint64) *dbtypes.Fork { + var fork dbtypes.Fork + + err := ReaderDb.Get(&fork, `SELECT fork_id, base_slot, base_root, leaf_slot, leaf_root, parent_fork + FROM forks + WHERE fork_id = $1 + `, forkId) + if err != nil { + return nil + } + + return &fork +} diff --git a/db/schema/sqlite/20241006182734_pectra-updates3.sql b/db/schema/sqlite/20241006182734_pectra-updates3.sql new file mode 100644 index 00000000..c87d9ed4 --- /dev/null +++ b/db/schema/sqlite/20241006182734_pectra-updates3.sql @@ -0,0 +1,72 @@ +-- +goose Up +-- +goose StatementBegin + +CREATE TABLE IF NOT EXISTS "consolidation_request_txs" ( + block_number BIGINT NOT NULL, + block_index INT NOT NULL, + block_time BIGINT NOT NULL, + block_root BLOB NOT NULL, + fork_id BIGINT NOT NULL DEFAULT 0, + source_address BLOB NOT NULL, + source_pubkey BLOB NULL, + target_pubkey BLOB NULL, + tx_hash BLOB NULL, + tx_sender BLOB NOT NULL, + tx_target BLOB NOT NULL, + dequeue_block BIGINT NOT NULL, + CONSTRAINT consolidation_pkey PRIMARY KEY (block_root, block_index) +); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_block_number_idx" + ON "consolidation_request_txs" + ("block_number" ASC); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx" + ON "consolidation_request_txs" + ("source_address" ASC); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx" + ON "consolidation_request_txs" + ("fork_id" ASC); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx" + ON "consolidation_request_txs" + ("dequeue_block" ASC); + +CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" ( + block_number BIGINT NOT NULL, + block_index INT NOT NULL, + block_time BIGINT NOT NULL, + block_root BLOB NOT NULL, + fork_id BIGINT NOT NULL DEFAULT 0, + source_address BLOB NOT NULL, + validator_pubkey BLOB NOT NULL, + amount BIGINT NOT NULL, + tx_hash BLOB NULL, + tx_sender BLOB NOT NULL, + tx_target BLOB NOT NULL, + dequeue_block BIGINT NOT NULL, + CONSTRAINT withdrawal_request_txs_pkey PRIMARY KEY (block_root, block_index) +); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_block_number_idx" + ON "withdrawal_request_txs" + ("block_number" ASC); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx" + ON "withdrawal_request_txs" + ("source_address" ASC); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx" + ON "withdrawal_request_txs" + ("fork_id" ASC); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx" + ON "withdrawal_request_txs" + ("dequeue_block" ASC); + +-- +goose StatementEnd +-- +goose Down +-- +goose StatementBegin +SELECT 'NOT SUPPORTED'; +-- +goose StatementEnd diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index 5e015950..9ab9aacc 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -265,6 +265,21 @@ type ConsolidationRequest struct { TxHash []byte `db:"tx_hash"` } +type ConsolidationRequestTx struct { + BlockNumber uint64 `db:"block_number"` + BlockIndex uint64 `db:"block_index"` + BlockTime uint64 `db:"block_time"` + BlockRoot []byte `db:"block_root"` + ForkId uint64 `db:"fork_id"` + SourceAddress []byte `db:"source_address"` + SourcePubkey []byte `db:"source_pubkey"` + TargetPubkey []byte `db:"target_pubkey"` + TxHash []byte `db:"tx_hash"` + TxSender []byte `db:"tx_sender"` + TxTarget []byte `db:"tx_target"` + DequeueBlock uint64 `db:"dequeue_block"` +} + type WithdrawalRequest struct { SlotNumber uint64 `db:"slot_number"` SlotRoot []byte `db:"slot_root"` @@ -277,3 +292,18 @@ type WithdrawalRequest struct { Amount uint64 `db:"amount"` TxHash []byte `db:"tx_hash"` } + +type WithdrawalRequestTx struct { + BlockNumber uint64 `db:"block_number"` + BlockIndex uint64 `db:"block_index"` + BlockTime uint64 `db:"block_time"` + BlockRoot []byte `db:"block_root"` + ForkId uint64 `db:"fork_id"` + SourceAddress []byte `db:"source_address"` + ValidatorPubkey []byte `db:"validator_pubkey"` + Amount uint64 `db:"amount"` + TxHash []byte `db:"tx_hash"` + TxSender []byte `db:"tx_sender"` + TxTarget []byte `db:"tx_target"` + DequeueBlock uint64 `db:"dequeue_block"` +} diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index 347aa68a..b1325e14 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethpandaops/dora/db" "github.com/ethpandaops/dora/dbtypes" "github.com/jmoiron/sqlx" @@ -19,6 +20,7 @@ type forkCache struct { forkMap map[ForkKey]*Fork finalizedForkId ForkKey lastForkId ForkKey + parentIdCache *lru.Cache[ForkKey, ForkKey] forkProcessLock sync.Mutex } @@ -26,8 +28,9 @@ type forkCache struct { // newForkCache creates a new instance of the forkCache struct. func newForkCache(indexer *Indexer) *forkCache { return &forkCache{ - indexer: indexer, - forkMap: make(map[ForkKey]*Fork), + indexer: indexer, + forkMap: make(map[ForkKey]*Fork), + parentIdCache: lru.NewCache[ForkKey, ForkKey](1000), } } @@ -117,11 +120,23 @@ func (cache *forkCache) removeFork(forkId ForkKey) { // getParentForkIds returns the parent fork ids of the given fork. func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { parentForks := []ForkKey{forkId} - + parentForkId := forkId thisFork := cache.getForkById(forkId) - for thisFork != nil && thisFork.parentFork != 0 { - parentForks = append(parentForks, thisFork.parentFork) - thisFork = cache.getForkById(thisFork.parentFork) + + for parentForkId > 1 { + if thisFork != nil { + parentForkId = thisFork.parentFork + } else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached { + parentForkId = cachedParent + } else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil { + parentForkId = ForkKey(dbFork.ParentFork) + cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork)) + } else { + break + } + + thisFork = cache.getForkById(parentForkId) + parentForks = append(parentForks, parentForkId) } return parentForks @@ -200,6 +215,8 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo continue } + cache.parentIdCache.Add(fork.forkId, fork.parentFork) + delete(cache.forkMap, fork.forkId) } diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index 4db82f3d..b1d80793 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -113,6 +113,7 @@ func (cache *forkCache) processBlock(block *Block) error { newFork := &newForkInfo{ fork: fork, } + cache.parentIdCache.Add(fork.forkId, fork.parentFork) newForks = append(newForks, newFork) fmt.Fprintf(&logbuf, ", head1(%v): %v [%v]", fork.forkId, block.Slot, block.Root.String()) @@ -142,6 +143,7 @@ func (cache *forkCache) processBlock(block *Block) error { fork: otherFork, updateRoots: updatedRoots, } + cache.parentIdCache.Add(otherFork.forkId, otherFork.parentFork) newForks = append(newForks, newFork) if updatedFork != nil { @@ -185,6 +187,7 @@ func (cache *forkCache) processBlock(block *Block) error { fork: fork, updateRoots: updatedRoots, } + cache.parentIdCache.Add(fork.forkId, fork.parentFork) newForks = append(newForks, newFork) if updatedFork != nil { @@ -321,6 +324,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId { for _, fork := range forks { fork.parentFork = forkId + cache.parentIdCache.Add(fork.forkId, fork.parentFork) } updatedFork = &updateForkInfo{ diff --git a/indexer/beacon/indexer_getter.go b/indexer/beacon/indexer_getter.go index 7ae1ef2b..a4236ca5 100644 --- a/indexer/beacon/indexer_getter.go +++ b/indexer/beacon/indexer_getter.go @@ -236,3 +236,8 @@ func (indexer *Indexer) GetEpochStats(epoch phase0.Epoch, overrideForkId *ForkKe return bestEpochStats } + +// GetParentForkIds returns the parent fork ids of the given fork. +func (indexer *Indexer) GetParentForkIds(forkId ForkKey) []ForkKey { + return indexer.forkCache.getParentForkIds(forkId) +} diff --git a/indexer/execution/consolidation_indexer.go b/indexer/execution/consolidation_indexer.go new file mode 100644 index 00000000..53dd515f --- /dev/null +++ b/indexer/execution/consolidation_indexer.go @@ -0,0 +1,571 @@ +package execution + +import ( + "bytes" + "context" + "fmt" + "math" + "math/big" + "time" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + + "github.com/ethpandaops/dora/clients/execution" + "github.com/ethpandaops/dora/db" + "github.com/ethpandaops/dora/dbtypes" + "github.com/ethpandaops/dora/indexer/beacon" + "github.com/ethpandaops/dora/utils" +) + +const consolidationContractAddr = "0x" +const consolidationDequeueRate = 1 + +type ConsolidationIndexer struct { + indexer *IndexerCtx + logger logrus.FieldLogger + state *consolidationIndexerState + batchSize int + contractAddress common.Address + forkStates map[beacon.ForkKey]*consolidationIndexerForkState +} + +type consolidationIndexerState struct { + FinalBlock uint64 `json:"final_block"` + FinalQueueLen uint64 `json:"final_queue"` + ForkStates map[beacon.ForkKey]*consolidationIndexerForkState `json:"fork_states"` +} + +type consolidationIndexerForkState struct { + Block uint64 `json:"b"` + QueueLen uint64 `json:"q"` +} + +func NewConsolidationIndexer(indexer *IndexerCtx) *ConsolidationIndexer { + batchSize := utils.Config.ExecutionApi.DepositLogBatchSize + if batchSize == 0 { + batchSize = 1000 + } + + ci := &ConsolidationIndexer{ + indexer: indexer, + logger: indexer.logger.WithField("indexer", "deposit"), + batchSize: batchSize, + contractAddress: common.HexToAddress(consolidationContractAddr), + forkStates: map[beacon.ForkKey]*consolidationIndexerForkState{}, + } + + go ci.runConsolidationIndexerLoop() + + return ci +} + +func (ds *ConsolidationIndexer) runConsolidationIndexerLoop() { + defer utils.HandleSubroutinePanic("ConsolidationIndexer.runConsolidationIndexerLoop") + + for { + time.Sleep(60 * time.Second) + ds.logger.Debugf("run consolidation indexer logic") + + err := ds.runConsolidationIndexer() + if err != nil { + ds.logger.Errorf("consolidation indexer error: %v", err) + } + } +} + +func (ds *ConsolidationIndexer) runConsolidationIndexer() error { + // get indexer state + if ds.state == nil { + ds.loadState() + } + + finalizedEpoch, _ := ds.indexer.chainState.GetFinalizedCheckpoint() + if finalizedEpoch > 0 { + finalizedBlockNumber := ds.getFinalizedBlockNumber() + + if finalizedBlockNumber == 0 { + return fmt.Errorf("finalized block not found in cache or db") + } + + if finalizedBlockNumber < ds.state.FinalBlock { + return fmt.Errorf("finalized block number (%v) smaller than index state (%v)", finalizedBlockNumber, ds.state.FinalBlock) + } + + if finalizedBlockNumber > ds.state.FinalBlock { + err := ds.processFinalizedBlocks(finalizedBlockNumber) + if err != nil { + return err + } + } + } + + ds.processRecentBlocks() + + return nil +} + +func (ds *ConsolidationIndexer) loadState() { + syncState := consolidationIndexerState{} + db.GetExplorerState("indexer.consolidationstate", &syncState) + ds.state = &syncState +} + +func (ds *ConsolidationIndexer) getFinalizedBlockNumber() uint64 { + var finalizedBlockNumber uint64 + + _, finalizedRoot := ds.indexer.chainState.GetFinalizedCheckpoint() + if finalizedBlock := ds.indexer.beaconIndexer.GetBlockByRoot(finalizedRoot); finalizedBlock != nil { + if indexVals := finalizedBlock.GetBlockIndex(); indexVals != nil { + finalizedBlockNumber = indexVals.ExecutionNumber + } + } + + if finalizedBlockNumber == 0 { + // load from db + if finalizedBlock := db.GetSlotByRoot(finalizedRoot[:]); finalizedBlock != nil && finalizedBlock.EthBlockNumber != nil { + finalizedBlockNumber = *finalizedBlock.EthBlockNumber + } + } + + return finalizedBlockNumber +} + +func (ds *ConsolidationIndexer) loadFilteredLogs(ctx context.Context, client *execution.Client, query ethereum.FilterQuery) ([]types.Log, error) { + ctx, cancel := context.WithTimeout(ctx, 60*time.Second) + defer cancel() + + return client.GetRPCClient().GetEthClient().FilterLogs(ctx, query) +} + +func (ds *ConsolidationIndexer) loadTransactionByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Transaction, error) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + tx, _, err := client.GetRPCClient().GetEthClient().TransactionByHash(ctx, hash) + return tx, err +} + +func (ds *ConsolidationIndexer) loadHeaderByNumber(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + return client.GetRPCClient().GetHeaderByHash(ctx, hash) +} + +func (ds *ConsolidationIndexer) processFinalizedBlocks(finalizedBlockNumber uint64) error { + clients := ds.indexer.getFinalizedClients(execution.AnyClient) + if len(clients) == 0 { + return fmt.Errorf("no ready execution client found") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + retryCount := 0 + + for ds.state.FinalBlock < finalizedBlockNumber { + client := clients[retryCount%len(clients)] + + batchSize := uint64(ds.batchSize) + if retryCount > 0 { + batchSize /= uint64(math.Pow(2, float64(retryCount))) + if batchSize < 10 { + batchSize = 10 + } + } + + toBlock := ds.state.FinalBlock + uint64(ds.batchSize) + if toBlock > finalizedBlockNumber { + toBlock = finalizedBlockNumber + } + + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(0).SetUint64(ds.state.FinalBlock + 1), + ToBlock: big.NewInt(0).SetUint64(toBlock), + Addresses: []common.Address{ + ds.contractAddress, + }, + } + + logs, err := ds.loadFilteredLogs(ctx, client, query) + if err != nil { + if retryCount < 3 { + retryCount++ + continue + } + + return fmt.Errorf("error fetching consolidation contract logs: %v", err) + } + + retryCount = 0 + + var txHash []byte + var txDetails *types.Transaction + var txBlockHeader *types.Header + + requestTxs := []*dbtypes.ConsolidationRequestTx{} + queueBlock := ds.state.FinalBlock + queueLength := ds.state.FinalQueueLen + + ds.logger.Infof("received consolidation log for block %v - %v: %v events", ds.state.FinalBlock, toBlock, len(logs)) + + for idx := range logs { + log := &logs[idx] + + if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) { + txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash) + if err != nil { + return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err) + } + + txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockHash) + if err != nil { + return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err) + } + + txHash = log.TxHash[:] + } + + txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails) + if err != nil { + return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err) + } + txTo := *txDetails.To() + + requestTx := ds.parseRequestLog(log, &queueBlock, &queueLength) + if requestTx == nil { + continue + } + + requestTx.BlockTime = txBlockHeader.Time + requestTx.TxSender = txFrom[:] + requestTx.TxTarget = txTo[:] + + requestTxs = append(requestTxs, requestTx) + } + + if queueBlock < toBlock { + dequeuedRequests := (toBlock - queueBlock) * consolidationDequeueRate + if dequeuedRequests > queueLength { + queueLength = 0 + } else { + queueLength -= dequeuedRequests + } + + queueBlock = toBlock + } + + if len(requestTxs) > 0 { + ds.logger.Infof("crawled consolidation transactions for block %v - %v: %v consolidations", ds.state.FinalBlock, toBlock, len(requestTxs)) + } + + err = ds.persistFinalizedRequestTxs(toBlock, queueLength, requestTxs) + if err != nil { + return fmt.Errorf("could not persist consolidation txs: %v", err) + } + + // cooldown to avoid rate limiting from external archive nodes + time.Sleep(1 * time.Second) + } + return nil +} + +func (ds *ConsolidationIndexer) parseRequestLog(log *types.Log, queueBlock, queueLength *uint64) *dbtypes.ConsolidationRequestTx { + // data layout: + // 0-20: sender address (20 bytes) + // 20-68: source pubkey (48 bytes) + // 68-116: target pubkey (48 bytes) + + if len(log.Data) < 116 { + ds.logger.Warnf("invalid consolidation log data length: %v", len(log.Data)) + return nil + } + + senderAddr := log.Data[:20] + sourcePubkey := log.Data[20:68] + targetPubkey := log.Data[68:116] + + if *queueBlock > log.BlockNumber { + ds.logger.Warnf("consolidation request for block %v received after block %v", log.BlockNumber, queueBlock) + return nil + } else if *queueBlock < log.BlockNumber { + dequeuedRequests := (log.BlockNumber - *queueBlock) * consolidationDequeueRate + if dequeuedRequests > *queueLength { + *queueLength = 0 + } else { + *queueLength -= dequeuedRequests + } + + *queueBlock = log.BlockNumber + } + + dequeueBlock := log.BlockNumber + (*queueLength / consolidationDequeueRate) + *queueLength++ + + requestTx := &dbtypes.ConsolidationRequestTx{ + BlockNumber: log.BlockNumber, + BlockIndex: uint64(log.Index), + BlockRoot: log.BlockHash[:], + SourceAddress: senderAddr, + SourcePubkey: sourcePubkey, + TargetPubkey: targetPubkey, + TxHash: log.TxHash[:], + DequeueBlock: dequeueBlock, + } + + return requestTx +} + +func (ds *ConsolidationIndexer) processRecentBlocks() error { + headForks := ds.indexer.getForksWithClients(execution.AnyClient) + for _, headFork := range headForks { + err := ds.processRecentBlocksForFork(headFork) + if err != nil { + if headFork.canonical { + ds.logger.Errorf("could not process recent events from canonical fork %v: %v", headFork.forkId, err) + } else { + ds.logger.Warnf("could not process recent events from fork %v: %v", headFork.forkId, err) + } + } + } + return nil +} + +func (ds *ConsolidationIndexer) processRecentBlocksForFork(headFork *forkWithClients) error { + elHeadBlock := ds.indexer.beaconIndexer.GetCanonicalHead(&headFork.forkId) + if elHeadBlock == nil { + return fmt.Errorf("head block not found") + } + + elHeadBlockIndex := elHeadBlock.GetBlockIndex() + if elHeadBlockIndex == nil { + return fmt.Errorf("head block index not found") + } + + elHeadBlockNumber := elHeadBlockIndex.ExecutionNumber + if elHeadBlockNumber > 0 { + elHeadBlockNumber-- + } + + startBlockNumber := ds.state.FinalBlock + 1 + startQueueLen := ds.state.FinalQueueLen + + // get last processed block for this fork + if forkState := ds.forkStates[headFork.forkId]; forkState != nil && forkState.Block <= elHeadBlockNumber { + if forkState.Block == elHeadBlockNumber { + return nil // already processed + } + + startBlockNumber = forkState.Block + 1 + startQueueLen = forkState.QueueLen + } else { + for parentForkId := range ds.indexer.beaconIndexer.GetParentForkIds(headFork.forkId) { + if parentForkState := ds.forkStates[beacon.ForkKey(parentForkId)]; parentForkState != nil && parentForkState.Block <= elHeadBlockNumber { + startBlockNumber = parentForkState.Block + 1 + startQueueLen = parentForkState.QueueLen + } + } + } + + var resError error + var ctxCancel context.CancelFunc + defer func() { + if ctxCancel != nil { + ctxCancel() + } + }() + + queueBlock := startBlockNumber + + for startBlockNumber <= elHeadBlockNumber { + var toBlock uint64 + var logs []types.Log + var reqError error + var txHash []byte + var txDetails *types.Transaction + var txBlockHeader *types.Header + + requestTxs := []*dbtypes.ConsolidationRequestTx{} + + for retryCount := 0; retryCount < 3; retryCount++ { + client := headFork.clients[retryCount%len(headFork.clients)] + + batchSize := uint64(ds.batchSize) + if retryCount > 0 { + batchSize /= uint64(math.Pow(2, float64(retryCount))) + if batchSize < 10 { + batchSize = 10 + } + } + + toBlock = startBlockNumber + uint64(ds.batchSize) + if toBlock > elHeadBlockNumber { + toBlock = elHeadBlockNumber + } + + if ctxCancel != nil { + ctxCancel() + } + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Second) + ctxCancel = cancel + + query := ethereum.FilterQuery{ + FromBlock: big.NewInt(0).SetUint64(startBlockNumber), + ToBlock: big.NewInt(0).SetUint64(toBlock), + Addresses: []common.Address{ + ds.contractAddress, + }, + } + + logs, reqError = ds.loadFilteredLogs(ctx, client, query) + if reqError != nil { + ds.logger.Warnf("error fetching consolidation contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError) + continue + } + + for idx := range logs { + log := &logs[idx] + + if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) { + var err error + + txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash) + if err != nil { + return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err) + } + + txBlockHeader, err = ds.loadHeaderByNumber(ctx, client, log.BlockHash) + if err != nil { + return fmt.Errorf("could not load block details (%v): %v", log.TxHash, err) + } + + txHash = log.TxHash[:] + } + + txFrom, err := types.Sender(types.LatestSignerForChainID(txDetails.ChainId()), txDetails) + if err != nil { + return fmt.Errorf("could not decode tx sender (%v): %v", log.TxHash, err) + } + txTo := *txDetails.To() + + requestTx := ds.parseRequestLog(log, &queueBlock, &startQueueLen) + if requestTx == nil { + continue + } + + if clBlock := ds.indexer.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)); len(clBlock) > 0 { + requestTx.ForkId = uint64(clBlock[0].GetForkId()) + } else { + requestTx.ForkId = uint64(headFork.forkId) + } + + requestTx.BlockTime = txBlockHeader.Time + requestTx.TxSender = txFrom[:] + requestTx.TxTarget = txTo[:] + + requestTxs = append(requestTxs, requestTx) + } + + if queueBlock < toBlock { + dequeuedRequests := (toBlock - queueBlock) * consolidationDequeueRate + if dequeuedRequests > startQueueLen { + startQueueLen = 0 + } else { + startQueueLen -= dequeuedRequests + } + + queueBlock = toBlock + } + + if len(requestTxs) > 0 { + ds.logger.Infof("crawled recent consolidations for fork %v (%v-%v): %v deposits", headFork.forkId, startBlockNumber, toBlock, len(requestTxs)) + + err := ds.persistRecentRequestTxs(headFork.forkId, queueBlock, startQueueLen, requestTxs) + if err != nil { + return fmt.Errorf("could not persist deposit txs: %v", err) + } + + time.Sleep(1 * time.Second) + } + + break + } + + if reqError != nil { + return fmt.Errorf("error fetching consolidation contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError) + } + + startBlockNumber = toBlock + 1 + } + + return resError +} + +func (ds *ConsolidationIndexer) persistFinalizedRequestTxs(finalBlockNumber, finalQueueLen uint64, requests []*dbtypes.ConsolidationRequestTx) error { + return db.RunDBTransaction(func(tx *sqlx.Tx) error { + requestCount := len(requests) + for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 { + endIdx := requestIdx + 500 + if endIdx > requestCount { + endIdx = requestCount + } + + err := db.InsertConsolidationRequestTxs(requests[requestIdx:endIdx], tx) + if err != nil { + return fmt.Errorf("error while inserting consolidation txs: %v", err) + } + } + + ds.state.FinalBlock = finalBlockNumber + ds.state.FinalQueueLen = finalQueueLen + + return ds.persistState(tx) + }) +} + +func (ds *ConsolidationIndexer) persistRecentRequestTxs(forkId beacon.ForkKey, finalBlockNumber, finalQueueLen uint64, requests []*dbtypes.ConsolidationRequestTx) error { + return db.RunDBTransaction(func(tx *sqlx.Tx) error { + requestCount := len(requests) + for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 { + endIdx := requestIdx + 500 + if endIdx > requestCount { + endIdx = requestCount + } + + err := db.InsertConsolidationRequestTxs(requests[requestIdx:endIdx], tx) + if err != nil { + return fmt.Errorf("error while inserting consolidation txs: %v", err) + } + } + + ds.forkStates[forkId] = &consolidationIndexerForkState{ + Block: finalBlockNumber, + QueueLen: finalQueueLen, + } + + return ds.persistState(tx) + }) +} + +func (ds *ConsolidationIndexer) persistState(tx *sqlx.Tx) error { + finalizedBlockNumber := ds.getFinalizedBlockNumber() + for forkId, forkState := range ds.forkStates { + if forkState.Block < finalizedBlockNumber { + delete(ds.forkStates, forkId) + } + } + + ds.state.ForkStates = ds.forkStates + + err := db.SetExplorerState("indexer.consolidationstate", ds.state, tx) + if err != nil { + return fmt.Errorf("error while updating deposit state: %v", err) + } + + return nil +}