Skip to content

Commit

Permalink
PeerDAS: Fix initial sync (#14208)
Browse files Browse the repository at this point in the history
* `SendDataColumnsByRangeRequest`: Add some new fields in logs.

* `BlobStorageSummary`: Implement `HasDataColumnIndex` and `AllDataColumnsAvailable`.

* Implement `fetchDataColumnsFromPeers`.

* `fetchBlobsFromPeer`: Return only one error.
  • Loading branch information
nalepae committed Nov 20, 2024
1 parent 9c10c00 commit 082c813
Show file tree
Hide file tree
Showing 11 changed files with 1,725 additions and 258 deletions.
24 changes: 24 additions & 0 deletions beacon-chain/db/filesystem/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ func (s BlobStorageSummary) HasIndex(idx uint64) bool {
return s.mask[idx]
}

// HasDataColumnIndex true if the DataColumnSidecar at the given index is available in the filesystem.
func (s BlobStorageSummary) HasDataColumnIndex(idx uint64) bool {
// Protect from panic, but assume callers are sophisticated enough to not need an error telling them they have an invalid idx.
if idx >= fieldparams.NumberOfColumns {
return false
}
return s.mask[idx]
}

// AllAvailable returns true if we have all blobs for all indices from 0 to count-1.
func (s BlobStorageSummary) AllAvailable(count int) bool {
if count > fieldparams.MaxBlobsPerBlock {
Expand All @@ -39,6 +48,21 @@ func (s BlobStorageSummary) AllAvailable(count int) bool {
return true
}

// AllDataColumnsAvailable returns true if we have all datacolumns for corresponding indices.
func (s BlobStorageSummary) AllDataColumnsAvailable(indices map[uint64]bool) bool {
if uint64(len(indices)) > fieldparams.NumberOfColumns {
return false
}

for indice := range indices {
if !s.mask[indice] {
return false
}
}

return true
}

// BlobStorageSummarizer can be used to receive a summary of metadata about blobs on disk for a given root.
// The BlobStorageSummary can be used to check which indices (if any) are available for a given block by root.
type BlobStorageSummarizer interface {
Expand Down
105 changes: 105 additions & 0 deletions beacon-chain/db/filesystem/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,108 @@ func TestAllAvailable(t *testing.T) {
})
}
}

func TestHasDataColumnIndex(t *testing.T) {
storedIndices := map[uint64]bool{
1: true,
3: true,
5: true,
}

cases := []struct {
name string
idx uint64
expected bool
}{
{
name: "index is too high",
idx: fieldparams.NumberOfColumns,
expected: false,
},
{
name: "non existing index",
idx: 2,
expected: false,
},
{
name: "existing index",
idx: 3,
expected: true,
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var mask blobIndexMask

for idx := range storedIndices {
mask[idx] = true
}

sum := BlobStorageSummary{mask: mask}
require.Equal(t, c.expected, sum.HasDataColumnIndex(c.idx))
})
}
}

func TestAllDataColumnAvailable(t *testing.T) {
tooManyColumns := make(map[uint64]bool, fieldparams.NumberOfColumns+1)
for i := uint64(0); i < fieldparams.NumberOfColumns+1; i++ {
tooManyColumns[i] = true
}

columns346 := map[uint64]bool{
3: true,
4: true,
6: true,
}

columns36 := map[uint64]bool{
3: true,
6: true,
}

cases := []struct {
name string
storedIndices map[uint64]bool
testedIndices map[uint64]bool
expected bool
}{
{
name: "no tested indices",
storedIndices: columns346,
testedIndices: map[uint64]bool{},
expected: true,
},
{
name: "too many tested indices",
storedIndices: columns346,
testedIndices: tooManyColumns,
expected: false,
},
{
name: "not all tested indices are stored",
storedIndices: columns36,
testedIndices: columns346,
expected: false,
},
{
name: "all tested indices are stored",
storedIndices: columns346,
testedIndices: columns36,
expected: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
var mask blobIndexMask

for idx := range c.storedIndices {
mask[idx] = true
}

sum := BlobStorageSummary{mask: mask}
require.Equal(t, c.expected, sum.AllDataColumnsAvailable(c.testedIndices))
})
}
}
4 changes: 2 additions & 2 deletions beacon-chain/p2p/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func (s *BlobSidecarsByRootReq) Len() int {
return len(*s)
}

// =====================================
// ===================================
// DataColumnSidecarsByRootReq section
// =====================================
// ===================================
var _ ssz.Marshaler = (*DataColumnSidecarsByRootReq)(nil)
var _ ssz.Unmarshaler = (*DataColumnSidecarsByRootReq)(nil)
var _ sort.Interface = (*DataColumnSidecarsByRootReq)(nil)
Expand Down
9 changes: 6 additions & 3 deletions beacon-chain/sync/data_columns_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
kzg "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
Expand Down Expand Up @@ -56,7 +57,10 @@ func TestRandomizeColumns(t *testing.T) {
}

// createAndConnectPeer creates a peer with a private key `offset` fixed.
// The peer is added and connected to `p2pService`
// The peer is added and connected to `p2pService`.
// If a `RPCDataColumnSidecarsByRootTopicV1` request is made with column index `i`,
// then the peer will respond with the `dataColumnSidecars[i]` if it is not in `columnsNotToRespond`.
// (If `len(dataColumnSidecars) < i`, then this function will panic.)
func createAndConnectPeer(
t *testing.T,
p2pService *p2ptest.TestP2P,
Expand All @@ -78,8 +82,7 @@ func createAndConnectPeer(
// Create the peer.
peer := p2ptest.NewTestP2P(t, libp2p.Identity(privateKey))

// TODO: Do not hardcode the topic.
peer.SetStreamHandler("/eth2/beacon_chain/req/data_column_sidecars_by_root/1/ssz_snappy", func(stream network.Stream) {
peer.SetStreamHandler(p2p.RPCDataColumnSidecarsByRootTopicV1+"/ssz_snappy", func(stream network.Stream) {
// Decode the request.
req := new(p2pTypes.DataColumnSidecarsByRootReq)
err := peer.Encoding().DecodeWithMaxLength(stream, req)
Expand Down
11 changes: 11 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//beacon-chain/sync/verify:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
Expand Down Expand Up @@ -71,7 +72,9 @@ go_test(
tags = ["CI_race_detection"],
deps = [
"//async/abool:go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
Expand All @@ -94,18 +97,26 @@ go_test(
"//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library",
"//container/slice:go_default_library",
"//crypto/ecdsa:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//network/forks:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_consensys_gnark_crypto//ecc/bls12-381/fr:go_default_library",
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
Expand Down
Loading

0 comments on commit 082c813

Please sign in to comment.