Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/fetcher: add support for DutyAggregator #1118

Merged
merged 3 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/eth2util/eth2exp"
)

// New returns a new fetcher instance.
Expand Down Expand Up @@ -71,6 +72,11 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef
if err != nil {
return errors.Wrap(err, "fetch proposer data")
}
case core.DutyAggregator:
unsignedSet, err = f.fetchAggregatorData(ctx, duty.Slot, defSet)
if err != nil {
return errors.Wrap(err, "fetch aggregator data")
}
default:
return errors.New("unsupported duty type", z.Str("type", duty.Type.String()))
}
Expand All @@ -89,8 +95,8 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef
return nil
}

// RegisterAggSigDB registers a function to get resolved aggregated signed data from the AggSigDB.
// Note: This is not thread safe should be called *before* Fetch.
// RegisterAggSigDB registers a function to get resolved aggregated signed data from AggSigDB.
// Note: This is not thread safe and should only be called *before* Fetch.
func (f *Fetcher) RegisterAggSigDB(fn func(context.Context, core.Duty, core.PubKey) (core.SignedData, error)) {
f.aggSigDBFunc = fn
}
Expand All @@ -108,15 +114,17 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core
return nil, errors.New("invalid attester definition")
}

eth2AttData, ok := dataByCommIdx[attDuty.CommitteeIndex]
commIdx := attDuty.CommitteeIndex

eth2AttData, ok := dataByCommIdx[commIdx]
if !ok {
var err error
eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), attDuty.CommitteeIndex)
eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), commIdx)
if err != nil {
return nil, err
}

dataByCommIdx[attDuty.CommitteeIndex] = eth2AttData
dataByCommIdx[commIdx] = eth2AttData
}

attData := core.AttestationData{
Expand All @@ -130,14 +138,60 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core
return resp, nil
}

func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
resp := make(core.UnsignedDataSet)
for pubkey := range defSet {
// Query AggSigDB for DutyPrepareAggregator to get beacon committee subscription.
prepAggData, err := f.aggSigDBFunc(ctx, core.NewPrepareAggregatorDuty(slot), pubkey)
if err != nil {
return core.UnsignedDataSet{}, err
}

sub, ok := prepAggData.(core.SignedBeaconCommitteeSubscription)
if !ok {
return core.UnsignedDataSet{}, errors.New("invalid beacon committee subscription")
}

res, err := eth2exp.CalculateCommitteeSubscriptionResponse(ctx, f.eth2Cl, &sub.BeaconCommitteeSubscription)
if err != nil {
return core.UnsignedDataSet{}, err
}

// This validator isn't an aggregator for this slot.
if !res.IsAggregator {
continue
xenowits marked this conversation as resolved.
Show resolved Hide resolved
}

// Query AggSigDB for DutyAttester to get attestation data root.
attData, err := f.aggSigDBFunc(ctx, core.NewAttesterDuty(slot), pubkey)
if err != nil {
return core.UnsignedDataSet{}, err
}

att, ok := attData.(core.Attestation)
if !ok {
return core.UnsignedDataSet{}, errors.New("invalid attestation")
}

// Query BN for aggregate attestation.
aggAtt, err := f.eth2Cl.AggregateAttestation(ctx, eth2p0.Slot(slot), att.Data.BeaconBlockRoot)
if err != nil {
return core.UnsignedDataSet{}, err
}

resp[pubkey] = core.AggregatedAttestation{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a core.NewAggregateAttesation(aggAtt) function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I created a new type for aggregate attestation implementing unsignedData

// AggregatedAttestation wraps Attestation and implements the UnsignedData interface.
type AggregatedAttestation struct {
	eth2p0.Attestation
}

Attestation: *aggAtt,
}
}

return resp, nil
}

