From 43bee017aceef8a187acbd25424a2f9d980ce990 Mon Sep 17 00:00:00 2001 From: spletka Date: Thu, 31 Aug 2023 12:52:04 +0200 Subject: [PATCH 1/5] (BIDS-2434) Fixed inclusion distance for missed slots --- db/bigtable.go | 55 +++++++++++++++++++++++++++++++++---------- db/db.go | 28 +++++++++++++++++++++- handlers/validator.go | 7 +----- 3 files changed, 70 insertions(+), 20 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index c6feeb0578..ed9da63b9e 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -851,16 +851,6 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*5)) defer cancel() - slots := []uint64{} - - for slot := startEpoch * utils.Config.Chain.Config.SlotsPerEpoch; slot < (endEpoch+1)*utils.Config.Chain.Config.SlotsPerEpoch; slot++ { - slots = append(slots, slot) - } - orphanedSlotsMap, err := GetOrphanedSlotsMap(slots) - if err != nil { - return nil, err - } - ranges := bigtable.getSlotRanges(startEpoch, endEpoch) res := make(map[uint64][]*types.ValidatorAttestation, len(validators)) @@ -886,6 +876,38 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st if len(columnFilters) == 0 { // special case to retrieve data for all validators filter = gcp_bigtable.FamilyFilter(ATTESTATIONS_FAMILY) } + + maxSlot := (endEpoch + 1) * utils.Config.Chain.Config.SlotsPerEpoch + err := bigtable.tableBeaconchain.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool { + for _, ri := range r[ATTESTATIONS_FAMILY] { + inclusionSlot := max_block_number - uint64(ri.Timestamp)/1000 + if inclusionSlot == max_block_number { + inclusionSlot = 0 + } + + if inclusionSlot > maxSlot { + maxSlot = inclusionSlot + } + } + return true + }, gcp_bigtable.RowFilter(filter)) + if err != nil { + return nil, err + } + + slots := []uint64{} + for slot := startEpoch * utils.Config.Chain.Config.SlotsPerEpoch; slot < maxSlot; slot++ { + slots = append(slots, slot) + } + missedSlotsMap, err := GetMissedSlotsMap(slots) + if err != nil { + return nil, err + } + orphanedSlotsMap, err := GetOrphanedSlotsMap(slots) + if err != nil { + return nil, err + } + err = bigtable.tableBeaconchain.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool { keySplit := strings.Split(r.Key(), ":") @@ -916,11 +938,19 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st res[validator] = make([]*types.ValidatorAttestation, 0) } + missedSlotsCount := uint64(0) + for slot := attesterSlot + 1; slot < inclusionSlot; slot++ { + if missedSlotsMap[slot] || orphanedSlotsMap[slot] { + missedSlotsCount++ + } + } + if len(res[validator]) > 0 && res[validator][len(res[validator])-1].AttesterSlot == attesterSlot { // don't override successful attestion, that was included in a different slot if status == 1 && res[validator][len(res[validator])-1].Status != 1 { - res[validator][len(res[validator])-1].InclusionSlot = inclusionSlot res[validator][len(res[validator])-1].Status = status + res[validator][len(res[validator])-1].InclusionSlot = inclusionSlot + res[validator][len(res[validator])-1].Delay = int64(inclusionSlot - attesterSlot - missedSlotsCount - 1) } } else { res[validator] = append(res[validator], &types.ValidatorAttestation{ @@ -930,10 +960,9 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st CommitteeIndex: 0, Status: status, InclusionSlot: inclusionSlot, - Delay: int64(inclusionSlot) - int64(attesterSlot) - 1, + Delay: int64(inclusionSlot - attesterSlot - missedSlotsCount - 1), }) } - } return true }, gcp_bigtable.RowFilter(filter)) diff --git a/db/db.go b/db/db.go index 45424aff53..957065a0fe 100644 --- a/db/db.go +++ b/db/db.go @@ -3312,6 +3312,32 @@ func GetValidatorPropsosals(validators []uint64, proposals *[]types.ValidatorPro `, validatorsPQArray) } +func GetMissedSlots(slots []uint64) ([]uint64, error) { + slotsPQArray := pq.Array(slots) + missed := []uint64{} + + err := ReaderDb.Select(&missed, ` + SELECT + slot + FROM blocks + WHERE slot = ANY($1) AND status = '2' + `, slotsPQArray) + + return missed, err +} + +func GetMissedSlotsMap(slots []uint64) (map[uint64]bool, error) { + missedSlots, err := GetMissedSlots(slots) + if err != nil { + return nil, err + } + missedSlotsMap := make(map[uint64]bool, len(missedSlots)) + for _, slot := range missedSlots { + missedSlotsMap[slot] = true + } + return missedSlotsMap, nil +} + func GetOrphanedSlots(slots []uint64) ([]uint64, error) { slotsPQArray := pq.Array(slots) orphaned := []uint64{} @@ -3331,7 +3357,7 @@ func GetOrphanedSlotsMap(slots []uint64) (map[uint64]bool, error) { if err != nil { return nil, err } - orphanedSlotsMap := make(map[uint64]bool) + orphanedSlotsMap := make(map[uint64]bool, len(orphanedSlots)) for _, slot := range orphanedSlots { orphanedSlotsMap[slot] = true } diff --git a/handlers/validator.go b/handlers/validator.go index e95527a385..801ec3996a 100644 --- a/handlers/validator.go +++ b/handlers/validator.go @@ -1986,17 +1986,12 @@ func ValidatorSync(w http.ResponseWriter, r *http.Request) { } // Search for the missed slots (status = 2), to see if it was only our validator that missed the slot or if the block was missed - missedSlots := []uint64{} - err = db.ReaderDb.Select(&missedSlots, `SELECT slot FROM blocks WHERE slot = ANY($1) AND status = '2'`, missedSyncSlots) + missedSlotsMap, err := db.GetMissedSlotsMap(missedSyncSlots) if err != nil { logger.WithError(err).Errorf("error getting missed slots data") http.Error(w, "Internal server error", http.StatusInternalServerError) return } - missedSlotsMap := make(map[uint64]bool, len(missedSlots)) - for _, slot := range missedSlots { - missedSlotsMap[slot] = true - } // extract correct slots tableData = make([][]interface{}, length) From 7ada2a482672828d072280223780a92b9884d910 Mon Sep 17 00:00:00 2001 From: spletka Date: Mon, 4 Sep 2023 16:02:29 +0200 Subject: [PATCH 2/5] (BIDS-2434) Rewrite to only one bigtable query --- db/bigtable.go | 135 +++++++++++++++++++++++++++++-------------------- 1 file changed, 79 insertions(+), 56 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index ed9da63b9e..34cce98949 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -878,16 +878,54 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } maxSlot := (endEpoch + 1) * utils.Config.Chain.Config.SlotsPerEpoch + attestationsMap := make(map[uint64]map[uint64][]*types.ValidatorAttestation) + + // Save info for all inclusionSlot for attestations in attestationsMap + // Set the maxSlot to the highest inclusionSlot err := bigtable.tableBeaconchain.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool { + keySplit := strings.Split(r.Key(), ":") + + attesterSlot, err := strconv.ParseUint(keySplit[4], 10, 64) + if err != nil { + logger.Errorf("error parsing slot from row key %v: %v", r.Key(), err) + return false + } + attesterSlot = max_block_number - attesterSlot for _, ri := range r[ATTESTATIONS_FAMILY] { inclusionSlot := max_block_number - uint64(ri.Timestamp)/1000 + + status := uint64(1) if inclusionSlot == max_block_number { inclusionSlot = 0 + status = 0 } if inclusionSlot > maxSlot { maxSlot = inclusionSlot } + + validator, err := strconv.ParseUint(strings.TrimPrefix(ri.Column, ATTESTATIONS_FAMILY+":"), 10, 64) + if err != nil { + logger.Errorf("error parsing validator from column key %v: %v", ri.Column, err) + return false + } + + if attestationsMap[validator] == nil { + attestationsMap[validator] = make(map[uint64][]*types.ValidatorAttestation) + } + + if attestationsMap[validator][attesterSlot] == nil { + attestationsMap[validator][attesterSlot] = make([]*types.ValidatorAttestation, 0) + } + + attestationsMap[validator][attesterSlot] = append(attestationsMap[validator][attesterSlot], &types.ValidatorAttestation{ + Index: validator, + Epoch: attesterSlot / utils.Config.Chain.Config.SlotsPerEpoch, + AttesterSlot: attesterSlot, + CommitteeIndex: 0, + Status: status, + InclusionSlot: inclusionSlot, + }) } return true }, gcp_bigtable.RowFilter(filter)) @@ -895,79 +933,64 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st return nil, err } + // Find all missed and orphaned slots slots := []uint64{} - for slot := startEpoch * utils.Config.Chain.Config.SlotsPerEpoch; slot < maxSlot; slot++ { + for slot := startEpoch * utils.Config.Chain.Config.SlotsPerEpoch; slot <= maxSlot; slot++ { slots = append(slots, slot) } - missedSlotsMap, err := GetMissedSlotsMap(slots) - if err != nil { - return nil, err - } - orphanedSlotsMap, err := GetOrphanedSlotsMap(slots) - if err != nil { - return nil, err - } - err = bigtable.tableBeaconchain.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool { - keySplit := strings.Split(r.Key(), ":") + var missedSlotsMap map[uint64]bool + var orphanedSlotsMap map[uint64]bool - attesterSlot, err := strconv.ParseUint(keySplit[4], 10, 64) - if err != nil { - logger.Errorf("error parsing slot from row key %v: %v", r.Key(), err) - return false - } - attesterSlot = max_block_number - attesterSlot - for _, ri := range r[ATTESTATIONS_FAMILY] { - inclusionSlot := max_block_number - uint64(ri.Timestamp)/1000 + g := new(errgroup.Group) - status := uint64(1) - if inclusionSlot == max_block_number { - inclusionSlot = 0 - status = 0 - } else if orphanedSlotsMap[inclusionSlot] { - status = 0 - } + g.Go(func() error { + missedSlotsMap, err = GetMissedSlotsMap(slots) + return err + }) - validator, err := strconv.ParseUint(strings.TrimPrefix(ri.Column, ATTESTATIONS_FAMILY+":"), 10, 64) - if err != nil { - logger.Errorf("error parsing validator from column key %v: %v", ri.Column, err) - return false - } + g.Go(func() error { + orphanedSlotsMap, err = GetOrphanedSlotsMap(slots) + return err + }) + err = g.Wait() + if err != nil { + return nil, err + } - if res[validator] == nil { - res[validator] = make([]*types.ValidatorAttestation, 0) + // Convert the attestationsMap info to the return format + // Set the delay of the inclusionSlot + for validator, attestations := range attestationsMap { + if res[validator] == nil { + res[validator] = make([]*types.ValidatorAttestation, 0) + } + for _, att := range attestations { + currentAttInfo := att[0] + for _, attInfo := range att { + if currentAttInfo.Status != 1 && attInfo.Status == 1 { + currentAttInfo.Status = attInfo.Status + currentAttInfo.InclusionSlot = attInfo.InclusionSlot + } } missedSlotsCount := uint64(0) - for slot := attesterSlot + 1; slot < inclusionSlot; slot++ { + for slot := currentAttInfo.AttesterSlot + 1; slot < currentAttInfo.InclusionSlot; slot++ { if missedSlotsMap[slot] || orphanedSlotsMap[slot] { missedSlotsCount++ } } + currentAttInfo.Delay = int64(currentAttInfo.InclusionSlot - currentAttInfo.AttesterSlot - missedSlotsCount - 1) - if len(res[validator]) > 0 && res[validator][len(res[validator])-1].AttesterSlot == attesterSlot { - // don't override successful attestion, that was included in a different slot - if status == 1 && res[validator][len(res[validator])-1].Status != 1 { - res[validator][len(res[validator])-1].Status = status - res[validator][len(res[validator])-1].InclusionSlot = inclusionSlot - res[validator][len(res[validator])-1].Delay = int64(inclusionSlot - attesterSlot - missedSlotsCount - 1) - } - } else { - res[validator] = append(res[validator], &types.ValidatorAttestation{ - Index: validator, - Epoch: attesterSlot / utils.Config.Chain.Config.SlotsPerEpoch, - AttesterSlot: attesterSlot, - CommitteeIndex: 0, - Status: status, - InclusionSlot: inclusionSlot, - Delay: int64(inclusionSlot - attesterSlot - missedSlotsCount - 1), - }) - } + res[validator] = append(res[validator], currentAttInfo) } - return true - }, gcp_bigtable.RowFilter(filter)) - if err != nil { - return nil, err + } + + // Sort the result by attesterSlot desc + for validator, att := range res { + sort.Slice(att, func(i, j int) bool { + return att[i].AttesterSlot > att[j].AttesterSlot + }) + res[validator] = att } return res, nil From 5d6cd41b43f974dfd5e21eae2ae261281fbaf85e Mon Sep 17 00:00:00 2001 From: spletka Date: Mon, 4 Sep 2023 16:12:37 +0200 Subject: [PATCH 3/5] (BIDS-2434) Fix for only orphaned slots included --- db/bigtable.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index 34cce98949..4bcf2575ba 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -919,12 +919,9 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } attestationsMap[validator][attesterSlot] = append(attestationsMap[validator][attesterSlot], &types.ValidatorAttestation{ - Index: validator, - Epoch: attesterSlot / utils.Config.Chain.Config.SlotsPerEpoch, - AttesterSlot: attesterSlot, - CommitteeIndex: 0, - Status: status, - InclusionSlot: inclusionSlot, + AttesterSlot: attesterSlot, + InclusionSlot: inclusionSlot, + Status: status, }) } return true @@ -967,6 +964,10 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st for _, att := range attestations { currentAttInfo := att[0] for _, attInfo := range att { + if orphanedSlotsMap[attInfo.InclusionSlot] { + attInfo.Status = 0 + } + if currentAttInfo.Status != 1 && attInfo.Status == 1 { currentAttInfo.Status = attInfo.Status currentAttInfo.InclusionSlot = attInfo.InclusionSlot @@ -979,6 +980,9 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st missedSlotsCount++ } } + currentAttInfo.Index = validator + currentAttInfo.Epoch = currentAttInfo.AttesterSlot / utils.Config.Chain.Config.SlotsPerEpoch + currentAttInfo.CommitteeIndex = 0 currentAttInfo.Delay = int64(currentAttInfo.InclusionSlot - currentAttInfo.AttesterSlot - missedSlotsCount - 1) res[validator] = append(res[validator], currentAttInfo) From 95e164eae649658697fcb782cd4b344ed8a07146 Mon Sep 17 00:00:00 2001 From: spletka Date: Tue, 5 Sep 2023 13:54:30 +0200 Subject: [PATCH 4/5] (BIDS-2434) Refactored attesterSlot --- db/bigtable.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/db/bigtable.go b/db/bigtable.go index 4bcf2575ba..21aa63c174 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -878,6 +878,7 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } maxSlot := (endEpoch + 1) * utils.Config.Chain.Config.SlotsPerEpoch + // map with structure attestationsMap[validator][attesterSlot] attestationsMap := make(map[uint64]map[uint64][]*types.ValidatorAttestation) // Save info for all inclusionSlot for attestations in attestationsMap @@ -919,7 +920,6 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } attestationsMap[validator][attesterSlot] = append(attestationsMap[validator][attesterSlot], &types.ValidatorAttestation{ - AttesterSlot: attesterSlot, InclusionSlot: inclusionSlot, Status: status, }) @@ -961,7 +961,7 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st if res[validator] == nil { res[validator] = make([]*types.ValidatorAttestation, 0) } - for _, att := range attestations { + for attesterSlot, att := range attestations { currentAttInfo := att[0] for _, attInfo := range att { if orphanedSlotsMap[attInfo.InclusionSlot] { @@ -981,9 +981,10 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } } currentAttInfo.Index = validator - currentAttInfo.Epoch = currentAttInfo.AttesterSlot / utils.Config.Chain.Config.SlotsPerEpoch + currentAttInfo.Epoch = attesterSlot / utils.Config.Chain.Config.SlotsPerEpoch currentAttInfo.CommitteeIndex = 0 - currentAttInfo.Delay = int64(currentAttInfo.InclusionSlot - currentAttInfo.AttesterSlot - missedSlotsCount - 1) + currentAttInfo.AttesterSlot = attesterSlot + currentAttInfo.Delay = int64(currentAttInfo.InclusionSlot - attesterSlot - missedSlotsCount - 1) res[validator] = append(res[validator], currentAttInfo) } From 794916cde512deb398b92ccdc98047653f67b6d6 Mon Sep 17 00:00:00 2001 From: spletka Date: Tue, 5 Sep 2023 14:52:04 +0200 Subject: [PATCH 5/5] (BIDS-2434) Fixed wrong attesterSlot usage --- db/bigtable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/bigtable.go b/db/bigtable.go index 21aa63c174..5a044a7281 100644 --- a/db/bigtable.go +++ b/db/bigtable.go @@ -975,7 +975,7 @@ func (bigtable *Bigtable) GetValidatorAttestationHistory(validators []uint64, st } missedSlotsCount := uint64(0) - for slot := currentAttInfo.AttesterSlot + 1; slot < currentAttInfo.InclusionSlot; slot++ { + for slot := attesterSlot + 1; slot < currentAttInfo.InclusionSlot; slot++ { if missedSlotsMap[slot] || orphanedSlotsMap[slot] { missedSlotsCount++ }