From b3baa4a2cceef9f0952071269e79d015435b8240 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Thu, 9 Nov 2023 12:46:27 +0100 Subject: [PATCH 01/11] (NOBIDS) fix sync committee committee_index order --- exporter/sync_committees.go | 13 ++----------- handlers/api.go | 31 +++++++++++++++++++++---------- handlers/validator.go | 2 +- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/exporter/sync_committees.go b/exporter/sync_committees.go index ca140a8263..a8941b05d6 100644 --- a/exporter/sync_committees.go +++ b/exporter/sync_committees.go @@ -75,6 +75,8 @@ func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { logger.Infof("exporting sync committee assignments for period %v (epoch %v to %v)", p, firstEpoch, lastEpoch) + // Note that the order we receive the validators from the node in is crucial + // and determines which bit reflects them in the block sync aggregate bits c, err := rpcClient.GetSyncCommittee(fmt.Sprintf("%d", stateID), epoch) if err != nil { return err @@ -89,17 +91,6 @@ func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { validatorsU64[i] = idxU64 } - dedupMap := make(map[uint64]bool) - - for _, validator := range validatorsU64 { - dedupMap[validator] = true - } - - validatorsU64 = make([]uint64, 0, len(dedupMap)) - for validator := range dedupMap { - validatorsU64 = append(validatorsU64, validator) - } - // start := time.Now() // // firstSlot := firstEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch diff --git a/handlers/api.go b/handlers/api.go index 1b51248f1b..823a6956d1 100644 --- a/handlers/api.go +++ b/handlers/api.go @@ -732,6 +732,8 @@ func ApiSyncCommittee(w http.ResponseWriter, r *http.Request) { period = utils.SyncPeriodOfEpoch(services.LatestEpoch()) + 1 } + // Beware that we do not deduplicate here since a validator can be part multiple times of the same sync committee period + // and the order of the committeeindex is important, deduplicating it would mess up the order rows, err := db.ReaderDb.Query(`SELECT period, GREATEST(period*$2, $3) AS start_epoch, ((period+1)*$2)-1 AS end_epoch, ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators FROM sync_committees WHERE period = $1 GROUP BY period`, period, utils.Config.Chain.ClConfig.EpochsPerSyncCommitteePeriod, utils.Config.Chain.ClConfig.AltairForkEpoch) if err != nil { logger.WithError(err).WithField("url", r.URL.String()).Errorf("error querying db") @@ -1020,15 +1022,24 @@ func ApiDashboard(w http.ResponseWriter, r *http.Request) { } func getSyncCommitteeInfoForValidators(validators []uint64, period uint64) ([]interface{}, error) { - rows, err := db.ReaderDb.Query( - `SELECT - period, - GREATEST(period*$3, $4) AS start_epoch, - ((period+1)*$3)-1 AS end_epoch, - ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators - FROM sync_committees - WHERE period = $1 AND validatorindex = ANY($2) - GROUP BY period`, + rows, err := db.ReaderDb.Query(` + WITH + data as ( + SELECT + period, + validatorindex, + max(committeeindex) as committeeindex + FROM sync_committees + WHERE period = $1 AND validatorindex = ANY($2) + group by period, validatorindex + ) + SELECT + period, + GREATEST(period*$3, $4) AS start_epoch, + ((period+1)*$3)-1 AS end_epoch, + ARRAY_AGG(validatorindex ORDER BY committeeindex) AS validators + FROM data + group by period;`, period, pq.Array(validators), utils.Config.Chain.ClConfig.EpochsPerSyncCommitteePeriod, utils.Config.Chain.ClConfig.AltairForkEpoch, ) @@ -1218,7 +1229,7 @@ func getSyncCommitteeSlotsStatistics(validators []uint64, epoch uint64) (types.S Validators pq.Int64Array `db:"validators"` } query, args, err := sqlx.In(` - SELECT period, COALESCE(ARRAY_AGG(validatorindex), '{}') AS validators + SELECT period, COALESCE(ARRAY_AGG(distinct validatorindex), '{}') AS validators FROM sync_committees WHERE period IN (?) AND validatorindex IN (?) GROUP BY period diff --git a/handlers/validator.go b/handlers/validator.go index 0dfc3849ee..5a1caa7be2 100644 --- a/handlers/validator.go +++ b/handlers/validator.go @@ -1992,7 +1992,7 @@ func ValidatorSync(w http.ResponseWriter, r *http.Request) { var syncPeriods []uint64 = []uint64{} err = db.ReaderDb.Select(&syncPeriods, ` - SELECT period + SELECT distinct period FROM sync_committees WHERE validatorindex = $1 ORDER BY period desc`, validatorIndex) From 859a441ff7e284becd60f2ed909514d53a6c7d6b Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 13 Nov 2023 16:19:21 +0100 Subject: [PATCH 02/11] (NOBIDS) misc tool to correct data --- cmd/misc/main.go | 274 +++++++++++++++++++++++++++++++++++- db/statistics.go | 26 ++-- exporter/sync_committees.go | 97 ++++++++----- 3 files changed, 346 insertions(+), 51 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index ac265938cc..ecdba6b175 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -12,14 +12,17 @@ import ( "eth2-exporter/utils" "eth2-exporter/version" "fmt" + "math" "math/big" "net/http" "strconv" "strings" + "sync" "time" "github.com/coocood/freecache" _ "github.com/jackc/pgx/v5/stdlib" + "github.com/pkg/errors" utilMath "github.com/protolambda/zrnt/eth2/util/math" "golang.org/x/sync/errgroup" @@ -52,7 +55,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee, export-sync-committee-ph2") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -359,6 +362,10 @@ func main() { } case "export-stats-totals": exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) + case "export-sync-committee": + exportSyncCommittee(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, false) + case "export-sync-committee-ph2": + exportSyncCommittee(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, true) case "fix-exec-transactions-count": err = fixExecTransactionsCount() default: @@ -1233,3 +1240,268 @@ OUTER: logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) } + +func exportSyncCommittee(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun, skipPhase1 bool) { + var currEpoch = uint64(0) + + firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) + if endDay <= 0 { + currEpoch := services.LatestFinalizedEpoch() + if currEpoch > 0 { // guard against underflows + currEpoch = currEpoch - 1 + } + } else { + _, lastEpoch := utils.GetFirstAndLastEpochForDay(endDay) + currEpoch = lastEpoch + } + + if startDay > 0 { + firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay) + firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch) + } + + lastPeriod := utils.SyncPeriodOfEpoch(uint64(currEpoch)) + 1 // we can look into the future + + if !skipPhase1 { + logrus.Infof("Phase 1: Re exporting data for sync_committee table") + for p := firstPeriod; p <= lastPeriod; p++ { + t0 := time.Now() + + err := reExportSyncCommittee(rpcClient, p, dryRun) + if err != nil { + if strings.Contains(err.Error(), "not found 404") { + logrus.WithField("period", p).Infof("reached max period, stopping") + break + } else { + logrus.WithError(err).WithField("period", p).Errorf("error re-exporting sync_committee") + return + } + } + + logrus.WithFields(logrus.Fields{ + "period": p, + "epoch": utils.FirstEpochOfSyncPeriod(p), + "duration": time.Since(t0), + }).Infof("re-exported sync_committee") + } + } + + logrus.Infof("Phase 2: Updating validator_stats table") + start := time.Now() + + epochsPerDay := utils.EpochsPerDay() + if currEpoch < epochsPerDay { + logrus.Infof("skipping exporting stats, first day has not been indexed yet") + return + } + currentDay := currEpoch / epochsPerDay + previousDay := currentDay - 1 + + for day := startDay; day <= previousDay; day++ { + startDay := time.Now() + err := UpdateValidatorStatisticsSyncData(bt, day, rpcClient, dryRun) + if err != nil { + utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0) + break + } + + logrus.Infof("finished updating validators_stats for day %v, took %v", day, time.Since(startDay)) + } + + logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, previousDay, time.Since(start)) +} + +func UpdateValidatorStatisticsSyncData(bt *db.Bigtable, day uint64, client rpc.Client, dryRun bool) error { + exportStart := time.Now() + firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) + + logrus.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch) + + if err := db.CheckIfDayIsFinalized(day); err != nil && !dryRun { + return err + } + + logrus.Infof("getting exported state for day %v", day) + + var err error + maxValidatorIndex := uint64(1999999) //bt.GetMaxValidatorindexForEpoch(lastEpoch) + // if err != nil { + // logrus.Errorf("error getting max validator index for epoch %v: %v", lastEpoch, err) + // return err + // } + validators := make([]uint64, 0, maxValidatorIndex) + validatorData := make([]*types.ValidatorStatsTableDbRow, 0, maxValidatorIndex) + validatorDataMux := &sync.Mutex{} + + logrus.Infof("processing statistics for validators 0-%d", maxValidatorIndex) + for i := uint64(0); i <= maxValidatorIndex; i++ { + validators = append(validators, i) + validatorData = append(validatorData, &types.ValidatorStatsTableDbRow{ + ValidatorIndex: i, + Day: int64(day), + }) + } + + g := &errgroup.Group{} + + g.Go(func() error { + if err := db.GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { + return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err) + } + return nil + }) + + var statisticsData1d []*types.ValidatorStatsTableDbRow + g.Go(func() error { + var err error + statisticsData1d, err = db.GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows + if err != nil { + return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) + } + return nil + }) + + err = g.Wait() + if err != nil { + return err + } + + onlySyncValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData)) + for index := range validatorData { + + if validatorData[index].ParticipatedSync > 0 || validatorData[index].MissedSync > 0 || validatorData[index].OrphanedSync > 0 { + onlySyncValidatorData = append(onlySyncValidatorData, validatorData[index]) + } + } + + if len(onlySyncValidatorData) == 0 { + return nil // no sync committee yet skip + } + + logrus.Infof("statistics data collection for day %v completed", day) + + // calculate cl income data & update totals + for _, data := range onlySyncValidatorData { + + previousDayData := &types.ValidatorStatsTableDbRow{ + ValidatorIndex: math.MaxUint64, + } + + if day == 0 { + previousDayData.ValidatorIndex = data.ValidatorIndex + } + + if data.ValidatorIndex < uint64(len(statisticsData1d)) && day > 0 { + previousDayData = statisticsData1d[data.ValidatorIndex] + } + + if data.ValidatorIndex != previousDayData.ValidatorIndex { + return fmt.Errorf("logic error when retrieving previous day data for validator %v (%v wanted, %v retrieved)", data.ValidatorIndex, data.ValidatorIndex, previousDayData.ValidatorIndex) + } + + // update sync total + data.ParticipatedSyncTotal = previousDayData.ParticipatedSyncTotal + data.ParticipatedSync + data.MissedSyncTotal = previousDayData.MissedSyncTotal + data.MissedSync + data.OrphanedSyncTotal = previousDayData.OrphanedSyncTotal + data.OrphanedSync + } + + var statisticsDataToday []*types.ValidatorStatsTableDbRow + if dryRun { + var err error + statisticsDataToday, err = db.GatherStatisticsForDay(int64(day)) // convert to int64 to avoid underflows + if err != nil { + return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) + } + } + + tx, err := db.WriterDb.Beginx() + if err != nil { + return fmt.Errorf("error retrieving raw sql connection: %w", err) + } + defer tx.Rollback() + + logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncValidatorData), len(validatorData)) + + for _, data := range onlySyncValidatorData { + if dryRun { + logrus.Infof( + "validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v, total participated: %v -> %v, total missed sync: %v -> %v, total orphaned sync: %v -> %v", + data.ValidatorIndex, statisticsDataToday[data.ValidatorIndex].ParticipatedSync, data.ParticipatedSync, statisticsDataToday[data.ValidatorIndex].MissedSync, data.MissedSync, statisticsDataToday[data.ValidatorIndex].OrphanedSync, + data.OrphanedSync, statisticsDataToday[data.ValidatorIndex].ParticipatedSyncTotal, data.ParticipatedSyncTotal, statisticsDataToday[data.ValidatorIndex].MissedSyncTotal, data.MissedSyncTotal, statisticsDataToday[data.ValidatorIndex].OrphanedSyncTotal, data.OrphanedSyncTotal, + ) + } else { + tx.Exec(` + UPDATE validator_stats set + participated_sync = $1, + participated_sync_total = $2, + missed_sync = $3, + missed_sync_total = $4, + orphaned_sync = $5, + orphaned_sync_total = $6 + WHERE day = $7 AND validatorindex = $8`, + data.ParticipatedSync, data.ParticipatedSyncTotal, + data.MissedSync, data.MissedSyncTotal, + data.OrphanedSync, data.OrphanedSyncTotal, + data.Day, data.ValidatorIndex) + } + } + + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return fmt.Errorf("error during statistics data insert: %w", err) + } + + logrus.Infof("statistics sync re-export of day %v completed, took %v", day, time.Since(exportStart)) + return nil +} + +func reExportSyncCommittee(rpcClient rpc.Client, p uint64, dryRun bool) error { + if dryRun { + var currentData []struct { + ValidatorIndex uint64 `db:"validatorindex"` + CommitteeIndex uint64 `db:"committeeindex"` + } + + err := db.WriterDb.Select(¤tData, `SELECT validatorindex, committeeindex FROM sync_committees WHERE period = $1`, p) + if err != nil { + return errors.Wrap(err, "select old entries") + } + + newData, err := exporter.GetSyncCommitteAtPeriod(rpcClient, p) + if err != nil { + return errors.Wrap(err, "export") + } + + // now we compare currentData with newData and print any difference in committeeindex + for _, d := range currentData { + for _, n := range newData { + if d.ValidatorIndex == n.ValidatorIndex && d.CommitteeIndex != n.CommitteeIndex { + logrus.Infof("validator %v has different committeeindex: %v -> %v", d.ValidatorIndex, d.CommitteeIndex, n.CommitteeIndex) + } + } + } + return nil + } else { + tx, err := db.WriterDb.Beginx() + if err != nil { + return errors.Wrap(err, "tx") + } + + defer tx.Rollback() + _, err = tx.Exec(`DELETE FROM sync_committees WHERE period = $1`, p) + if err != nil { + return errors.Wrap(err, "delete old entries") + } + + err = exporter.ExportSyncCommitteeAtPeriod(rpcClient, p, tx) + if err != nil { + return errors.Wrap(err, "export") + } + + return tx.Commit() + } +} diff --git a/db/statistics.go b/db/statistics.go index 6062ab3c5a..8f00c72717 100644 --- a/db/statistics.go +++ b/db/statistics.go @@ -36,7 +36,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { logger.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch) - if err := checkIfDayIsFinalized(day); err != nil { + if err := CheckIfDayIsFinalized(day); err != nil { return err } @@ -105,7 +105,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { return nil }) g.Go(func() error { - if err := gatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { + if err := GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err) } return nil @@ -138,7 +138,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData1d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData1d, err = gatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows + statisticsData1d, err = GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -147,7 +147,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData7d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData7d, err = gatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows + statisticsData7d, err = GatherStatisticsForDay(int64(day) - 7) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -156,7 +156,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData31d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData31d, err = gatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows + statisticsData31d, err = GatherStatisticsForDay(int64(day) - 31) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -165,7 +165,7 @@ func WriteValidatorStatisticsForDay(day uint64, client rpc.Client) error { var statisticsData365d []*types.ValidatorStatsTableDbRow g.Go(func() error { var err error - statisticsData31d, err = gatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows + statisticsData31d, err = GatherStatisticsForDay(int64(day) - 365) // convert to int64 to avoid underflows if err != nil { return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) } @@ -884,7 +884,7 @@ func gatherValidatorDepositWithdrawals(day uint64, data []*types.ValidatorStatsT return nil } -func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error { +func GatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*types.ValidatorStatsTableDbRow, mux *sync.Mutex) error { exportStart := time.Now() defer func() { metrics.TaskDuration.WithLabelValues("db_update_validator_sync_stats").Observe(time.Since(exportStart).Seconds()) @@ -898,9 +898,11 @@ func gatherValidatorSyncDutiesForDay(validators []uint64, day uint64, data []*ty return nil } logger := logger.WithFields(logrus.Fields{ - "day": day, - "firstEpoch": firstEpoch, - "lastEpoch": lastEpoch, + "day": day, + "firstEpoch": firstEpoch, + "lastEpoch": lastEpoch, + "startPeriod": utils.SyncPeriodOfEpoch(firstEpoch), + "endPeriod": utils.SyncPeriodOfEpoch(lastEpoch), }) logger.Infof("gathering sync duties") @@ -1105,7 +1107,7 @@ func gatherValidatorMissedAttestationsStatisticsForDay(validators []uint64, day return nil } -func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) { +func GatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error) { if day < 0 { return nil, nil @@ -1790,7 +1792,7 @@ func WriteGraffitiStatisticsForDay(day int64) error { return nil } -func checkIfDayIsFinalized(day uint64) error { +func CheckIfDayIsFinalized(day uint64) error { _, lastEpoch := utils.GetFirstAndLastEpochForDay(day) latestFinalizedEpoch, err := GetLatestFinalizedEpoch() diff --git a/exporter/sync_committees.go b/exporter/sync_committees.go index a8941b05d6..e92733e3af 100644 --- a/exporter/sync_committees.go +++ b/exporter/sync_committees.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -44,7 +45,7 @@ func exportSyncCommittees(rpcClient rpc.Client) error { _, exists := dbPeriodsMap[p] if !exists { t0 := time.Now() - err = exportSyncCommitteeAtPeriod(rpcClient, p) + err = ExportSyncCommitteeAtPeriod(rpcClient, p, nil) if err != nil { return fmt.Errorf("error exporting sync-committee at period %v: %w", p, err) } @@ -58,7 +59,48 @@ func exportSyncCommittees(rpcClient rpc.Client) error { return nil } -func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { +func ExportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64, providedTx *sqlx.Tx) error { + + data, err := GetSyncCommitteAtPeriod(rpcClient, p) + if err != nil { + return err + } + + tx := providedTx + if tx == nil { + tx, err = db.WriterDb.Beginx() + if err != nil { + return err + } + defer tx.Rollback() + } + + nArgs := 3 + valueArgs := make([]interface{}, len(data)*nArgs) + valueIds := make([]string, len(data)) + for i, entry := range data { + valueArgs[i*nArgs+0] = entry.Period + valueArgs[i*nArgs+1] = entry.ValidatorIndex + valueArgs[i*nArgs+2] = entry.CommitteeIndex + valueIds[i] = fmt.Sprintf("($%d,$%d,$%d)", i*nArgs+1, i*nArgs+2, i*nArgs+3) + } + _, err = tx.Exec( + fmt.Sprintf(` + INSERT INTO sync_committees (period, validatorindex, committeeindex) + VALUES %s ON CONFLICT (period, validatorindex, committeeindex) DO NOTHING`, + strings.Join(valueIds, ",")), + valueArgs...) + if err != nil { + return err + } + + if providedTx == nil { + return tx.Commit() + } + return nil +} + +func GetSyncCommitteAtPeriod(rpcClient rpc.Client, p uint64) ([]SyncCommittee, error) { stateID := uint64(0) if p > 0 { @@ -79,53 +121,32 @@ func exportSyncCommitteeAtPeriod(rpcClient rpc.Client, p uint64) error { // and determines which bit reflects them in the block sync aggregate bits c, err := rpcClient.GetSyncCommittee(fmt.Sprintf("%d", stateID), epoch) if err != nil { - return err + return nil, err } validatorsU64 := make([]uint64, len(c.Validators)) for i, idxStr := range c.Validators { idxU64, err := strconv.ParseUint(idxStr, 10, 64) if err != nil { - return err + return nil, err } validatorsU64[i] = idxU64 } - // start := time.Now() - // - // firstSlot := firstEpoch * utils.Config.Chain.ClConfig.SlotsPerEpoch - // lastSlot := lastEpoch*utils.Config.Chain.ClConfig.SlotsPerEpoch + utils.Config.Chain.ClConfig.SlotsPerEpoch - 1 - // logger.Infof("exporting sync committee assignments for period %v (epoch %v to %v, slot %v to %v) to bigtable", p, firstEpoch, lastEpoch, firstSlot, lastSlot) - - // err = db.BigtableClient.SaveSyncCommitteesAssignments(firstSlot, lastSlot, validatorsU64) - // if err != nil { - // return fmt.Errorf("error saving sync committee assignments: %v", err) - // } - // logger.Infof("exported sync committee assignments for period %v to bigtable in %v", p, time.Since(start)) - tx, err := db.WriterDb.Beginx() - if err != nil { - return err - } - defer tx.Rollback() - - nArgs := 3 - valueArgs := make([]interface{}, len(validatorsU64)*nArgs) - valueIds := make([]string, len(validatorsU64)) + result := make([]SyncCommittee, len(validatorsU64)) for i, idxU64 := range validatorsU64 { - valueArgs[i*nArgs+0] = p - valueArgs[i*nArgs+1] = idxU64 - valueArgs[i*nArgs+2] = i - valueIds[i] = fmt.Sprintf("($%d,$%d,$%d)", i*nArgs+1, i*nArgs+2, i*nArgs+3) - } - _, err = tx.Exec( - fmt.Sprintf(` - INSERT INTO sync_committees (period, validatorindex, committeeindex) - VALUES %s ON CONFLICT (period, validatorindex, committeeindex) DO NOTHING`, - strings.Join(valueIds, ",")), - valueArgs...) - if err != nil { - return err + result = append(result, SyncCommittee{ + Period: p, + ValidatorIndex: idxU64, + CommitteeIndex: uint64(i), + }) } - return tx.Commit() + return result, nil +} + +type SyncCommittee struct { + Period uint64 `json:"period"` + ValidatorIndex uint64 `json:"validatorindex"` + CommitteeIndex uint64 `json:"committeeindex"` } From d5724f7a9b26d92c457a674fa85e6bc4a2980031 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 13 Nov 2023 16:27:35 +0100 Subject: [PATCH 03/11] (NOBIDS) static fix --- cmd/misc/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index ecdba6b175..fda16dc38d 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -1246,7 +1246,7 @@ func exportSyncCommittee(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) if endDay <= 0 { - currEpoch := services.LatestFinalizedEpoch() + currEpoch = services.LatestFinalizedEpoch() if currEpoch > 0 { // guard against underflows currEpoch = currEpoch - 1 } From cc1ab6f1ca8b96d8ffac3162befa7582364fd67a Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Tue, 14 Nov 2023 07:45:56 +0100 Subject: [PATCH 04/11] (NOBIDS) split commands into two --- cmd/misc/main.go | 63 +++++++++++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 25 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index fda16dc38d..b49a2b01b9 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -55,7 +55,7 @@ var opts = struct { func main() { configPath := flag.String("config", "config/default.config.yml", "Path to the config file") - flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee, export-sync-committee-ph2") + flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats") flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") flag.Uint64Var(&opts.User, "user", 0, "user id") @@ -362,10 +362,10 @@ func main() { } case "export-stats-totals": exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) - case "export-sync-committee": - exportSyncCommittee(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, false) - case "export-sync-committee-ph2": - exportSyncCommittee(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, true) + case "export-sync-committee-periods": + exportSyncCommitteePeriods(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun) + case "export-sync-committee-validator-stats": + exportSyncCommitteeValidatorStats(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, true) case "fix-exec-transactions-count": err = fixExecTransactionsCount() default: @@ -1241,7 +1241,7 @@ OUTER: logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) } -func exportSyncCommittee(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun, skipPhase1 bool) { +func exportSyncCommitteePeriods(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun bool) { var currEpoch = uint64(0) firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) @@ -1262,31 +1262,44 @@ func exportSyncCommittee(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay lastPeriod := utils.SyncPeriodOfEpoch(uint64(currEpoch)) + 1 // we can look into the future - if !skipPhase1 { - logrus.Infof("Phase 1: Re exporting data for sync_committee table") - for p := firstPeriod; p <= lastPeriod; p++ { - t0 := time.Now() + start := time.Now() + for p := firstPeriod; p <= lastPeriod; p++ { + t0 := time.Now() - err := reExportSyncCommittee(rpcClient, p, dryRun) - if err != nil { - if strings.Contains(err.Error(), "not found 404") { - logrus.WithField("period", p).Infof("reached max period, stopping") - break - } else { - logrus.WithError(err).WithField("period", p).Errorf("error re-exporting sync_committee") - return - } + err := reExportSyncCommittee(rpcClient, p, dryRun) + if err != nil { + if strings.Contains(err.Error(), "not found 404") { + logrus.WithField("period", p).Infof("reached max period, stopping") + break + } else { + logrus.WithError(err).WithField("period", p).Errorf("error re-exporting sync_committee") + return } + } - logrus.WithFields(logrus.Fields{ - "period": p, - "epoch": utils.FirstEpochOfSyncPeriod(p), - "duration": time.Since(t0), - }).Infof("re-exported sync_committee") + logrus.WithFields(logrus.Fields{ + "period": p, + "epoch": utils.FirstEpochOfSyncPeriod(p), + "duration": time.Since(t0), + }).Infof("re-exported sync_committee") + } + + logrus.Infof("finished all exporting sync_committee for periods %v - %v, took %v", firstPeriod, lastPeriod, time.Since(start)) +} + +func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun, skipPhase1 bool) { + var currEpoch = uint64(0) + + if endDay <= 0 { + currEpoch = services.LatestFinalizedEpoch() + if currEpoch > 0 { // guard against underflows + currEpoch = currEpoch - 1 } + } else { + _, lastEpoch := utils.GetFirstAndLastEpochForDay(endDay) + currEpoch = lastEpoch } - logrus.Infof("Phase 2: Updating validator_stats table") start := time.Now() epochsPerDay := utils.EpochsPerDay() From bc6461a720d728a125f889fdcd9689d5fe33961c Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Tue, 14 Nov 2023 08:01:46 +0100 Subject: [PATCH 05/11] (NOBIDS) db finalized epoch not service --- cmd/misc/main.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index b49a2b01b9..f7228fe8e5 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -1246,7 +1246,12 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, bt *db.Bigtable, startDay, firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) if endDay <= 0 { - currEpoch = services.LatestFinalizedEpoch() + var err error + currEpoch, err = db.GetLatestFinalizedEpoch() + if err != nil { + logrus.WithError(err).Errorf("error getting latest finalized epoch") + return + } if currEpoch > 0 { // guard against underflows currEpoch = currEpoch - 1 } @@ -1291,7 +1296,12 @@ func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, bt *db.Bigtable, st var currEpoch = uint64(0) if endDay <= 0 { - currEpoch = services.LatestFinalizedEpoch() + var err error + currEpoch, err = db.GetLatestFinalizedEpoch() + if err != nil { + logrus.WithError(err).Errorf("error getting latest finalized epoch") + return + } if currEpoch > 0 { // guard against underflows currEpoch = currEpoch - 1 } From bf2fd38536d3615b5cac044512325cb361434b25 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Thu, 23 Nov 2023 10:09:20 +0100 Subject: [PATCH 06/11] (NOBIDS) incorporated feedback --- cmd/misc/main.go | 41 ++++++++++++++++++++----------------- exporter/sync_committees.go | 7 +------ 2 files changed, 23 insertions(+), 25 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index f7228fe8e5..1914ba5ab9 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -363,9 +363,9 @@ func main() { case "export-stats-totals": exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency) case "export-sync-committee-periods": - exportSyncCommitteePeriods(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun) + exportSyncCommitteePeriods(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun) case "export-sync-committee-validator-stats": - exportSyncCommitteeValidatorStats(rpcClient, bt, opts.StartDay, opts.EndDay, opts.DryRun, true) + exportSyncCommitteeValidatorStats(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun, true) case "fix-exec-transactions-count": err = fixExecTransactionsCount() default: @@ -1241,10 +1241,19 @@ OUTER: logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start)) } -func exportSyncCommitteePeriods(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun bool) { +/* +Instead of deleting entries from the sync_committee table in a prod environment and wait for the exporter to sync back all entries, +this method will replace each sync committee period one by one with the new one. Which is much nicer for a prod environment. +*/ +func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, dryRun bool) { var currEpoch = uint64(0) firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) + if startDay > 0 { + firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay) + firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch) + } + if endDay <= 0 { var err error currEpoch, err = db.GetLatestFinalizedEpoch() @@ -1260,11 +1269,6 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, bt *db.Bigtable, startDay, currEpoch = lastEpoch } - if startDay > 0 { - firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay) - firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch) - } - lastPeriod := utils.SyncPeriodOfEpoch(uint64(currEpoch)) + 1 // we can look into the future start := time.Now() @@ -1292,37 +1296,36 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, bt *db.Bigtable, startDay, logrus.Infof("finished all exporting sync_committee for periods %v - %v, took %v", firstPeriod, lastPeriod, time.Since(start)) } -func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, bt *db.Bigtable, startDay, endDay uint64, dryRun, skipPhase1 bool) { - var currEpoch = uint64(0) +func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay uint64, dryRun, skipPhase1 bool) { + var lastEpoch = uint64(0) if endDay <= 0 { var err error - currEpoch, err = db.GetLatestFinalizedEpoch() + lastEpoch, err = db.GetLatestFinalizedEpoch() if err != nil { logrus.WithError(err).Errorf("error getting latest finalized epoch") return } - if currEpoch > 0 { // guard against underflows - currEpoch = currEpoch - 1 + if lastEpoch > 0 { // guard against underflows + lastEpoch = lastEpoch - 1 } } else { - _, lastEpoch := utils.GetFirstAndLastEpochForDay(endDay) - currEpoch = lastEpoch + _, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay) } start := time.Now() epochsPerDay := utils.EpochsPerDay() - if currEpoch < epochsPerDay { + if lastEpoch < epochsPerDay { logrus.Infof("skipping exporting stats, first day has not been indexed yet") return } - currentDay := currEpoch / epochsPerDay + currentDay := lastEpoch / epochsPerDay previousDay := currentDay - 1 for day := startDay; day <= previousDay; day++ { startDay := time.Now() - err := UpdateValidatorStatisticsSyncData(bt, day, rpcClient, dryRun) + err := UpdateValidatorStatisticsSyncData(day, rpcClient, dryRun) if err != nil { utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0) break @@ -1334,7 +1337,7 @@ func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, bt *db.Bigtable, st logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, previousDay, time.Since(start)) } -func UpdateValidatorStatisticsSyncData(bt *db.Bigtable, day uint64, client rpc.Client, dryRun bool) error { +func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error { exportStart := time.Now() firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day) diff --git a/exporter/sync_committees.go b/exporter/sync_committees.go index e92733e3af..3f04286db5 100644 --- a/exporter/sync_committees.go +++ b/exporter/sync_committees.go @@ -124,17 +124,12 @@ func GetSyncCommitteAtPeriod(rpcClient rpc.Client, p uint64) ([]SyncCommittee, e return nil, err } - validatorsU64 := make([]uint64, len(c.Validators)) + result := make([]SyncCommittee, len(c.Validators)) for i, idxStr := range c.Validators { idxU64, err := strconv.ParseUint(idxStr, 10, 64) if err != nil { return nil, err } - validatorsU64[i] = idxU64 - } - - result := make([]SyncCommittee, len(validatorsU64)) - for i, idxU64 := range validatorsU64 { result = append(result, SyncCommittee{ Period: p, ValidatorIndex: idxU64, From c6cdffc5c902eadc3e99cb3e4f4faccd8adc1423 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 27 Nov 2023 10:49:26 +0100 Subject: [PATCH 07/11] (NOBIDS) included feedback --- cmd/misc/main.go | 84 ++++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 1914ba5ab9..729fe2d867 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -1246,7 +1246,7 @@ Instead of deleting entries from the sync_committee table in a prod environment this method will replace each sync committee period one by one with the new one. Which is much nicer for a prod environment. */ func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, dryRun bool) { - var currEpoch = uint64(0) + var lastEpoch = uint64(0) firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch) if startDay > 0 { @@ -1256,20 +1256,19 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, d if endDay <= 0 { var err error - currEpoch, err = db.GetLatestFinalizedEpoch() + lastEpoch, err = db.GetLatestFinalizedEpoch() if err != nil { - logrus.WithError(err).Errorf("error getting latest finalized epoch") + utils.LogError(err, "error getting latest finalized epoch", 0) return } - if currEpoch > 0 { // guard against underflows - currEpoch = currEpoch - 1 + if lastEpoch > 0 { // guard against underflows + lastEpoch = lastEpoch - 1 } } else { - _, lastEpoch := utils.GetFirstAndLastEpochForDay(endDay) - currEpoch = lastEpoch + _, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay) } - lastPeriod := utils.SyncPeriodOfEpoch(uint64(currEpoch)) + 1 // we can look into the future + lastPeriod := utils.SyncPeriodOfEpoch(uint64(lastEpoch)) + 1 // we can look into the future start := time.Now() for p := firstPeriod; p <= lastPeriod; p++ { @@ -1281,7 +1280,9 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, d logrus.WithField("period", p).Infof("reached max period, stopping") break } else { - logrus.WithError(err).WithField("period", p).Errorf("error re-exporting sync_committee") + utils.LogError(err, "error re-exporting sync_committee", 0, map[string]interface{}{ + "period": p, + }) return } } @@ -1297,33 +1298,30 @@ func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, d } func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay uint64, dryRun, skipPhase1 bool) { - var lastEpoch = uint64(0) - if endDay <= 0 { - var err error - lastEpoch, err = db.GetLatestFinalizedEpoch() + lastEpoch, err := db.GetLatestFinalizedEpoch() if err != nil { - logrus.WithError(err).Errorf("error getting latest finalized epoch") + utils.LogError(err, "error getting latest finalized epoch", 0) return } if lastEpoch > 0 { // guard against underflows lastEpoch = lastEpoch - 1 } - } else { - _, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay) - } - start := time.Now() + _, err = db.GetLastExportedStatisticDay() + if err != nil { + logrus.Infof("skipping exporting stats, first day has not been indexed yet") + return + } - epochsPerDay := utils.EpochsPerDay() - if lastEpoch < epochsPerDay { - logrus.Infof("skipping exporting stats, first day has not been indexed yet") - return + epochsPerDay := utils.EpochsPerDay() + currentDay := lastEpoch / epochsPerDay + endDay = currentDay - 1 // current day will be picked up by exporter } - currentDay := lastEpoch / epochsPerDay - previousDay := currentDay - 1 - for day := startDay; day <= previousDay; day++ { + start := time.Now() + + for day := startDay; day <= endDay; day++ { startDay := time.Now() err := UpdateValidatorStatisticsSyncData(day, rpcClient, dryRun) if err != nil { @@ -1334,7 +1332,7 @@ func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay ui logrus.Infof("finished updating validators_stats for day %v, took %v", day, time.Since(startDay)) } - logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, previousDay, time.Since(start)) + logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, endDay, time.Since(start)) } func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error { @@ -1350,18 +1348,24 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo logrus.Infof("getting exported state for day %v", day) var err error - maxValidatorIndex := uint64(1999999) //bt.GetMaxValidatorindexForEpoch(lastEpoch) - // if err != nil { - // logrus.Errorf("error getting max validator index for epoch %v: %v", lastEpoch, err) - // return err - // } - validators := make([]uint64, 0, maxValidatorIndex) + var maxValidatorIndex uint64 + err = db.ReaderDb.Get(&maxValidatorIndex, `SELECT MAX(validatorindex) FROM validator_stats WHERE day = $1`, day) + if err != nil { + utils.LogFatal(err, "error: could not get max validator index", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } else if maxValidatorIndex == uint64(0) { + utils.LogFatal(err, "error: no validator found", 0, map[string]interface{}{ + "epoch": firstEpoch, + }) + } + maxValidatorIndex += 10000 // add some buffer, exact number is not important. Should just be bigger than max validators that can join in a day + validatorData := make([]*types.ValidatorStatsTableDbRow, 0, maxValidatorIndex) validatorDataMux := &sync.Mutex{} logrus.Infof("processing statistics for validators 0-%d", maxValidatorIndex) for i := uint64(0); i <= maxValidatorIndex; i++ { - validators = append(validators, i) validatorData = append(validatorData, &types.ValidatorStatsTableDbRow{ ValidatorIndex: i, Day: int64(day), @@ -1371,7 +1375,7 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo g := &errgroup.Group{} g.Go(func() error { - if err := db.GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil { + if err := db.GatherValidatorSyncDutiesForDay(nil, day, validatorData, validatorDataMux); err != nil { return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err) } return nil @@ -1392,31 +1396,27 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo return err } - onlySyncValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData)) + onlySyncCommitteeValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData)) for index := range validatorData { if validatorData[index].ParticipatedSync > 0 || validatorData[index].MissedSync > 0 || validatorData[index].OrphanedSync > 0 { - onlySyncValidatorData = append(onlySyncValidatorData, validatorData[index]) + onlySyncCommitteeValidatorData = append(onlySyncCommitteeValidatorData, validatorData[index]) } } - if len(onlySyncValidatorData) == 0 { + if len(onlySyncCommitteeValidatorData) == 0 { return nil // no sync committee yet skip } logrus.Infof("statistics data collection for day %v completed", day) // calculate cl income data & update totals - for _, data := range onlySyncValidatorData { + for _, data := range onlySyncCommitteeValidatorData { previousDayData := &types.ValidatorStatsTableDbRow{ ValidatorIndex: math.MaxUint64, } - if day == 0 { - previousDayData.ValidatorIndex = data.ValidatorIndex - } - if data.ValidatorIndex < uint64(len(statisticsData1d)) && day > 0 { previousDayData = statisticsData1d[data.ValidatorIndex] } From 8ed667a9804b593f5ae62b670edd7f68e11d8b53 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 27 Nov 2023 11:34:36 +0100 Subject: [PATCH 08/11] (NOBIDS) fix build --- cmd/misc/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 729fe2d867..ae2375a21a 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -1446,9 +1446,9 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo } defer tx.Rollback() - logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncValidatorData), len(validatorData)) + logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncCommitteeValidatorData), len(validatorData)) - for _, data := range onlySyncValidatorData { + for _, data := range onlySyncCommitteeValidatorData { if dryRun { logrus.Infof( "validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v, total participated: %v -> %v, total missed sync: %v -> %v, total orphaned sync: %v -> %v", From 4751080fef48296216cec2be28dbbc8909f0e418 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:33:48 +0100 Subject: [PATCH 09/11] (NOBIDS) removed total update logic, rather use the misc total export tool after --- cmd/misc/main.go | 52 +++++++++--------------------------------------- 1 file changed, 9 insertions(+), 43 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index ae2375a21a..650d1635cb 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -12,7 +12,6 @@ import ( "eth2-exporter/utils" "eth2-exporter/version" "fmt" - "math" "math/big" "net/http" "strconv" @@ -1333,6 +1332,7 @@ func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay ui } logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, endDay, time.Since(start)) + logrus.Infof("REMEMBER: To execute export-stats-totals now to update the totals") } func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error { @@ -1381,16 +1381,6 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo return nil }) - var statisticsData1d []*types.ValidatorStatsTableDbRow - g.Go(func() error { - var err error - statisticsData1d, err = db.GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows - if err != nil { - return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err) - } - return nil - }) - err = g.Wait() if err != nil { return err @@ -1410,27 +1400,6 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo logrus.Infof("statistics data collection for day %v completed", day) - // calculate cl income data & update totals - for _, data := range onlySyncCommitteeValidatorData { - - previousDayData := &types.ValidatorStatsTableDbRow{ - ValidatorIndex: math.MaxUint64, - } - - if data.ValidatorIndex < uint64(len(statisticsData1d)) && day > 0 { - previousDayData = statisticsData1d[data.ValidatorIndex] - } - - if data.ValidatorIndex != previousDayData.ValidatorIndex { - return fmt.Errorf("logic error when retrieving previous day data for validator %v (%v wanted, %v retrieved)", data.ValidatorIndex, data.ValidatorIndex, previousDayData.ValidatorIndex) - } - - // update sync total - data.ParticipatedSyncTotal = previousDayData.ParticipatedSyncTotal + data.ParticipatedSync - data.MissedSyncTotal = previousDayData.MissedSyncTotal + data.MissedSync - data.OrphanedSyncTotal = previousDayData.OrphanedSyncTotal + data.OrphanedSync - } - var statisticsDataToday []*types.ValidatorStatsTableDbRow if dryRun { var err error @@ -1451,23 +1420,20 @@ func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun boo for _, data := range onlySyncCommitteeValidatorData { if dryRun { logrus.Infof( - "validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v, total participated: %v -> %v, total missed sync: %v -> %v, total orphaned sync: %v -> %v", + "validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v", data.ValidatorIndex, statisticsDataToday[data.ValidatorIndex].ParticipatedSync, data.ParticipatedSync, statisticsDataToday[data.ValidatorIndex].MissedSync, data.MissedSync, statisticsDataToday[data.ValidatorIndex].OrphanedSync, - data.OrphanedSync, statisticsDataToday[data.ValidatorIndex].ParticipatedSyncTotal, data.ParticipatedSyncTotal, statisticsDataToday[data.ValidatorIndex].MissedSyncTotal, data.MissedSyncTotal, statisticsDataToday[data.ValidatorIndex].OrphanedSyncTotal, data.OrphanedSyncTotal, + data.OrphanedSync, ) } else { tx.Exec(` UPDATE validator_stats set participated_sync = $1, - participated_sync_total = $2, - missed_sync = $3, - missed_sync_total = $4, - orphaned_sync = $5, - orphaned_sync_total = $6 - WHERE day = $7 AND validatorindex = $8`, - data.ParticipatedSync, data.ParticipatedSyncTotal, - data.MissedSync, data.MissedSyncTotal, - data.OrphanedSync, data.OrphanedSyncTotal, + missed_sync = $2, + orphaned_sync = $3, + WHERE day = $4 AND validatorindex = $5`, + data.ParticipatedSync, + data.MissedSync, + data.OrphanedSync, data.Day, data.ValidatorIndex) } } From e33248594886973fe278b8dc4e0cab9ca67d17e9 Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:29:15 +0100 Subject: [PATCH 10/11] (NOBIDS) misc export total tool: re-export to today --- cmd/misc/main.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 650d1635cb..2470231cf3 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -12,6 +12,7 @@ import ( "eth2-exporter/utils" "eth2-exporter/version" "fmt" + "math" "math/big" "net/http" "strconv" @@ -1127,6 +1128,12 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) { func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) { start := time.Now() + exportToToday := false + if dayEnd <= 0 { + exportToToday = true + dayEnd = math.MaxInt + } + logrus.Infof("exporting stats totals for columns '%v'", columns) // validate columns input @@ -1234,6 +1241,27 @@ OUTER: "columns": columns, }) } + + if exportToToday { + // update end day since export might take a couple days to finish + lastEpoch, err := db.GetLatestFinalizedEpoch() + if err != nil { + utils.LogError(err, "error getting latest finalized epoch", 0) + return + } + if lastEpoch > 0 { // guard against underflows + lastEpoch = lastEpoch - 1 + } + + _, err = db.GetLastExportedStatisticDay() + if err != nil { + logrus.Infof("skipping exporting stats, first day has not been indexed yet") + return + } + + epochsPerDay := utils.EpochsPerDay() + dayEnd = lastEpoch / epochsPerDay + } logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay)) } From 0e391bdaa568cffacda0ea941c5980ffcf4c6a6c Mon Sep 17 00:00:00 2001 From: Manuel <5877862+manuelsc@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:46:33 +0100 Subject: [PATCH 11/11] (NOBIDS) use last exported day as end day --- cmd/misc/main.go | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/cmd/misc/main.go b/cmd/misc/main.go index 2470231cf3..395887c1af 100644 --- a/cmd/misc/main.go +++ b/cmd/misc/main.go @@ -1243,24 +1243,11 @@ OUTER: } if exportToToday { - // update end day since export might take a couple days to finish - lastEpoch, err := db.GetLatestFinalizedEpoch() + dayEnd, err = db.GetLastExportedStatisticDay() if err != nil { - utils.LogError(err, "error getting latest finalized epoch", 0) + utils.LogError(err, "error getting last exported statistic day", 0) return } - if lastEpoch > 0 { // guard against underflows - lastEpoch = lastEpoch - 1 - } - - _, err = db.GetLastExportedStatisticDay() - if err != nil { - logrus.Infof("skipping exporting stats, first day has not been indexed yet") - return - } - - epochsPerDay := utils.EpochsPerDay() - dayEnd = lastEpoch / epochsPerDay } logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay)) }