func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
resp := make(core.UnsignedDataSet)
for pubkey := range defSet {
// Fetch previously aggregated randao reveal from AggSigDB
dutyRandao := core.Duty{
Slot: slot,
Type: core.DutyRandao,
}
dutyRandao := core.NewRandaoDuty(slot)
randaoData, err := f.aggSigDBFunc(ctx, dutyRandao, pubkey)
if err != nil {
return nil, err
Expand Down
115 changes: 111 additions & 4 deletions core/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func TestFetchAttester(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyAttester, Slot: slot}

duty := core.NewAttesterDuty(slot)
bmock, err := beaconmock.New()
require.NoError(t, err)
fetch, err := fetcher.New(bmock)
Expand All @@ -93,6 +92,97 @@ func TestFetchAttester(t *testing.T) {
require.NoError(t, err)
}

func TestFetchAggregator(t *testing.T) {
ctx := context.Background()

const (
slot = 1
vIdxA = 2
vIdxB = 3
commIdxA = 4
commIdxB = 5
commLen = 6
)

duty := core.NewAggregatorDuty(slot)

pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{
vIdxA: testutil.RandomCorePubKey(t),
vIdxB: testutil.RandomCorePubKey(t),
}

defSet := core.DutyDefinitionSet{
pubkeysByIdx[vIdxA]: core.NewEmptyDefinition(),
pubkeysByIdx[vIdxB]: core.NewEmptyDefinition(),
}

signedCommSubByPubKey := map[core.PubKey]core.SignedData{
pubkeysByIdx[vIdxA]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxA, slot, commIdxA),
pubkeysByIdx[vIdxB]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxB, slot, commIdxB),
}

attByPubKey := map[core.PubKey]core.SignedData{
pubkeysByIdx[vIdxA]: core.Attestation{
Attestation: *testutil.RandomAttestation(),
},
pubkeysByIdx[vIdxB]: core.Attestation{
Attestation: *testutil.RandomAttestation(),
},
}

bmock, err := beaconmock.New()
require.NoError(t, err)

bmock.BeaconCommitteesAtEpochFunc = func(_ context.Context, _ string, _ eth2p0.Epoch) ([]*eth2v1.BeaconCommittee, error) {
return []*eth2v1.BeaconCommittee{
beaconCommittee(commIdxA, commLen),
beaconCommittee(commIdxB, commLen),
}, nil
}

bmock.AggregateAttestationFunc = func(ctx context.Context, slot eth2p0.Slot, root eth2p0.Root) (*eth2p0.Attestation, error) {
for _, att := range attByPubKey {
a := att.(core.Attestation)
if a.Data.BeaconBlockRoot == root {
return &a.Attestation, nil
}
}

return &eth2p0.Attestation{}, nil
}

fetch, err := fetcher.New(bmock)
require.NoError(t, err)

fetch.RegisterAggSigDB(func(ctx context.Context, duty core.Duty, key core.PubKey) (core.SignedData, error) {
if duty.Type == core.DutyAttester {
return attByPubKey[key], nil
}

return signedCommSubByPubKey[key], nil
})

err = fetch.Fetch(ctx, duty, defSet)
require.NoError(t, err)

fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error {
require.Equal(t, duty, resDuty)
require.Len(t, resDataSet, 2)

for pubkey, aggAtt := range resDataSet {
aggregated, ok := aggAtt.(core.AggregatedAttestation)
require.True(t, ok)

att, ok := attByPubKey[pubkey].(core.Attestation)
require.True(t, ok)

require.Equal(t, aggregated.Attestation, att.Attestation)
}

return nil
})
}

func TestFetchProposer(t *testing.T) {
ctx := context.Background()

Expand All @@ -119,7 +209,7 @@ func TestFetchProposer(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyProposer, Slot: slot}
duty := core.NewProposerDuty(slot)

randaoA := testutil.RandomCoreSignature()
randaoB := testutil.RandomCoreSignature()
Expand Down Expand Up @@ -186,7 +276,7 @@ func TestFetchBuilderProposer(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyBuilderProposer, Slot: slot}
duty := core.NewBuilderProposerDuty(slot)

randaoA := testutil.RandomCoreSignature()
randaoB := testutil.RandomCoreSignature()
Expand Down Expand Up @@ -252,3 +342,20 @@ func assertRandaoBlindedBlock(t *testing.T, randao eth2p0.BLSSignature, block co
require.Fail(t, "invalid block")
}
}

