Skip to content

Commit

Permalink
core/dutydb: add aggregator duty (#1139)
Browse files Browse the repository at this point in the history
Adds support for duty aggregator to dutydb.

category: feature 
ticket: #1115
  • Loading branch information
corverroos authored Sep 20, 2022
1 parent 8c668e9 commit eb44fb3
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 24 deletions.
156 changes: 147 additions & 9 deletions core/dutydb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func NewMemDB(deadliner core.Deadliner) *MemDB {
attKeysBySlot: make(map[int64][]pkKey),
builderProDuties: make(map[int64]*eth2api.VersionedBlindedBeaconBlock),
proDuties: make(map[int64]*spec.VersionedBeaconBlock),
aggDuties: make(map[aggKey]core.AggregatedAttestation),
aggKeysBySlot: make(map[int64][]aggKey),
shutdown: make(chan struct{}),
deadliner: deadliner,
}
Expand All @@ -44,17 +46,29 @@ func NewMemDB(deadliner core.Deadliner) *MemDB {
// MemDB is a in-memory dutyDB implementation.
// It is a placeholder for the badgerDB implementation.
type MemDB struct {
mu sync.Mutex
attDuties map[attKey]*eth2p0.AttestationData
attPubKeys map[pkKey]core.PubKey
attKeysBySlot map[int64][]pkKey
attQueries []attQuery
mu sync.Mutex

// DutyAttester
attDuties map[attKey]*eth2p0.AttestationData
attPubKeys map[pkKey]core.PubKey
attKeysBySlot map[int64][]pkKey
attQueries []attQuery

// DutyBuilderProposer
builderProDuties map[int64]*eth2api.VersionedBlindedBeaconBlock
builderProQueries []builderProQuery
proDuties map[int64]*spec.VersionedBeaconBlock
proQueries []proQuery
shutdown chan struct{}
deadliner core.Deadliner

// DutyProposer
proDuties map[int64]*spec.VersionedBeaconBlock
proQueries []proQuery

// DutyAggregator
aggDuties map[aggKey]core.AggregatedAttestation
aggKeysBySlot map[int64][]aggKey
aggQueries []aggQuery

shutdown chan struct{}
deadliner core.Deadliner
}

// Shutdown results in all blocking queries to return shutdown errors.
Expand Down Expand Up @@ -107,6 +121,14 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig
}
}
db.resolveAttQueriesUnsafe()
case core.DutyAggregator:
for _, unsignedData := range unsignedSet {
err := db.storeAggAttestationUnsafe(unsignedData)
if err != nil {
return err
}
}
db.resolveAggQueriesUnsafe()
default:
return errors.New("unsupported duty type", z.Str("type", duty.Type.String()))
}
Expand Down Expand Up @@ -198,6 +220,42 @@ func (db *MemDB) AwaitAttestation(ctx context.Context, slot int64, commIdx int64
}
}

// AwaitAggAttestation blocks and returns the aggregated attestation for the slot
// and attestation when available.
func (db *MemDB) AwaitAggAttestation(ctx context.Context, slot int64, attestationRoot eth2p0.Root,
) (*eth2p0.Attestation, error) {
db.mu.Lock()
response := make(chan core.AggregatedAttestation, 1) // Buffer of one so resolving never blocks
db.aggQueries = append(db.aggQueries, aggQuery{
Key: aggKey{
Slot: slot,
Root: attestationRoot,
},
Response: response,
})
db.resolveAggQueriesUnsafe()
db.mu.Unlock()

select {
case <-db.shutdown:
return nil, errors.New("dutydb shutdown")
case <-ctx.Done():
return nil, ctx.Err()
case value := <-response:
// Clone before returning.
clone, err := value.Clone()
if err != nil {
return nil, err
}
aggAtt, ok := clone.(core.AggregatedAttestation)
if !ok {
return nil, errors.Wrap(err, "invalid aggregated attestation")
}

return &aggAtt.Attestation, nil
}
}

