Skip to content

Commit

Permalink
(BIDS-2369) add total withdrawals to stats
Browse files Browse the repository at this point in the history
  • Loading branch information
LuccaBitfly committed Sep 28, 2023
1 parent 6cc9381 commit 187e69a
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 11 deletions.
120 changes: 119 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ var opts = struct {
DataConcurrency uint64
Transformers string
Table string
Columns string
Family string
Key string
DryRun bool
}{}

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators, export-stats-totals")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand All @@ -64,6 +65,7 @@ func main() {
flag.Uint64Var(&opts.DataConcurrency, "data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size")
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()
Expand Down Expand Up @@ -310,6 +312,8 @@ func main() {
if err != nil {
logrus.Fatal(err)
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
default:
utils.LogFatal(nil, "unknown command", 0)
}
Expand Down Expand Up @@ -783,3 +787,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) {

logrus.Info("historic price update run completed")
}

func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) {
start := time.Now()
logrus.Infof("exporting stats totals for columns '%v'", columns)

// validate columns input
columnsSlice := strings.Split(columns, ",")
validColumns := []string{
"cl_rewards_gwei_total",
"el_rewards_wei_total",
"mev_rewards_wei_total",
"missed_attestations_total",
"participated_sync_total",
"missed_sync_total",
"orphaned_sync_total",
"withdrawals_total",
"withdrawals_amount_total",
}

OUTER:
for _, c := range columnsSlice {
for _, vc := range validColumns {
if c == vc {
// valid column found, continue to next column from input
continue OUTER
}
}
// no valid column matched, exit with error
utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{
"usedColumn": c,
"validColumns": validColumns,
})
}

// build insert query from input columns
var selectClauses []string
var conflictClauses []string

for _, col := range columnsSlice {
selectClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col)
selectClauses = append(selectClauses, selectClause)

conflictClause := fmt.Sprintf("%s_total = excluded.%s", col, col)
conflictClauses = append(conflictClauses, conflictClause)
}

insertQuery := fmt.Sprintf(`
INSERT INTO validator_stats (%s)
SELECT
vs1.validatorindex,
vs1.day,
%s
FROM validator_stats vs1
LEFT JOIN validator_stats vs2
ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex
WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3
ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`,
strings.Join(columnsSlice, ",\n\t"),
strings.Join(selectClauses, ",\n\t\t"),
strings.Join(conflictClauses, ",\n\t"))

for day := dayStart; day <= dayEnd; day++ {
timeDay := time.Now()
logrus.Infof("exporting total sync and for columns %v for day %v", columns, day)

// get max validator index for day
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1)
maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch)
if err != nil {
utils.LogFatal(err, "error in GetAggregatedValidatorIncomeDetailsHistory: could not get max validator index from validator income history", 0, map[string]interface{}{
"epoch": firstEpoch,
})
} else if maxValidatorIndex == uint64(0) {
utils.LogFatal(err, "error in GetAggregatedValidatorIncomeDetailsHistory: no validator found", 0, map[string]interface{}{
"epoch": firstEpoch,
})
}

ctx := context.Background()
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))

batchSize := 1000

// insert stats totals for each batch of validators
for b := 0; b <= int(maxValidatorIndex); b += batchSize {
start := b
end := b + batchSize // exclusive
if int(maxValidatorIndex) < end {
end = int(maxValidatorIndex)
}

g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}

_, err = db.WriterDb.Exec(insertQuery, day, start, end)
return err
})
}
if err = g.Wait(); err != nil {
utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{
"day": day,
"columns": columns,
})
}
logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay))
}

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}
10 changes: 5 additions & 5 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2646,18 +2646,18 @@ func GetTotalWithdrawalsCount(validators []uint64) (uint64, error) {

err = ReaderDb.Get(&count, `
WITH today AS (
SELECT COUNT(*) as count_today
SELECT COUNT(*) as count
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2
),
stats AS (
SELECT COALESCE(SUM(withdrawals), 0) as total_count
SELECT COALESCE(SUM(withdrawals_total), 0) as count
FROM validator_stats
WHERE validatorindex = ANY($1)
WHERE validatorindex = ANY($1) AND day = $3
)
SELECT today.count_today + stats.total_count
FROM today, stats;`, validatorFilter, cutoffSlot)
SELECT today.count + stats.count
FROM today, stats;`, validatorFilter, cutoffSlot, lastExportedDay)
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
Expand Down
13 changes: 13 additions & 0 deletions db/migrations/temp_add_total_withdrawals_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query - add total withdrawal count and amount columns to stats';
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query - remove total withdrawal count and amount columns from stats';
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount;
-- +goose StatementEnd
10 changes: 5 additions & 5 deletions handlers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1562,7 +1562,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
WITH today AS (
SELECT
w.validatorindex,
COALESCE(SUM(w.amount), 0) as amount_today
COALESCE(SUM(w.amount), 0) as amount
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2
Expand All @@ -1571,15 +1571,15 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
stats AS (
SELECT
vs.validatorindex,
COALESCE(SUM(vs.withdrawals_amount), 0) as total_amount
COALESCE(SUM(vs.withdrawals_amount_total), 0) as amount
FROM validator_stats vs
WHERE vs.validatorindex = ANY($1)
WHERE vs.validatorindex = ANY($1) AND day = $3
GROUP BY vs.validatorindex
),
withdrawals_summary AS (
SELECT
COALESCE(t.validatorindex, s.validatorindex) as validatorindex,
COALESCE(t.amount_today, 0) + COALESCE(s.total_amount, 0) as total
COALESCE(t.amount, 0) + COALESCE(s.amount, 0) as total
FROM today t
FULL JOIN stats s ON t.validatorindex = s.validatorindex
)
Expand All @@ -1598,7 +1598,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
LEFT JOIN withdrawals_summary ws ON ws.validatorindex = v.validatorindex
WHERE v.validatorindex = ANY($1)
ORDER BY v.validatorindex;
`, pq.Array(queryIndices), cutoffSlot)
`, pq.Array(queryIndices), cutoffSlot, lastExportedDay)
if err != nil {
logger.Warnf("error retrieving validator data from db: %v", err)
sendErrorResponse(w, r.URL.String(), "could not retrieve db results")
Expand Down
27 changes: 27 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"os/signal"
"path/filepath"
"reflect"
"regexp"
"runtime"
"sort"
Expand Down Expand Up @@ -1571,3 +1572,29 @@ func GetCurrentFuncName() string {
pc, _, _, _ := runtime.Caller(1)
return runtime.FuncForPC(pc).Name()
}

// CheckAndInverse checks if all bool fields of a struct are true and returns the struct with all bool fields inverted.
//
// Parameters:
// - `v` : the struct to check, should only contain bool fields
//
// Returns:
// - `bool` : true if all bool fields are true
// - `interface{}` : the struct with all bool fields inverted
func CheckAndInverse(v interface{}) (bool, interface{}) {
val := reflect.ValueOf(v)
allTrue := true
inverse := reflect.New(val.Type()).Elem()

for i := 0; i < val.NumField(); i++ {
field := val.Field(i)
if field.Kind() == reflect.Bool {
if !field.Bool() {
allTrue = false
}
inverse.Field(i).SetBool(!field.Bool())
}
}

return allTrue, inverse.Interface()
}

0 comments on commit 187e69a

Please sign in to comment.