Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong sync committee participation association #2682

Merged
merged 12 commits into from
Nov 27, 2023
300 changes: 299 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (
"eth2-exporter/utils"
"eth2-exporter/version"
"fmt"
"math"
"math/big"
"net/http"
"strconv"
"strings"
"sync"
"time"

"github.com/coocood/freecache"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/pkg/errors"
utilMath "github.com/protolambda/zrnt/eth2/util/math"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -52,7 +55,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -359,6 +362,10 @@ func main() {
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
case "export-sync-committee-periods":
exportSyncCommitteePeriods(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun)
case "export-sync-committee-validator-stats":
exportSyncCommitteeValidatorStats(rpcClient, opts.StartDay, opts.EndDay, opts.DryRun, true)
case "fix-exec-transactions-count":
err = fixExecTransactionsCount()
default:
Expand Down Expand Up @@ -1233,3 +1240,294 @@ OUTER:

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}

/*
Instead of deleting entries from the sync_committee table in a prod environment and wait for the exporter to sync back all entries,
this method will replace each sync committee period one by one with the new one. Which is much nicer for a prod environment.
*/
func exportSyncCommitteePeriods(rpcClient rpc.Client, startDay, endDay uint64, dryRun bool) {
var currEpoch = uint64(0)
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved

firstPeriod := utils.SyncPeriodOfEpoch(utils.Config.Chain.ClConfig.AltairForkEpoch)
if startDay > 0 {
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(startDay)
firstPeriod = utils.SyncPeriodOfEpoch(firstEpoch)
}

if endDay <= 0 {
var err error
currEpoch, err = db.GetLatestFinalizedEpoch()
if err != nil {
logrus.WithError(err).Errorf("error getting latest finalized epoch")
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
return
}
if currEpoch > 0 { // guard against underflows
currEpoch = currEpoch - 1
}
} else {
_, lastEpoch := utils.GetFirstAndLastEpochForDay(endDay)
currEpoch = lastEpoch
}

lastPeriod := utils.SyncPeriodOfEpoch(uint64(currEpoch)) + 1 // we can look into the future

start := time.Now()
for p := firstPeriod; p <= lastPeriod; p++ {
t0 := time.Now()

err := reExportSyncCommittee(rpcClient, p, dryRun)
if err != nil {
if strings.Contains(err.Error(), "not found 404") {
logrus.WithField("period", p).Infof("reached max period, stopping")
break
} else {
logrus.WithError(err).WithField("period", p).Errorf("error re-exporting sync_committee")
return
}
}

logrus.WithFields(logrus.Fields{
"period": p,
"epoch": utils.FirstEpochOfSyncPeriod(p),
"duration": time.Since(t0),
}).Infof("re-exported sync_committee")
}

logrus.Infof("finished all exporting sync_committee for periods %v - %v, took %v", firstPeriod, lastPeriod, time.Since(start))
}

func exportSyncCommitteeValidatorStats(rpcClient rpc.Client, startDay, endDay uint64, dryRun, skipPhase1 bool) {
var lastEpoch = uint64(0)
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved

if endDay <= 0 {
var err error
lastEpoch, err = db.GetLatestFinalizedEpoch()
if err != nil {
logrus.WithError(err).Errorf("error getting latest finalized epoch")
return
}
if lastEpoch > 0 { // guard against underflows
lastEpoch = lastEpoch - 1
}
} else {
_, lastEpoch = utils.GetFirstAndLastEpochForDay(endDay)
}

start := time.Now()

epochsPerDay := utils.EpochsPerDay()
if lastEpoch < epochsPerDay {
logrus.Infof("skipping exporting stats, first day has not been indexed yet")
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
return
}
currentDay := lastEpoch / epochsPerDay
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
previousDay := currentDay - 1

for day := startDay; day <= previousDay; day++ {
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
startDay := time.Now()
err := UpdateValidatorStatisticsSyncData(day, rpcClient, dryRun)
if err != nil {
utils.LogError(err, fmt.Errorf("error exporting stats for day %v", day), 0)
break
}

logrus.Infof("finished updating validators_stats for day %v, took %v", day, time.Since(startDay))
}

logrus.Infof("finished all exporting stats for days %v - %v, took %v", startDay, previousDay, time.Since(start))
}

func UpdateValidatorStatisticsSyncData(day uint64, client rpc.Client, dryRun bool) error {
exportStart := time.Now()
firstEpoch, lastEpoch := utils.GetFirstAndLastEpochForDay(day)

logrus.Infof("exporting statistics for day %v (epoch %v to %v)", day, firstEpoch, lastEpoch)

if err := db.CheckIfDayIsFinalized(day); err != nil && !dryRun {
return err
}

logrus.Infof("getting exported state for day %v", day)

var err error
maxValidatorIndex := uint64(1999999) //bt.GetMaxValidatorindexForEpoch(lastEpoch)
// if err != nil {
// logrus.Errorf("error getting max validator index for epoch %v: %v", lastEpoch, err)
// return err
// }
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
validators := make([]uint64, 0, maxValidatorIndex)
validatorData := make([]*types.ValidatorStatsTableDbRow, 0, maxValidatorIndex)
validatorDataMux := &sync.Mutex{}

logrus.Infof("processing statistics for validators 0-%d", maxValidatorIndex)
for i := uint64(0); i <= maxValidatorIndex; i++ {
validators = append(validators, i)
validatorData = append(validatorData, &types.ValidatorStatsTableDbRow{
ValidatorIndex: i,
Day: int64(day),
})
}

g := &errgroup.Group{}

g.Go(func() error {
if err := db.GatherValidatorSyncDutiesForDay(validators, day, validatorData, validatorDataMux); err != nil {
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("error in GatherValidatorSyncDutiesForDay: %w", err)
}
return nil
})

var statisticsData1d []*types.ValidatorStatsTableDbRow
g.Go(func() error {
var err error
statisticsData1d, err = db.GatherStatisticsForDay(int64(day) - 1) // convert to int64 to avoid underflows
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
return nil
})

err = g.Wait()
if err != nil {
return err
}

onlySyncValidatorData := make([]*types.ValidatorStatsTableDbRow, 0, len(validatorData))
for index := range validatorData {

if validatorData[index].ParticipatedSync > 0 || validatorData[index].MissedSync > 0 || validatorData[index].OrphanedSync > 0 {
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
onlySyncValidatorData = append(onlySyncValidatorData, validatorData[index])
}
}

if len(onlySyncValidatorData) == 0 {
return nil // no sync committee yet skip
}

logrus.Infof("statistics data collection for day %v completed", day)

// calculate cl income data & update totals
for _, data := range onlySyncValidatorData {

previousDayData := &types.ValidatorStatsTableDbRow{
ValidatorIndex: math.MaxUint64,
}

if day == 0 {
previousDayData.ValidatorIndex = data.ValidatorIndex
}
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved

if data.ValidatorIndex < uint64(len(statisticsData1d)) && day > 0 {
previousDayData = statisticsData1d[data.ValidatorIndex]
}

if data.ValidatorIndex != previousDayData.ValidatorIndex {
return fmt.Errorf("logic error when retrieving previous day data for validator %v (%v wanted, %v retrieved)", data.ValidatorIndex, data.ValidatorIndex, previousDayData.ValidatorIndex)
}

// update sync total
data.ParticipatedSyncTotal = previousDayData.ParticipatedSyncTotal + data.ParticipatedSync
data.MissedSyncTotal = previousDayData.MissedSyncTotal + data.MissedSync
data.OrphanedSyncTotal = previousDayData.OrphanedSyncTotal + data.OrphanedSync
}

var statisticsDataToday []*types.ValidatorStatsTableDbRow
if dryRun {
var err error
statisticsDataToday, err = db.GatherStatisticsForDay(int64(day)) // convert to int64 to avoid underflows
if err != nil {
return fmt.Errorf("error in GatherPreviousDayStatisticsData: %w", err)
}
}

tx, err := db.WriterDb.Beginx()
if err != nil {
return fmt.Errorf("error retrieving raw sql connection: %w", err)
}
defer tx.Rollback()

logrus.Infof("updating statistics data into the validator_stats table %v | %v", len(onlySyncValidatorData), len(validatorData))

for _, data := range onlySyncValidatorData {
if dryRun {
logrus.Infof(
"validator %v: participated sync: %v -> %v, missed sync: %v -> %v, orphaned sync: %v -> %v, total participated: %v -> %v, total missed sync: %v -> %v, total orphaned sync: %v -> %v",
data.ValidatorIndex, statisticsDataToday[data.ValidatorIndex].ParticipatedSync, data.ParticipatedSync, statisticsDataToday[data.ValidatorIndex].MissedSync, data.MissedSync, statisticsDataToday[data.ValidatorIndex].OrphanedSync,
data.OrphanedSync, statisticsDataToday[data.ValidatorIndex].ParticipatedSyncTotal, data.ParticipatedSyncTotal, statisticsDataToday[data.ValidatorIndex].MissedSyncTotal, data.MissedSyncTotal, statisticsDataToday[data.ValidatorIndex].OrphanedSyncTotal, data.OrphanedSyncTotal,
)
} else {
tx.Exec(`
UPDATE validator_stats set
participated_sync = $1,
participated_sync_total = $2,
missed_sync = $3,
missed_sync_total = $4,
orphaned_sync = $5,
orphaned_sync_total = $6
WHERE day = $7 AND validatorindex = $8`,
data.ParticipatedSync, data.ParticipatedSyncTotal,
data.MissedSync, data.MissedSyncTotal,
data.OrphanedSync, data.OrphanedSyncTotal,
data.Day, data.ValidatorIndex)
}
LuccaBitfly marked this conversation as resolved.
Show resolved Hide resolved
}

if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("error during statistics data insert: %w", err)
}

logrus.Infof("statistics sync re-export of day %v completed, took %v", day, time.Since(exportStart))
return nil
}

