Skip to content

Commit

Permalink
optimise sync committee message signature verification
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Oct 30, 2024
1 parent 5498f85 commit 5865338
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 62 deletions.
36 changes: 18 additions & 18 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,28 +351,28 @@ func (a *ApiHandler) PostEthV1BeaconPoolSyncCommittees(w http.ResponseWriter, r
continue
}
for _, subnet := range publishingSubnets {
if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, v); err != nil && !errors.Is(err, services.ErrIgnore) {

var syncCommitteeMessageWithGossipData cltypes.SyncCommitteeMessageWithGossipData
syncCommitteeMessageWithGossipData.SyncCommitteeMessage = v

encodedSSZ, err := syncCommitteeMessageWithGossipData.SyncCommitteeMessage.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

subnetId := subnet
syncCommitteeMessageWithGossipData.GossipData = &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixSyncCommittee,
SubnetId: &subnetId,
}

if err = a.syncCommitteeMessagesService.ProcessMessage(r.Context(), &subnet, &syncCommitteeMessageWithGossipData); err != nil && !errors.Is(err, services.ErrIgnore) {
log.Warn("[Beacon REST] failed to process attestation in syncCommittee service", "err", err)
failures = append(failures, poolingFailure{Index: idx, Message: err.Error()})
break
}
// Broadcast to gossip
if a.sentinel != nil {
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
subnetId := subnet // this effectively makes a copy
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixSyncCommittee,
SubnetId: &subnetId,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
}
if len(failures) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
mockValidatorMonitor := mockMonitor.NewMockValidatorMonitor(ctrl)

// ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error
syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessage) error {
return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg)
syncCommitteeMessagesService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error {
return h.syncMessagePool.AddSyncCommitteeMessage(postState, *subnetID, msg.SyncCommitteeMessage)
}).AnyTimes()

syncContributionService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedContributionAndProof) error {
Expand Down
7 changes: 7 additions & 0 deletions cl/cltypes/contribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/hexutility"
"github.com/erigontech/erigon-lib/common/length"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/types/clonable"
"github.com/erigontech/erigon/cl/merkle_tree"
ssz2 "github.com/erigontech/erigon/cl/ssz"
Expand Down Expand Up @@ -179,6 +180,12 @@ func (agg *SyncContribution) HashSSZ() ([32]byte, error) {

}

type SyncCommitteeMessageWithGossipData struct {
SyncCommitteeMessage *SyncCommitteeMessage
GossipData *sentinel.GossipData
ImmediateVerification bool
}

type SyncCommitteeMessage struct {
Slot uint64 `json:"slot,string"`
BeaconBlockRoot libcommon.Hash `json:"beacon_block_root"`
Expand Down
4 changes: 2 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
// The background checks above are enough for now.
return g.blobService.ProcessMessage(ctx, data.SubnetId, blobSideCar)
case gossip.IsTopicSyncCommittee(data.Name):
msg := &cltypes.SyncCommitteeMessage{}
if err := msg.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
msg := &cltypes.SyncCommitteeMessageWithGossipData{}
if err := msg.SyncCommitteeMessage.DecodeSSZ(common.CopyBytes(data.Data), int(version)); err != nil {
return err
}
return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg)
Expand Down
7 changes: 7 additions & 0 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type BatchSignatureVerifier struct {
attVerifyAndExecute chan *AggregateVerificationData
aggregateProofVerify chan *AggregateVerificationData
blsToExecutionChangeVerify chan *AggregateVerificationData
syncCommitteeMessage chan *AggregateVerificationData
ctx context.Context
}

Expand All @@ -49,6 +50,7 @@ func NewBatchSignatureVerifier(ctx context.Context, sentinel sentinel.SentinelCl
attVerifyAndExecute: make(chan *AggregateVerificationData, 1024),
aggregateProofVerify: make(chan *AggregateVerificationData, 1024),
blsToExecutionChangeVerify: make(chan *AggregateVerificationData, 1024),
syncCommitteeMessage: make(chan *AggregateVerificationData, 1024),
}
}

Expand All @@ -65,6 +67,10 @@ func (b *BatchSignatureVerifier) AsyncVerifyBlsToExecutionChange(data *Aggregate
b.blsToExecutionChangeVerify <- data
}

