diff --git a/cmd/migrations/bigtable/main.go b/cmd/migrations/bigtable/main.go index f4474e14f9..258cba05dd 100644 --- a/cmd/migrations/bigtable/main.go +++ b/cmd/migrations/bigtable/main.go @@ -119,7 +119,7 @@ func main() { // export epoch data to bigtable g := new(errgroup.Group) - g.SetLimit(5) + g.SetLimit(6) g.Go(func() error { err = db.BigtableClient.SaveValidatorBalances(epoch, data.Validators) if err != nil { @@ -155,6 +155,13 @@ func main() { } return nil }) + g.Go(func() error { + err = db.BigtableClient.MigrateIncomeDataV1V2Schema(epoch) + if err != nil { + return fmt.Errorf("error exporting sync committee duties to bigtable: %v", err) + } + return nil + }) err = g.Wait() if err != nil { diff --git a/db/bigtable.go b/db/bigtable.go index e663e8d529..7a5c7081fe 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -2195,3 +2195,93 @@ func (bigtable *Bigtable) reversedPaddedEpoch(epoch uint64) string { func (bigtable *Bigtable) reversedPaddedSlot(slot uint64) string { return fmt.Sprintf("%09d", MAX_CL_BLOCK_NUMBER-slot) } + +func (bigtable *Bigtable) MigrateIncomeDataV1V2Schema(epoch uint64) error { + type validatorEpochData struct { + ValidatorIndex uint64 + IncomeDetails *itypes.ValidatorEpochIncome + } + + epochData := make(map[uint64]*validatorEpochData) + filter := gcp_bigtable.ChainFilters(gcp_bigtable.FamilyFilter(INCOME_DETAILS_COLUMN_FAMILY), gcp_bigtable.LatestNFilter(1)) + ctx := context.Background() + + prefixEpochRange := gcp_bigtable.PrefixRange(fmt.Sprintf("%s:e:b:%s", bigtable.chainId, fmt.Sprintf("%09d", (MAX_EPOCH+1)-epoch))) + + err := bigtable.tableBeaconchain.ReadRows(ctx, prefixEpochRange, func(r gcp_bigtable.Row) bool { + // logger.Infof("processing row %v", r.Key()) + + keySplit := strings.Split(r.Key(), ":") + + rowKeyEpoch, err := strconv.ParseUint(keySplit[3], 10, 64) + if err != nil { + logger.Errorf("error parsing epoch from row key %v: %v", r.Key(), err) + return false + } + + rowKeyEpoch = MAX_EPOCH - rowKeyEpoch + + if epoch != rowKeyEpoch { + logger.Errorf("retrieved different epoch than requested, requested: %d, retrieved: %d", epoch, rowKeyEpoch) + } + + // logger.Infof("epoch is %d", rowKeyEpoch) + + for columnFamily, readItems := range r { + + for _, ri := range readItems { + + if ri.Column == "stats:sum" { // skip migrating the total epoch income stats + continue + } + + validator, err := strconv.ParseUint(strings.TrimPrefix(ri.Column, columnFamily+":"), 10, 64) + if err != nil { + logger.Errorf("error parsing validator from column key %v: %v", ri.Column, err) + return false + } + + // logger.Infof("retrieved field %s from column family %s for validator %d", ri.Column, columnFamily, validator) + if columnFamily == INCOME_DETAILS_COLUMN_FAMILY { + if epochData[validator] == nil { + epochData[validator] = &validatorEpochData{ + ValidatorIndex: validator, + } + } + // logger.Infof("processing income details data for validator %d", validator) + incomeDetails := &itypes.ValidatorEpochIncome{} + err = proto.Unmarshal(ri.Value, incomeDetails) + if err != nil { + logger.Errorf("error decoding validator income data for row %v: %v", r.Key(), err) + return false + } + + epochData[validator].IncomeDetails = incomeDetails + } else { + logger.Errorf("retrieved unexpected column family %s", columnFamily) + } + } + } + + return true + }, gcp_bigtable.RowFilter(filter)) + + if err != nil { + return err + } + + incomeData := make(map[uint64]*itypes.ValidatorEpochIncome) + for _, validator := range epochData { + if validator.IncomeDetails == nil { + continue + } + incomeData[validator.ValidatorIndex] = validator.IncomeDetails + } + + err = bigtable.SaveValidatorIncomeDetails(epoch, incomeData) + if err != nil { + return err + } + + return nil +}