diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 4e4cc1d1f2..955e9fa5b8 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -18,7 +18,9 @@ import ( "github.com/coocood/freecache" _ "github.com/jackc/pgx/v4/stdlib" + "github.com/lib/pq" utilMath "github.com/protolambda/zrnt/eth2/util/math" + "github.com/shopspring/decimal" "golang.org/x/sync/errgroup" "flag" @@ -47,7 +49,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, reexport-validator-stats-totals") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -213,9 +215,304 @@ func main() { indexMissingBlocks(opts.StartBlock, opts.EndBlock, bt, erigonClient) case "migrate-last-attestation-slot-bigtable": migrateLastAttestationSlotToBigtable() + case "reexport-validator-stats-totals": + err = reexportValidatorStatsTotals() + case "debug-perf": + err = debugPerf() default: utils.LogFatal(nil, "unknown command", 0) } + if err != nil { + logrus.Fatal(err) + } +} + +func debugPerf() error { + var err error + valis := []uint64{} + err = db.ReaderDb.Select(&valis, `select validatorindex from blocks_withdrawals order by block_slot desc limit 100;`) + if err != nil { + return err + } + fmt.Printf("valis: %+v\n", valis) + + t0 := time.Now() + for i := 0; i < 100; i++ { + s, err := debugPerfA(valis) + if err != nil { + return err + } + if i == 0 { + fmt.Println(s) + } + } + t1 := time.Now() + for i := 0; i < 100; i++ { + s, err := debugPerfB(valis) + if err != nil { + return err + } + if i == 0 { + fmt.Println(s) + } + } + t2 := time.Now() + for i := 0; i < 100; i++ { + s, err := debugPerfC(valis) + if err != nil { + return err + } + if i == 0 { + fmt.Println(s) + } + } + t3 := time.Now() + logrus.WithFields(logrus.Fields{"a": t1.Sub(t0), "b": t2.Sub(t1), "c": t3.Sub(t2)}).Infof("debug perf") + return nil +} + +func debugPerfA(valis []uint64) (decimal.Decimal, error) { + sum := decimal.NewFromInt(0) + data := []struct { + Validatorindex uint64 `db:"index"` + WithdrawalsTotal decimal.Decimal `db:"total"` + }{} + err := db.ReaderDb.Select(&data, ` + SELECT validatorindex as index, COALESCE(sum(amount), 0) as total + FROM blocks_withdrawals w + INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1' + WHERE w.block_slot >= (SELECT (coalesce(max(day),-1)+1)*7200 FROM validator_stats) AND validatorindex = ANY($1) + GROUP BY validatorindex`, pq.Array(valis)) + if err != nil { + return sum, err + } + for _, d := range data { + sum = sum.Add(d.WithdrawalsTotal) + } + return sum, err +} + +func debugPerfB(valis []uint64) (decimal.Decimal, error) { + sum := decimal.NewFromInt(0) + data := []struct { + Validatorindex uint64 `db:"index"` + WithdrawalsTotal decimal.Decimal `db:"total"` + }{} + tx, err := db.ReaderDb.Beginx() + if err != nil { + return sum, err + } + defer tx.Rollback() + var maxStatsDay uint64 + err = tx.Get(&maxStatsDay, `select (coalesce(max(day),-1)+1)*7200 from validator_stats`) + if err != nil { + return sum, err + } + err = tx.Select(&data, ` + SELECT validatorindex as index, COALESCE(sum(amount), 0) as total + FROM blocks_withdrawals w + INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1' + WHERE w.block_slot >= $2 AND validatorindex = ANY($1) + GROUP BY validatorindex`, pq.Array(valis), maxStatsDay) + if err != nil { + return sum, err + } + err = tx.Commit() + if err != nil { + return sum, err + } + for _, d := range data { + sum = sum.Add(d.WithdrawalsTotal) + } + return sum, err +} + +func debugPerfC(valis []uint64) (decimal.Decimal, error) { + sum := decimal.NewFromInt(0) + data := []struct { + Validatorindex uint64 `db:"index"` + WithdrawalsTotal decimal.Decimal `db:"total"` + }{} + err := db.ReaderDb.Select(&data, ` + SELECT validatorindex as index, COALESCE(sum(amount), 0) as total + FROM blocks_withdrawals w + INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1' + WHERE validatorindex = ANY($1) + GROUP BY validatorindex`, pq.Array(valis)) + if err != nil { + return sum, err + } + for _, d := range data { + sum = sum.Add(d.WithdrawalsTotal) + } + return sum, err +} + +// see BIDS-2465 +func reexportValidatorStatsTotals() error { + var maxValidatorIndex int + err := db.WriterDb.Get(&maxValidatorIndex, `select max(validatorindex) from validators`) + if err != nil { + return fmt.Errorf("error getting maxValidatorIndex: %w", err) + } + var maxValidatorStatsDay int + err = db.WriterDb.Get(&maxValidatorStatsDay, `select max(day) from validator_stats`) + if err != nil { + return fmt.Errorf("error getting maxValidatorStatsDay: %w", err) + } + logrus.WithFields(logrus.Fields{ + "maxValidatorIndex": maxValidatorIndex, + "maxValidatorStatsDay": maxValidatorStatsDay, + "opts.StartDay": opts.StartDay, + "opts.EndDay": opts.EndDay, + }).Infof("reexporting validator-stats-total") + validatorsPerBatch := 10000 + for day := opts.StartDay; day <= opts.EndDay; day++ { + startTime := time.Now() + for i := 0; i < maxValidatorIndex; i += validatorsPerBatch { + start := i + end := i + validatorsPerBatch + _, err = db.WriterDb.Exec(` + INSERT INTO validator_stats ( + validatorindex, + day, + missed_attestations_total, + orphaned_attestations_total, + proposed_blocks_total, + missed_blocks_total, + orphaned_blocks_total, + attester_slashings_total, + proposer_slashings_total, + participated_sync_total, + missed_sync_total, + orphaned_sync_total, + withdrawals_total, + withdrawals_amount_total, + el_rewards_wei_total, + cl_rewards_gwei_total, + mev_rewards_wei_total, + deposits_total, + deposits_amount_total + ) + ( + SELECT + vs1.validatorindex, + vs1.day, + COALESCE(vs1.missed_attestations ,0) + COALESCE(vs2.missed_attestations_total ,0) + COALESCE(vs1.orphaned_attestations,0) + COALESCE(vs2.orphaned_attestations_total,0) + COALESCE(vs1.proposed_blocks ,0) + COALESCE(vs2.proposed_blocks_total ,0) + COALESCE(vs1.missed_blocks ,0) + COALESCE(vs2.missed_blocks_total ,0) + COALESCE(vs1.orphaned_blocks ,0) + COALESCE(vs2.orphaned_blocks_total ,0) + COALESCE(vs1.attester_slashings ,0) + COALESCE(vs2.attester_slashings_total ,0) + COALESCE(vs1.proposer_slashings ,0) + COALESCE(vs2.proposer_slashings_total ,0) + COALESCE(vs1.participated_sync ,0) + COALESCE(vs2.participated_sync_total ,0) + COALESCE(vs1.missed_sync ,0) + COALESCE(vs2.missed_sync_total ,0) + COALESCE(vs1.orphaned_sync ,0) + COALESCE(vs2.orphaned_sync_total ,0) + COALESCE(vs1.withdrawals ,0) + COALESCE(vs2.withdrawals_total ,0) + COALESCE(vs1.withdrawals_amount ,0) + COALESCE(vs2.withdrawals_amount_total ,0) + COALESCE(vs1.el_rewards_wei ,0) + COALESCE(vs2.el_rewards_wei_total ,0) + COALESCE(vs1.cl_rewards_gwei ,0) + COALESCE(vs2.cl_rewards_gwei_total ,0) + COALESCE(vs1.mev_rewards_wei ,0) + COALESCE(vs2.mev_rewards_wei_total ,0) + COALESCE(vs1.deposits ,0) + COALESCE(vs2.deposits_total ,0) + COALESCE(vs1.deposits_amount ,0) + COALESCE(vs2.deposits_amount_total ,0) + FROM validator_stats vs1 + LEFT JOIN validator_stats vs2 ON vs2.day = vs1.day-1 AND vs2.validatorindex = vs1.validatorindex + WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3 + ) + ON CONFLICT (validatorindex, day) DO UPDATE SET + missed_attestations_total = excluded.missed_attestations_total + orphaned_attestations_total = excluded.orphaned_attestations_total + proposed_blocks_total = excluded.proposed_blocks_total + missed_blocks_total = excluded.missed_blocks_total + orphaned_blocks_total = excluded.orphaned_blocks_total + attester_slashings_total = excluded.attester_slashings_total + proposer_slashings_total = excluded.proposer_slashings_total + participated_sync_total = excluded.participated_sync_total + missed_sync_total = excluded.missed_sync_total + orphaned_sync_total = excluded.orphaned_sync_total + withdrawals_total = excluded.withdrawals_total + withdrawals_amount_total = excluded.withdrawals_amount_total + el_rewards_wei_total = excluded.el_rewards_wei_total + cl_rewards_gwei_total = excluded.cl_rewards_gwei_total + mev_rewards_wei_total = excluded.mev_rewards_wei_total + deposits_total = excluded.deposits_total + deposits_amount_total = excluded.deposits_amount_total + `, day, start, end) + if err != nil { + return fmt.Errorf("error updating totals for day %v (batch %v-%v): %w", day, start, end, err) + } + } + logrus.WithFields(logrus.Fields{"day": day, "duration": time.Since(startTime)}).Infof("reexported validator-stats-totals-day") + } + + _, err = db.WriterDb.Exec(` + INSERT INTO validator_stats_totals ( + validatorindex , + missed_attestations , + orphaned_attestations, + proposed_blocks , + missed_blocks , + orphaned_blocks , + attester_slashings , + proposer_slashings , + participated_sync , + missed_sync , + orphaned_sync , + withdrawals , + withdrawals_amount , + el_rewards_wei , + cl_rewards_gwei , + mev_rewards_wei , + deposits , + deposits_amount + ) + ( + SELECT + day , + validatorindex , + missed_attestations_total , + orphaned_attestations_total, + proposed_blocks_total , + missed_blocks_total , + orphaned_blocks_total , + attester_slashings_total , + proposer_slashings_total , + participated_sync_total , + missed_sync_total , + orphaned_sync_total , + withdrawals_total , + withdrawals_amount_total , + el_rewards_wei_total , + cl_rewards_gwei_total , + mev_rewards_wei_total , + deposits_total , + deposits_amount_total + FROM validator_stats WHERE day = $1 + ) + ON CONFLICT (validatorindex) + missed_attestations = excluded.missed_attestations + orphaned_attestations = excluded.orphaned_attestations + proposed_blocks = excluded.proposed_blocks + missed_blocks = excluded.missed_blocks + orphaned_blocks = excluded.orphaned_blocks + attester_slashings = excluded.attester_slashings + proposer_slashings = excluded.proposer_slashings + participated_sync = excluded.participated_sync + missed_sync = excluded.missed_sync + orphaned_sync = excluded.orphaned_sync + withdrawals = excluded.withdrawals + withdrawals_amount = excluded.withdrawals_amount + el_rewards_wei = excluded.el_rewards_wei + cl_rewards_gwei = excluded.cl_rewards_gwei + mev_rewards_wei = excluded.mev_rewards_wei + deposits = excluded.deposits + deposits_amount = excluded.deposits_amount + `, maxValidatorStatsDay) + if err != nil { + return fmt.Errorf("error updating validator_stats_totals (maxValidatorStatsDay=%v): %w", maxValidatorStatsDay, err) + } + logrus.WithFields(logrus.Fields{"maxValidatorStatsDay": maxValidatorStatsDay}).Infof("updated validator_stats_totals") + return nil } // one time migration of the last attestation slot values from postgres to bigtable diff --git a/db/migrations/20230907120000_add_stats_totals.sql b/db/migrations/20230907120000_add_stats_totals.sql new file mode 100644 index 0000000000..30555926a1 --- /dev/null +++ b/db/migrations/20230907120000_add_stats_totals.sql @@ -0,0 +1,68 @@ +-- +goose Up + +-- +goose StatementBegin +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_attestations_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_attestations_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS proposed_blocks_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_blocks_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_blocks_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS attester_slashings_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS proposer_slashings_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS participated_sync_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS missed_sync_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS orphaned_sync_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS el_rewards_wei_total BIGINT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS cl_rewards_gwei_total BIGINT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS mev_rewards_wei_total BIGINT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS deposits_total INT NOT NULL DEFAULT 0; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS deposits_amount_total BIGINT NOT NULL DEFAULT 0; + +CREATE TABLE IF NOT EXISTS + validator_stats_totals ( + validatorindex INT NOT NULL, + missed_attestations INT NOT NULL, + orphaned_attestations INT NOT NULL, + proposed_blocks INT NOT NULL, + missed_blocks INT NOT NULL, + orphaned_blocks INT NOT NULL, + attester_slashings INT NOT NULL, + proposer_slashings INT NOT NULL, + participated_sync INT NOT NULL, + missed_sync INT NOT NULL, + orphaned_sync INT NOT NULL, + withdrawals INT NOT NULL, + withdrawals_amount BIGINT NOT NULL, + el_rewards_wei BIGINT NOT NULL, + cl_rewards_gwei BIGINT NOT NULL, + mev_rewards_wei BIGINT NOT NULL, + deposits INT NOT NULL, + deposits_amount BIGINT NOT NULL, + PRIMARY KEY (validatorindex) + ); +-- +goose StatementEnd + +-- +goose Down + +-- +goose StatementBegin +-- existed before: missed_attestations_total, participated_sync_total, missed_sync_total, orphaned_sync_total, el_rewards_wei_total, cl_rewards_gwei_total, mev_rewards_wei_total +DROP TABLE IF EXISTS validator_stats_totals; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_attestations_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_attestations_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS proposed_blocks_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_blocks_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_blocks_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS attester_slashings_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS proposer_slashings_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS participated_sync_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS missed_sync_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS orphaned_sync_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS el_rewards_wei_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS cl_rewards_gwei_total; +--ALTER TABLE validator_stats DROP COLUMN IF EXISTS mev_rewards_wei_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS deposits_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS deposits_amount_total; +-- +goose StatementEnd diff --git a/handlers/api.go b/handlers/api.go index d7182cb89f..d8d977962c 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -1551,7 +1551,22 @@ func apiValidator(w http.ResponseWriter, r *http.Request) { data := make([]*ApiValidatorResponse, 0) - err = db.ReaderDb.Select(&data, ` + tx, err := db.ReaderDb.Beginx() + if err != nil { + utils.LogError(err, "error starting db-tx for apiValidator", 0) + sendErrorResponse(w, r.URL.String(), "sorry something went wrong") + return + } + defer tx.Rollback() + var lastStatsSlot uint64 + err = tx.Get(&lastStatsSlot, `select (coalesce(max(day),-1)+1)*$1 from validator_stats`, utils.SlotsPerDay()) + if err != nil { + utils.LogError(err, "error getting lastStatsSlot for apiValidator", 0) + sendErrorResponse(w, r.URL.String(), "sorry something went wrong") + return + } + + err = tx.Select(&data, ` SELECT validatorindex, '0x' || encode(pubkey, 'hex') as pubkey, withdrawableepoch, '0x' || encode(withdrawalcredentials, 'hex') as withdrawalcredentials, @@ -1561,21 +1576,29 @@ func apiValidator(w http.ResponseWriter, r *http.Request) { exitepoch, status, COALESCE(n.name, '') AS name, - COALESCE(w.total, 0) as total_withdrawals + COALESCE(w.total, 0)+COALESCE(vst.withdrawal_amount_total) as total_withdrawals FROM validators v LEFT JOIN validator_names n ON n.publickey = v.pubkey + LEFT JOIN validator_stats_total vst ON vst.validatorindex = v.validatorindex LEFT JOIN ( SELECT validatorindex as index, COALESCE(sum(amount), 0) as total FROM blocks_withdrawals w INNER JOIN blocks b ON b.blockroot = w.block_root AND status = '1' - WHERE validatorindex = ANY($1) + WHERE w.block_slot >= $2 AND validatorindex = ANY($1) GROUP BY validatorindex ) as w ON w.index = v.validatorindex WHERE validatorindex = ANY($1) ORDER BY validatorindex; - `, pq.Array(queryIndices)) + `, pq.Array(queryIndices), lastStatsSlot) if err != nil { - logger.Warnf("error retrieving validator data from db: %v", err) + utils.LogError(err, "error getting data for apiValidator", 0) + sendErrorResponse(w, r.URL.String(), "could not retrieve db results") + return + } + + err = tx.Commit() + if err != nil { + utils.LogError(err, "error commiting tx for apiValidator", 0) sendErrorResponse(w, r.URL.String(), "could not retrieve db results") return } diff --git a/utils/utils.go b/utils/utils.go index 7f1d072982..35dfeef741 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1110,6 +1110,10 @@ func GetTimeToNextWithdrawal(distance uint64) time.Time { return timeToWithdrawal } +func SlotsPerDay() uint64 { + return EpochsPerDay() * Config.Chain.Config.SlotsPerEpoch +} + func EpochsPerDay() uint64 { day := time.Hour * 24 return (uint64(day.Seconds()) / Config.Chain.Config.SlotsPerEpoch) / Config.Chain.Config.SecondsPerSlot