From 187e69a2cc575077b9b8acec15198d508aca8818 Mon Sep 17 00:00:00 2001 From: LUCCA DUKIC <109136188+LuccaBitfly@users.noreply.github.com> Date: Thu, 28 Sep 2023 10:18:31 +0200 Subject: [PATCH] (BIDS-2369) add total withdrawals to stats --- cmd/misc/main.go | 120 +++++++++++++++++- db/db.go | 10 +- .../temp_add_total_withdrawals_stats.sql | 13 ++ handlers/api.go | 10 +- utils/utils.go | 27 ++++ 5 files changed, 169 insertions(+), 11 deletions(-) create mode 100644 db/migrations/temp_add_total_withdrawals_stats.sql diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 2ee39146c4..0432c1b88a 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -41,6 +41,7 @@ var opts = struct { DataConcurrency uint64 Transformers string Table string + Columns string Family string Key string DryRun bool @@ -48,7 +49,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators, export-stats-totals") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -64,6 +65,7 @@ func main() { flag.Uint64Var(&opts.DataConcurrency, "data.concurrency", 30, "Concurrency to use when indexing data from bigtable") flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size") flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer") + flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command") dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them") versionFlag := flag.Bool("version", false, "Show version and exit") flag.Parse() @@ -310,6 +312,8 @@ func main() { if err != nil { logrus.Fatal(err) } + case "export-stats-totals": + exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) default: utils.LogFatal(nil, "unknown command", 0) } @@ -783,3 +787,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) { logrus.Info("historic price update run completed") } + +func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) { + start := time.Now() + logrus.Infof("exporting stats totals for columns '%v'", columns) + + // validate columns input + columnsSlice := strings.Split(columns, ",") + validColumns := []string{ + "cl_rewards_gwei_total", + "el_rewards_wei_total", + "mev_rewards_wei_total", + "missed_attestations_total", + "participated_sync_total", + "missed_sync_total", + "orphaned_sync_total", + "withdrawals_total", + "withdrawals_amount_total", + } + +OUTER: + for _, c := range columnsSlice { + for _, vc := range validColumns { + if c == vc { + // valid column found, continue to next column from input + continue OUTER + } + } + // no valid column matched, exit with error + utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{ + "usedColumn": c, + "validColumns": validColumns, + }) + } + + // build insert query from input columns + var selectClauses []string + var conflictClauses []string + + for _, col := range columnsSlice { + selectClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col) + selectClauses = append(selectClauses, selectClause) + + conflictClause := fmt.Sprintf("%s_total = excluded.%s", col, col) + conflictClauses = append(conflictClauses, conflictClause) + } + + insertQuery := fmt.Sprintf(` + INSERT INTO validator_stats (%s) + SELECT + vs1.validatorindex, + vs1.day, + %s + FROM validator_stats vs1 + LEFT JOIN validator_stats vs2 + ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex + WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3 + ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`, + strings.Join(columnsSlice, ",\n\t"), + strings.Join(selectClauses, ",\n\t\t"), + strings.Join(conflictClauses, ",\n\t")) + + for day := dayStart; day <= dayEnd; day++ { + timeDay := time.Now() + logrus.Infof("exporting total sync and for columns %v for day %v", columns, day) + + // get max validator index for day + firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1) + maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch) + if err != nil { + utils.LogFatal(err, "error in GetAggregatedValidatorIncomeDetailsHistory: could not get max validator index from validator income history", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } else if maxValidatorIndex == uint64(0) { + utils.LogFatal(err, "error in GetAggregatedValidatorIncomeDetailsHistory: no validator found", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } + + ctx := context.Background() + g, gCtx := errgroup.WithContext(ctx) + g.SetLimit(int(concurrency)) + + batchSize := 1000 + + // insert stats totals for each batch of validators + for b := 0; b <= int(maxValidatorIndex); b += batchSize { + start := b + end := b + batchSize // exclusive + if int(maxValidatorIndex) < end { + end = int(maxValidatorIndex) + } + + g.Go(func() error { + select { + case <-gCtx.Done(): + return gCtx.Err() + default: + } + + _, err = db.WriterDb.Exec(insertQuery, day, start, end) + return err + }) + } + if err = g.Wait(); err != nil { + utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{ + "day": day, + "columns": columns, + }) + } + logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay)) + } + + logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) +} diff --git a/db/db.go b/db/db.go index 11a12567a4..41c1c03b69 100644 --- a/db/db.go +++ b/db/db.go @@ -2646,18 +2646,18 @@ func GetTotalWithdrawalsCount(validators []uint64) (uint64, error) { err = ReaderDb.Get(&count, ` WITH today AS ( - SELECT COUNT(*) as count_today + SELECT COUNT(*) as count FROM blocks_withdrawals w INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1' WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2 ), stats AS ( - SELECT COALESCE(SUM(withdrawals), 0) as total_count + SELECT COALESCE(SUM(withdrawals_total), 0) as count FROM validator_stats - WHERE validatorindex = ANY($1) + WHERE validatorindex = ANY($1) AND day = $3 ) - SELECT today.count_today + stats.total_count - FROM today, stats;`, validatorFilter, cutoffSlot) + SELECT today.count + stats.count + FROM today, stats;`, validatorFilter, cutoffSlot, lastExportedDay) if err != nil { if err == sql.ErrNoRows { return 0, nil diff --git a/db/migrations/temp_add_total_withdrawals_stats.sql b/db/migrations/temp_add_total_withdrawals_stats.sql new file mode 100644 index 0000000000..a8da6fd71e --- /dev/null +++ b/db/migrations/temp_add_total_withdrawals_stats.sql @@ -0,0 +1,13 @@ +-- +goose Up +-- +goose StatementBegin +SELECT 'up SQL query - add total withdrawal count and amount columns to stats'; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT; +ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +SELECT 'down SQL query - remove total withdrawal count and amount columns from stats'; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total; +ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount; +-- +goose StatementEnd diff --git a/handlers/api.go b/handlers/api.go index 16861f77e1..066d06199f 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -1562,7 +1562,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) { WITH today AS ( SELECT w.validatorindex, - COALESCE(SUM(w.amount), 0) as amount_today + COALESCE(SUM(w.amount), 0) as amount FROM blocks_withdrawals w INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1' WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2 @@ -1571,15 +1571,15 @@ func apiValidator(w http.ResponseWriter, r *http.Request) { stats AS ( SELECT vs.validatorindex, - COALESCE(SUM(vs.withdrawals_amount), 0) as total_amount + COALESCE(SUM(vs.withdrawals_amount_total), 0) as amount FROM validator_stats vs - WHERE vs.validatorindex = ANY($1) + WHERE vs.validatorindex = ANY($1) AND day = $3 GROUP BY vs.validatorindex ), withdrawals_summary AS ( SELECT COALESCE(t.validatorindex, s.validatorindex) as validatorindex, - COALESCE(t.amount_today, 0) + COALESCE(s.total_amount, 0) as total + COALESCE(t.amount, 0) + COALESCE(s.amount, 0) as total FROM today t FULL JOIN stats s ON t.validatorindex = s.validatorindex ) @@ -1598,7 +1598,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) { LEFT JOIN withdrawals_summary ws ON ws.validatorindex = v.validatorindex WHERE v.validatorindex = ANY($1) ORDER BY v.validatorindex; - `, pq.Array(queryIndices), cutoffSlot) + `, pq.Array(queryIndices), cutoffSlot, lastExportedDay) if err != nil { logger.Warnf("error retrieving validator data from db: %v", err) sendErrorResponse(w, r.URL.String(), "could not retrieve db results") diff --git a/utils/utils.go b/utils/utils.go index 419536a6af..91fe3d15d1 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -26,6 +26,7 @@ import ( "os" "os/signal" "path/filepath" + "reflect" "regexp" "runtime" "sort" @@ -1571,3 +1572,29 @@ func GetCurrentFuncName() string { pc, _, _, _ := runtime.Caller(1) return runtime.FuncForPC(pc).Name() } + +// CheckAndInverse checks if all bool fields of a struct are true and returns the struct with all bool fields inverted. +// +// Parameters: +// - `v` : the struct to check, should only contain bool fields +// +// Returns: +// - `bool` : true if all bool fields are true +// - `interface{}` : the struct with all bool fields inverted +func CheckAndInverse(v interface{}) (bool, interface{}) { + val := reflect.ValueOf(v) + allTrue := true + inverse := reflect.New(val.Type()).Elem() + + for i := 0; i < val.NumField(); i++ { + field := val.Field(i) + if field.Kind() == reflect.Bool { + if !field.Bool() { + allTrue = false + } + inverse.Field(i).SetBool(!field.Bool()) + } + } + + return allTrue, inverse.Interface() +}