Skip to content

Commit

Permalink
Merge pull request #2564 from gobitfly/BIDS-2472/follow_up
Browse files Browse the repository at this point in the history
Bids 2472/follow up
  • Loading branch information
peterbitfly authored Sep 20, 2023
2 parents 84b773f + 6047115 commit 76f0f2a
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
9 changes: 8 additions & 1 deletion cmd/migrations/bigtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func main() {

// export epoch data to bigtable
g := new(errgroup.Group)
g.SetLimit(5)
g.SetLimit(6)
g.Go(func() error {
err = db.BigtableClient.SaveValidatorBalances(epoch, data.Validators)
if err != nil {
Expand Down Expand Up @@ -155,6 +155,13 @@ func main() {
}
return nil
})
g.Go(func() error {
err = db.BigtableClient.MigrateIncomeDataV1V2Schema(epoch)
if err != nil {
return fmt.Errorf("error exporting sync committee duties to bigtable: %v", err)
}
return nil
})

err = g.Wait()
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2195,3 +2195,93 @@ func (bigtable *Bigtable) reversedPaddedEpoch(epoch uint64) string {
func (bigtable *Bigtable) reversedPaddedSlot(slot uint64) string {
return fmt.Sprintf("%09d", MAX_CL_BLOCK_NUMBER-slot)
}

func (bigtable *Bigtable) MigrateIncomeDataV1V2Schema(epoch uint64) error {
type validatorEpochData struct {
ValidatorIndex uint64
IncomeDetails *itypes.ValidatorEpochIncome
}

epochData := make(map[uint64]*validatorEpochData)
filter := gcp_bigtable.ChainFilters(gcp_bigtable.FamilyFilter(INCOME_DETAILS_COLUMN_FAMILY), gcp_bigtable.LatestNFilter(1))
ctx := context.Background()

prefixEpochRange := gcp_bigtable.PrefixRange(fmt.Sprintf("%s:e:b:%s", bigtable.chainId, fmt.Sprintf("%09d", (MAX_EPOCH+1)-epoch)))

err := bigtable.tableBeaconchain.ReadRows(ctx, prefixEpochRange, func(r gcp_bigtable.Row) bool {
// logger.Infof("processing row %v", r.Key())

keySplit := strings.Split(r.Key(), ":")

rowKeyEpoch, err := strconv.ParseUint(keySplit[3], 10, 64)
if err != nil {
logger.Errorf("error parsing epoch from row key %v: %v", r.Key(), err)
return false
}

rowKeyEpoch = MAX_EPOCH - rowKeyEpoch

if epoch != rowKeyEpoch {
logger.Errorf("retrieved different epoch than requested, requested: %d, retrieved: %d", epoch, rowKeyEpoch)
}

// logger.Infof("epoch is %d", rowKeyEpoch)

for columnFamily, readItems := range r {

for _, ri := range readItems {

if ri.Column == "stats:sum" { // skip migrating the total epoch income stats
continue
}

validator, err := strconv.ParseUint(strings.TrimPrefix(ri.Column, columnFamily+":"), 10, 64)
if err != nil {
logger.Errorf("error parsing validator from column key %v: %v", ri.Column, err)
return false
}

// logger.Infof("retrieved field %s from column family %s for validator %d", ri.Column, columnFamily, validator)
if columnFamily == INCOME_DETAILS_COLUMN_FAMILY {
if epochData[validator] == nil {
epochData[validator] = &validatorEpochData{
ValidatorIndex: validator,
}
}
// logger.Infof("processing income details data for validator %d", validator)
incomeDetails := &itypes.ValidatorEpochIncome{}
err = proto.Unmarshal(ri.Value, incomeDetails)
if err != nil {
logger.Errorf("error decoding validator income data for row %v: %v", r.Key(), err)
return false
}

epochData[validator].IncomeDetails = incomeDetails
} else {
logger.Errorf("retrieved unexpected column family %s", columnFamily)
}
}
}

return true
}, gcp_bigtable.RowFilter(filter))

if err != nil {
return err
}

incomeData := make(map[uint64]*itypes.ValidatorEpochIncome)
for _, validator := range epochData {
if validator.IncomeDetails == nil {
continue
}
incomeData[validator.ValidatorIndex] = validator.IncomeDetails
}

err = bigtable.SaveValidatorIncomeDetails(epoch, incomeData)
if err != nil {
return err
}

return nil
}
16 changes: 16 additions & 0 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,10 @@ func GetAddressWithdrawalsTotal(address []byte) (uint64, error) {
var total uint64

err := ReaderDb.Get(&total, `
/*+
BitmapScan(w)
NestLoop(b w)
*/
SELECT
COALESCE(sum(w.amount), 0) as total
FROM blocks_withdrawals w
Expand All @@ -2601,6 +2605,10 @@ func GetDashboardWithdrawalsCount(validators []uint64) (uint64, error) {
var count uint64
validatorFilter := pq.Array(validators)
err := ReaderDb.Get(&count, `
/*+
BitmapScan(w)
NestLoop(b w)
*/
SELECT count(*)
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
Expand All @@ -2622,6 +2630,10 @@ func GetDashboardWithdrawals(validators []uint64, limit uint64, offset uint64, o
}
validatorFilter := pq.Array(validators)
err := ReaderDb.Select(&withdrawals, fmt.Sprintf(`
/*+
BitmapScan(w)
NestLoop(b w)
*/
SELECT
w.block_slot as slot,
w.withdrawalindex as index,
Expand Down Expand Up @@ -2652,6 +2664,10 @@ func GetValidatorWithdrawalsCount(validator uint64) (count, lastWithdrawalEpoch

r := &dbResponse{}
err = ReaderDb.Get(r, `
/*+
BitmapScan(w)
NestLoop(b w)
*/
SELECT count(*) as withdrawals_count, COALESCE(max(block_slot), 0) as last_withdawal_slot
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
Expand Down

0 comments on commit 76f0f2a

Please sign in to comment.