// PubKeyByAttestation implements core.DutyDB, see its godoc.
func (db *MemDB) PubKeyByAttestation(_ context.Context, slot, commIdx, valCommIdx int64) (core.PubKey, error) {
db.mu.Lock()
Expand Down Expand Up @@ -261,6 +319,52 @@ func (db *MemDB) storeAttestationUnsafe(pubkey core.PubKey, unsignedData core.Un
return nil
}

// storeAggAttestationUnsafe stores the unsigned aggregated attestation. It is unsafe since it assumes the lock is held.
func (db *MemDB) storeAggAttestationUnsafe(unsignedData core.UnsignedData) error {
cloned, err := unsignedData.Clone() // Clone before storing.
if err != nil {
return err
}

aggAtt, ok := cloned.(core.AggregatedAttestation)
if !ok {
return errors.New("invalid unsigned aggregated attestation")
}

aggRoot, err := aggAtt.Attestation.Data.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "hash aggregated attestation root")
}

slot := int64(aggAtt.Attestation.Data.Slot)

// Store key and value for PubKeyByAttestation
key := aggKey{
Slot: slot,
Root: aggRoot,
}
if existing, ok := db.aggDuties[key]; ok {
existingRoot, err := existing.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "attestation root")
}

providedRoot, err := aggAtt.HashTreeRoot()
if err != nil {
return errors.Wrap(err, "attestation root")
}

if existingRoot != providedRoot {
return errors.New("clashing aggregated attestation")
}
} else {
db.aggDuties[key] = aggAtt
db.aggKeysBySlot[slot] = append(db.aggKeysBySlot[slot], key)
}

return nil
}

// storeBeaconBlockUnsafe stores the unsigned BeaconBlock. It is unsafe since it assumes the lock is held.
func (db *MemDB) storeBeaconBlockUnsafe(unsignedData core.UnsignedData) error {
cloned, err := unsignedData.Clone() // Clone before storing.
Expand Down Expand Up @@ -371,6 +475,23 @@ func (db *MemDB) resolveProQueriesUnsafe() {
db.proQueries = unresolved
}

// resolveAggQueriesUnsafe resolve any aggQuery to a result if found.
// It is unsafe since it assume that the lock is held.
func (db *MemDB) resolveAggQueriesUnsafe() {
var unresolved []aggQuery
for _, query := range db.aggQueries {
value, ok := db.aggDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
continue
}

query.Response <- value
}

db.aggQueries = unresolved
}

// resolveBuilderProQueriesUnsafe resolve any builderProQuery to a result if found.
// It is unsafe since it assume that the lock is held.
func (db *MemDB) resolveBuilderProQueriesUnsafe() {
Expand Down Expand Up @@ -401,6 +522,11 @@ func (db *MemDB) deleteDutyUnsafe(duty core.Duty) error {
delete(db.attDuties, attKey{Slot: key.Slot, CommIdx: key.CommIdx})
}
delete(db.attKeysBySlot, duty.Slot)
case core.DutyAggregator:
for _, key := range db.aggKeysBySlot[duty.Slot] {
delete(db.aggDuties, key)
}
delete(db.aggKeysBySlot, duty.Slot)
default:
return errors.New("unknown duty type")
}
Expand All @@ -421,6 +547,12 @@ type pkKey struct {
ValCommIdx int64
}

// aggKey is the key to lookup an aggregated attestation by root in the DB.
type aggKey struct {
Slot int64
Root eth2p0.Root
}

