Skip to content

Commit

Permalink
Use Data Column Validation Across Prysm (#14377)
Browse files Browse the repository at this point in the history
* Use Data Column Validation Everywhere

* Fix Build

* Fix Lint

* Fix Clock Synchronizer

* Fix Panic
  • Loading branch information
nisdas authored and nalepae committed Nov 25, 2024
1 parent 4a7b892 commit b29b3f2
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 102 deletions.
31 changes: 12 additions & 19 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/sirupsen/logrus"

"github.com/prysmaticlabs/prysm/v5/async"
Expand Down Expand Up @@ -57,6 +58,8 @@ type dataColumnSampler1D struct {
columnFromPeer map[peer.ID]map[uint64]bool
// peerFromColumn maps a column to the peer responsible for custody.
peerFromColumn map[uint64]map[peer.ID]bool
// columnVerifier verifies a column according to the specified requirements.
columnVerifier verification.NewColumnVerifier
}

// newDataColumnSampler1D creates a new 1D data column sampler.
Expand All @@ -65,6 +68,7 @@ func newDataColumnSampler1D(
clock *startup.Clock,
ctxMap ContextByteVersions,
stateNotifier statefeed.Notifier,
colVerifier verification.NewColumnVerifier,
) *dataColumnSampler1D {
numColumns := params.BeaconConfig().NumberOfColumns
peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns)
Expand All @@ -79,6 +83,7 @@ func newDataColumnSampler1D(
stateNotifier: stateNotifier,
columnFromPeer: make(map[peer.ID]map[uint64]bool),
peerFromColumn: peerFromColumn,
columnVerifier: colVerifier,
}
}

Expand Down Expand Up @@ -426,7 +431,7 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
}

for _, roDataColumn := range roDataColumns {
if verifyColumn(roDataColumn, root, pid, requestedColumns) {
if verifyColumn(roDataColumn, root, pid, requestedColumns, d.columnVerifier) {
retrievedColumns[roDataColumn.ColumnIndex] = true
}
}
Expand Down Expand Up @@ -500,6 +505,7 @@ func verifyColumn(
root [32]byte,
pid peer.ID,
requestedColumns map[uint64]bool,
columnVerifier verification.NewColumnVerifier,
) bool {
retrievedColumn := roDataColumn.ColumnIndex

Expand Down Expand Up @@ -528,38 +534,25 @@ func verifyColumn(
return false
}

vf := columnVerifier(roDataColumn, verification.SamplingColumnSidecarRequirements)
// Filter out columns which did not pass the KZG inclusion proof verification.
if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn); err != nil {
if err := vf.SidecarInclusionProven(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG inclusion proof for retrieved column")

}).WithError(err).Debug("Failed to verify KZG inclusion proof for retrieved column")
return false
}

// Filter out columns which did not pass the KZG proof verification.
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn)
if err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Error when verifying KZG proof for retrieved column")

return false
}

if !verified {
if err := vf.SidecarKzgProofVerified(); err != nil {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"index": retrievedColumn,
}).Debug("Failed to verify KZG proof for retrieved column")

}).WithError(err).Debug("Failed to verify KZG proof for retrieved column")
return false
}

return true
}
9 changes: 8 additions & 1 deletion beacon-chain/sync/data_columns_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -195,7 +197,12 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
kzgProofs: kzgProofs,
dataColumnSidecars: dataColumnSidecars,
}
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil)
clockSync := startup.NewClockSynchronizer()
require.NoError(t, clockSync.SetClock(clock))
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
ini, err := iniWaiter.WaitForInitializer(context.Background())
require.NoError(t, err)
sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil, newColumnVerifierFromInitializer(ini))

return test, sampler
}
Expand Down
12 changes: 10 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
prysmsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -81,6 +82,8 @@ type blocksFetcherConfig struct {
peerFilterCapacityWeight float64
mode syncMode
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
}

