Skip to content

Commit

Permalink
(BIDS-2369) add total withdrawal columns (#2653)
Browse files Browse the repository at this point in the history
  • Loading branch information
LuccaBitfly authored Nov 2, 2023
1 parent 0c45392 commit 120ccfe
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 3 deletions.
120 changes: 119 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var opts = struct {
DataConcurrency uint64
Transformers string
Table string
Columns string
Family string
Key string
ValidatorNameRanges string
Expand All @@ -51,7 +52,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand All @@ -68,6 +69,7 @@ func main() {
flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size")
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})")
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()
Expand Down Expand Up @@ -355,6 +357,8 @@ func main() {
if err != nil {
logrus.Fatal(err)
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
default:
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand Down Expand Up @@ -1002,3 +1006,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) {

logrus.Info("historic price update run completed")
}

func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) {
start := time.Now()
logrus.Infof("exporting stats totals for columns '%v'", columns)

// validate columns input
columnsSlice := strings.Split(columns, ",")
validColumns := []string{
"cl_rewards_gwei_total",
"el_rewards_wei_total",
"mev_rewards_wei_total",
"missed_attestations_total",
"participated_sync_total",
"missed_sync_total",
"orphaned_sync_total",
"withdrawals_total",
"withdrawals_amount_total",
}

OUTER:
for _, c := range columnsSlice {
for _, vc := range validColumns {
if c == vc {
// valid column found, continue to next column from input
continue OUTER
}
}
// no valid column matched, exit with error
utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{
"usedColumn": c,
"validColumns": validColumns,
})
}

// build insert query from input columns
var totalClauses []string
var conflictClauses []string

for _, col := range columnsSlice {
totalClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col)
totalClauses = append(totalClauses, totalClause)

conflictClause := fmt.Sprintf("%s = excluded.%s", col, col)
conflictClauses = append(conflictClauses, conflictClause)
}

insertQuery := fmt.Sprintf(`
INSERT INTO validator_stats (validatorindex, day, %s)
SELECT
vs1.validatorindex,
vs1.day,
%s
FROM validator_stats vs1
LEFT JOIN validator_stats vs2
ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex
WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3
ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`,
strings.Join(columnsSlice, ",\n\t"),
strings.Join(totalClauses, ",\n\t\t"),
strings.Join(conflictClauses, ",\n\t"))

for day := dayStart; day <= dayEnd; day++ {
timeDay := time.Now()
logrus.Infof("exporting total sync and for columns %v for day %v", columns, day)

// get max validator index for day
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1)
maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch)
if err != nil {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: could not get max validator index", 0, map[string]interface{}{
"epoch": firstEpoch,
})
} else if maxValidatorIndex == uint64(0) {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: no validator found", 0, map[string]interface{}{
"epoch": firstEpoch,
})
}

ctx := context.Background()
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))

batchSize := 1000

// insert stats totals for each batch of validators
for b := 0; b <= int(maxValidatorIndex); b += batchSize {
start := b
end := b + batchSize // exclusive
if int(maxValidatorIndex) < end {
end = int(maxValidatorIndex)
}

g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}

_, err = db.WriterDb.Exec(insertQuery, day, start, end)
return err
})
}
if err = g.Wait(); err != nil {
utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{
"day": day,
"columns": columns,
})
}
logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay))
}

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}
13 changes: 13 additions & 0 deletions db/migrations/20231102075500_add_total_withdrawals_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query - add total withdrawal count and amount columns to stats';
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query - remove total withdrawal count and amount columns from stats';
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total;
-- +goose StatementEnd
10 changes: 10 additions & 0 deletions db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
// update mev reward total
data.MEVRewardsWeiTotal = previousDayData.MEVRewardsWeiTotal.Add(data.MEVRewardsWei)

// update withdrawal total
data.WithdrawalsTotal = previousDayData.WithdrawalsTotal + data.Withdrawals
data.WithdrawalsAmountTotal = previousDayData.WithdrawalsAmountTotal + data.WithdrawalsAmount

if statisticsData1d != nil && len(statisticsData1d) > index {
data.ClPerformance1d = data.ClRewardsGWeiTotal - statisticsData1d[index].ClRewardsGWeiTotal
data.ElPerformance1d = data.ElRewardsWeiTotal.Sub(statisticsData1d[index].ElRewardsWeiTotal)
Expand Down Expand Up @@ -302,7 +306,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
"deposits",
"deposits_amount",
"withdrawals",
"withdrawals_total",
"withdrawals_amount",
"withdrawals_amount_total",
"cl_rewards_gwei",
"cl_rewards_gwei_total",
"el_rewards_wei",
Expand Down Expand Up @@ -338,7 +344,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
validatorData[i].Deposits,
validatorData[i].DepositsAmount,
validatorData[i].Withdrawals,
validatorData[i].WithdrawalsTotal,
validatorData[i].WithdrawalsAmount,
validatorData[i].WithdrawalsAmountTotal,
validatorData[i].ClRewardsGWei,
validatorData[i].ClRewardsGWeiTotal,
validatorData[i].ElRewardsWei,
Expand Down Expand Up @@ -1141,7 +1149,9 @@ func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error
COALESCE(deposits, 0) AS deposits,
COALESCE(deposits_amount, 0) AS deposits_amount,
COALESCE(withdrawals, 0) AS withdrawals,
COALESCE(withdrawals_total, 0) AS withdrawals_total,
COALESCE(withdrawals_amount, 0) AS withdrawals_amount,
COALESCE(withdrawals_amount_total, 0) AS withdrawals_amount_total,
COALESCE(cl_rewards_gwei, 0) AS cl_rewards_gwei,
COALESCE(cl_rewards_gwei_total, 0) AS cl_rewards_gwei_total,
COALESCE(el_rewards_wei, 0) AS el_rewards_wei,
Expand Down
6 changes: 4 additions & 2 deletions types/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,10 @@ type ValidatorStatsTableDbRow struct {
Deposits int64 `db:"deposits"`
DepositsAmount int64 `db:"deposits_amount"`

Withdrawals int64 `db:"withdrawals"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
Withdrawals int64 `db:"withdrawals"`
WithdrawalsTotal int64 `db:"withdrawals_total"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
WithdrawalsAmountTotal int64 `db:"withdrawals_amount_total"`

ClRewardsGWei int64 `db:"cl_rewards_gwei"`
ClRewardsGWeiTotal int64 `db:"cl_rewards_gwei_total"`
Expand Down

0 comments on commit 120ccfe

Please sign in to comment.