Skip to content

Commit

Permalink
core/fetcher: add support for DutyAggregator (#1118)
Browse files Browse the repository at this point in the history
Add support for `DutyAggregator` to `fetcher`.

category: feature
ticket: #1088
  • Loading branch information
xenowits authored Sep 13, 2022
1 parent 2f60f09 commit eed2dad
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 17 deletions.
72 changes: 63 additions & 9 deletions core/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/eth2util/eth2exp"
)

// New returns a new fetcher instance.
Expand Down Expand Up @@ -71,6 +72,11 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef
if err != nil {
return errors.Wrap(err, "fetch proposer data")
}
case core.DutyAggregator:
unsignedSet, err = f.fetchAggregatorData(ctx, duty.Slot, defSet)
if err != nil {
return errors.Wrap(err, "fetch aggregator data")
}
default:
return errors.New("unsupported duty type", z.Str("type", duty.Type.String()))
}
Expand All @@ -89,8 +95,8 @@ func (f *Fetcher) Fetch(ctx context.Context, duty core.Duty, defSet core.DutyDef
return nil
}

// RegisterAggSigDB registers a function to get resolved aggregated signed data from the AggSigDB.
// Note: This is not thread safe should be called *before* Fetch.
// RegisterAggSigDB registers a function to get resolved aggregated signed data from AggSigDB.
// Note: This is not thread safe and should only be called *before* Fetch.
func (f *Fetcher) RegisterAggSigDB(fn func(context.Context, core.Duty, core.PubKey) (core.SignedData, error)) {
f.aggSigDBFunc = fn
}
Expand All @@ -108,15 +114,17 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core
return nil, errors.New("invalid attester definition")
}

eth2AttData, ok := dataByCommIdx[attDuty.CommitteeIndex]
commIdx := attDuty.CommitteeIndex

eth2AttData, ok := dataByCommIdx[commIdx]
if !ok {
var err error
eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), attDuty.CommitteeIndex)
eth2AttData, err = f.eth2Cl.AttestationData(ctx, eth2p0.Slot(uint64(slot)), commIdx)
if err != nil {
return nil, err
}

dataByCommIdx[attDuty.CommitteeIndex] = eth2AttData
dataByCommIdx[commIdx] = eth2AttData
}

attData := core.AttestationData{
Expand All @@ -130,14 +138,60 @@ func (f *Fetcher) fetchAttesterData(ctx context.Context, slot int64, defSet core
return resp, nil
}

func (f *Fetcher) fetchAggregatorData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
resp := make(core.UnsignedDataSet)
for pubkey := range defSet {
// Query AggSigDB for DutyPrepareAggregator to get beacon committee subscription.
prepAggData, err := f.aggSigDBFunc(ctx, core.NewPrepareAggregatorDuty(slot), pubkey)
if err != nil {
return core.UnsignedDataSet{}, err
}

sub, ok := prepAggData.(core.SignedBeaconCommitteeSubscription)
if !ok {
return core.UnsignedDataSet{}, errors.New("invalid beacon committee subscription")
}

res, err := eth2exp.CalculateCommitteeSubscriptionResponse(ctx, f.eth2Cl, &sub.BeaconCommitteeSubscription)
if err != nil {
return core.UnsignedDataSet{}, err
}

// This validator isn't an aggregator for this slot.
if !res.IsAggregator {
continue
}

// Query AggSigDB for DutyAttester to get attestation data root.
attData, err := f.aggSigDBFunc(ctx, core.NewAttesterDuty(slot), pubkey)
if err != nil {
return core.UnsignedDataSet{}, err
}

att, ok := attData.(core.Attestation)
if !ok {
return core.UnsignedDataSet{}, errors.New("invalid attestation")
}

// Query BN for aggregate attestation.
aggAtt, err := f.eth2Cl.AggregateAttestation(ctx, eth2p0.Slot(slot), att.Data.BeaconBlockRoot)
if err != nil {
return core.UnsignedDataSet{}, err
}

resp[pubkey] = core.AggregatedAttestation{
Attestation: *aggAtt,
}
}

return resp, nil
}

