Skip to content

Commit

Permalink
(BIDS-2465) use new table validator_stats_total for all fields in val…
Browse files Browse the repository at this point in the history
…idator_stats
  • Loading branch information
guybrush committed Sep 7, 2023
1 parent d7d21fd commit 1669aa4
Show file tree
Hide file tree
Showing 4 changed files with 398 additions and 6 deletions.
299 changes: 298 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (

"github.com/coocood/freecache"
_ "github.com/jackc/pgx/v4/stdlib"
"github.com/lib/pq"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"

"flag"
Expand Down Expand Up @@ -47,7 +49,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, reexport-validator-stats-totals")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -213,9 +215,304 @@ func main() {
indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient)
case "migrate-last-attestation-slot-bigtable":
migrateLastAttestationSlotToBigtable()
case "reexport-validator-stats-totals":
err = reexportValidatorStatsTotals()
case "debug-perf":
err = debugPerf()
default:
utils.LogFatal(nil, "unknown command", 0)
}
if err != nil {
logrus.Fatal(err)
}
}

func debugPerf() error {
var err error
valis := []uint64{}
err = db.ReaderDb.Select(&valis, `select validatorindex from blocks_withdrawals order by block_slot desc limit 100;`)
if err != nil {
return err
}
fmt.Printf("valis: %+v\n", valis)

t0 := time.Now()
for i := 0; i < 100; i++ {
s, err := debugPerfA(valis)
if err != nil {
return err
}
if i == 0 {
fmt.Println(s)
}
}
t1 := time.Now()
for i := 0; i < 100; i++ {
s, err := debugPerfB(valis)
if err != nil {
return err
}
if i == 0 {
fmt.Println(s)
}
}
t2 := time.Now()
for i := 0; i < 100; i++ {
s, err := debugPerfC(valis)
if err != nil {
return err
}
if i == 0 {
fmt.Println(s)
}
}
t3 := time.Now()
logrus.WithFields(logrus.Fields{"a": t1.Sub(t0), "b": t2.Sub(t1), "c": t3.Sub(t2)}).Infof("debug perf")
return nil
}

func debugPerfA(valis []uint64) (decimal.Decimal, error) {
sum := decimal.NewFromInt(0)
data := []struct {
Validatorindex uint64 `db:"index"`
WithdrawalsTotal decimal.Decimal `db:"total"`
}{}
err := db.ReaderDb.Select(&data, `
SELECT validatorindex as index, COALESCE(sum(amount), 0) as total
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1'
WHERE w.block_slot >= (SELECT (coalesce(max(day),-1)+1)*7200 FROM validator_stats) AND validatorindex = ANY($1)
GROUP BY validatorindex`, pq.Array(valis))
if err != nil {
return sum, err
}
for _, d := range data {
sum = sum.Add(d.WithdrawalsTotal)
}
return sum, err
}

func debugPerfB(valis []uint64) (decimal.Decimal, error) {
sum := decimal.NewFromInt(0)
data := []struct {
Validatorindex uint64 `db:"index"`
WithdrawalsTotal decimal.Decimal `db:"total"`
}{}
tx, err := db.ReaderDb.Beginx()
if err != nil {
return sum, err
}
defer tx.Rollback()
var maxStatsDay uint64
err = tx.Get(&maxStatsDay, `select (coalesce(max(day),-1)+1)*7200 from validator_stats`)
if err != nil {
return sum, err
}
err = tx.Select(&data, `
SELECT validatorindex as index, COALESCE(sum(amount), 0) as total
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1'
WHERE w.block_slot >= $2 AND validatorindex = ANY($1)
GROUP BY validatorindex`, pq.Array(valis), maxStatsDay)
if err != nil {
return sum, err
}
err = tx.Commit()
if err != nil {
return sum, err
}
for _, d := range data {
sum = sum.Add(d.WithdrawalsTotal)
}
return sum, err
}

func debugPerfC(valis []uint64) (decimal.Decimal, error) {
sum := decimal.NewFromInt(0)
data := []struct {
Validatorindex uint64 `db:"index"`
WithdrawalsTotal decimal.Decimal `db:"total"`
}{}
err := db.ReaderDb.Select(&data, `
SELECT validatorindex as index, COALESCE(sum(amount), 0) as total
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1'
WHERE validatorindex = ANY($1)
GROUP BY validatorindex`, pq.Array(valis))
if err != nil {
return sum, err
}
for _, d := range data {
sum = sum.Add(d.WithdrawalsTotal)
}
return sum, err
}

