Skip to content

Commit

Permalink
add consolidation & withdrawal matchers
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Oct 8, 2024
1 parent 8c48ada commit dfe4b80
Show file tree
Hide file tree
Showing 19 changed files with 785 additions and 149 deletions.
1 change: 1 addition & 0 deletions config/default.config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ executionapi:
url: "http://127.0.0.1:8545"

depositLogBatchSize: 1000
electraDeployBlock: 0 # el block number from where to crawl the electra system contracts (should be <=, but close to electra fork activation block)

# indexer keeps track of the latest epochs in memory.
indexer:
Expand Down
79 changes: 79 additions & 0 deletions db/consolidation_request_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package db

import (
"fmt"
"strings"

"github.com/ethpandaops/dora/dbtypes"
"github.com/jmoiron/sqlx"
)

func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12

args := make([]any, len(consolidationTxs)*fieldCount)
for i, consolidationTx := range consolidationTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = consolidationTx.BlockNumber
args[argIdx+1] = consolidationTx.BlockIndex
args[argIdx+2] = consolidationTx.BlockTime
args[argIdx+3] = consolidationTx.BlockRoot
args[argIdx+4] = consolidationTx.ForkId
args[argIdx+5] = consolidationTx.SourceAddress
args[argIdx+6] = consolidationTx.SourcePubkey
args[argIdx+7] = consolidationTx.TargetPubkey
args[argIdx+8] = consolidationTx.TxHash
args[argIdx+9] = consolidationTx.TxSender
args[argIdx+10] = consolidationTx.TxTarget
args[argIdx+11] = consolidationTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func GetConsolidationRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.ConsolidationRequestTx {
consolidationTxs := []*dbtypes.ConsolidationRequestTx{}

err := ReaderDb.Select(&consolidationTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block
FROM consolidation_request_txs
WHERE dequeue_block >= $1 AND dequeue_block <= $2
ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
`, dequeueFirst, dequeueLast)
if err != nil {
logger.Errorf("Error while fetching consolidation request txs: %v", err)
return nil
}

return consolidationTxs
}
89 changes: 31 additions & 58 deletions db/consolidation_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,18 @@ import (
"github.com/jmoiron/sqlx"
)

func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequestTx, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12

args := make([]any, len(consolidationTxs)*fieldCount)
for i, consolidationTx := range consolidationTxs {
if i > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "(")
for f := 0; f < fieldCount; f++ {
if f > 0 {
fmt.Fprintf(&sql, ", ")
}
fmt.Fprintf(&sql, "$%v", argIdx+f+1)

}
fmt.Fprintf(&sql, ")")

args[argIdx+0] = consolidationTx.BlockNumber
args[argIdx+1] = consolidationTx.BlockIndex
args[argIdx+2] = consolidationTx.BlockTime
args[argIdx+3] = consolidationTx.BlockRoot
args[argIdx+4] = consolidationTx.ForkId
args[argIdx+5] = consolidationTx.SourceAddress
args[argIdx+6] = consolidationTx.SourcePubkey
args[argIdx+7] = consolidationTx.TargetPubkey
args[argIdx+8] = consolidationTx.TxHash
args[argIdx+9] = consolidationTx.TxSender
args[argIdx+10] = consolidationTx.TxTarget
args[argIdx+11] = consolidationTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

_, err := tx.Exec(sql.String(), args...)
if err != nil {
return err
}
return nil
}

func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest, tx *sqlx.Tx) error {
var sql strings.Builder
fmt.Fprint(&sql,
EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_requests ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_requests ",
}),
"(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash)",
"(slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number)",
" VALUES ",
)
argIdx := 0
fieldCount := 11
fieldCount := 12

args := make([]interface{}, len(consolidations)*fieldCount)
for i, consolidation := range consolidations {
Expand All @@ -100,6 +46,7 @@ func InsertConsolidationRequests(consolidations []*dbtypes.ConsolidationRequest,
args[argIdx+8] = consolidation.TargetIndex
args[argIdx+9] = consolidation.TargetPubkey[:]
args[argIdx+10] = consolidation.TxHash[:]
args[argIdx+11] = consolidation.BlockNumber
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
Expand All @@ -119,7 +66,7 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash
slot_number, slot_root, slot_index, orphaned, fork_id, source_address, source_index, source_pubkey, target_index, target_pubkey, tx_hash, block_number
FROM consolidation_requests
`)

Expand Down Expand Up @@ -212,7 +159,8 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc
null AS source_pubkey,
0 AS target_index,
null AS target_pubkey,
null AS tx_hash
null AS tx_hash,
0 AS block_number
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
Expand All @@ -235,3 +183,28 @@ func GetConsolidationRequestsFiltered(offset uint64, limit uint32, finalizedBloc

return consolidationRequests[1:], consolidationRequests[0].SlotNumber, nil
}

func GetConsolidationRequestsByElBlockRange(firstSlot uint64, lastSlot uint64) []*dbtypes.ConsolidationRequest {
consolidationRequests := []*dbtypes.ConsolidationRequest{}

err := ReaderDb.Select(&consolidationRequests, `
SELECT consolidation_requests.*
FROM consolidation_requests
WHERE block_number >= $1 AND block_number <= $2
ORDER BY block_number ASC, slot_index ASC
`, firstSlot, lastSlot)
if err != nil {
logger.Errorf("Error while fetching consolidation requests: %v", err)
return nil
}

return consolidationRequests
}

func UpdateConsolidationRequestTxHash(slotRoot []byte, slotIndex uint64, txHash []byte, tx *sqlx.Tx) error {
_, err := tx.Exec(`UPDATE consolidation_requests SET tx_hash = $1 WHERE slot_root = $2 AND slot_index = $3`, txHash, slotRoot, slotIndex)
if err != nil {
return err
}
return nil
}
4 changes: 2 additions & 2 deletions db/schema/pgsql/20240805095505_pectra-updates2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
DROP TABLE IF EXISTS public."consolidations";

CREATE TABLE IF NOT EXISTS public."consolidation_requests" (
slot_number INT NOT NULL,
slot_number BIGINT NOT NULL,
slot_root bytea NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
Expand Down Expand Up @@ -42,7 +42,7 @@ CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
DROP TABLE IF EXISTS public."el_requests";

CREATE TABLE IF NOT EXISTS public."withdrawal_requests" (
slot_number INT NOT NULL,
slot_number BIGINT NOT NULL,
slot_root bytea NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
Expand Down
31 changes: 31 additions & 0 deletions db/schema/pgsql/20241006182734_pectra-updates3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx"
ON public."consolidation_request_txs"
("dequeue_block" ASC NULLS FIRST);

-- add block_number to consolidation_requests
ALTER TABLE public."consolidation_requests"
ADD "block_number" BIGINT NOT NULL DEFAULT 0;

UPDATE public."consolidation_requests"
SET "block_number" = (
SELECT eth_block_number
FROM public."slots"
WHERE public."slots".root = public."consolidation_requests".slot_root
);

CREATE INDEX IF NOT EXISTS "consolidation_requests_block_number_idx"
ON public."consolidation_requests"
("block_number" ASC NULLS FIRST);

CREATE TABLE IF NOT EXISTS public."withdrawal_request_txs" (
block_number BIGINT NOT NULL,
block_index INT NOT NULL,
Expand Down Expand Up @@ -65,6 +80,22 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx"
ON public."withdrawal_request_txs"
("dequeue_block" ASC NULLS FIRST);

-- add block_number to withdrawal_requests
ALTER TABLE public."withdrawal_requests"
ADD "block_number" BIGINT NOT NULL DEFAULT 0;

UPDATE public."withdrawal_requests"
SET "block_number" = (
SELECT eth_block_number
FROM public."slots"
WHERE public."slots".root = public."withdrawal_requests".slot_root
);

CREATE INDEX IF NOT EXISTS "withdrawal_requests_block_number_idx"
ON public."withdrawal_requests"
("block_number" ASC NULLS FIRST);


-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
Expand Down
4 changes: 2 additions & 2 deletions db/schema/sqlite/20240805095505_pectra-updates2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
DROP TABLE IF EXISTS "consolidations";

CREATE TABLE IF NOT EXISTS "consolidation_requests" (
slot_number INT NOT NULL,
slot_number BIGINT NOT NULL,
slot_root BLOB NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
Expand Down Expand Up @@ -42,7 +42,7 @@ CREATE INDEX IF NOT EXISTS "consolidation_requests_fork_idx"
DROP TABLE IF EXISTS "el_requests";

CREATE TABLE IF NOT EXISTS "withdrawal_requests" (
slot_number INT NOT NULL,
slot_number BIGINT NOT NULL,
slot_root BLOB NOT NULL,
slot_index INT NOT NULL,
orphaned bool NOT NULL DEFAULT FALSE,
Expand Down
30 changes: 30 additions & 0 deletions db/schema/sqlite/20241006182734_pectra-updates3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,21 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_dequeue_block_idx"
ON "consolidation_request_txs"
("dequeue_block" ASC);

-- add block_number to consolidation_requests
ALTER TABLE "consolidation_requests"
ADD "block_number" BIGINT NOT NULL DEFAULT 0;

UPDATE "consolidation_requests"
SET "block_number" = (
SELECT eth_block_number
FROM "slots"
WHERE "slots".root = "consolidation_requests".slot_root
);

CREATE INDEX IF NOT EXISTS "consolidation_requests_block_number_idx"
ON "consolidation_requests"
("block_number" ASC);

CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" (
block_number BIGINT NOT NULL,
block_index INT NOT NULL,
Expand Down Expand Up @@ -65,6 +80,21 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_dequeue_block_idx"
ON "withdrawal_request_txs"
("dequeue_block" ASC);

-- add block_number to withdrawal_requests
ALTER TABLE "withdrawal_requests"
ADD "block_number" BIGINT NOT NULL DEFAULT 0;

UPDATE "withdrawal_requests"
SET "block_number" = (
SELECT eth_block_number
FROM "slots"
WHERE "slots".root = "withdrawal_requests".slot_root
);

CREATE INDEX IF NOT EXISTS "withdrawal_requests_block_number_idx"
ON "withdrawal_requests"
("block_number" ASC);

-- +goose StatementEnd
-- +goose Down
-- +goose StatementBegin
Expand Down
1 change: 0 additions & 1 deletion db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ func GetFirstRootAfterSlot(slot uint64, withOrphaned bool) []byte {
SELECT root FROM slots WHERE slot >= $1 `+statusFilter+` AND status != 0 ORDER BY slot ASC LIMIT 1
`, slot)
if err != nil {
logger.Errorf("Error while fetching first root after %v: %v", slot, err)
return nil
}
return result
Expand Down
Loading

0 comments on commit dfe4b80

Please sign in to comment.