func (f *Fetcher) fetchProposerData(ctx context.Context, slot int64, defSet core.DutyDefinitionSet) (core.UnsignedDataSet, error) {
resp := make(core.UnsignedDataSet)
for pubkey := range defSet {
// Fetch previously aggregated randao reveal from AggSigDB
dutyRandao := core.Duty{
Slot: slot,
Type: core.DutyRandao,
}
dutyRandao := core.NewRandaoDuty(slot)
randaoData, err := f.aggSigDBFunc(ctx, dutyRandao, pubkey)
if err != nil {
return nil, err
Expand Down
115 changes: 111 additions & 4 deletions core/fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ func TestFetchAttester(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewAttesterDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewAttesterDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyAttester, Slot: slot}

duty := core.NewAttesterDuty(slot)
bmock, err := beaconmock.New()
require.NoError(t, err)
fetch, err := fetcher.New(bmock)
Expand All @@ -93,6 +92,97 @@ func TestFetchAttester(t *testing.T) {
require.NoError(t, err)
}

func TestFetchAggregator(t *testing.T) {
ctx := context.Background()

const (
slot = 1
vIdxA = 2
vIdxB = 3
commIdxA = 4
commIdxB = 5
commLen = 6
)

duty := core.NewAggregatorDuty(slot)

pubkeysByIdx := map[eth2p0.ValidatorIndex]core.PubKey{
vIdxA: testutil.RandomCorePubKey(t),
vIdxB: testutil.RandomCorePubKey(t),
}

defSet := core.DutyDefinitionSet{
pubkeysByIdx[vIdxA]: core.NewEmptyDefinition(),
pubkeysByIdx[vIdxB]: core.NewEmptyDefinition(),
}

signedCommSubByPubKey := map[core.PubKey]core.SignedData{
pubkeysByIdx[vIdxA]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxA, slot, commIdxA),
pubkeysByIdx[vIdxB]: testutil.RandomSignedBeaconCommitteeSubscription(vIdxB, slot, commIdxB),
}

attByPubKey := map[core.PubKey]core.SignedData{
pubkeysByIdx[vIdxA]: core.Attestation{
Attestation: *testutil.RandomAttestation(),
},
pubkeysByIdx[vIdxB]: core.Attestation{
Attestation: *testutil.RandomAttestation(),
},
}

bmock, err := beaconmock.New()
require.NoError(t, err)

bmock.BeaconCommitteesAtEpochFunc = func(_ context.Context, _ string, _ eth2p0.Epoch) ([]*eth2v1.BeaconCommittee, error) {
return []*eth2v1.BeaconCommittee{
beaconCommittee(commIdxA, commLen),
beaconCommittee(commIdxB, commLen),
}, nil
}

bmock.AggregateAttestationFunc = func(ctx context.Context, slot eth2p0.Slot, root eth2p0.Root) (*eth2p0.Attestation, error) {
for _, att := range attByPubKey {
a := att.(core.Attestation)
if a.Data.BeaconBlockRoot == root {
return &a.Attestation, nil
}
}

return &eth2p0.Attestation{}, nil
}

fetch, err := fetcher.New(bmock)
require.NoError(t, err)

fetch.RegisterAggSigDB(func(ctx context.Context, duty core.Duty, key core.PubKey) (core.SignedData, error) {
if duty.Type == core.DutyAttester {
return attByPubKey[key], nil
}

return signedCommSubByPubKey[key], nil
})

err = fetch.Fetch(ctx, duty, defSet)
require.NoError(t, err)

fetch.Subscribe(func(ctx context.Context, resDuty core.Duty, resDataSet core.UnsignedDataSet) error {
require.Equal(t, duty, resDuty)
require.Len(t, resDataSet, 2)

for pubkey, aggAtt := range resDataSet {
aggregated, ok := aggAtt.(core.AggregatedAttestation)
require.True(t, ok)

att, ok := attByPubKey[pubkey].(core.Attestation)
require.True(t, ok)

require.Equal(t, aggregated.Attestation, att.Attestation)
}

return nil
})
}

func TestFetchProposer(t *testing.T) {
ctx := context.Background()

Expand All @@ -119,7 +209,7 @@ func TestFetchProposer(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyProposer, Slot: slot}
duty := core.NewProposerDuty(slot)

randaoA := testutil.RandomCoreSignature()
randaoB := testutil.RandomCoreSignature()
Expand Down Expand Up @@ -186,7 +276,7 @@ func TestFetchBuilderProposer(t *testing.T) {
pubkeysByIdx[vIdxA]: core.NewProposerDefinition(&dutyA),
pubkeysByIdx[vIdxB]: core.NewProposerDefinition(&dutyB),
}
duty := core.Duty{Type: core.DutyBuilderProposer, Slot: slot}
duty := core.NewBuilderProposerDuty(slot)

randaoA := testutil.RandomCoreSignature()
randaoB := testutil.RandomCoreSignature()
Expand Down Expand Up @@ -252,3 +342,20 @@ func assertRandaoBlindedBlock(t *testing.T, randao eth2p0.BLSSignature, block co
require.Fail(t, "invalid block")
}
}

// beaconCommittee returns a BeaconCommittee with the given committee index and a list of commLen validator indexes.
func beaconCommittee(commIdx, commLen int) *eth2v1.BeaconCommittee {
var (
slot = eth2p0.Slot(1)
vals []eth2p0.ValidatorIndex
)
for idx := 1; idx <= commLen; idx++ {
vals = append(vals, eth2p0.ValidatorIndex(idx))
}

return &eth2v1.BeaconCommittee{
Slot: slot,
Index: eth2p0.CommitteeIndex(commIdx),
Validators: vals,
}
}
3 changes: 1 addition & 2 deletions core/signeddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,8 +803,7 @@ func (s *SignedAggregateAndProof) UnmarshalJSON(input []byte) error {
}

// cloneJSONMarshaler clones the marshaler by serialising to-from json
// since eth2 types contains pointers. The result is stored
// in the value pointed to by v.
// since eth2 types contain pointers. The result is stored in the value pointed to by v.
func cloneJSONMarshaler(data json.Marshaler, v any) error {
bytes, err := data.MarshalJSON()
if err != nil {
Expand Down
36 changes: 35 additions & 1 deletion core/unsigneddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (

var (
_ UnsignedData = AttestationData{}
_ UnsignedData = AggregatedAttestation{}
_ UnsignedData = VersionedBeaconBlock{}
_ UnsignedData = VersionedBlindedBeaconBlock{}
)

// AttestationData wraps the eth2 attestation data and adds the original duty.
// The original duty allows mapping the partial signed response from the VC
// backed to the validator pubkey via the aggregation bits field.
// back to the validator pubkey via the aggregation bits field.
type AttestationData struct {
Data eth2p0.AttestationData
Duty eth2v1.AttesterDuty
Expand Down Expand Up @@ -81,6 +82,39 @@ type attestationDataJSON struct {
Duty *eth2v1.AttesterDuty `json:"attestation_duty"`
}

// AggregatedAttestation wraps Attestation and implements the UnsignedData interface.
type AggregatedAttestation struct {
eth2p0.Attestation
}

func (a AggregatedAttestation) Clone() (UnsignedData, error) {
var resp AggregatedAttestation
err := cloneJSONMarshaler(a, &resp)
if err != nil {
return nil, errors.Wrap(err, "clone aggregated attestation")
}

return resp, nil
}

func (a AggregatedAttestation) MarshalJSON() ([]byte, error) {
resp, err := json.Marshal(a)
if err != nil {
return nil, errors.Wrap(err, "marshal aggregated attestation")
}

return resp, nil
}

func (a *AggregatedAttestation) UnmarshalJSON(input []byte) error { //nolint:revive
var att AggregatedAttestation
if err := json.Unmarshal(input, &att); err != nil {
return errors.Wrap(err, "unmarshal aggregated attestation")
}

return nil
}

// NewVersionedBeaconBlock validates and returns a new wrapped VersionedBeaconBlock.
func NewVersionedBeaconBlock(block *spec.VersionedBeaconBlock) (VersionedBeaconBlock, error) {
switch block.Version {
Expand Down
2 changes: 1 addition & 1 deletion eth2util/eth2exp/attagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestIsAggregator(t *testing.T) {
})
}

// beaconCommittees returns a BeaconCommittee with the list of commLen validator indexes.
// beaconCommittee returns a BeaconCommittee with the list of commLen validator indexes.
func beaconCommittee(commLen int) *eth2v1.BeaconCommittee {
var (
slot = eth2p0.Slot(1)
Expand Down
13 changes: 13 additions & 0 deletions testutil/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,19 @@ func RandomBeaconCommitteeSubscription() *eth2exp.BeaconCommitteeSubscription {
}
}

// RandomSignedBeaconCommitteeSubscription returns a SignedBeaconCommitteeSubscription with the inputs and a random slot signature.
func RandomSignedBeaconCommitteeSubscription(vIdx, slot, commIdx int) core.SignedBeaconCommitteeSubscription {
return core.SignedBeaconCommitteeSubscription{
BeaconCommitteeSubscription: eth2exp.BeaconCommitteeSubscription{
ValidatorIndex: eth2p0.ValidatorIndex(vIdx),
Slot: eth2p0.Slot(slot),
CommitteeIndex: eth2p0.CommitteeIndex(commIdx),
CommitteesAtSlot: rand.Uint64(),
SlotSignature: RandomEth2Signature(),
},
}
}

func RandomSyncAggregate(t *testing.T) *altair.SyncAggregate {
t.Helper()

Expand Down

0 comments on commit eed2dad

Please sign in to comment.