// blocksFetcher is a service to fetch chain data from peers.
Expand All @@ -97,6 +100,8 @@ type blocksFetcher struct {
p2p p2p.P2P
db db.ReadOnlyDatabase
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
blocksPerPeriod uint64
rateLimiter *leakybucket.Collector
peerLocks map[peer.ID]*peerLock
Expand Down Expand Up @@ -156,6 +161,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
p2p: cfg.p2p,
db: cfg.db,
bs: cfg.bs,
bv: cfg.bv,
cv: cfg.cv,
blocksPerPeriod: uint64(blocksPerPeriod),
rateLimiter: rateLimiter,
peerLocks: make(map[peer.ID]*peerLock),
Expand Down Expand Up @@ -956,6 +963,7 @@ func processRetrievedDataColumns(
indicesFromRoot map[[fieldparams.RootLength]byte][]int,
missingColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
bwb []blocks.BlockWithROBlobs,
colVerifier verification.NewColumnVerifier,
) {
retrievedColumnsFromRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool)

Expand All @@ -976,7 +984,7 @@ func processRetrievedDataColumns(
}

// Verify the data column.
if err := verify.ColumnAlignsWithBlock(dataColumn, blockFromRoot[root]); err != nil {
if err := verify.ColumnAlignsWithBlock(dataColumn, blockFromRoot[root], colVerifier); err != nil {
// TODO: Should we downscore the peer for that?
continue
}
Expand Down Expand Up @@ -1071,7 +1079,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers(ctx context.Context,
}

// Process the retrieved data columns.
processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb)
processRetrievedDataColumns(roDataColumns, blockFromRoot, indicesFromRoot, missingColumnsFromRoot, bwb, f.cv)

if len(missingColumnsFromRoot) > 0 {
for root, columns := range missingColumnsFromRoot {
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
Expand Down Expand Up @@ -2094,13 +2095,19 @@ func TestFetchDataColumnsFromPeers(t *testing.T) {
for _, roBlock := range roBlocks {
bwb = append(bwb, blocks.BlockWithROBlobs{Block: roBlock})
}
clockSync := startup.NewClockSynchronizer()
require.NoError(t, clockSync.SetClock(clock))
iniWaiter := verification.NewInitializerWaiter(clockSync, nil, nil)
ini, err := iniWaiter.WaitForInitializer(ctx)
require.NoError(t, err)

// Create the block fetcher.
blocksFetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
clock: clock,
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
p2p: p2pSvc,
bs: blobStorageSummarizer,
cv: newColumnVerifierFromInitializer(ini),
})

// Fetch the data columns from the peers.
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
Expand Down Expand Up @@ -71,6 +72,8 @@ type blocksQueueConfig struct {
db db.ReadOnlyDatabase
mode syncMode
bs filesystem.BlobStorageSummarizer
bv verification.NewBlobVerifier
cv verification.NewColumnVerifier
}

// blocksQueue is a priority queue that serves as a intermediary between block fetchers (producers)
Expand Down Expand Up @@ -113,6 +116,8 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
db: cfg.db,
clock: cfg.clock,
bs: cfg.bs,
bv: cfg.bv,
cv: cfg.cv,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
Expand Down
20 changes: 6 additions & 14 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (s *Service) startBlocksQueue(ctx context.Context, highestSlot primitives.S
highestExpectedSlot: highestSlot,
mode: mode,
bs: summarizer,
bv: s.newBlobVerifier,
cv: s.newColumnVerifier,
}
queue := newBlocksQueue(ctx, cfg)
if err := queue.start(); err != nil {
Expand Down Expand Up @@ -174,7 +176,8 @@ func (s *Service) processFetchedDataRegSync(
return
}
if coreTime.PeerDASIsActive(startSlot) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
"firstUnprocessed": bwb[0].Block.Block().Slot(),
Expand Down Expand Up @@ -363,7 +366,8 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
}
var aStore das.AvailabilityStore
if coreTime.PeerDASIsActive(first.Block().Slot()) {
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
if len(bb.Columns) == 0 {
Expand Down Expand Up @@ -425,15 +429,3 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool
}
return false
}

type emptyVerifier struct {
}

func (_ emptyVerifier) VerifiedRODataColumns(_ context.Context, _ blocks.ROBlock, cols []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error) {
var verCols []blocks.VerifiedRODataColumn
for _, col := range cols {
vCol := blocks.NewVerifiedRODataColumn(col)
verCols = append(verCols, vCol)
}
return verCols, nil
}
33 changes: 21 additions & 12 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,18 @@ type Config struct {

// Service service.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
synced *abool.AtomicBool
chainStarted *abool.AtomicBool
counter *ratecounter.RateCounter
genesisChan chan time.Time
clock *startup.Clock
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
ctxMap sync.ContextByteVersions
cfg *Config
ctx context.Context
cancel context.CancelFunc
synced *abool.AtomicBool
chainStarted *abool.AtomicBool
counter *ratecounter.RateCounter
genesisChan chan time.Time
clock *startup.Clock
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
newColumnVerifier verification.NewColumnVerifier
ctxMap sync.ContextByteVersions
}

// Option is a functional option for the initial-sync Service.
Expand Down Expand Up @@ -151,6 +152,7 @@ func (s *Service) Start() {
return
}
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
s.newColumnVerifier = newColumnVerifierFromInitializer(v)

gt := clock.GenesisTime()
if gt.IsZero() {
Expand Down Expand Up @@ -454,7 +456,8 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error {
if len(sidecars) != len(req) {
continue
}
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, emptyVerifier{}, s.cfg.P2P.NodeID())
bv := verification.NewDataColumnBatchVerifier(s.newColumnVerifier, verification.InitsyncColumnSidecarRequirements)
avs := das.NewLazilyPersistentStoreColumn(s.cfg.BlobStorage, bv, s.cfg.P2P.NodeID())
current := s.clock.CurrentSlot()
if err := avs.PersistColumns(current, sidecars...); err != nil {
return err
Expand All @@ -481,3 +484,9 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
return ini.NewBlobVerifier(b, reqs)
}
}

func newColumnVerifierFromInitializer(ini *verification.Initializer) verification.NewColumnVerifier {
return func(d blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnVerifier {
return ini.NewColumnVerifier(d, reqs)
}
}
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *Service) sendAndSaveDataColumnSidecars(ctx context.Context, request typ
return err
}
for _, sidecar := range sidecars {
if err := verify.ColumnAlignsWithBlock(sidecar, RoBlock); err != nil {
if err := verify.ColumnAlignsWithBlock(sidecar, RoBlock, s.newColumnVerifier); err != nil {
return err
}
log.WithFields(columnFields(sidecar)).Debug("Received data column sidecar RPC")
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *Service) startTasksPostInitialSync() {

// Start data columns sampling if peerDAS is enabled.
if params.PeerDASEnabled() {
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier, s.newColumnVerifier)
go s.sampler.Run(s.ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/verify/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types/blocks:go_default_library",
"//encoding/bytesutil:go_default_library",
Expand Down
Loading

0 comments on commit b29b3f2

Please sign in to comment.