// attQuery is a waiting attQuery with a response channel.
type attQuery struct {
Key attKey
Expand All @@ -433,6 +565,12 @@ type proQuery struct {
Response chan<- *spec.VersionedBeaconBlock
}

// aggQuery is a waiting aggQuery with a response channel.
type aggQuery struct {
Key aggKey
Response chan<- core.AggregatedAttestation
}

// builderProQuery is a waiting builderProQuery with a response channel.
type builderProQuery struct {
Key int64
Expand Down
25 changes: 25 additions & 0 deletions core/dutydb/memory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,31 @@ func TestMemDBProposer(t *testing.T) {
}
}

func TestMemDBAggregator(t *testing.T) {
ctx := context.Background()
db := dutydb.NewMemDB(new(testDeadliner))

const queries = 3

for i := 0; i < queries; i++ {
agg := testutil.RandomAttestation()
set := core.UnsignedDataSet{
testutil.RandomCorePubKey(t): core.NewAggregatedAttestation(agg),
}
slot := int64(agg.Data.Slot)
go func() {
err := db.Store(ctx, core.NewAggregatorDuty(slot), set)
require.NoError(t, err)
}()

root, err := agg.Data.HashTreeRoot()
require.NoError(t, err)
resp, err := db.AwaitAggAttestation(ctx, slot, root)
require.NoError(t, err)
require.Equal(t, agg, resp)
}
}

func TestMemDBClashingBlocks(t *testing.T) {
ctx := context.Background()
db := dutydb.NewMemDB(new(testDeadliner))
Expand Down
13 changes: 11 additions & 2 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ type DutyDB interface {
// slot, committee index and validator committee index. This allows mapping of attestation
// data response to validator.
PubKeyByAttestation(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)

// AwaitAggAttestation blocks and returns the aggregated attestation for the slot
// and attestation when available.
AwaitAggAttestation(ctx context.Context, slot int64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)
}

// Consensus comes to consensus on proposed duty data.
Expand Down Expand Up @@ -97,8 +101,8 @@ type ValidatorAPI interface {
// RegisterGetDutyDefinition registers a function to query duty definitions.
RegisterGetDutyDefinition(func(context.Context, Duty) (DutyDefinitionSet, error))

// RegisterAwaitAggregatedAttestation registers a function to query aggregated attestation.
RegisterAwaitAggregatedAttestation(fn func(ctx context.Context, slot int64, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error))
// RegisterAwaitAggAttestation registers a function to query aggregated attestation.
RegisterAwaitAggAttestation(fn func(ctx context.Context, slot int64, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error))

// RegisterAggSigDB registers a function to query aggregated signed data from aggSigDB.
RegisterAggSigDB(fn func(context.Context, Duty, PubKey) (SignedData, error))
Expand Down Expand Up @@ -174,11 +178,13 @@ type wireFuncs struct {
DutyDBAwaitBlindedBeaconBlock func(ctx context.Context, slot int64) (*eth2api.VersionedBlindedBeaconBlock, error)
DutyDBAwaitAttestation func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error)
DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error)
DutyDBAwaitAggAttestation func(ctx context.Context, slot int64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)
VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error))
VAPIRegisterAwaitBeaconBlock func(func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error))
VAPIRegisterAwaitBlindedBeaconBlock func(func(ctx context.Context, slot int64) (*eth2api.VersionedBlindedBeaconBlock, error))
VAPIRegisterGetDutyDefinition func(func(context.Context, Duty) (DutyDefinitionSet, error))
VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx int64) (PubKey, error))
VAPIRegisterAwaitAggAttestation func(func(ctx context.Context, slot int64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error))
VAPISubscribe func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error
ParSigDBStoreExternal func(context.Context, Duty, ParSignedDataSet) error
Expand Down Expand Up @@ -223,11 +229,13 @@ func Wire(sched Scheduler,
DutyDBAwaitBeaconBlock: dutyDB.AwaitBeaconBlock,
DutyDBAwaitBlindedBeaconBlock: dutyDB.AwaitBlindedBeaconBlock,
DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation,
DutyDBAwaitAggAttestation: dutyDB.AwaitAggAttestation,
VAPIRegisterAwaitBeaconBlock: vapi.RegisterAwaitBeaconBlock,
VAPIRegisterAwaitBlindedBeaconBlock: vapi.RegisterAwaitBlindedBeaconBlock,
VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation,
VAPIRegisterGetDutyDefinition: vapi.RegisterGetDutyDefinition,
VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation,
VAPIRegisterAwaitAggAttestation: vapi.RegisterAwaitAggAttestation,
VAPISubscribe: vapi.Subscribe,
ParSigDBStoreInternal: parSigDB.StoreInternal,
ParSigDBStoreExternal: parSigDB.StoreExternal,
Expand Down Expand Up @@ -255,6 +263,7 @@ func Wire(sched Scheduler,
w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation)
w.VAPIRegisterGetDutyDefinition(w.SchedulerGetDutyDefinition)
w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation)
w.VAPIRegisterAwaitAggAttestation(w.DutyDBAwaitAggAttestation)
w.VAPISubscribe(w.ParSigDBStoreInternal)
w.ParSigDBSubscribeInternal(w.ParSigExBroadcast)
w.ParSigExSubscribe(w.ParSigDBStoreExternal)
Expand Down
18 changes: 10 additions & 8 deletions core/unsigneddata.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ type attestationDataJSON struct {
Duty *eth2v1.AttesterDuty `json:"attestation_duty"`
}