func (b *BatchSignatureVerifier) AsyncVerifySyncCommitteeMessage(data *AggregateVerificationData) {
b.syncCommitteeMessage <- data
}

func (b *BatchSignatureVerifier) ImmediateVerification(data *AggregateVerificationData) error {
return b.processSignatureVerification([]*AggregateVerificationData{data})
}
Expand All @@ -74,6 +80,7 @@ func (b *BatchSignatureVerifier) Start() {
go b.start(b.attVerifyAndExecute)
go b.start(b.aggregateProofVerify)
go b.start(b.blsToExecutionChangeVerify)
go b.start(b.syncCommitteeMessage)
}

// When receiving AggregateVerificationData, we simply collect all the signature verification data
Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/network/services/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type BlockService Service[*cltypes.SignedBeaconBlock]
type BlobSidecarsService Service[*cltypes.BlobSidecar]

//go:generate mockgen -typed=true -destination=./mock_services/sync_committee_messages_service_mock.go -package=mock_services . SyncCommitteeMessagesService
type SyncCommitteeMessagesService Service[*cltypes.SyncCommitteeMessage]
type SyncCommitteeMessagesService Service[*cltypes.SyncCommitteeMessageWithGossipData]

//go:generate mockgen -typed=true -destination=./mock_services/sync_contribution_service_mock.go -package=mock_services . SyncContributionService
type SyncContributionService Service[*cltypes.SignedContributionAndProof]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 40 additions & 25 deletions cl/phase1/network/services/sync_committee_messages_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ package services

