From 120ccfec79426a8d2f476035075566a73d30dfa4 Mon Sep 17 00:00:00 2001 From: Lucca <109136188+LuccaBitfly@users.noreply.github.com> Date: Thu, 2 Nov 2023 08:00:30 +0100 Subject: [PATCH] (BIDS-2369) add total withdrawal columns (#2653) --- cmd/misc/main.go | 120 +++++++++++++++++- ...1102075500_add_total_withdrawals_stats.sql | 13 ++ db/statistics.go | 10 ++ types/exporter.go | 6 +- 4 files changed, 146 insertions(+), 3 deletions(-) create mode 100644 db/migrations/20231102075500_add_total_withdrawals_stats.sql diff --git a/cmd/misc/main.go b/cmd/misc/main.go index d541875e32..fda294d879 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -43,6 +43,7 @@ var opts = struct { DataConcurrency uint64 Transformers string Table string + Columns string Family string Key string ValidatorNameRanges string @@ -51,7 +52,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -68,6 +69,7 @@ func main() { flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size") flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer") flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})") + flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command") dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them") versionFlag := flag.Bool("version", false, "Show version and exit") flag.Parse() @@ -355,6 +357,8 @@ func main() { if err != nil { logrus.Fatal(err) } + case "export-stats-totals": + exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) default: utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0) } @@ -1002,3 +1006,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) { logrus.Info("historic price update run completed") } + +func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) { + start := time.Now() + logrus.Infof("exporting stats totals for columns '%v'", columns) + + // validate columns input + columnsSlice := strings.Split(columns, ",") + validColumns := []string{ + "cl_rewards_gwei_total", + "el_rewards_wei_total", + "mev_rewards_wei_total", + "missed_attestations_total", + "participated_sync_total", + "missed_sync_total", + "orphaned_sync_total", + "withdrawals_total", + "withdrawals_amount_total", + } + +OUTER: + for _, c := range columnsSlice { + for _, vc := range validColumns { + if c == vc { + // valid column found, continue to next column from input + continue OUTER + } + } + // no valid column matched, exit with error + utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{ + "usedColumn": c, + "validColumns": validColumns, + }) + } + + // build insert query from input columns + var totalClauses []string + var conflictClauses []string + + for _, col := range columnsSlice { + totalClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col) + totalClauses = append(totalClauses, totalClause) + + conflictClause := fmt.Sprintf("%s = excluded.%s", col, col) + conflictClauses = append(conflictClauses, conflictClause) + } + + insertQuery := fmt.Sprintf(` + INSERT INTO validator_stats (validatorindex, day, %s) + SELECT + vs1.validatorindex, + vs1.day, + %s + FROM validator_stats vs1 + LEFT JOIN validator_stats vs2 + ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex + WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3 + ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`, + strings.Join(columnsSlice, ",\n\t"), + strings.Join(totalClauses, ",\n\t\t"), + strings.Join(conflictClauses, ",\n\t")) + + for day := dayStart; day <= dayEnd; day++ { + timeDay := time.Now() + logrus.Infof("exporting total sync and for columns %v for day %v", columns, day) + + // get max validator index for day + firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1) + maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch) + if err != nil { + utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: could not get max validator index", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } else if maxValidatorIndex == uint64(0) { + utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: no validator found", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } + + ctx := context.Background() + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(int(concurrency)) + + batchSize := 1000 + + // insert stats totals for each batch of validators + for b := 0; b <= int(maxValidatorIndex); b += batchSize { + start := b + end := b + batchSize // exclusive + if int(maxValidatorIndex) < end { + end = int(maxValidatorIndex) + } + + g.Go(func() error { + select { + case <-gCtx.Done(): + return gCtx.Err() + default: + } + + _, err = db.WriterDb.Exec(insertQuery, day, start, end) + return err + }) + } + if err = g.Wait(); err != nil { + utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{ + "day": day, + "columns": columns, + }) + } + logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay)) + } + + logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) +} diff --git a/db/migrations/20231102075500_add_total_withdrawals_stats.sql b/db/migrations/20231102075500_add_total_withdrawals_stats.sql new file mode 100644 index 0000000000..fd33067507 --- /dev/null +++ b/db/migrations/20231102075500_add_total_withdrawals_stats.sql @@ -0,0 +1,13 @@ +-- +goose Up +-- +goose StatementBegin +SELECT 'up SQL query - add total withdrawal count and amount columns to stats'; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +SELECT 'down SQL query - remove total withdrawal count and amount columns from stats'; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total; +-- +goose StatementEnd diff --git a/db/statistics.go b/db/statistics.go index 9bb50c391a..6062ab3c5a 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -212,6 +212,10 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { // update mev reward total data.MEVRewardsWeiTotal = previousDayData.MEVRewardsWeiTotal.Add(data.MEVRewardsWei) + // update withdrawal total + data.WithdrawalsTotal = previousDayData.WithdrawalsTotal + data.Withdrawals + data.WithdrawalsAmountTotal = previousDayData.WithdrawalsAmountTotal + data.WithdrawalsAmount + if statisticsData1d != nil && len(statisticsData1d) > index { data.ClPerformance1d = data.ClRewardsGWeiTotal - statisticsData1d[index].ClRewardsGWeiTotal data.ElPerformance1d = data.ElRewardsWeiTotal.Sub(statisticsData1d[index].ElRewardsWeiTotal) @@ -302,7 +306,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { "deposits", "deposits_amount", "withdrawals", + "withdrawals_total", "withdrawals_amount", + "withdrawals_amount_total", "cl_rewards_gwei", "cl_rewards_gwei_total", "el_rewards_wei", @@ -338,7 +344,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { validatorData[i].Deposits, validatorData[i].DepositsAmount, validatorData[i].Withdrawals, + validatorData[i].WithdrawalsTotal, validatorData[i].WithdrawalsAmount, + validatorData[i].WithdrawalsAmountTotal, validatorData[i].ClRewardsGWei, validatorData[i].ClRewardsGWeiTotal, validatorData[i].ElRewardsWei, @@ -1141,7 +1149,9 @@ func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error COALESCE(deposits, 0) AS deposits, COALESCE(deposits_amount, 0) AS deposits_amount, COALESCE(withdrawals, 0) AS withdrawals, + COALESCE(withdrawals_total, 0) AS withdrawals_total, COALESCE(withdrawals_amount, 0) AS withdrawals_amount, + COALESCE(withdrawals_amount_total, 0) AS withdrawals_amount_total, COALESCE(cl_rewards_gwei, 0) AS cl_rewards_gwei, COALESCE(cl_rewards_gwei_total, 0) AS cl_rewards_gwei_total, COALESCE(el_rewards_wei, 0) AS el_rewards_wei, diff --git a/types/exporter.go b/types/exporter.go index e1b773a62a..6e03595233 100644 --- a/types/exporter.go +++ b/types/exporter.go @@ -645,8 +645,10 @@ type ValidatorStatsTableDbRow struct { Deposits int64 `db:"deposits"` DepositsAmount int64 `db:"deposits_amount"` - Withdrawals int64 `db:"withdrawals"` - WithdrawalsAmount int64 `db:"withdrawals_amount"` + Withdrawals int64 `db:"withdrawals"` + WithdrawalsTotal int64 `db:"withdrawals_total"` + WithdrawalsAmount int64 `db:"withdrawals_amount"` + WithdrawalsAmountTotal int64 `db:"withdrawals_amount_total"` ClRewardsGWei int64 `db:"cl_rewards_gwei"` ClRewardsGWeiTotal int64 `db:"cl_rewards_gwei_total"`