// AggregatedAttestation wraps Attestation and implements the UnsignedData interface.
// NewAggregatedAttestation returns a new aggregated attestation.
func NewAggregatedAttestation(att *eth2p0.Attestation) AggregatedAttestation {
return AggregatedAttestation{Attestation: *att}
}

// AggregatedAttestation wraps un unsigned aggregated attestation and implements the UnsignedData interface.
type AggregatedAttestation struct {
eth2p0.Attestation
}
Expand All @@ -98,20 +103,17 @@ func (a AggregatedAttestation) Clone() (UnsignedData, error) {
}

func (a AggregatedAttestation) MarshalJSON() ([]byte, error) {
resp, err := json.Marshal(a)
if err != nil {
return nil, errors.Wrap(err, "marshal aggregated attestation")
}

return resp, nil
return a.Attestation.MarshalJSON()
}

func (a *AggregatedAttestation) UnmarshalJSON(input []byte) error { //nolint:revive
var att AggregatedAttestation
var att eth2p0.Attestation
if err := json.Unmarshal(input, &att); err != nil {
return errors.Wrap(err, "unmarshal aggregated attestation")
}

*a = AggregatedAttestation{Attestation: att}

return nil
}

Expand Down
10 changes: 5 additions & 5 deletions core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type Component struct {
awaitAttFunc func(ctx context.Context, slot, commIdx int64) (*eth2p0.AttestationData, error)
awaitBlockFunc func(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error)
awaitBlindedBlockFunc func(ctx context.Context, slot int64) (*eth2api.VersionedBlindedBeaconBlock, error)
awaitAggAttFunc func(ctx context.Context, slot int64, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error)
awaitAggAttFunc func(ctx context.Context, slot int64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)
aggSigDBFunc func(context.Context, core.Duty, core.PubKey) (core.SignedData, error)
dutyDefFunc func(ctx context.Context, duty core.Duty) (core.DutyDefinitionSet, error)
subs []func(context.Context, core.Duty, core.ParSignedDataSet) error
Expand Down Expand Up @@ -181,9 +181,9 @@ func (c *Component) RegisterGetDutyDefinition(fn func(ctx context.Context, duty
c.dutyDefFunc = fn
}

// RegisterAwaitAggregatedAttestation registers a function to query aggregated attestation.
// RegisterAwaitAggAttestation registers a function to query an aggregated attestation.
// It supports a single function, since it is an input of the component.
func (c *Component) RegisterAwaitAggregatedAttestation(fn func(ctx context.Context, slot int64, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error)) {
func (c *Component) RegisterAwaitAggAttestation(fn func(ctx context.Context, slot int64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)) {
c.awaitAggAttFunc = fn
}

Expand Down Expand Up @@ -654,8 +654,8 @@ func (c Component) SubmitBeaconCommitteeSubscriptionsV2(ctx context.Context, sub
return c.getCommResponse(ctx, psigsBySlot)
}

// AggregateAttestation fetches the aggregate attestation given an attestation.
// Note: It queries aggregated attestation from DutyDB (this is blocking).
// AggregateAttestation returns the aggregate attestation for the given attestation root.
// It does a blocking query to DutyAggregator unsigned data from dutyDB.
func (c Component) AggregateAttestation(ctx context.Context, slot eth2p0.Slot, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error) {
return c.awaitAggAttFunc(ctx, int64(slot), attestationDataRoot)
}
Expand Down

0 comments on commit eb44fb3

Please sign in to comment.