func reExportSyncCommittee(rpcClient rpc.Client, p uint64, dryRun bool) error {
if dryRun {
var currentData []struct {
ValidatorIndex uint64 `db:"validatorindex"`
CommitteeIndex uint64 `db:"committeeindex"`
}

err := db.WriterDb.Select(&currentData, `SELECT validatorindex, committeeindex FROM sync_committees WHERE period = $1`, p)
if err != nil {
return errors.Wrap(err, "select old entries")
}

newData, err := exporter.GetSyncCommitteAtPeriod(rpcClient, p)
if err != nil {
return errors.Wrap(err, "export")
}

// now we compare currentData with newData and print any difference in committeeindex
for _, d := range currentData {
for _, n := range newData {
if d.ValidatorIndex == n.ValidatorIndex && d.CommitteeIndex != n.CommitteeIndex {
logrus.Infof("validator %v has different committeeindex: %v -> %v", d.ValidatorIndex, d.CommitteeIndex, n.CommitteeIndex)
}
}
}
return nil
} else {
tx, err := db.WriterDb.Beginx()
if err != nil {
return errors.Wrap(err, "tx")
}

defer tx.Rollback()
_, err = tx.Exec(`DELETE FROM sync_committees WHERE period = $1`, p)
if err != nil {
return errors.Wrap(err, "delete old entries")
}

err = exporter.ExportSyncCommitteeAtPeriod(rpcClient, p, tx)
if err != nil {
return errors.Wrap(err, "export")
}

return tx.Commit()
}
}
Loading
Loading