import (
"context"
"errors"
"fmt"
"slices"
"sync"

"github.com/Giulio2002/bls"

"github.com/erigontech/erigon/cl/beacon/synced_data"
"github.com/erigontech/erigon/cl/clparams"
"github.com/erigontech/erigon/cl/cltypes"
Expand All @@ -47,6 +44,7 @@ type syncCommitteeMessagesService struct {
beaconChainCfg *clparams.BeaconChainConfig
syncContributionPool sync_contribution_pool.SyncContributionPool
ethClock eth_clock.EthereumClock
batchSignatureVerifier *BatchSignatureVerifier
test bool

mu sync.Mutex
Expand All @@ -58,6 +56,7 @@ func NewSyncCommitteeMessagesService(
ethClock eth_clock.EthereumClock,
syncedDataManager *synced_data.SyncedDataManager,
syncContributionPool sync_contribution_pool.SyncContributionPool,
batchSignatureVerifier *BatchSignatureVerifier,
test bool,
) SyncCommitteeMessagesService {
return &syncCommitteeMessagesService{
Expand All @@ -66,33 +65,34 @@ func NewSyncCommitteeMessagesService(
syncedDataManager: syncedDataManager,
beaconChainCfg: beaconChainCfg,
syncContributionPool: syncContributionPool,
batchSignatureVerifier: batchSignatureVerifier,
test: test,
}
}

// ProcessMessage processes a sync committee message
func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SyncCommitteeMessage) error {
func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SyncCommitteeMessageWithGossipData) error {
s.mu.Lock()
defer s.mu.Unlock()
headState := s.syncedDataManager.HeadState()
if headState == nil {
return ErrIgnore
}
// [IGNORE] The message's slot is for the current slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance), i.e. sync_committee_message.slot == current_slot.
if !s.ethClock.IsSlotCurrentSlotWithMaximumClockDisparity(msg.Slot) {
if !s.ethClock.IsSlotCurrentSlotWithMaximumClockDisparity(msg.SyncCommitteeMessage.Slot) {
return ErrIgnore
}

// [REJECT] The subnet_id is valid for the given validator, i.e. subnet_id in compute_subnets_for_sync_committee(state, sync_committee_message.validator_index).
// Note this validation implies the validator is part of the broader current sync committee along with the correct subcommittee.
subnets, err := subnets.ComputeSubnetsForSyncCommittee(headState, msg.ValidatorIndex)
subnets, err := subnets.ComputeSubnetsForSyncCommittee(headState, msg.SyncCommitteeMessage.ValidatorIndex)
if err != nil {
return err
}
seenSyncCommitteeMessageIdentifier := seenSyncCommitteeMessage{
subnet: *subnet,
slot: msg.Slot,
validatorIndex: msg.ValidatorIndex,
slot: msg.SyncCommitteeMessage.Slot,
validatorIndex: msg.SyncCommitteeMessage.ValidatorIndex,
}

if !slices.Contains(subnets, *subnet) {
Expand All @@ -103,13 +103,35 @@ func (s *syncCommitteeMessagesService) ProcessMessage(ctx context.Context, subne
return ErrIgnore
}
// [REJECT] The signature is valid for the message beacon_block_root for the validator referenced by validator_index
if err := verifySyncCommitteeMessageSignature(headState, msg); !s.test && err != nil {
signature, signingRoot, pubKey, err := verifySyncCommitteeMessageSignature(headState, msg.SyncCommitteeMessage)
if !s.test && err != nil {
return err
}
s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier] = struct{}{}
s.cleanupOldSyncCommitteeMessages() // cleanup old messages
// Aggregate the message
return s.syncContributionPool.AddSyncCommitteeMessage(headState, *subnet, msg)
aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{signature},
SignRoots: [][]byte{signingRoot},
Pks: [][]byte{pubKey},
GossipData: msg.GossipData,
F: func() {
s.seenSyncCommitteeMessages[seenSyncCommitteeMessageIdentifier] = struct{}{}
s.cleanupOldSyncCommitteeMessages() // cleanup old messages
// Aggregate the message
s.syncContributionPool.AddSyncCommitteeMessage(headState, *subnet, msg.SyncCommitteeMessage)
},
}

if msg.ImmediateVerification {
return s.batchSignatureVerifier.ImmediateVerification(aggregateVerificationData)
}

// push the signatures to verify asynchronously and run final functions after that.
s.batchSignatureVerifier.AsyncVerifySyncCommitteeMessage(aggregateVerificationData)

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
// in BatchSignatureVerifier service. After validating signatures, if they are valid we will publish the
// gossip ourselves or ban the peer which sent that particular invalid signature.
return ErrIgnore
}

// cleanupOldSyncCommitteeMessages removes old sync committee messages from the cache
Expand All @@ -123,26 +145,19 @@ func (s *syncCommitteeMessagesService) cleanupOldSyncCommitteeMessages() {
}

// verifySyncCommitteeMessageSignature verifies the signature of a sync committee message
func verifySyncCommitteeMessageSignature(s *state.CachingBeaconState, msg *cltypes.SyncCommitteeMessage) error {
func verifySyncCommitteeMessageSignature(s *state.CachingBeaconState, msg *cltypes.SyncCommitteeMessage) ([]byte, []byte, []byte, error) {
publicKey, err := s.ValidatorPublicKey(int(msg.ValidatorIndex))
if err != nil {
return err
return nil, nil, nil, err
}
cfg := s.BeaconConfig()
domain, err := s.GetDomain(cfg.DomainSyncCommittee, state.Epoch(s))
if err != nil {
return err
return nil, nil, nil, err
}
signingRoot, err := utils.Sha256(msg.BeaconBlockRoot[:], domain), nil
if err != nil {
return err
}
valid, err := bls.Verify(msg.Signature[:], signingRoot[:], publicKey[:])
if err != nil {
return errors.New("invalid signature")
}
if !valid {
return errors.New("invalid signature")
return nil, nil, nil, err
}
return nil
return msg.Signature[:], signingRoot[:], publicKey[:], nil
}
29 changes: 19 additions & 10 deletions cl/phase1/network/services/sync_committee_messages_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,24 @@ func setupSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (Sync
syncedDataManager := synced_data.NewSyncedDataManager(true, cfg)
ethClock := eth_clock.NewMockEthereumClock(ctrl)
syncContributionPool := syncpoolmock.NewMockSyncContributionPool(ctrl)
s := NewSyncCommitteeMessagesService(cfg, ethClock, syncedDataManager, syncContributionPool, true)
batchSignatureVerifier := NewBatchSignatureVerifier(context.TODO(), nil)
go batchSignatureVerifier.Start()
s := NewSyncCommitteeMessagesService(cfg, ethClock, syncedDataManager, syncContributionPool, batchSignatureVerifier, true)
syncContributionPool.EXPECT().AddSyncCommitteeMessage(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
return s, syncedDataManager, ethClock
}

func getObjectsForSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *cltypes.SyncCommitteeMessage) {
func getObjectsForSyncCommitteesServiceTest(t *testing.T, ctrl *gomock.Controller) (*state.CachingBeaconState, *cltypes.SyncCommitteeMessageWithGossipData) {
_, _, state := tests.GetBellatrixRandom()
br, _ := state.BlockRoot()
msg := &cltypes.SyncCommitteeMessage{
Slot: state.Slot(),
BeaconBlockRoot: br,
ValidatorIndex: 0,
msg := &cltypes.SyncCommitteeMessageWithGossipData{
SyncCommitteeMessage: &cltypes.SyncCommitteeMessage{
Slot: state.Slot(),
BeaconBlockRoot: br,
ValidatorIndex: 0,
},
GossipData: nil,
ImmediateVerification: true,
}
return state, msg
}
Expand All @@ -68,7 +74,7 @@ func TestSyncCommitteesBadTiming(t *testing.T) {

s, synced, ethClock := setupSyncCommitteesServiceTest(t, ctrl)
synced.OnHeadState(state)
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.Slot).Return(false).AnyTimes()
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.SyncCommitteeMessage.Slot).Return(false).AnyTimes()
require.Error(t, s.ProcessMessage(context.Background(), nil, msg))
}

Expand All @@ -81,19 +87,22 @@ func TestSyncCommitteesBadSubnet(t *testing.T) {

s, synced, ethClock := setupSyncCommitteesServiceTest(t, ctrl)
synced.OnHeadState(state)
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.Slot).Return(true).AnyTimes()
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.SyncCommitteeMessage.Slot).Return(true).AnyTimes()
require.Error(t, s.ProcessMessage(context.Background(), &sn, msg))
}

func TestSyncCommitteesSuccess(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

state, msg := getObjectsForSyncCommitteesServiceTest(t, ctrl)
mockFuncs := &mockFuncs{ctrl: ctrl}
blsVerifyMultipleSignatures = mockFuncs.BlsVerifyMultipleSignatures

state, msg := getObjectsForSyncCommitteesServiceTest(t, ctrl)
ctrl.RecordCall(mockFuncs, "BlsVerifyMultipleSignatures", gomock.Any(), gomock.Any(), gomock.Any()).Return(true, nil)
s, synced, ethClock := setupSyncCommitteesServiceTest(t, ctrl)
synced.OnHeadState(state)
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.Slot).Return(true).AnyTimes()
ethClock.EXPECT().IsSlotCurrentSlotWithMaximumClockDisparity(msg.SyncCommitteeMessage.Slot).Return(true).AnyTimes()
require.NoError(t, s.ProcessMessage(context.Background(), new(uint64), msg))
require.Error(t, s.ProcessMessage(context.Background(), new(uint64), msg)) // Ignore if done twice
}
2 changes: 1 addition & 1 deletion cmd/caplin/caplin1/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi
// Define gossip services
blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters)
blobService := services.NewBlobSidecarService(ctx, beaconConfig, forkChoice, syncedDataManager, ethClock, emitters, false)
syncCommitteeMessagesService := services.NewSyncCommitteeMessagesService(beaconConfig, ethClock, syncedDataManager, syncContributionPool, false)
syncCommitteeMessagesService := services.NewSyncCommitteeMessagesService(beaconConfig, ethClock, syncedDataManager, syncContributionPool, batchSignatureVerifier, false)
attestationService := services.NewAttestationService(ctx, forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig, emitters, batchSignatureVerifier)
syncContributionService := services.NewSyncContributionService(syncedDataManager, beaconConfig, syncContributionPool, ethClock, emitters, false)
aggregateAndProofService := services.NewAggregateAndProofService(ctx, syncedDataManager, forkChoice, beaconConfig, pool, false, batchSignatureVerifier)
Expand Down

0 comments on commit 5865338

Please sign in to comment.