From 1dc5a5a9a7d87d834bc973a78b0c0c591bde2848 Mon Sep 17 00:00:00 2001 From: Ciaran McVeigh Date: Thu, 21 Jul 2022 08:59:59 +0100 Subject: [PATCH] dutydb memory implementation for builder proposer --- core/dutydb/memory.go | 120 ++++++++++++++++++++++++++++++---- core/dutydb/memory_test.go | 129 +++++++++++++++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 11 deletions(-) diff --git a/core/dutydb/memory.go b/core/dutydb/memory.go index 69e252275..57d166672 100644 --- a/core/dutydb/memory.go +++ b/core/dutydb/memory.go @@ -19,6 +19,7 @@ import ( "context" "sync" + eth2api "github.com/attestantio/go-eth2-client/api" "github.com/attestantio/go-eth2-client/spec" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" @@ -30,23 +31,26 @@ import ( // NewMemDB returns a new in-memory dutyDB instance. func NewMemDB() *MemDB { return &MemDB{ - attDuties: make(map[attKey]*eth2p0.AttestationData), - attPubKeys: make(map[pkKey]core.PubKey), - proDuties: make(map[int64]*spec.VersionedBeaconBlock), - shutdown: make(chan struct{}), + attDuties: make(map[attKey]*eth2p0.AttestationData), + attPubKeys: make(map[pkKey]core.PubKey), + builderProDuties: make(map[int64]*eth2api.VersionedBlindedBeaconBlock), + proDuties: make(map[int64]*spec.VersionedBeaconBlock), + shutdown: make(chan struct{}), } } // MemDB is a in-memory dutyDB implementation. // It is a placeholder for the badgerDB implementation. type MemDB struct { - mu sync.Mutex - attDuties map[attKey]*eth2p0.AttestationData - attPubKeys map[pkKey]core.PubKey - attQueries []attQuery - proDuties map[int64]*spec.VersionedBeaconBlock - proQueries []proQuery - shutdown chan struct{} + mu sync.Mutex + attDuties map[attKey]*eth2p0.AttestationData + attPubKeys map[pkKey]core.PubKey + attQueries []attQuery + builderProDuties map[int64]*eth2api.VersionedBlindedBeaconBlock + builderProQueries []builderProQuery + proDuties map[int64]*spec.VersionedBeaconBlock + proQueries []proQuery + shutdown chan struct{} } // Shutdown results in all blocking queries to return shutdown errors. @@ -73,6 +77,18 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig } } db.resolveProQueriesUnsafe() + case core.DutyBuilderProposer: + // Sanity check max one builder proposer per slot + if len(unsignedSet) > 1 { + return errors.New("unexpected builder proposer data set length", z.Int("n", len(unsignedSet))) + } + for _, unsignedData := range unsignedSet { + err := db.storeBlindedBeaconBlockUnsafe(unsignedData) + if err != nil { + return err + } + } + db.resolveBuilderProQueriesUnsafe() case core.DutyAttester: for pubkey, unsignedData := range unsignedSet { err := db.storeAttestationUnsafe(pubkey, unsignedData) @@ -109,6 +125,27 @@ func (db *MemDB) AwaitBeaconBlock(ctx context.Context, slot int64) (*spec.Versio } } +// AwaitBlindedBeaconBlock implements core.DutyDB, see its godoc. +func (db *MemDB) AwaitBlindedBeaconBlock(ctx context.Context, slot int64) (*eth2api.VersionedBlindedBeaconBlock, error) { + db.mu.Lock() + response := make(chan *eth2api.VersionedBlindedBeaconBlock, 1) + db.builderProQueries = append(db.builderProQueries, builderProQuery{ + Key: slot, + Response: response, + }) + db.resolveBuilderProQueriesUnsafe() + db.mu.Unlock() + + select { + case <-db.shutdown: + return nil, errors.New("dutydb shutdown") + case <-ctx.Done(): + return nil, ctx.Err() + case block := <-response: + return block, nil + } +} + // AwaitAttestation implements core.DutyDB, see its godoc. func (db *MemDB) AwaitAttestation(ctx context.Context, slot int64, commIdx int64) (*eth2p0.AttestationData, error) { db.mu.Lock() @@ -233,6 +270,44 @@ func (db *MemDB) storeBeaconBlockUnsafe(unsignedData core.UnsignedData) error { return nil } +// storeBlindedBeaconBlockUnsafe stores the unsigned BlindedBeaconBlock. It is unsafe since it assumes the lock is held. +func (db *MemDB) storeBlindedBeaconBlockUnsafe(unsignedData core.UnsignedData) error { + cloned, err := unsignedData.Clone() // Clone before storing. + if err != nil { + return err + } + + block, ok := cloned.(core.VersionedBlindedBeaconBlock) + if !ok { + return errors.New("invalid unsigned blinded block") + } + + slot, err := block.Slot() + if err != nil { + return err + } + + if existing, ok := db.builderProDuties[int64(slot)]; ok { + existingRoot, err := existing.Root() + if err != nil { + return errors.Wrap(err, "blinded block root") + } + + providedRoot, err := block.Root() + if err != nil { + return errors.Wrap(err, "blinded block root") + } + + if existingRoot != providedRoot { + return errors.New("clashing blinded blocks") + } + } else { + db.builderProDuties[int64(slot)] = &block.VersionedBlindedBeaconBlock + } + + return nil +} + // resolveAttQueriesUnsafe resolve any attQuery to a result if found. // It is unsafe since it assume that the lock is held. func (db *MemDB) resolveAttQueriesUnsafe() { @@ -267,6 +342,23 @@ func (db *MemDB) resolveProQueriesUnsafe() { db.proQueries = unresolved } +// resolveBuilderProQueriesUnsafe resolve any builderProQuery to a result if found. +// It is unsafe since it assume that the lock is held. +func (db *MemDB) resolveBuilderProQueriesUnsafe() { + var unresolved []builderProQuery + for _, query := range db.builderProQueries { + value, ok := db.builderProDuties[query.Key] + if !ok { + unresolved = append(unresolved, query) + continue + } + + query.Response <- value + } + + db.builderProQueries = unresolved +} + // attKey is the key to lookup an attester value in the DB. type attKey struct { Slot int64 @@ -291,3 +383,9 @@ type proQuery struct { Key int64 Response chan<- *spec.VersionedBeaconBlock } + +// builderProQuery is a waiting builderProQuery with a response channel. +type builderProQuery struct { + Key int64 + Response chan<- *eth2api.VersionedBlindedBeaconBlock +} diff --git a/core/dutydb/memory_test.go b/core/dutydb/memory_test.go index 8f9b16550..19972f479 100644 --- a/core/dutydb/memory_test.go +++ b/core/dutydb/memory_test.go @@ -20,6 +20,7 @@ import ( "runtime" "testing" + eth2api "github.com/attestantio/go-eth2-client/api" eth2v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec" eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0" @@ -263,3 +264,131 @@ func TestMemDBClashProposer(t *testing.T) { }) require.ErrorContains(t, err, "clashing blocks") } + +func TestMemDBBuilderProposer(t *testing.T) { + ctx := context.Background() + db := dutydb.NewMemDB() + + const queries = 3 + slots := [queries]int64{123, 456, 789} + + type response struct { + block *eth2api.VersionedBlindedBeaconBlock + } + var awaitResponse [queries]chan response + for i := 0; i < queries; i++ { + awaitResponse[i] = make(chan response) + go func(slot int) { + block, err := db.AwaitBlindedBeaconBlock(ctx, slots[slot]) + require.NoError(t, err) + awaitResponse[slot] <- response{block: block} + }(i) + } + + blocks := make([]*eth2api.VersionedBlindedBeaconBlock, queries) + pubkeysByIdx := make(map[eth2p0.ValidatorIndex]core.PubKey) + for i := 0; i < queries; i++ { + blocks[i] = ð2api.VersionedBlindedBeaconBlock{ + Version: spec.DataVersionBellatrix, + Bellatrix: testutil.RandomBellatrixBlindedBeaconBlock(t), + } + blocks[i].Bellatrix.Slot = eth2p0.Slot(slots[i]) + blocks[i].Bellatrix.ProposerIndex = eth2p0.ValidatorIndex(i) + pubkeysByIdx[eth2p0.ValidatorIndex(i)] = testutil.RandomCorePubKey(t) + } + + // Store the Blocks + for i := 0; i < queries; i++ { + unsigned, err := core.NewVersionedBlindedBeaconBlock(blocks[i]) + require.NoError(t, err) + + duty := core.Duty{Slot: slots[i], Type: core.DutyBuilderProposer} + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkeysByIdx[eth2p0.ValidatorIndex(i)]: unsigned, + }) + require.NoError(t, err) + } + + // Get and assert the proQuery responses + for i := 0; i < queries; i++ { + actualData := <-awaitResponse[i] + require.Equal(t, blocks[i], actualData.block) + } +} + +func TestMemDBClashingBlindedBlocks(t *testing.T) { + ctx := context.Background() + db := dutydb.NewMemDB() + + const slot = 123 + block1 := ð2api.VersionedBlindedBeaconBlock{ + Version: spec.DataVersionBellatrix, + Bellatrix: testutil.RandomBellatrixBlindedBeaconBlock(t), + } + block1.Bellatrix.Slot = eth2p0.Slot(slot) + block2 := ð2api.VersionedBlindedBeaconBlock{ + Version: spec.DataVersionBellatrix, + Bellatrix: testutil.RandomBellatrixBlindedBeaconBlock(t), + } + block2.Bellatrix.Slot = eth2p0.Slot(slot) + pubkey := testutil.RandomCorePubKey(t) + + // Encode the Blocks + unsigned1, err := core.NewVersionedBlindedBeaconBlock(block1) + require.NoError(t, err) + + unsigned2, err := core.NewVersionedBlindedBeaconBlock(block2) + require.NoError(t, err) + + // Store the Blocks + duty := core.Duty{Slot: slot, Type: core.DutyBuilderProposer} + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkey: unsigned1, + }) + require.NoError(t, err) + + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkey: unsigned2, + }) + require.ErrorContains(t, err, "clashing blinded blocks") +} + +func TestMemDBClashBuilderProposer(t *testing.T) { + ctx := context.Background() + db := dutydb.NewMemDB() + + const slot = 123 + + block := ð2api.VersionedBlindedBeaconBlock{ + Version: spec.DataVersionBellatrix, + Bellatrix: testutil.RandomBellatrixBlindedBeaconBlock(t), + } + block.Bellatrix.Slot = eth2p0.Slot(slot) + pubkey := testutil.RandomCorePubKey(t) + + // Encode the block + unsigned, err := core.NewVersionedBlindedBeaconBlock(block) + require.NoError(t, err) + + // Store the Blocks + duty := core.Duty{Slot: slot, Type: core.DutyBuilderProposer} + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkey: unsigned, + }) + require.NoError(t, err) + + // Store same block from same validator to test idempotent inserts + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkey: unsigned, + }) + require.NoError(t, err) + + // Store a different block for the same slot + block.Bellatrix.ProposerIndex++ + unsignedB, err := core.NewVersionedBlindedBeaconBlock(block) + require.NoError(t, err) + err = db.Store(ctx, duty, core.UnsignedDataSet{ + pubkey: unsignedB, + }) + require.ErrorContains(t, err, "clashing blinded blocks") +}