Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Added SyncAggregate computation to block production #10009

Merged
merged 26 commits into from
Apr 23, 2024
Merged
2 changes: 1 addition & 1 deletion cl/aggregation/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p *aggregationPoolImpl) AddAttestation(inAtt *solid.Attestation) error {
return nil
}

if utils.IsSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) {
if utils.IsNonStrictSupersetBitlist(att.AggregationBits(), inAtt.AggregationBits()) {
return ErrIsSuperset
}

Expand Down
14 changes: 9 additions & 5 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync"
"time"

"github.com/Giulio2002/bls"
"github.com/go-chi/chi/v5"
"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -170,10 +169,6 @@ func (a *ApiHandler) produceBeaconBody(ctx context.Context, apiVersion int, base
beaconBody.RandaoReveal = randaoReveal
beaconBody.Graffiti = graffiti
beaconBody.Version = stateVersion
// Sync aggregate is empty for now.
beaconBody.SyncAggregate = &cltypes.SyncAggregate{
SyncCommiteeSignature: bls.InfiniteSignature,
}

// Build execution payload
latestExecutionPayload := baseState.LatestExecutionPayloadHeader()
Expand Down Expand Up @@ -308,6 +303,15 @@ func (a *ApiHandler) produceBeaconBody(ctx context.Context, apiVersion int, base
}
}
}()
// process the sync aggregate in parallel
wg.Add(1)
go func() {
defer wg.Done()
beaconBody.SyncAggregate, err = a.syncMessagePool.GetSyncAggregate(targetSlot-1, blockRoot)
if err != nil {
log.Error("BlockProduction: Failed to get sync aggregate", "err", err)
}
}()
wg.Wait()
if executionPayload == nil {
return nil, 0, fmt.Errorf("failed to produce execution payload")
Expand Down
2 changes: 1 addition & 1 deletion cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
bcfg.CapellaForkEpoch = 1
blocks, preState, postState = tests.GetCapellaRandom()
}
fcu = forkchoice.NewForkChoiceStorageMock()
fcu = forkchoice.NewForkChoiceStorageMock(t)
db = memdb.NewTestDB(t)
blobDb := memdb.NewTestDB(t)
var reader *tests.MockBlockReader
Expand Down
50 changes: 48 additions & 2 deletions cl/phase1/forkchoice/forkchoice_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package forkchoice

import (
"context"
"testing"

"github.com/ledgerwatch/erigon-lib/common"
libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/ledgerwatch/erigon/cl/pool"
"github.com/ledgerwatch/erigon/cl/transition/impl/eth2"
"github.com/ledgerwatch/erigon/cl/validator/sync_contribution_pool"
"go.uber.org/mock/gomock"
)

// Make mocks with maps and simple setters and getters, panic on methods from ForkChoiceStorageWriter
Expand Down Expand Up @@ -47,7 +49,51 @@ type ForkChoiceStorageMock struct {
Pool pool.OperationsPool
}

func NewForkChoiceStorageMock() *ForkChoiceStorageMock {
func makeSyncContributionPoolMock(t *testing.T) sync_contribution_pool.SyncContributionPool {
ctrl := gomock.NewController(t)
type syncContributionKey struct {
slot uint64
subcommitteeIndex uint64
beaconBlockRoot common.Hash
}
u := map[syncContributionKey]*cltypes.Contribution{}
pool := sync_contribution_pool.NewMockSyncContributionPool(ctrl)
pool.EXPECT().AddSyncContribution(gomock.Any(), gomock.Any()).DoAndReturn(func(headState *state.CachingBeaconState, contribution *cltypes.Contribution) error {
key := syncContributionKey{
slot: contribution.Slot,
subcommitteeIndex: contribution.SubcommitteeIndex,
beaconBlockRoot: contribution.BeaconBlockRoot,
}
u[key] = contribution
return nil
}).AnyTimes()
pool.EXPECT().GetSyncContribution(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(slot uint64, subcommitteeIndex uint64, beaconBlockRoot common.Hash) (*cltypes.Contribution, bool) {
key := syncContributionKey{
slot: slot,
subcommitteeIndex: subcommitteeIndex,
beaconBlockRoot: beaconBlockRoot,
}
v, ok := u[key]
return v, ok
}).AnyTimes()
pool.EXPECT().AddSyncCommitteeMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(headState *state.CachingBeaconState, subCommitee uint64, message *cltypes.SyncCommitteeMessage) error {
key := syncContributionKey{
slot: message.Slot,
subcommitteeIndex: subCommitee,
beaconBlockRoot: message.BeaconBlockRoot,
}
u[key] = &cltypes.Contribution{
Slot: message.Slot,
SubcommitteeIndex: subCommitee,
BeaconBlockRoot: message.BeaconBlockRoot,
AggregationBits: make([]byte, cltypes.SyncCommitteeAggregationBitsSize),
}
return nil
}).AnyTimes()
return pool
}

