Skip to content

Commit

Permalink
Add Data Column Verification (#14287)
Browse files Browse the repository at this point in the history
* Persist All Changes

* Fix All Tests

* Fix Build

* Fix Build

* Fix Build

* Fix Test Again

* Add missing verification

* Add Test Cases for Data Column Validation

* Fix comments for methods

* Fix comments for methods

* Fix Test

* Manu's Review
  • Loading branch information
nisdas authored Aug 12, 2024
1 parent 289c44a commit 6587b35
Show file tree
Hide file tree
Showing 30 changed files with 1,296 additions and 193 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func DataColumnSidecarsForReconstruct(

// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc *ethpb.DataColumnSidecar) (bool, error) {
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
return false, errIndexTooLarge
}
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
require.NoError(t, err)

for i, sidecar := range sCars {
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(sidecar)
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
require.NoError(t, err)
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/backfill/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type blobBatchVerifier struct {

func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillBlobSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func verifyColumn(
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil {
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
Expand All @@ -540,7 +540,7 @@ func verifyColumn(
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn)
if err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Service) processFetchedDataRegSync(
}
}
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
}
aStore = avs
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if len(sidecars) != len(req) {
continue
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
if len(sidecars) != len(request) {
return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request))
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueBlobSidecarRequirements)
for _, sidecar := range sidecars {
if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type Service struct {
initialSyncComplete chan struct{}
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnProposerVerifier verification.NewColumnVerifier
newColumnVerifier verification.NewColumnVerifier
availableBlocker coverage.AvailableBlocker
dataColumsnReconstructionLock sync.Mutex
receivedDataColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool
Expand Down Expand Up @@ -228,6 +228,12 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
}
}

func newColumnVerifierFromInitializer(ini *verification.Initializer) verification.NewColumnVerifier {
return func(d blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnVerifier {
return ini.NewColumnVerifier(d, reqs)
}
}

// Start the regular sync service.
func (s *Service) Start() {
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
Expand All @@ -236,7 +242,7 @@ func (s *Service) Start() {
return
}
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
s.newColumnProposerVerifier = v.VerifyProposer
s.newColumnVerifier = newColumnVerifierFromInitializer(v)

go s.verifierRoutine()
go s.startTasksPostInitialSync()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/validate_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Mes
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "roblob conversion failure")
}
vf := s.newBlobVerifier(blob, verification.GossipSidecarRequirements)
vf := s.newBlobVerifier(blob, verification.GossipBlobSidecarRequirements)

if err := vf.BlobIndexInBounds(); err != nil {
return pubsub.ValidationReject, err
Expand Down
137 changes: 54 additions & 83 deletions beacon-chain/sync/validate_data_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
coreBlocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub
Expand Down Expand Up @@ -48,15 +46,19 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
}

// Ignore messages that are not of the expected type.
ds, ok := m.(*eth.DataColumnSidecar)
dspb, ok := m.(*eth.DataColumnSidecar)
if !ok {
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
return pubsub.ValidationReject, errWrongMessage
}
ds, err := blocks.NewRODataColumn(dspb)
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure")
}
vf := s.newColumnVerifier(ds, verification.GossipColumnSidecarRequirements)

// [REJECT] The sidecar's index is consistent with NUMBER_OF_COLUMNS -- i.e. sidecar.index < NUMBER_OF_COLUMNS.
if ds.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
return pubsub.ValidationReject, errors.Errorf("invalid column index provided, got %d", ds.ColumnIndex)
if err := vf.DataColumnIndexInBounds(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id.
Expand All @@ -66,115 +68,84 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic)
}

// [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. validate that block_header.slot <= current_slot (a client MAY queue future sidecars for processing at the appropriate slot).
if err := slots.VerifyTime(uint64(s.cfg.clock.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot, params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil {
log.WithError(err).Debug("Ignored sidecar: could not verify slot time")
return pubsub.ValidationIgnore, nil
if err := vf.NotFromFutureSlot(); err != nil {
return pubsub.ValidationIgnore, err
}

// [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- i.e. validate that block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)
cp := s.cfg.chain.FinalizedCheckpt()
startSlot, err := slots.EpochStart(cp.Epoch)
if err != nil {
log.WithError(err).Debug("Ignored column sidecar: could not calculate epoch start slot")
// [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, sidecar.index) with valid header signature, sidecar inclusion proof, and kzg proof.
if s.hasSeenDataColumnIndex(ds.Slot(), ds.ProposerIndex(), ds.DataColumnSidecar.ColumnIndex) {
return pubsub.ValidationIgnore, nil
}

if startSlot >= ds.SignedBlockHeader.Header.Slot {
err := fmt.Errorf("finalized slot %d greater or equal to block slot %d", startSlot, ds.SignedBlockHeader.Header.Slot)
log.Debug(err)
if err := vf.SlotAboveFinalized(); err != nil {
return pubsub.ValidationIgnore, err
}

// [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved).
if !s.cfg.chain.HasBlock(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
err := errors.Errorf("unknown parent for data column sidecar with slot %d and parent root %#x", ds.SignedBlockHeader.Header.Slot, ds.SignedBlockHeader.Header.ParentRoot)
log.WithError(err).Debug("Could not identify parent for data column sidecar")
if err := vf.SidecarParentSeen(s.hasBadBlock); err != nil {
go func() {
if err := s.sendBatchRootRequest(context.Background(), [][32]byte{ds.ParentRoot()}, rand.NewGenerator()); err != nil {
log.WithError(err).WithFields(columnFields(ds)).Debug("Failed to send batch root request")
}
}()
return pubsub.ValidationIgnore, err
}

// [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation.
if s.hasBadBlock([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
bRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return pubsub.ValidationIgnore, err
}

// If parent is bad, we set the block as bad.
s.setBadBlock(ctx, bRoot)
return pubsub.ValidationReject, errors.Errorf("column sidecar with bad parent provided")
if err := vf.SidecarParentValid(s.hasBadBlock); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by block_header.parent_root).
parentSlot, err := s.cfg.chain.RecentBlockSlot([32]byte(ds.SignedBlockHeader.Header.ParentRoot))
if err != nil {
return pubsub.ValidationIgnore, err
if err := vf.SidecarParentSlotLower(); err != nil {
return pubsub.ValidationReject, err
}

if ds.SignedBlockHeader.Header.Slot <= parentSlot {
return pubsub.ValidationReject, errors.Errorf("invalid column sidecar slot: %d", ds.SignedBlockHeader.Header.Slot)
if err := vf.SidecarDescendsFromFinalized(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root.
if !s.cfg.chain.InForkchoice([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
return pubsub.ValidationReject, blockchain.ErrNotDescendantOfFinalized
if err := vf.SidecarInclusionProven(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by verify_data_column_sidecar_inclusion_proof(sidecar).
if err := blocks.VerifyKZGInclusionProofColumn(ds); err != nil {
if err := vf.SidecarKzgProofVerified(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar's column data is valid as verified by verify_data_column_sidecar_kzg_proofs(sidecar).
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(ds)
if err != nil {
if err := vf.ValidProposerSignature(ctx); err != nil {
return pubsub.ValidationReject, err
}

if !verified {
return pubsub.ValidationReject, errors.New("failed to verify kzg proof of column")
if err := vf.SidecarProposerExpected(ctx); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
parentState, err := s.cfg.stateGen.StateByRoot(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot))
// Get the time at slot start.
startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot)
if err != nil {
return pubsub.ValidationIgnore, err
}

if err := coreBlocks.VerifyBlockHeaderSignatureUsingCurrentFork(parentState, ds.SignedBlockHeader); err != nil {
return pubsub.ValidationReject, err
}
roDataColumn, err := blocks.NewRODataColumn(ds)
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "new RO data columns")
}

if err := s.newColumnProposerVerifier(ctx, roDataColumn); err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "could not verify proposer")
}

// Get the time at slot start.
startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot)
fields := columnFields(ds)
sinceSlotStartTime := receivedTime.Sub(startTime)
validationTime := s.cfg.clock.Now().Sub(receivedTime)
fields["sinceSlotStartTime"] = sinceSlotStartTime
fields["validationTime"] = validationTime
log.WithFields(fields).Debug("Received data column sidecar gossip")

// Add specific debug log.
if err == nil {
log.WithFields(logrus.Fields{
"sinceSlotStartTime": receivedTime.Sub(startTime),
"validationTime": s.cfg.clock.Now().Sub(receivedTime),
"columnIndex": ds.ColumnIndex,
}).Debug("Received data column sidecar")
} else {
log.WithError(err).Error("Failed to calculate slot time")
verifiedRODataColumn, err := vf.VerifiedRODataColumn()
if err != nil {
return pubsub.ValidationReject, err
}

// TODO: Transform this whole function so it looks like to the `validateBlob`
// with the tiny verifiers inside.
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)

msg.ValidatorData = verifiedRODataColumn
return pubsub.ValidationAccept, nil
}

// Returns true if the column with the same slot, proposer index, and column index has been seen before.
func (s *Service) hasSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) bool {
s.seenDataColumnLock.RLock()
defer s.seenDataColumnLock.RUnlock()
b := append(bytesutil.Bytes32(uint64(slot)), bytesutil.Bytes32(uint64(proposerIndex))...)
b = append(b, bytesutil.Bytes32(index)...)
_, seen := s.seenDataColumnCache.Get(string(b))
return seen
}

// Sets the data column with the same slot, proposer index, and data column index as seen.
func (s *Service) setSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) {
s.seenDataColumnLock.Lock()
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/verify/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func ColumnAlignsWithBlock(col blocks.RODataColumn, block blocks.ROBlock) error
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(col.DataColumnSidecar); err != nil {
if err := blocks.VerifyKZGInclusionProofColumn(col); err != nil {
return err
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(col.DataColumnSidecar)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(col)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"blob.go",
"cache.go",
"data_column.go",
"error.go",
"fake.go",
"initializer.go",
Expand All @@ -19,6 +20,7 @@ go_library(
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
Expand Down Expand Up @@ -49,11 +51,14 @@ go_test(
"batch_test.go",
"blob_test.go",
"cache_test.go",
"data_column_test.go",
"initializer_test.go",
"result_test.go",
"verification_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/verification/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestBatchVerifier(t *testing.T) {
blk, blbs := c.bandb(t, c.nblobs)
reqs := c.reqs
if reqs == nil {
reqs = InitsyncSidecarRequirements
reqs = InitsyncBlobSidecarRequirements
}
bbv := NewBlobBatchVerifier(c.nv(), reqs)
if c.cv == nil {
Expand Down
Loading

0 comments on commit 6587b35

Please sign in to comment.