Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More efficient aggregation on demand #5354

Merged
merged 6 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//shared/hashutil:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
],
)

Expand All @@ -31,6 +32,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//shared/bls:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
],
Expand Down
58 changes: 58 additions & 0 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,68 @@ package kv
import (
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
)

// AggregateUnaggregatedAttestations aggregates the unaggregated attestations and save the
// newly aggregated attestations in the pool.
// It tracks the unaggregated attestations that weren't able to aggregate to prevent
// the deletion of unaggregated attestations in the pool.
func (p *AttCaches) AggregateUnaggregatedAttestations() error {
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation)
unaggregatedAtts := p.UnaggregatedAttestations()
for _, att := range unaggregatedAtts {
attDataRoot, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return err
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
}

// Aggregate unaggregated attestations from the pool and save them in the pool.
// Track the unaggregated attestations that aren't able to aggregate.
leftOverUnaggregatedAtt := make(map[[32]byte]bool)
for _, atts := range attsByDataRoot {
aggregatedAtts := make([]*ethpb.Attestation, 0, len(atts))
processedAtts, err := helpers.AggregateAttestations(atts)
if err != nil {
return err
}
for _, att := range processedAtts {
if helpers.IsAggregated(att) {
aggregatedAtts = append(aggregatedAtts, att)
} else {
h, err := ssz.HashTreeRoot(att)
if err != nil {
return err
}
leftOverUnaggregatedAtt[h] = true
}
}
if err := p.SaveAggregatedAttestations(aggregatedAtts); err != nil {
return err
}
}

// Remove the unaggregated attestations from the pool that were successfully aggregated.
for _, att := range unaggregatedAtts {
h, err := ssz.HashTreeRoot(att)
if err != nil {
return err
}
if leftOverUnaggregatedAtt[h] {
continue
}
if err := p.DeleteUnaggregatedAttestation(att); err != nil {
return err
}
}

return nil
}

// SaveAggregatedAttestation saves an aggregated attestation in cache.
func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
if att == nil || att.Data == nil {
Expand Down
38 changes: 31 additions & 7 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,45 @@ package kv
import (
"reflect"
"sort"
"strings"
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/shared/bls"
)

func TestKV_Aggregated_NotAggregated(t *testing.T) {
func TestKV_AggregateUnaggregatedAttestations(t *testing.T) {
cache := NewAttCaches()
priv := bls.RandKey()
sig1 := priv.Sign([]byte{'a'})
sig2 := priv.Sign([]byte{'b'})
att1 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig1.Marshal()}
att2 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1010}, Signature: sig1.Marshal()}
att3 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig1.Marshal()}
att4 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()}
att5 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig1.Marshal()}
att6 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1010}, Signature: sig1.Marshal()}
att7 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig1.Marshal()}
att8 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()}
atts := []*ethpb.Attestation{att1, att2, att3, att4, att5, att6, att7, att8}
if err := cache.SaveUnaggregatedAttestations(atts); err != nil {
t.Fatal(err)
}
if err := cache.AggregateUnaggregatedAttestations(); err != nil {
t.Fatal(err)
}

att := &ethpb.Attestation{AggregationBits: bitfield.Bitlist{0b11}, Data: &ethpb.AttestationData{}}

wanted := "attestation is not aggregated"
if err := cache.SaveAggregatedAttestation(att); !strings.Contains(err.Error(), wanted) {
t.Error("Did not received wanted error")
if len(cache.AggregatedAttestationsBySlotIndex(1, 0)) != 1 {
t.Fatal("Did not aggregate correctly")
}
if len(cache.AggregatedAttestationsBySlotIndex(2, 0)) != 1 {
t.Fatal("Did not aggregate correctly")
}
if len(cache.UnAggregatedAttestationsBySlotIndex(1, 0)) != 0 {
t.Fatal("Did not clear unaggregated correctly")
}
if len(cache.UnAggregatedAttestationsBySlotIndex(2, 0)) != 0 {
t.Fatal("Did not clear unaggregated correctly")
}
}

Expand Down
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// for aggregator actor.
type Pool interface {
// For Aggregated attestations
AggregateUnaggregatedAttestations() error
SaveAggregatedAttestation(att *ethpb.Attestation) error
SaveAggregatedAttestations(atts []*ethpb.Attestation) error
AggregatedAttestations() []*ethpb.Attestation
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {

attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation)

atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...)
atts = append(atts, s.pool.BlockAttestations()...)
if err := s.pool.AggregateUnaggregatedAttestations(); err != nil {
return err
}
atts := append(s.pool.AggregatedAttestations(), s.pool.BlockAttestations()...)
atts = append(atts, s.pool.ForkchoiceAttestations()...)

// Consolidate attestations by aggregating them by similar data root.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,24 @@ func TestBatchAttestations_Multiple(t *testing.T) {
t.Fatal(err)
}

wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[0], aggregatedAtts[0], blockAtts[0]})
wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[0], blockAtts[0]})
if err != nil {
t.Fatal(err)
}
aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[1], aggregatedAtts[1], blockAtts[1]})
aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[1], blockAtts[1]})
if err != nil {
t.Fatal(err)
}
wanted = append(wanted, aggregated...)
aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[2], aggregatedAtts[2], blockAtts[2]})
aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[2], blockAtts[2]})
if err != nil {
t.Fatal(err)
}

wanted = append(wanted, aggregated...)
if err := s.pool.AggregateUnaggregatedAttestations(); err != nil {
return
}
received := s.pool.ForkchoiceAttestations()

sort.Slice(received, func(i, j int) bool {
Expand Down
18 changes: 3 additions & 15 deletions beacon-chain/rpc/validator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,10 @@ func (as *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator")
}

// Retrieve the unaggregated attestation from pool.
unaggregatedAtts := as.AttPool.UnAggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
// In case there's left over aggregated attestations in the pool.
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts, err = helpers.AggregateAttestations(append(aggregatedAtts, unaggregatedAtts...))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get aggregate attestations: %v", err)
}

// Save the aggregated attestations to the pool.
if err := as.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not save aggregated attestations: %v", err)
}
if err := as.AttPool.DeleteUnaggregatedAttestations(unaggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not delete unaggregated attestations: %v", err)
if err := as.AttPool.AggregateUnaggregatedAttestations(); err != nil {
return nil, status.Errorf(codes.Internal, "Could not aggregate unaggregated attestations")
}
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)

// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
if len(aggregatedAtts) == 0 {
Expand Down