diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index bfb54748c933..eb6b9af79b5d 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -65,20 +65,21 @@ func (s *Service) processPendingAtts(ctx context.Context) error { hasStateSummary := featureconfig.Get().NewStateMgmt && s.db.HasStateSummary(ctx, bRoot) if s.db.HasBlock(ctx, bRoot) && (s.db.HasState(ctx, bRoot) || hasStateSummary) { numberOfBlocksRecoveredFromAtt.Inc() - for _, att := range attestations { + for _, signedAtt := range attestations { + att := signedAtt.Message // The pending attestations can arrive in both aggregated and unaggregated forms, // each from has distinct validation steps. if helpers.IsAggregated(att.Aggregate) { // Save the pending aggregated attestation to the pool if it passes the aggregated // validation steps. - if s.validateBlockInAttestation(ctx, att) && s.validateAggregatedAtt(ctx, att) { + if s.validateBlockInAttestation(ctx, signedAtt) && s.validateAggregatedAtt(ctx, att) { if err := s.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil { return err } numberOfAttsRecovered.Inc() - // Broadcasting the attestation again once a node is able to process it. - if err := s.p2p.Broadcast(ctx, att); err != nil { + // Broadcasting the signed attestation again once a node is able to process it. + if err := s.p2p.Broadcast(ctx, signedAtt); err != nil { log.WithError(err).Error("Failed to broadcast") } } @@ -93,8 +94,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error { } numberOfAttsRecovered.Inc() - // Broadcasting the attestation again once a node is able to process it. - if err := s.p2p.Broadcast(ctx, att); err != nil { + // Broadcasting the signed attestation again once a node is able to process it. + if err := s.p2p.Broadcast(ctx, signedAtt); err != nil { log.WithError(err).Error("Failed to broadcast") } } @@ -112,7 +113,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error { // Pending attestation's missing block has not arrived yet. log.WithFields(logrus.Fields{ "currentSlot": s.chain.CurrentSlot(), - "attSlot": attestations[0].Aggregate.Data.Slot, + "attSlot": attestations[0].Message.Aggregate.Data.Slot, "attCount": len(attestations), "blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])), }).Debug("Requesting block for pending attestation") @@ -123,7 +124,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error { return nil } pid := pids[rand.Int()%len(pids)] - targetSlot := helpers.SlotToEpoch(attestations[0].Aggregate.Data.Target.Epoch) + targetSlot := helpers.SlotToEpoch(attestations[0].Message.Aggregate.Data.Target.Epoch) for _, p := range pids { if cs, _ := s.p2p.Peers().ChainState(p); cs != nil && cs.HeadSlot >= targetSlot { pid = p @@ -144,14 +145,14 @@ func (s *Service) processPendingAtts(ctx context.Context) error { // This defines how pending attestations is saved in the map. The key is the // root of the missing block. The value is the list of pending attestations // that voted for that block root. -func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) { - root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot) +func (s *Service) savePendingAtt(att *ethpb.SignedAggregateAttestationAndProof) { + root := bytesutil.ToBytes32(att.Message.Aggregate.Data.BeaconBlockRoot) s.pendingAttsLock.Lock() defer s.pendingAttsLock.Unlock() _, ok := s.blkRootToPendingAtts[root] if !ok { - s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att} + s.blkRootToPendingAtts[root] = []*ethpb.SignedAggregateAttestationAndProof{att} return } @@ -171,7 +172,7 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) { for bRoot, atts := range s.blkRootToPendingAtts { for i := len(atts) - 1; i >= 0; i-- { - if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch { + if slot >= atts[i].Message.Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch { // Remove the pending attestation from the list in place. atts = append(atts[:i], atts[i+1:]...) numberOfAttsNotRecovered.Inc() diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go index 55278c80bce1..b1a2894c73e8 100644 --- a/beacon-chain/sync/pending_attestations_queue_test.go +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -44,11 +44,11 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { p2p: p1, db: db, chain: &mock.ChainService{Genesis: roughtime.Now()}, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), } a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}}} - r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.AggregateAttestationAndProof{a} + r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a}} if err := r.processPendingAtts(context.Background()); err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { p2p: p1, db: db, chain: &mock.ChainService{Genesis: roughtime.Now()}, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), attPool: attestations.NewPool(), } @@ -83,7 +83,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { r.db.SaveBlock(context.Background(), b) r.db.SaveState(context.Background(), s, r32) - r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{a} + r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: a}} if err := r.processPendingAtts(context.Background()); err != nil { t.Fatal(err) } @@ -172,7 +172,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { FinalizedCheckPoint: ðpb.Checkpoint{ Epoch: 0, }}, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), attPool: attestations.NewPool(), } @@ -182,7 +182,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { s, _ := beaconstate.InitializeFromProto(&pb.BeaconState{}) r.db.SaveState(context.Background(), s, r32) - r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{aggregateAndProof} + r.blkRootToPendingAtts[r32] = []*ethpb.SignedAggregateAttestationAndProof{{Message: aggregateAndProof}} if err := r.processPendingAtts(context.Background()); err != nil { t.Fatal(err) } @@ -205,7 +205,7 @@ func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { defer dbtest.TeardownDB(t, db) s := &Service{ - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), } // 100 Attestations per block root. @@ -214,15 +214,18 @@ func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { r3 := [32]byte{'C'} for i := 0; i < 100; i++ { - s.savePendingAtt(ðpb.AggregateAttestationAndProof{ - Aggregate: ðpb.Attestation{ - Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r1[:]}}}) - s.savePendingAtt(ðpb.AggregateAttestationAndProof{ - Aggregate: ðpb.Attestation{ - Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r2[:]}}}) - s.savePendingAtt(ðpb.AggregateAttestationAndProof{ - Aggregate: ðpb.Attestation{ - Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r3[:]}}}) + s.savePendingAtt(ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r1[:]}}}}) + s.savePendingAtt(ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r2[:]}}}}) + s.savePendingAtt(ðpb.SignedAggregateAttestationAndProof{ + Message: ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r3[:]}}}}) } if len(s.blkRootToPendingAtts[r1]) != 100 { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index b9181711cea7..4f1bea073421 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -77,7 +77,7 @@ func NewRegularSync(cfg *Config) *Service { attestationNotifier: cfg.AttestationNotifier, slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), stateNotifier: cfg.StateNotifier, blockNotifier: cfg.BlockNotifier, blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), @@ -102,7 +102,7 @@ type Service struct { chain blockchainService slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock seenPendingBlocks map[[32]byte]bool - blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof + blkRootToPendingAtts map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof pendingAttsLock sync.RWMutex pendingQueueLock sync.RWMutex chainStarted bool diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 01910a8ce38a..6bea5d9725e7 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -12,15 +12,15 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { - a, ok := msg.(*ethpb.AggregateAttestationAndProof) + a, ok := msg.(*ethpb.SignedAggregateAttestationAndProof) if !ok { - return fmt.Errorf("message was not type *eth.AggregateAttestationAndProof, type=%T", msg) + return fmt.Errorf("message was not type *eth.SignedAggregateAttestationAndProof, type=%T", msg) } - if a.Aggregate == nil || a.Aggregate.Data == nil { + if a.Message.Aggregate == nil || a.Message.Aggregate.Data == nil { return errors.New("nil aggregate") } - r.setAggregatorIndexSlotSeen(a.Aggregate.Data.Slot, a.AggregatorIndex) + r.setAggregatorIndexSlotSeen(a.Message.Aggregate.Data.Slot, a.Message.AggregatorIndex) - return r.attPool.SaveAggregatedAttestation(a.Aggregate) + return r.attPool.SaveAggregatedAttestation(a.Message.Aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go index df28f11f0f91..bac2dd2a4370 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go @@ -18,12 +18,12 @@ func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) { seenAttestationCache: c, } - a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100} + a := ðpb.SignedAggregateAttestationAndProof{Message: ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100}} if err := r.beaconAggregateProofSubscriber(context.Background(), a); err != nil { t.Fatal(err) } - if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{a.Aggregate}) { + if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{a.Message.Aggregate}) { t.Error("Did not save aggregated attestation") } } diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index d386821970f7..2bc154f11796 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -66,7 +66,7 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms if seen { return false } - if !r.validateBlockInAttestation(ctx, m.Message) { + if !r.validateBlockInAttestation(ctx, m) { return false } @@ -131,14 +131,15 @@ func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateA return true } -func (r *Service) validateBlockInAttestation(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool { +func (r *Service) validateBlockInAttestation(ctx context.Context, s *ethpb.SignedAggregateAttestationAndProof) bool { + a := s.Message // Verify the block being voted and the processed state is in DB. The block should have passed validation if it's in the DB. hasStateSummary := featureconfig.Get().NewStateMgmt && r.db.HasStateSummary(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) hasState := r.db.HasState(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) || hasStateSummary hasBlock := r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) if !(hasState && hasBlock) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. - r.savePendingAtt(a) + r.savePendingAtt(s) return false } return true diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index dc8d8f32803a..c24c8a1d7fad 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -3,12 +3,12 @@ package sync import ( "bytes" "context" - lru "github.com/hashicorp/golang-lru" "reflect" "strings" "testing" "time" + lru "github.com/hashicorp/golang-lru" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -140,7 +140,7 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) { db: db, initialSync: &mockSync.Sync{IsSyncing: false}, attPool: attestations.NewPool(), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), seenAttestationCache: c, } diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index 0390e777aed4..24e4cbb7d355 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -83,7 +83,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p hasBlock := s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) if !(hasState && hasBlock) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. - s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att}) + s.savePendingAtt(ð.SignedAggregateAttestationAndProof{Message: ð.AggregateAttestationAndProof{Aggregate: att}}) return false } diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go index ccca1aa0680d..bdb593239b6a 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go @@ -38,7 +38,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { p2p: p, db: db, chain: chain, - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), seenAttestationCache: c, }