// see BIDS-2465
func reexportValidatorStatsTotals() error {
var maxValidatorIndex int
err := db.WriterDb.Get(&maxValidatorIndex, `select max(validatorindex) from validators`)
if err != nil {
return fmt.Errorf("error getting maxValidatorIndex: %w", err)
}
var maxValidatorStatsDay int
err = db.WriterDb.Get(&maxValidatorStatsDay, `select max(day) from validator_stats`)
if err != nil {
return fmt.Errorf("error getting maxValidatorStatsDay: %w", err)
}
logrus.WithFields(logrus.Fields{
"maxValidatorIndex": maxValidatorIndex,
"maxValidatorStatsDay": maxValidatorStatsDay,
"opts.StartDay": opts.StartDay,
"opts.EndDay": opts.EndDay,
}).Infof("reexporting validator-stats-total")
validatorsPerBatch := 10000
for day := opts.StartDay; day <= opts.EndDay; day++ {
startTime := time.Now()
for i := 0; i < maxValidatorIndex; i += validatorsPerBatch {
start := i
end := i + validatorsPerBatch
_, err = db.WriterDb.Exec(`
INSERT INTO validator_stats (
validatorindex,
day,
missed_attestations_total,
orphaned_attestations_total,
proposed_blocks_total,
missed_blocks_total,
orphaned_blocks_total,
attester_slashings_total,
proposer_slashings_total,
participated_sync_total,
missed_sync_total,
orphaned_sync_total,
withdrawals_total,
withdrawals_amount_total,
el_rewards_wei_total,
cl_rewards_gwei_total,
mev_rewards_wei_total,
deposits_total,
deposits_amount_total
)
(
SELECT
vs1.validatorindex,
vs1.day,
COALESCE(vs1.missed_attestations ,0) + COALESCE(vs2.missed_attestations_total ,0)
COALESCE(vs1.orphaned_attestations,0) + COALESCE(vs2.orphaned_attestations_total,0)
COALESCE(vs1.proposed_blocks ,0) + COALESCE(vs2.proposed_blocks_total ,0)
COALESCE(vs1.missed_blocks ,0) + COALESCE(vs2.missed_blocks_total ,0)
COALESCE(vs1.orphaned_blocks ,0) + COALESCE(vs2.orphaned_blocks_total ,0)
COALESCE(vs1.attester_slashings ,0) + COALESCE(vs2.attester_slashings_total ,0)
COALESCE(vs1.proposer_slashings ,0) + COALESCE(vs2.proposer_slashings_total ,0)
COALESCE(vs1.participated_sync ,0) + COALESCE(vs2.participated_sync_total ,0)
COALESCE(vs1.missed_sync ,0) + COALESCE(vs2.missed_sync_total ,0)
COALESCE(vs1.orphaned_sync ,0) + COALESCE(vs2.orphaned_sync_total ,0)
COALESCE(vs1.withdrawals ,0) + COALESCE(vs2.withdrawals_total ,0)
COALESCE(vs1.withdrawals_amount ,0) + COALESCE(vs2.withdrawals_amount_total ,0)
COALESCE(vs1.el_rewards_wei ,0) + COALESCE(vs2.el_rewards_wei_total ,0)
COALESCE(vs1.cl_rewards_gwei ,0) + COALESCE(vs2.cl_rewards_gwei_total ,0)
COALESCE(vs1.mev_rewards_wei ,0) + COALESCE(vs2.mev_rewards_wei_total ,0)
COALESCE(vs1.deposits ,0) + COALESCE(vs2.deposits_total ,0)
COALESCE(vs1.deposits_amount ,0) + COALESCE(vs2.deposits_amount_total ,0)
FROM validator_stats vs1
LEFT JOIN validator_stats vs2 ON vs2.day = vs1.day-1 AND vs2.validatorindex = vs1.validatorindex
WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3
)
ON CONFLICT (validatorindex, day) DO UPDATE SET
missed_attestations_total = excluded.missed_attestations_total
orphaned_attestations_total = excluded.orphaned_attestations_total
proposed_blocks_total = excluded.proposed_blocks_total
missed_blocks_total = excluded.missed_blocks_total
orphaned_blocks_total = excluded.orphaned_blocks_total
attester_slashings_total = excluded.attester_slashings_total
proposer_slashings_total = excluded.proposer_slashings_total
participated_sync_total = excluded.participated_sync_total
missed_sync_total = excluded.missed_sync_total
orphaned_sync_total = excluded.orphaned_sync_total
withdrawals_total = excluded.withdrawals_total
withdrawals_amount_total = excluded.withdrawals_amount_total
el_rewards_wei_total = excluded.el_rewards_wei_total
cl_rewards_gwei_total = excluded.cl_rewards_gwei_total
mev_rewards_wei_total = excluded.mev_rewards_wei_total
deposits_total = excluded.deposits_total
deposits_amount_total = excluded.deposits_amount_total
`, day, start, end)
if err != nil {
return fmt.Errorf("error updating totals for day %v (batch %v-%v): %w", day, start, end, err)
}
}
logrus.WithFields(logrus.Fields{"day": day, "duration": time.Since(startTime)}).Infof("reexported validator-stats-totals-day")
}

_, err = db.WriterDb.Exec(`
INSERT INTO validator_stats_totals (
validatorindex ,
missed_attestations ,
orphaned_attestations,
proposed_blocks ,
missed_blocks ,
orphaned_blocks ,
attester_slashings ,
proposer_slashings ,
participated_sync ,
missed_sync ,
orphaned_sync ,
withdrawals ,
withdrawals_amount ,
el_rewards_wei ,
cl_rewards_gwei ,
mev_rewards_wei ,
deposits ,
deposits_amount
)
(
SELECT
day ,
validatorindex ,
missed_attestations_total ,
orphaned_attestations_total,
proposed_blocks_total ,
missed_blocks_total ,
orphaned_blocks_total ,
attester_slashings_total ,
proposer_slashings_total ,
participated_sync_total ,
missed_sync_total ,
orphaned_sync_total ,
withdrawals_total ,
withdrawals_amount_total ,
el_rewards_wei_total ,
cl_rewards_gwei_total ,
mev_rewards_wei_total ,
deposits_total ,
deposits_amount_total
FROM validator_stats WHERE day = $1
)
ON CONFLICT (validatorindex)
missed_attestations = excluded.missed_attestations
orphaned_attestations = excluded.orphaned_attestations
proposed_blocks = excluded.proposed_blocks
missed_blocks = excluded.missed_blocks
orphaned_blocks = excluded.orphaned_blocks
attester_slashings = excluded.attester_slashings
proposer_slashings = excluded.proposer_slashings
participated_sync = excluded.participated_sync
missed_sync = excluded.missed_sync
orphaned_sync = excluded.orphaned_sync
withdrawals = excluded.withdrawals
withdrawals_amount = excluded.withdrawals_amount
el_rewards_wei = excluded.el_rewards_wei
cl_rewards_gwei = excluded.cl_rewards_gwei
mev_rewards_wei = excluded.mev_rewards_wei
deposits = excluded.deposits
deposits_amount = excluded.deposits_amount
`, maxValidatorStatsDay)
if err != nil {
return fmt.Errorf("error updating validator_stats_totals (maxValidatorStatsDay=%v): %w", maxValidatorStatsDay, err)
}
logrus.WithFields(logrus.Fields{"maxValidatorStatsDay": maxValidatorStatsDay}).Infof("updated validator_stats_totals")
return nil
}

