Skip to content

Commit

Permalink
add(exporter): sanity checks & more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
invis-bitfly committed Jan 14, 2025
1 parent 635a10d commit 9788a51
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 7 deletions.
40 changes: 38 additions & 2 deletions backend/pkg/exporter/modules/execution_payloads_exporter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package modules

import (
"bytes"
"context"
"database/sql"
"fmt"
Expand Down Expand Up @@ -99,6 +100,7 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
blockChan := make(chan *types.Eth1BlockIndexed, 1000)
type Result struct {
BlockHash []byte
BlockNumber uint64
FeeRecipientReward decimal.Decimal
}
resData := make([]Result, 0, maxBlock-minBlock+1)
Expand Down Expand Up @@ -137,7 +139,7 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
if err != nil {
return fmt.Errorf("error converting tx reward to decimal for block %v: %w", block.Number, err)
}
resData = append(resData, Result{BlockHash: hash, FeeRecipientReward: dec})
resData = append(resData, Result{BlockHash: hash, FeeRecipientReward: dec, BlockNumber: block.Number})
}
})

Expand All @@ -154,9 +156,43 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
if err != nil {
return fmt.Errorf("error processing blocks: %w", err)
}
// sanity checks: check if any block hashes are 0x0000000000000000000000000000000000000000000000000000000000000000 or duplicate, check if count matches expected
seen := make(map[string]bool)
emptyBlockHash := bytes.Repeat([]byte{0}, 32)
err = error(nil)
counter := 0
for _, r := range resData {
if counter > 25 {
err = fmt.Errorf("too many errors, aborting")
log.Error(err, "error processing blocks", 0)
break
}
if len(r.BlockHash) == 0 {
err = fmt.Errorf("error processing blocks: block hash is empty, block number: %v", r.BlockNumber)
log.Error(err, "error processing blocks", 0)
counter++
}
if bytes.Equal(r.BlockHash, emptyBlockHash) {
err = fmt.Errorf("error processing blocks: block hash is all zeros, block number: %v", r.BlockNumber)
log.Error(err, "error processing blocks", 0)
counter++
}
if _, ok := seen[string(r.BlockHash)]; ok {
err = fmt.Errorf("error processing blocks: duplicate block hash, block number: %v", r.BlockNumber)
log.Error(err, "error processing blocks", 0)
counter++
}
seen[string(r.BlockHash)] = true
}
if err != nil {
return err
}

// update the execution_payloads table
if uint64(len(resData)) != maxBlock-minBlock+1 {
return fmt.Errorf("error processing blocks: expected %v blocks, got %v", maxBlock-minBlock+1, len(resData))
}

// update the execution_payloads table
log.Infof("preparing copy update to temp table")

// load data into temp table
Expand Down
46 changes: 41 additions & 5 deletions backend/pkg/exporter/modules/execution_rewards_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"github.com/doug-martin/goqu/v9"
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
)

type executionRewardsFinalizer struct {
ModuleContext
ExportMutex *sync.Mutex
CooldownTs time.Time
}

func NewExecutionRewardFinalizer(moduleContext ModuleContext) ModuleInterface {
Expand All @@ -40,6 +42,9 @@ func (d *executionRewardsFinalizer) OnFinalizedCheckpoint(event *constypes.Stand
}

func (d *executionRewardsFinalizer) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
if time.Now().Before(d.CooldownTs) {
log.Warnf("execution rewards finalizer is on cooldown till %s", d.CooldownTs)
}
// if mutex is locked, return early
if !d.ExportMutex.TryLock() {
log.Infof("execution rewards finalizer is already running")
Expand All @@ -48,6 +53,7 @@ func (d *executionRewardsFinalizer) OnHead(event *constypes.StandardEventHeadRes
defer d.ExportMutex.Unlock()
err = d.maintainTable()
if err != nil {
d.CooldownTs = time.Now().Add(1 * time.Minute)
return fmt.Errorf("error maintaining table: %w", err)
}
return nil
Expand Down Expand Up @@ -80,16 +86,47 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) {
}

// limit to prevent overloading
if latestFinalizedSlot-lastExportedSlot > 250_000 {
latestFinalizedSlot = lastExportedSlot + 250_000
// gnosis has a 5 second slot window, so to prevent hammering the db scale the batch size by the slot time
batch := int64(10_000 * utils.Config.Chain.ClConfig.SlotsPerEpoch)
if latestFinalizedSlot-lastExportedSlot > batch {
latestFinalizedSlot = lastExportedSlot + batch
}

if latestFinalizedSlot <= lastExportedSlot {
log.Debugf("no new finalized slots to export")
return nil
}
// sanity check, check if any non-missed block has a fee_recipient_reward that is NULL
var count struct {
Total int64 `db:"total"`
NonNull int64 `db:"non_null"`
}
gc := goqu.Dialect("postgres").From("blocks").
Select(
goqu.Func("count", goqu.Star()).As("total"),
goqu.Func("count", goqu.I("ep.fee_recipient_reward")).As("non_null"),
).
LeftJoin(
goqu.T("execution_payloads").As("ep"),
goqu.On(goqu.I("ep.block_hash").Eq(goqu.I("blocks.exec_block_hash"))),
).
Where(
goqu.I("slot").Gt(lastExportedSlot),
goqu.I("slot").Lte(latestFinalizedSlot),
goqu.I("status").Eq("1"),
)
query, args, err := gc.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}
err = db.ReaderDb.Get(&count, query, args...)
if err != nil {
return fmt.Errorf("error getting count of non-missed blocks: %w", err)
}
if count.Total != count.NonNull {
return fmt.Errorf("only %v out of %v blocks have non-null fee_recipient_reward", count.NonNull, count.Total)
}
log.Infof("finalized rewards = last exported slot: %v, latest finalized slot: %v", lastExportedSlot, latestFinalizedSlot)

start := time.Now()
ds := goqu.Dialect("postgres").Insert("execution_rewards_finalized").FromQuery(
goqu.From(goqu.T("blocks").As("b")).
Expand Down Expand Up @@ -122,12 +159,11 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) {

log.Debugf("writing execution rewards finalized data")

query, args, err := ds.Prepared(true).ToSQL()
query, args, err = ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}
_, err = db.WriterDb.Exec(query, args...)

if err != nil {
return fmt.Errorf("error inserting data: %w", err)
}
Expand Down

0 comments on commit 9788a51

Please sign in to comment.