diff --git a/core/fetcher/fetcher.go b/core/fetcher/fetcher.go index 55a5ce89e..e81a66ee1 100644 --- a/core/fetcher/fetcher.go +++ b/core/fetcher/fetcher.go @@ -26,6 +26,7 @@ import ( "github.com/obolnetwork/charon/app/version" "github.com/obolnetwork/charon/app/z" "github.com/obolnetwork/charon/core" + "github.com/obolnetwork/charon/eth2util/eth2exp" ) // New returns a new fetcher instance. @@ -71,6 +72,11 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef if err != nil { return errors.Wrap(err, "fetch proposer data") } + case core.DutyAggregator: + unsignedSet, err = f.fetchAggregatorData(ctx, duty.Slot, defSet) + if err != nil { + return errors.Wrap(err, "fetch aggregator data") + } default: return errors.New("unsupported duty type", z.Str("type", duty.Type.String())) } @@ -89,8 +95,8 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef return nil } -// RegisterAggSigDB registers a function to get resolved aggregated signed data from the AggSigDB. -// Note: This is not thread safe should be called *before* Fetch. +// RegisterAggSigDB registers a function to get resolved aggregated signed data from AggSigDB. +// Note: This is not thread safe and should only be called *before* Fetch. func (f *Fetcher) RegisterAggSigDB(fn func(context.Context, core.Duty, core.PubKey) (core.SignedData, error)) { f.aggSigDBFunc = fn } @@ -108,15 +114,17 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core return nil, errors.New("invalid attester definition") } - eth2AttData, ok := dataByCommIdx[attDuty.CommitteeIndex] + commIdx := attDuty.CommitteeIndex + + eth2AttData, ok := dataByCommIdx[commIdx] if !ok { var err error - eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), attDuty.CommitteeIndex) + eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), commIdx) if err != nil { return nil, err } - dataByCommIdx[attDuty.CommitteeIndex] = eth2AttData + dataByCommIdx[commIdx] = eth2AttData } attData := core.AttestationData{ @@ -130,14 +138,60 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core return resp, nil } +func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) { + resp := make(core.UnsignedDataSet) + for pubkey := range defSet { + // Query AggSigDB for DutyPrepareAggregator to get beacon committee subscription. + prepAggData, err := f.aggSigDBFunc(ctx, core.NewPrepareAggregatorDuty(slot), pubkey) + if err != nil { + return core.UnsignedDataSet{}, err + } + + sub, ok := prepAggData.(core.SignedBeaconCommitteeSubscription) + if !ok { + return core.UnsignedDataSet{}, errors.New("invalid beacon committee subscription") + } + + res, err := eth2exp.CalculateCommitteeSubscriptionResponse(ctx, f.eth2Cl, &sub.BeaconCommitteeSubscription) + if err != nil { + return core.UnsignedDataSet{}, err + } + + // This validator isn't an aggregator for this slot. + if !res.IsAggregator { + continue + } + + // Query AggSigDB for DutyAttester to get attestation data root. + attData, err := f.aggSigDBFunc(ctx, core.NewAttesterDuty(slot), pubkey) + if err != nil { + return core.UnsignedDataSet{}, err + } + + att, ok := attData.(core.Attestation) + if !ok { + return core.UnsignedDataSet{}, errors.New("invalid attestation") + } + + // Query BN for aggregate attestation. + aggAtt, err := f.eth2Cl.AggregateAttestation(ctx, eth2p0.Slot(slot), att.Data.BeaconBlockRoot) + if err != nil { + return core.UnsignedDataSet{}, err + } + + resp[pubkey] = core.AggregatedAttestation{ + Attestation: *aggAtt, + } + } + + return resp, nil +} + func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) { resp := make(core.UnsignedDataSet) for pubkey := range defSet { // Fetch previously aggregated randao reveal from AggSigDB - dutyRandao := core.Duty{ - Slot: slot, - Type: core.DutyRandao, - } + dutyRandao := core.NewRandaoDuty(slot) randaoData, err := f.aggSigDBFunc(ctx, dutyRandao, pubkey) if err != nil { return nil, err diff --git a/core/fetcher/fetcher_test.go b/core/fetcher/fetcher_test.go index b5e2541fd..ff96a8a4f 100644 --- a/core/fetcher/fetcher_test.go +++ b/core/fetcher/fetcher_test.go @@ -65,8 +65,7 @@ func TestFetchAttester(t *testing.T) { pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA), pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB), } - duty := core.Duty{Type: core.DutyAttester, Slot: slot} - + duty := core.NewAttesterDuty(slot) bmock, err := beaconmock.New() require.NoError(t, err) fetch, err := fetcher.New(bmock) @@ -93,6 +92,97 @@ func TestFetchAttester(t *testing.T) { require.NoError(t, err) } +func TestFetchAggregator(t *testing.T) { + ctx := context.Background() + + const ( + slot = 1 + vIdxA = 2 + vIdxB = 3 + commIdxA = 4 + commIdxB = 5 + commLen = 6 + ) + + duty := core.NewAggregatorDuty(slot) + + pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{ + vIdxA: testutil.RandomCorePubKey(t), + vIdxB: testutil.RandomCorePubKey(t), + } + + defSet := core.DutyDefinitionSet{ + pubkeysByIdx[vIdxA]: core.NewEmptyDefinition(), + pubkeysByIdx[vIdxB]: core.NewEmptyDefinition(), + } + + signedCommSubByPubKey := map[core.PubKey]core.SignedData{ + pubkeysByIdx[vIdxA]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxA, slot, commIdxA), + pubkeysByIdx[vIdxB]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxB, slot, commIdxB), + } + + attByPubKey := map[core.PubKey]core.SignedData{ + pubkeysByIdx[vIdxA]: core.Attestation{ + Attestation: *testutil.RandomAttestation(), + }, + pubkeysByIdx[vIdxB]: core.Attestation{ + Attestation: *testutil.RandomAttestation(), + }, + } + + bmock, err := beaconmock.New() + require.NoError(t, err) + + bmock.BeaconCommitteesAtEpochFunc = func(_ context.Context, _ string, _ eth2p0.Epoch) ([]*eth2v1.BeaconCommittee, error) { + return []*eth2v1.BeaconCommittee{ + beaconCommittee(commIdxA, commLen), + beaconCommittee(commIdxB, commLen), + }, nil + } + + bmock.AggregateAttestationFunc = func(ctx context.Context, slot eth2p0.Slot, root eth2p0.Root) (*eth2p0.Attestation, error) { + for _, att := range attByPubKey { + a := att.(core.Attestation) + if a.Data.BeaconBlockRoot == root { + return &a.Attestation, nil + } + } + + return ð2p0.Attestation{}, nil + } + + fetch, err := fetcher.New(bmock) + require.NoError(t, err) + + fetch.RegisterAggSigDB(func(ctx context.Context, duty core.Duty, key core.PubKey) (core.SignedData, error) { + if duty.Type == core.DutyAttester { + return attByPubKey[key], nil + } + + return signedCommSubByPubKey[key], nil + }) + + err = fetch.Fetch(ctx, duty, defSet) + require.NoError(t, err) + + fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error { + require.Equal(t, duty, resDuty) + require.Len(t, resDataSet, 2) + + for pubkey, aggAtt := range resDataSet { + aggregated, ok := aggAtt.(core.AggregatedAttestation) + require.True(t, ok) + + att, ok := attByPubKey[pubkey].(core.Attestation) + require.True(t, ok) + + require.Equal(t, aggregated.Attestation, att.Attestation) + } + + return nil + }) +} + func TestFetchProposer(t *testing.T) { ctx := context.Background() @@ -119,7 +209,7 @@ func TestFetchProposer(t *testing.T) { pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA), pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB), } - duty := core.Duty{Type: core.DutyProposer, Slot: slot} + duty := core.NewProposerDuty(slot) randaoA := testutil.RandomCoreSignature() randaoB := testutil.RandomCoreSignature() @@ -186,7 +276,7 @@ func TestFetchBuilderProposer(t *testing.T) { pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA), pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB), } - duty := core.Duty{Type: core.DutyBuilderProposer, Slot: slot} + duty := core.NewBuilderProposerDuty(slot) randaoA := testutil.RandomCoreSignature() randaoB := testutil.RandomCoreSignature() @@ -252,3 +342,20 @@ func assertRandaoBlindedBlock(t *testing.T, randao eth2p0.BLSSignature, block co require.Fail(t, "invalid block") } } + +// beaconCommittee returns a BeaconCommittee with the given committee index and a list of commLen validator indexes. +func beaconCommittee(commIdx, commLen int) *eth2v1.BeaconCommittee { + var ( + slot = eth2p0.Slot(1) + vals []eth2p0.ValidatorIndex + ) + for idx := 1; idx <= commLen; idx++ { + vals = append(vals, eth2p0.ValidatorIndex(idx)) + } + + return ð2v1.BeaconCommittee{ + Slot: slot, + Index: eth2p0.CommitteeIndex(commIdx), + Validators: vals, + } +} diff --git a/core/signeddata.go b/core/signeddata.go index 980a4c3a9..1bd36135b 100644 --- a/core/signeddata.go +++ b/core/signeddata.go @@ -803,8 +803,7 @@ func (s *SignedAggregateAndProof) UnmarshalJSON(input []byte) error { } // cloneJSONMarshaler clones the marshaler by serialising to-from json -// since eth2 types contains pointers. The result is stored -// in the value pointed to by v. +// since eth2 types contain pointers. The result is stored in the value pointed to by v. func cloneJSONMarshaler(data json.Marshaler, v any) error { bytes, err := data.MarshalJSON() if err != nil { diff --git a/core/unsigneddata.go b/core/unsigneddata.go index d5887dede..c7075c979 100644 --- a/core/unsigneddata.go +++ b/core/unsigneddata.go @@ -30,13 +30,14 @@ import ( var ( _ UnsignedData = AttestationData{} + _ UnsignedData = AggregatedAttestation{} _ UnsignedData = VersionedBeaconBlock{} _ UnsignedData = VersionedBlindedBeaconBlock{} ) // AttestationData wraps the eth2 attestation data and adds the original duty. // The original duty allows mapping the partial signed response from the VC -// backed to the validator pubkey via the aggregation bits field. +// back to the validator pubkey via the aggregation bits field. type AttestationData struct { Data eth2p0.AttestationData Duty eth2v1.AttesterDuty @@ -81,6 +82,39 @@ type attestationDataJSON struct { Duty *eth2v1.AttesterDuty `json:"attestation_duty"` } +// AggregatedAttestation wraps Attestation and implements the UnsignedData interface. +type AggregatedAttestation struct { + eth2p0.Attestation +} + +func (a AggregatedAttestation) Clone() (UnsignedData, error) { + var resp AggregatedAttestation + err := cloneJSONMarshaler(a, &resp) + if err != nil { + return nil, errors.Wrap(err, "clone aggregated attestation") + } + + return resp, nil +} + +func (a AggregatedAttestation) MarshalJSON() ([]byte, error) { + resp, err := json.Marshal(a) + if err != nil { + return nil, errors.Wrap(err, "marshal aggregated attestation") + } + + return resp, nil +} + +func (a *AggregatedAttestation) UnmarshalJSON(input []byte) error { //nolint:revive + var att AggregatedAttestation + if err := json.Unmarshal(input, &att); err != nil { + return errors.Wrap(err, "unmarshal aggregated attestation") + } + + return nil +} + // NewVersionedBeaconBlock validates and returns a new wrapped VersionedBeaconBlock. func NewVersionedBeaconBlock(block *spec.VersionedBeaconBlock) (VersionedBeaconBlock, error) { switch block.Version { diff --git a/eth2util/eth2exp/attagg_test.go b/eth2util/eth2exp/attagg_test.go index 1321eabdc..fc455207a 100644 --- a/eth2util/eth2exp/attagg_test.go +++ b/eth2util/eth2exp/attagg_test.go @@ -122,7 +122,7 @@ func TestIsAggregator(t *testing.T) { }) } -// beaconCommittees returns a BeaconCommittee with the list of commLen validator indexes. +// beaconCommittee returns a BeaconCommittee with the list of commLen validator indexes. func beaconCommittee(commLen int) *eth2v1.BeaconCommittee { var ( slot = eth2p0.Slot(1) diff --git a/testutil/random.go b/testutil/random.go index 7b79c347f..7f20940d5 100644 --- a/testutil/random.go +++ b/testutil/random.go @@ -347,6 +347,19 @@ func RandomBeaconCommitteeSubscription() *eth2exp.BeaconCommitteeSubscription { } } +// RandomSignedBeaconCommitteeSubscription returns a SignedBeaconCommitteeSubscription with the inputs and a random slot signature. +func RandomSignedBeaconCommitteeSubscription(vIdx, slot, commIdx int) core.SignedBeaconCommitteeSubscription { + return core.SignedBeaconCommitteeSubscription{ + BeaconCommitteeSubscription: eth2exp.BeaconCommitteeSubscription{ + ValidatorIndex: eth2p0.ValidatorIndex(vIdx), + Slot: eth2p0.Slot(slot), + CommitteeIndex: eth2p0.CommitteeIndex(commIdx), + CommitteesAtSlot: rand.Uint64(), + SlotSignature: RandomEth2Signature(), + }, + } +} + func RandomSyncAggregate(t *testing.T) *altair.SyncAggregate { t.Helper()