// one time migration of the last attestation slot values from postgres to bigtable
Expand Down
68 changes: 68 additions & 0 deletions db/migrations/20230907120000_add_stats_totals.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-- +goose Up

-- +goose StatementBegin
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_attestations_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_attestations_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS proposed_blocks_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_blocks_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_blocks_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS attester_slashings_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS proposer_slashings_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS participated_sync_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_sync_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_sync_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS el_rewards_wei_total BIGINT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS cl_rewards_gwei_total BIGINT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS mev_rewards_wei_total BIGINT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS deposits_total INT NOT NULL DEFAULT 0;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS deposits_amount_total BIGINT NOT NULL DEFAULT 0;

CREATE TABLE IF NOT EXISTS
validator_stats_totals (
validatorindex INT NOT NULL,
missed_attestations INT NOT NULL,
orphaned_attestations INT NOT NULL,
proposed_blocks INT NOT NULL,
missed_blocks INT NOT NULL,
orphaned_blocks INT NOT NULL,
attester_slashings INT NOT NULL,
proposer_slashings INT NOT NULL,
participated_sync INT NOT NULL,
missed_sync INT NOT NULL,
orphaned_sync INT NOT NULL,
withdrawals INT NOT NULL,
withdrawals_amount BIGINT NOT NULL,
el_rewards_wei BIGINT NOT NULL,
cl_rewards_gwei BIGINT NOT NULL,
mev_rewards_wei BIGINT NOT NULL,
deposits INT NOT NULL,
deposits_amount BIGINT NOT NULL,
PRIMARY KEY (validatorindex)
);
-- +goose StatementEnd

-- +goose Down

-- +goose StatementBegin
-- existed before: missed_attestations_total, participated_sync_total, missed_sync_total, orphaned_sync_total, el_rewards_wei_total, cl_rewards_gwei_total, mev_rewards_wei_total
DROP TABLE IF EXISTS validator_stats_totals;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_attestations_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_attestations_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS proposed_blocks_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_blocks_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_blocks_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS attester_slashings_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS proposer_slashings_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS participated_sync_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_sync_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_sync_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS el_rewards_wei_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS cl_rewards_gwei_total;
--ALTER TABLE validator_stats DROP COLUMN IF EXISTS mev_rewards_wei_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS deposits_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS deposits_amount_total;
-- +goose StatementEnd
Loading

0 comments on commit 1669aa4

Please sign in to comment.