Skip to content

Commit

Permalink
implement consolidation request log crawler
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Oct 8, 2024
1 parent 394f0ef commit 26b9d3e
Show file tree
Hide file tree
Showing 8 changed files with 773 additions and 6 deletions.
54 changes: 54 additions & 0 deletions db/consolidation_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,60 @@ import (
"github.com/jmoiron/sqlx"
)

func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12

args := make([]any, len(consolidationTxs)*fieldCount)
for i, consolidationTx := range consolidationTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = consolidationTx.BlockNumber
args[argIdx+1] = consolidationTx.BlockIndex
args[argIdx+2] = consolidationTx.BlockTime
args[argIdx+3] = consolidationTx.BlockRoot
args[argIdx+4] = consolidationTx.ForkId
args[argIdx+5] = consolidationTx.SourceAddress
args[argIdx+6] = consolidationTx.SourcePubkey
args[argIdx+7] = consolidationTx.TargetPubkey
args[argIdx+8] = consolidationTx.TxHash
args[argIdx+9] = consolidationTx.TxSender
args[argIdx+10] = consolidationTx.TxTarget
args[argIdx+11] = consolidationTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
Expand Down
14 changes: 14 additions & 0 deletions db/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,17 @@ func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error

return nil
}

func GetForkById(forkId uint64) *dbtypes.Fork {
var fork dbtypes.Fork

err := ReaderDb.Get(&fork, `SELECT fork_id, base_slot, base_root, leaf_slot, leaf_root, parent_fork
FROM forks
WHERE fork_id = $1
`, forkId)
if err != nil {
return nil
}

return &fork
}
72 changes: 72 additions & 0 deletions db/schema/sqlite/20241006182734_pectra-updates3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- +goose Up
-- +goose StatementBegin

CREATE TABLE IF NOT EXISTS "consolidation_request_txs" (
block_number BIGINT NOT NULL,
block_index INT NOT NULL,
block_time BIGINT NOT NULL,
block_root BLOB NOT NULL,
fork_id BIGINT NOT NULL DEFAULT 0,
source_address BLOB NOT NULL,
source_pubkey BLOB NULL,
target_pubkey BLOB NULL,
tx_hash BLOB NULL,
tx_sender BLOB NOT NULL,
tx_target BLOB NOT NULL,
dequeue_block BIGINT NOT NULL,
CONSTRAINT consolidation_pkey PRIMARY KEY (block_root, block_index)
);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_block_number_idx"
ON "consolidation_request_txs"
("block_number" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx"
ON "consolidation_request_txs"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx"
ON "consolidation_request_txs"
("fork_id" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx"
ON "consolidation_request_txs"
("dequeue_block" ASC);

CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" (
block_number BIGINT NOT NULL,
block_index INT NOT NULL,
block_time BIGINT NOT NULL,
block_root BLOB NOT NULL,
fork_id BIGINT NOT NULL DEFAULT 0,
source_address BLOB NOT NULL,
validator_pubkey BLOB NOT NULL,
amount BIGINT NOT NULL,
tx_hash BLOB NULL,
tx_sender BLOB NOT NULL,
tx_target BLOB NOT NULL,
dequeue_block BIGINT NOT NULL,
CONSTRAINT withdrawal_request_txs_pkey PRIMARY KEY (block_root, block_index)
);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_block_number_idx"
ON "withdrawal_request_txs"
("block_number" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx"
ON "withdrawal_request_txs"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx"
ON "withdrawal_request_txs"
("fork_id" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx"
ON "withdrawal_request_txs"
("dequeue_block" ASC);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
SELECT 'NOT SUPPORTED';
-- +goose StatementEnd
30 changes: 30 additions & 0 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,21 @@ type ConsolidationRequest struct {
TxHash []byte `db:"tx_hash"`
}

type ConsolidationRequestTx struct {
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourcePubkey []byte `db:"source_pubkey"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
}

type WithdrawalRequest struct {
SlotNumber uint64 `db:"slot_number"`
SlotRoot []byte `db:"slot_root"`
Expand All @@ -277,3 +292,18 @@ type WithdrawalRequest struct {
Amount uint64 `db:"amount"`
TxHash []byte `db:"tx_hash"`
}

type WithdrawalRequestTx struct {
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
ValidatorPubkey []byte `db:"validator_pubkey"`
Amount uint64 `db:"amount"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
}
29 changes: 23 additions & 6 deletions indexer/beacon/forkcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
Expand All @@ -19,15 +20,17 @@ type forkCache struct {
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
parentIdCache *lru.Cache[ForkKey, ForkKey]

forkProcessLock sync.Mutex
}

// newForkCache creates a new instance of the forkCache struct.
func newForkCache(indexer *Indexer) *forkCache {
return &forkCache{
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
}
}

Expand Down Expand Up @@ -117,11 +120,23 @@ func (cache *forkCache) removeFork(forkId ForkKey) {
// getParentForkIds returns the parent fork ids of the given fork.
func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey {
parentForks := []ForkKey{forkId}

parentForkId := forkId
thisFork := cache.getForkById(forkId)
for thisFork != nil && thisFork.parentFork != 0 {
parentForks = append(parentForks, thisFork.parentFork)
thisFork = cache.getForkById(thisFork.parentFork)

for parentForkId > 1 {
if thisFork != nil {
parentForkId = thisFork.parentFork
} else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
parentForkId = cachedParent
} else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil {
parentForkId = ForkKey(dbFork.ParentFork)
cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork))
} else {
break
}

thisFork = cache.getForkById(parentForkId)
parentForks = append(parentForks, parentForkId)
}

return parentForks
Expand Down Expand Up @@ -200,6 +215,8 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo
continue
}

cache.parentIdCache.Add(fork.forkId, fork.parentFork)

delete(cache.forkMap, fork.forkId)
}

Expand Down
4 changes: 4 additions & 0 deletions indexer/beacon/forkdetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (cache *forkCache) processBlock(block *Block) error {
newFork := &newForkInfo{
fork: fork,
}
cache.parentIdCache.Add(fork.forkId, fork.parentFork)
newForks = append(newForks, newFork)

fmt.Fprintf(&logbuf, ", head1(%v): %v [%v]", fork.forkId, block.Slot, block.Root.String())
Expand Down Expand Up @@ -142,6 +143,7 @@ func (cache *forkCache) processBlock(block *Block) error {
fork: otherFork,
updateRoots: updatedRoots,
}
cache.parentIdCache.Add(otherFork.forkId, otherFork.parentFork)
newForks = append(newForks, newFork)

if updatedFork != nil {
Expand Down Expand Up @@ -185,6 +187,7 @@ func (cache *forkCache) processBlock(block *Block) error {
fork: fork,
updateRoots: updatedRoots,
}
cache.parentIdCache.Add(fork.forkId, fork.parentFork)
newForks = append(newForks, newFork)

if updatedFork != nil {
Expand Down Expand Up @@ -321,6 +324,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip
if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId {
for _, fork := range forks {
fork.parentFork = forkId
cache.parentIdCache.Add(fork.forkId, fork.parentFork)
}

updatedFork = &updateForkInfo{
Expand Down
5 changes: 5 additions & 0 deletions indexer/beacon/indexer_getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,8 @@ func (indexer *Indexer) GetEpochStats(epoch phase0.Epoch, overrideForkId *ForkKe

return bestEpochStats
}

// GetParentForkIds returns the parent fork ids of the given fork.
func (indexer *Indexer) GetParentForkIds(forkId ForkKey) []ForkKey {
return indexer.forkCache.getParentForkIds(forkId)
}
Loading

0 comments on commit 26b9d3e

Please sign in to comment.