func NewForkChoiceStorageMock(t *testing.T) *ForkChoiceStorageMock {
return &ForkChoiceStorageMock{
Ancestors: make(map[uint64]common.Hash),
AnchorSlotVal: 0,
Expand All @@ -66,9 +112,9 @@ func NewForkChoiceStorageMock() *ForkChoiceStorageMock {
GetFinalityCheckpointsVal: make(map[common.Hash][3]solid.Checkpoint),
LightClientBootstraps: make(map[common.Hash]*cltypes.LightClientBootstrap),
LCUpdates: make(map[uint64]*cltypes.LightClientUpdate),
SyncContributionPool: sync_contribution_pool.NewSyncContributionPoolMock(),
Headers: make(map[common.Hash]*cltypes.BeaconBlockHeader),
GetBeaconCommitteeMock: nil,
SyncContributionPool: makeSyncContributionPoolMock(t),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func setupAggregateAndProofTest(t *testing.T) (AggregateAndProofService, *synced
cn()
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(true, cfg)
forkchoiceMock := forkchoice.NewForkChoiceStorageMock()
forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t)
blockService := NewAggregateAndProofService(ctx, syncedDataManager, forkchoiceMock, cfg, nil, true)
return blockService, syncedDataManager, forkchoiceMock
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/blob_sidecar_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func setupBlobSidecarService(t *testing.T, ctrl *gomock.Controller, test bool) (
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(true, cfg)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := forkchoice.NewForkChoiceStorageMock()
forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t)
blockService := NewBlobSidecarService(ctx2, cfg, forkchoiceMock, syncedDataManager, ethClock, test)
return blockService, syncedDataManager, ethClock, forkchoiceMock
}
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/block_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func setupBlockService(t *testing.T, ctrl *gomock.Controller) (BlockService, *sy
cfg := &clparams.MainnetBeaconConfig
syncedDataManager := synced_data.NewSyncedDataManager(true, cfg)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
forkchoiceMock := forkchoice.NewForkChoiceStorageMock()
forkchoiceMock := forkchoice.NewForkChoiceStorageMock(t)
blockService := NewBlockService(context.Background(), db, forkchoiceMock, syncedDataManager, ethClock, cfg, nil)
return blockService, syncedDataManager, ethClock, forkchoiceMock
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne
func (s *syncCommitteeMessagesService) cleanupOldSyncCommitteeMessages() {
headSlot := s.syncedDataManager.HeadSlot()
for k := range s.seenSyncCommitteeMessages {
if headSlot != k.slot {
if headSlot > k.slot+1 {
delete(s.seenSyncCommitteeMessages, k)
}
}
Expand Down
10 changes: 5 additions & 5 deletions cl/sentinel/handlers/heartbeats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestPing(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)
ethClock := getEthClock(t)

_, beaconCfg := clparams.GetConfigsByNetwork(1)
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestGoodbye(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)
ethClock := getEthClock(t)
_, beaconCfg := clparams.GetConfigsByNetwork(1)
c := NewConsensusHandlers(
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestMetadataV2(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)
ethClock := getEthClock(t)
nc := clparams.NetworkConfigs[clparams.MainnetNetwork]
_, beaconCfg := clparams.GetConfigsByNetwork(1)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestMetadataV1(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)

nc := clparams.NetworkConfigs[clparams.MainnetNetwork]
ethClock := getEthClock(t)
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestStatus(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)

hs := handshake.New(ctx, getEthClock(t), &clparams.MainnetBeaconConfig, nil)
s := &cltypes.Status{
Expand Down
8 changes: 4 additions & 4 deletions cl/sentinel/handlers/light_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestLightClientOptimistic(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)

f.NewestLCUpdate = &cltypes.LightClientUpdate{
AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion),
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestLightClientFinality(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)

f.NewestLCUpdate = &cltypes.LightClientUpdate{
AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion),
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestLightClientBootstrap(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)

f.NewestLCUpdate = &cltypes.LightClientUpdate{
AttestedHeader: cltypes.NewLightClientHeader(clparams.AltairVersion),
Expand Down Expand Up @@ -270,7 +270,7 @@ func TestLightClientUpdates(t *testing.T) {
peersPool := peers.NewPool()
beaconDB, indiciesDB := setupStore(t)

f := forkchoice.NewForkChoiceStorageMock()
f := forkchoice.NewForkChoiceStorageMock(t)
ethClock := getEthClock(t)

up := &cltypes.LightClientUpdate{
Expand Down
2 changes: 1 addition & 1 deletion cl/sentinel/service/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func createSentinel(
gossipTopics = append(gossipTopics, sentinel.SyncCommitteeContributionAndProofSsz, sentinel.BeaconAggregateAndProofSsz)
}
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBlobSidecar, int(cfg.BeaconConfig.MaxBlobsPerBlock))...)
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBeaconAttestation, int(cfg.NetworkConfig.AttestationSubnetCount))...)
//gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixBeaconAttestation, int(cfg.NetworkConfig.AttestationSubnetCount))...)
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
gossipTopics = append(gossipTopics, generateSubnetsTopics(gossip.TopicNamePrefixSyncCommittee, int(cfg.BeaconConfig.SyncCommitteeSubnetCount))...)

for _, v := range gossipTopics {
Expand Down
20 changes: 17 additions & 3 deletions cl/utils/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,32 @@ func IsBitOn(b []byte, idx int) bool {
return b[idx/8]&i == i
}

func IsSupersetBitlist(a, b []byte) bool {
// IsNonStrictSupersetBitlist checks if bitlist 'a' is a non-strict superset of bitlist 'b'
func IsNonStrictSupersetBitlist(a, b []byte) bool {
// Ensure 'a' is at least as long as 'b'
if len(a) < len(b) {
return false
}
for i := range b {
if a[i]&b[i] != b[i] {

// Check each bit in 'b' to ensure it is also set in 'a'
for i := 0; i < len(b); i++ {
if (a[i] & b[i]) != b[i] {
return false
}
}

// If all bits required by 'b' are present in 'a', return true
return true
}

func BitsOnCount(b []byte) int {
count := 0
for _, v := range b {
count += bits.OnesCount8(v)
}
return count
}

func MergeBitlists(a, b []byte) {
for i := range b {
a[i] |= b[i]
Expand Down
2 changes: 2 additions & 0 deletions cl/validator/sync_contribution_pool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ type SyncContributionPool interface {

// GetSyncContribution retrieves a sync contribution from the pool.
GetSyncContribution(slot, subcommitteeIndex uint64, beaconBlockRoot common.Hash) *cltypes.Contribution
// Obtain the sync aggregate for the sync messages pointing to a given beacon block root.
GetSyncAggregate(slot uint64, beaconBlockRoot common.Hash) (*cltypes.SyncAggregate, error)
}
15 changes: 15 additions & 0 deletions cl/validator/sync_contribution_pool/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading