Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Column Verification #14287

Merged
merged 12 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/core/peerdas/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func DataColumnSidecarsForReconstruct(

// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
// data column.
func VerifyDataColumnSidecarKZGProofs(sc *ethpb.DataColumnSidecar) (bool, error) {
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
return false, errIndexTooLarge
}
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/core/peerdas/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
require.NoError(t, err)

for i, sidecar := range sCars {
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(sidecar)
roCol, err := blocks.NewRODataColumn(sidecar)
require.NoError(t, err)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
require.NoError(t, err)
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/backfill/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type blobBatchVerifier struct {

func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillBlobSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func verifyColumn(
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil {
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn); err != nil {
nisdas marked this conversation as resolved.
Show resolved Hide resolved
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
Expand All @@ -533,7 +533,7 @@ func verifyColumn(
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn)
if err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Service) processFetchedDataRegSync(
}
}
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -370,7 +370,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
}
aStore = avs
} else {
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if len(sidecars) != len(req) {
continue
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
if len(sidecars) != len(request) {
return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request))
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueBlobSidecarRequirements)
for _, sidecar := range sidecars {
if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type Service struct {
initialSyncComplete chan struct{}
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnProposerVerifier verification.NewColumnVerifier
newColumnVerifier verification.NewColumnVerifier
availableBlocker coverage.AvailableBlocker
dataColumsnReconstructionLock sync.Mutex
receivedDataColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool
Expand Down Expand Up @@ -228,6 +228,12 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
}
}

func newColumnVerifierFromInitializer(ini *verification.Initializer) verification.NewColumnVerifier {
return func(d blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnVerifier {
return ini.NewColumnVerifier(d, reqs)
}
}

// Start the regular sync service.
func (s *Service) Start() {
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
Expand All @@ -236,7 +242,7 @@ func (s *Service) Start() {
return
}
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
s.newColumnProposerVerifier = v.VerifyProposer
s.newColumnVerifier = newColumnVerifierFromInitializer(v)

go s.verifierRoutine()
go s.startTasksPostInitialSync()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/validate_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Mes
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "roblob conversion failure")
}
vf := s.newBlobVerifier(blob, verification.GossipSidecarRequirements)
vf := s.newBlobVerifier(blob, verification.GossipBlobSidecarRequirements)

if err := vf.BlobIndexInBounds(); err != nil {
return pubsub.ValidationReject, err
Expand Down
128 changes: 42 additions & 86 deletions beacon-chain/sync/validate_data_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,15 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
coreBlocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/rand"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)

// https://github.com/ethereum/consensus-specs/blob/dev/specs/_features/eip7594/p2p-interface.md#the-gossip-domain-gossipsub
Expand Down Expand Up @@ -48,15 +46,19 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
}

// Ignore messages that are not of the expected type.
ds, ok := m.(*eth.DataColumnSidecar)
dspb, ok := m.(*eth.DataColumnSidecar)
if !ok {
log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar")
return pubsub.ValidationReject, errWrongMessage
}
ds, err := blocks.NewRODataColumn(dspb)
nalepae marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure")
}
vf := s.newColumnVerifier(ds, verification.GossipColumnSidecarRequirements)

// [REJECT] The sidecar's index is consistent with NUMBER_OF_COLUMNS -- i.e. sidecar.index < NUMBER_OF_COLUMNS.
if ds.ColumnIndex >= params.BeaconConfig().NumberOfColumns {
return pubsub.ValidationReject, errors.Errorf("invalid column index provided, got %d", ds.ColumnIndex)
if err := vf.DataColumnIndexInBounds(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar is for the correct subnet -- i.e. compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id.
Expand All @@ -66,110 +68,64 @@ func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubs
return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic)
}

// [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- i.e. validate that block_header.slot <= current_slot (a client MAY queue future sidecars for processing at the appropriate slot).
if err := slots.VerifyTime(uint64(s.cfg.clock.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot, params.BeaconConfig().MaximumGossipClockDisparityDuration()); err != nil {
log.WithError(err).Debug("Ignored sidecar: could not verify slot time")
return pubsub.ValidationIgnore, nil
}

// [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- i.e. validate that block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)
cp := s.cfg.chain.FinalizedCheckpt()
startSlot, err := slots.EpochStart(cp.Epoch)
if err != nil {
log.WithError(err).Debug("Ignored column sidecar: could not calculate epoch start slot")
return pubsub.ValidationIgnore, nil
}

if startSlot >= ds.SignedBlockHeader.Header.Slot {
err := fmt.Errorf("finalized slot %d greater or equal to block slot %d", startSlot, ds.SignedBlockHeader.Header.Slot)
log.Debug(err)
if err := vf.NotFromFutureSlot(); err != nil {
return pubsub.ValidationIgnore, err
}

// [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved).
if !s.cfg.chain.HasBlock(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
err := errors.Errorf("unknown parent for data column sidecar with slot %d and parent root %#x", ds.SignedBlockHeader.Header.Slot, ds.SignedBlockHeader.Header.ParentRoot)
log.WithError(err).Debug("Could not identify parent for data column sidecar")
if err := vf.SlotAboveFinalized(); err != nil {
return pubsub.ValidationIgnore, err
}

// [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation.
if s.hasBadBlock([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
bRoot, err := ds.SignedBlockHeader.Header.HashTreeRoot()
if err != nil {
return pubsub.ValidationIgnore, err
}

// If parent is bad, we set the block as bad.
s.setBadBlock(ctx, bRoot)
return pubsub.ValidationReject, errors.Errorf("column sidecar with bad parent provided")
}

// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by block_header.parent_root).
parentSlot, err := s.cfg.chain.RecentBlockSlot([32]byte(ds.SignedBlockHeader.Header.ParentRoot))
if err != nil {
if err := vf.SidecarParentSeen(s.hasBadBlock); err != nil {
go func() {
if err := s.sendBatchRootRequest(context.Background(), [][32]byte{ds.ParentRoot()}, rand.NewGenerator()); err != nil {
log.WithError(err).WithFields(columnFields(ds)).Debug("Failed to send batch root request")
}
}()
return pubsub.ValidationIgnore, err
}

if ds.SignedBlockHeader.Header.Slot <= parentSlot {
return pubsub.ValidationReject, errors.Errorf("invalid column sidecar slot: %d", ds.SignedBlockHeader.Header.Slot)
}

// [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root.
if !s.cfg.chain.InForkchoice([32]byte(ds.SignedBlockHeader.Header.ParentRoot)) {
return pubsub.ValidationReject, blockchain.ErrNotDescendantOfFinalized
}

// [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by verify_data_column_sidecar_inclusion_proof(sidecar).
if err := blocks.VerifyKZGInclusionProofColumn(ds); err != nil {
if err := vf.SidecarParentValid(s.hasBadBlock); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The sidecar's column data is valid as verified by verify_data_column_sidecar_kzg_proofs(sidecar).
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(ds)
if err != nil {
if err := vf.SidecarParentSlotLower(); err != nil {
return pubsub.ValidationReject, err
}

if !verified {
return pubsub.ValidationReject, errors.New("failed to verify kzg proof of column")
if err := vf.SidecarDescendsFromFinalized(); err != nil {
return pubsub.ValidationReject, err
}

// [REJECT] The proposer signature of sidecar.signed_block_header, is valid with respect to the block_header.proposer_index pubkey.
parentState, err := s.cfg.stateGen.StateByRoot(ctx, [32]byte(ds.SignedBlockHeader.Header.ParentRoot))
if err != nil {
return pubsub.ValidationIgnore, err
if err := vf.SidecarInclusionProven(); err != nil {
return pubsub.ValidationReject, err
}

if err := coreBlocks.VerifyBlockHeaderSignatureUsingCurrentFork(parentState, ds.SignedBlockHeader); err != nil {
if err := vf.SidecarKzgProofVerified(); err != nil {
return pubsub.ValidationReject, err
}
roDataColumn, err := blocks.NewRODataColumn(ds)
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "new RO data columns")
if err := vf.ValidProposerSignature(ctx); err != nil {
return pubsub.ValidationReject, err
}

if err := s.newColumnProposerVerifier(ctx, roDataColumn); err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "could not verify proposer")
if err := vf.SidecarProposerExpected(ctx); err != nil {
return pubsub.ValidationReject, err
}

// Get the time at slot start.
startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), ds.SignedBlockHeader.Header.Slot)

// Add specific debug log.
if err == nil {
log.WithFields(logrus.Fields{
"sinceSlotStartTime": receivedTime.Sub(startTime),
"validationTime": s.cfg.clock.Now().Sub(receivedTime),
"columnIndex": ds.ColumnIndex,
}).Debug("Received data column sidecar")
} else {
log.WithError(err).Error("Failed to calculate slot time")
if err != nil {
return pubsub.ValidationIgnore, err
}

// TODO: Transform this whole function so it looks like to the `validateBlob`
// with the tiny verifiers inside.
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
fields := columnFields(ds)
sinceSlotStartTime := receivedTime.Sub(startTime)
validationTime := s.cfg.clock.Now().Sub(receivedTime)
fields["sinceSlotStartTime"] = sinceSlotStartTime
fields["validationTime"] = validationTime
log.WithFields(fields).Debug("Received data column sidecar gossip")

verifiedRODataColumn, err := vf.VerifiedRODataColumn()
if err != nil {
return pubsub.ValidationReject, err
}

msg.ValidatorData = verifiedRODataColumn
return pubsub.ValidationAccept, nil
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/verify/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ func ColumnAlignsWithBlock(col blocks.RODataColumn, block blocks.ROBlock) error
}

// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(col.DataColumnSidecar); err != nil {
if err := blocks.VerifyKZGInclusionProofColumn(col); err != nil {
return err
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(col.DataColumnSidecar)
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(col)
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/verification/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"blob.go",
"cache.go",
"data_column.go",
"error.go",
"fake.go",
"initializer.go",
Expand All @@ -19,6 +20,7 @@ go_library(
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
Expand Down Expand Up @@ -49,11 +51,14 @@ go_test(
"batch_test.go",
"blob_test.go",
"cache_test.go",
"data_column_test.go",
"initializer_test.go",
"result_test.go",
"verification_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/forkchoice/types:go_default_library",
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/verification/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestBatchVerifier(t *testing.T) {
blk, blbs := c.bandb(t, c.nblobs)
reqs := c.reqs
if reqs == nil {
reqs = InitsyncSidecarRequirements
reqs = InitsyncBlobSidecarRequirements
}
bbv := NewBlobBatchVerifier(c.nv(), reqs)
if c.cv == nil {
Expand Down
Loading
Loading