Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(BIDS-2369) add total withdrawal columns #2653

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 119 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var opts = struct {
DataConcurrency uint64
Transformers string
Table string
Columns string
Family string
Key string
ValidatorNameRanges string
Expand All @@ -51,7 +52,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand All @@ -68,6 +69,7 @@ func main() {
flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size")
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})")
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()
Expand Down Expand Up @@ -355,6 +357,8 @@ func main() {
if err != nil {
logrus.Fatal(err)
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
default:
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand Down Expand Up @@ -1002,3 +1006,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) {

logrus.Info("historic price update run completed")
}

func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) {
start := time.Now()
logrus.Infof("exporting stats totals for columns '%v'", columns)

// validate columns input
columnsSlice := strings.Split(columns, ",")
validColumns := []string{
"cl_rewards_gwei_total",
"el_rewards_wei_total",
"mev_rewards_wei_total",
"missed_attestations_total",
"participated_sync_total",
"missed_sync_total",
"orphaned_sync_total",
"withdrawals_total",
"withdrawals_amount_total",
}

OUTER:
for _, c := range columnsSlice {
for _, vc := range validColumns {
if c == vc {
// valid column found, continue to next column from input
continue OUTER
}
}
// no valid column matched, exit with error
utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{
"usedColumn": c,
"validColumns": validColumns,
})
}

// build insert query from input columns
var totalClauses []string
var conflictClauses []string

for _, col := range columnsSlice {
totalClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col)
totalClauses = append(totalClauses, totalClause)

conflictClause := fmt.Sprintf("%s = excluded.%s", col, col)
conflictClauses = append(conflictClauses, conflictClause)
}

insertQuery := fmt.Sprintf(`
INSERT INTO validator_stats (validatorindex, day, %s)
SELECT
vs1.validatorindex,
vs1.day,
%s
FROM validator_stats vs1
LEFT JOIN validator_stats vs2
ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex
WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3
ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`,
strings.Join(columnsSlice, ",\n\t"),
strings.Join(totalClauses, ",\n\t\t"),
strings.Join(conflictClauses, ",\n\t"))

for day := dayStart; day <= dayEnd; day++ {
timeDay := time.Now()
logrus.Infof("exporting total sync and for columns %v for day %v", columns, day)

// get max validator index for day
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1)
maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch)
if err != nil {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: could not get max validator index", 0, map[string]interface{}{
"epoch": firstEpoch,
})
} else if maxValidatorIndex == uint64(0) {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: no validator found", 0, map[string]interface{}{
"epoch": firstEpoch,
})
}

ctx := context.Background()
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))

batchSize := 1000

// insert stats totals for each batch of validators
for b := 0; b <= int(maxValidatorIndex); b += batchSize {
start := b
end := b + batchSize // exclusive
if int(maxValidatorIndex) < end {
end = int(maxValidatorIndex)
}

g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}

_, err = db.WriterDb.Exec(insertQuery, day, start, end)
return err
})
}
if err = g.Wait(); err != nil {
utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{
"day": day,
"columns": columns,
})
}
logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay))
}

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}
13 changes: 13 additions & 0 deletions db/migrations/20231102075500_add_total_withdrawals_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query - add total withdrawal count and amount columns to stats';
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query - remove total withdrawal count and amount columns from stats';
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total;
-- +goose StatementEnd
10 changes: 10 additions & 0 deletions db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
// update mev reward total
data.MEVRewardsWeiTotal = previousDayData.MEVRewardsWeiTotal.Add(data.MEVRewardsWei)

// update withdrawal total
data.WithdrawalsTotal = previousDayData.WithdrawalsTotal + data.Withdrawals
data.WithdrawalsAmountTotal = previousDayData.WithdrawalsAmountTotal + data.WithdrawalsAmount

if statisticsData1d != nil && len(statisticsData1d) > index {
data.ClPerformance1d = data.ClRewardsGWeiTotal - statisticsData1d[index].ClRewardsGWeiTotal
data.ElPerformance1d = data.ElRewardsWeiTotal.Sub(statisticsData1d[index].ElRewardsWeiTotal)
Expand Down Expand Up @@ -302,7 +306,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
"deposits",
"deposits_amount",
"withdrawals",
"withdrawals_total",
"withdrawals_amount",
"withdrawals_amount_total",
"cl_rewards_gwei",
"cl_rewards_gwei_total",
"el_rewards_wei",
Expand Down Expand Up @@ -338,7 +344,9 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error {
validatorData[i].Deposits,
validatorData[i].DepositsAmount,
validatorData[i].Withdrawals,
validatorData[i].WithdrawalsTotal,
validatorData[i].WithdrawalsAmount,
validatorData[i].WithdrawalsAmountTotal,
validatorData[i].ClRewardsGWei,
validatorData[i].ClRewardsGWeiTotal,
validatorData[i].ElRewardsWei,
Expand Down Expand Up @@ -1141,7 +1149,9 @@ func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error
COALESCE(deposits, 0) AS deposits,
COALESCE(deposits_amount, 0) AS deposits_amount,
COALESCE(withdrawals, 0) AS withdrawals,
COALESCE(withdrawals_total, 0) AS withdrawals_total,
COALESCE(withdrawals_amount, 0) AS withdrawals_amount,
COALESCE(withdrawals_amount_total, 0) AS withdrawals_amount_total,
COALESCE(cl_rewards_gwei, 0) AS cl_rewards_gwei,
COALESCE(cl_rewards_gwei_total, 0) AS cl_rewards_gwei_total,
COALESCE(el_rewards_wei, 0) AS el_rewards_wei,
Expand Down
6 changes: 4 additions & 2 deletions types/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,10 @@ type ValidatorStatsTableDbRow struct {
Deposits int64 `db:"deposits"`
DepositsAmount int64 `db:"deposits_amount"`

Withdrawals int64 `db:"withdrawals"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
Withdrawals int64 `db:"withdrawals"`
WithdrawalsTotal int64 `db:"withdrawals_total"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
WithdrawalsAmountTotal int64 `db:"withdrawals_amount_total"`

ClRewardsGWei int64 `db:"cl_rewards_gwei"`
ClRewardsGWeiTotal int64 `db:"cl_rewards_gwei_total"`
Expand Down
Loading