// beaconCommittee returns a BeaconCommittee with the given committee index and a list of commLen validator indexes.
func beaconCommittee(commIdx, commLen int) *eth2v1.BeaconCommittee {
var (
slot = eth2p0.Slot(1)
vals []eth2p0.ValidatorIndex
)
for idx := 1; idx <= commLen; idx++ {
vals = append(vals, eth2p0.ValidatorIndex(idx))
}

return &eth2v1.BeaconCommittee{
Slot: slot,
Index: eth2p0.CommitteeIndex(commIdx),
Validators: vals,
}
}
3 changes: 1 addition & 2 deletions core/signeddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,7 @@ func (s *SignedAggregateAndProof) UnmarshalJSON(input []byte) error {
}

// cloneJSONMarshaler clones the marshaler by serialising to-from json
// since eth2 types contains pointers. The result is stored
// in the value pointed to by v.
// since eth2 types contain pointers. The result is stored in the value pointed to by v.
func cloneJSONMarshaler(data json.Marshaler, v any) error {
bytes, err := data.MarshalJSON()
if err != nil {
Expand Down
36 changes: 35 additions & 1 deletion core/unsigneddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (

var (
_ UnsignedData = AttestationData{}
_ UnsignedData = AggregatedAttestation{}
_ UnsignedData = VersionedBeaconBlock{}
_ UnsignedData = VersionedBlindedBeaconBlock{}
)

// AttestationData wraps the eth2 attestation data and adds the original duty.
// The original duty allows mapping the partial signed response from the VC
// backed to the validator pubkey via the aggregation bits field.
// back to the validator pubkey via the aggregation bits field.
type AttestationData struct {
Data eth2p0.AttestationData
Duty eth2v1.AttesterDuty
Expand Down Expand Up @@ -81,6 +82,39 @@ type attestationDataJSON struct {
Duty *eth2v1.AttesterDuty `json:"attestation_duty"`
}

// AggregatedAttestation wraps Attestation and implements the UnsignedData interface.
type AggregatedAttestation struct {
eth2p0.Attestation
}

func (a AggregatedAttestation) Clone() (UnsignedData, error) {
var resp AggregatedAttestation
err := cloneJSONMarshaler(a, &resp)
if err != nil {
return nil, errors.Wrap(err, "clone aggregated attestation")
}

return resp, nil
}

func (a AggregatedAttestation) MarshalJSON() ([]byte, error) {
resp, err := json.Marshal(a)
if err != nil {
return nil, errors.Wrap(err, "marshal aggregated attestation")
}

return resp, nil
}

func (a *AggregatedAttestation) UnmarshalJSON(input []byte) error { //nolint:revive
var att AggregatedAttestation
if err := json.Unmarshal(input, &att); err != nil {
return errors.Wrap(err, "unmarshal aggregated attestation")
}

return nil
}

// NewVersionedBeaconBlock validates and returns a new wrapped VersionedBeaconBlock.
func NewVersionedBeaconBlock(block *spec.VersionedBeaconBlock) (VersionedBeaconBlock, error) {
switch block.Version {
Expand Down
2 changes: 1 addition & 1 deletion eth2util/eth2exp/attagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestIsAggregator(t *testing.T) {
})
}

// beaconCommittees returns a BeaconCommittee with the list of commLen validator indexes.
// beaconCommittee returns a BeaconCommittee with the list of commLen validator indexes.
func beaconCommittee(commLen int) *eth2v1.BeaconCommittee {
var (
slot = eth2p0.Slot(1)
Expand Down
13 changes: 13 additions & 0 deletions testutil/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ func RandomBeaconCommitteeSubscription() *eth2exp.BeaconCommitteeSubscription {
}
}

// RandomSignedBeaconCommitteeSubscription returns a SignedBeaconCommitteeSubscription with the inputs and a random slot signature.
func RandomSignedBeaconCommitteeSubscription(vIdx, slot, commIdx int) core.SignedBeaconCommitteeSubscription {
return core.SignedBeaconCommitteeSubscription{
BeaconCommitteeSubscription: eth2exp.BeaconCommitteeSubscription{
ValidatorIndex: eth2p0.ValidatorIndex(vIdx),
Slot: eth2p0.Slot(slot),
CommitteeIndex: eth2p0.CommitteeIndex(commIdx),
CommitteesAtSlot: rand.Uint64(),
SlotSignature: RandomEth2Signature(),
},
}
}

func RandomSyncAggregate(t *testing.T) *altair.SyncAggregate {
t.Helper()

Expand Down