Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Add MetadataV3 with custody_subnet_count #14274

Merged
merged 21 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
412ca3a
`sendPingRequest`: Add some comments.
nalepae Jul 26, 2024
5376b6e
`sendPingRequest`: Replace `stream.Conn().RemotePeer()` by `peerID`.
nalepae Jul 26, 2024
f6edec3
`pingHandler`: Add comments.
nalepae Jul 27, 2024
04db57c
`sendMetaDataRequest`: Add comments and implement an unique test.
nalepae Jul 29, 2024
4a1bf61
Gather `SchemaVersion`s in the same `const` definition.
nalepae Jul 29, 2024
23b48d6
Define `SchemaVersionV3`.
nalepae Jul 29, 2024
2cdd759
`MetaDataV1`: Fix comment.
nalepae Jul 29, 2024
7a5d2af
Proto: Define `MetaDataV2`.
nalepae Jul 29, 2024
938a744
`MetaDataV2`: Generate SSZ.
nalepae Jul 29, 2024
11cc685
`newColumnSubnetIDs`: Use smaller lines.
nalepae Jul 29, 2024
7229c0d
`metaDataHandler` and `sendMetaDataRequest`: Manage `MetaDataV2`.
nalepae Jul 30, 2024
74ea8c9
`RefreshPersistentSubnets`: Refactor tests (no functional change).
nalepae Aug 1, 2024
364d65d
`RefreshPersistentSubnets`: Refactor and add comments (no functional …
nalepae Jul 30, 2024
e941806
`RefreshPersistentSubnets`: Compare cache with both ENR & metadata.
nalepae Aug 1, 2024
9b8b863
`RefreshPersistentSubnets`: Manage peerDAS.
nalepae Aug 1, 2024
5fe39ae
`registerRPCHandlersPeerDAS`: Register `RPCMetaDataTopicV3`.
nalepae Aug 1, 2024
a3f148b
`CustodyCountFromRemotePeer`: Retrieve the count from metadata.
nalepae Aug 1, 2024
f8de5a0
Update beacon-chain/sync/rpc_metadata.go
nalepae Aug 2, 2024
66ac549
Fix duplicate case.
nalepae Aug 2, 2024
58c40ef
Remove version testing.
nalepae Aug 2, 2024
eb91698
`debug.proto`: Stop breaking ordering.
nalepae Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions beacon-chain/cache/column_subnet_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ var ColumnSubnetIDs = newColumnSubnetIDs()
const columnKey = "columns"

func newColumnSubnetIDs() *columnSubnetIDs {
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
epochDuration := time.Duration(slotsPerEpoch.Mul(secondsPerSlot))

// Set the default duration of a column subnet subscription as the column expiry period.
subLength := epochDuration * time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
minEpochsForDataColumnSidecarsRequest := time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
subLength := epochDuration * minEpochsForDataColumnSidecarsRequest

persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second)
return &columnSubnetIDs{colSubCache: persistentCache}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/core/helpers/sync_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestIsCurrentEpochSyncCommittee_UsingCommittee(t *testing.T) {

func TestIsCurrentEpochSyncCommittee_DoesNotExist(t *testing.T) {
helpers.ClearCache()
params.SetupTestConfigCleanup(t)

validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
syncCommittee := &ethpb.SyncCommittee{
Expand Down Expand Up @@ -264,6 +265,7 @@ func TestCurrentEpochSyncSubcommitteeIndices_UsingCommittee(t *testing.T) {
}

func TestCurrentEpochSyncSubcommitteeIndices_DoesNotExist(t *testing.T) {
params.SetupTestConfigCleanup(t)
helpers.ClearCache()

validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ go_test(
"//network/forks:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//proto/testing:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
Expand Down
17 changes: 16 additions & 1 deletion beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,28 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
custodyRequirement := params.BeaconConfig().CustodyRequirement

// First, try to get the custody count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
}

if metadata != nil {
custodyCount := metadata.CustodySubnetCount()
if custodyCount > 0 {
return custodyCount
}
}

log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value")

// Retrieve the ENR of the peer.
record, err := s.peers.ENR(pid)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve ENR for peer, defaulting to the default value")
}).Debug("Failed to retrieve ENR for peer, defaulting to the default value")

return custodyRequirement
}
Expand Down
55 changes: 44 additions & 11 deletions beacon-chain/p2p/custody_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
prysmNetwork "github.com/prysmaticlabs/prysm/v5/network"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)

Expand Down Expand Up @@ -99,11 +102,12 @@ func TestGetValidCustodyPeers(t *testing.T) {

func TestCustodyCountFromRemotePeer(t *testing.T) {
const (
expected uint64 = 7
pid = "test-id"
expectedENR uint64 = 7
expectedMetadata uint64 = 8
pid = "test-id"
)

csc := peerdas.Csc(expected)
csc := peerdas.Csc(expectedENR)

// Define a nil record
var nilRecord *enr.Record = nil
Expand All @@ -115,26 +119,49 @@ func TestCustodyCountFromRemotePeer(t *testing.T) {
nominalRecord := &enr.Record{}
nominalRecord.Set(csc)

// Define a metadata with zero custody.
zeroMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodySubnetCount: 0,
})

// Define a nominal metadata.
nominalMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodySubnetCount: expectedMetadata,
})

testCases := []struct {
name string
record *enr.Record
metadata metadata.Metadata
expected uint64
}{
{
name: "nominal",
record: nominalRecord,
expected: expected,
},
{
name: "nil",
name: "No metadata - No ENR",
record: nilRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "empty",
name: "No metadata - Empty ENR",
record: emptyRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "No Metadata - ENR",
record: nominalRecord,
expected: expectedENR,
},
{
name: "Metadata with 0 value - ENR",
record: nominalRecord,
metadata: zeroMetadata,
expected: expectedENR,
},
{
name: "Metadata - ENR",
record: nominalRecord,
metadata: nominalMetadata,
expected: expectedMetadata,
},
}

for _, tc := range testCases {
Expand All @@ -144,12 +171,18 @@ func TestCustodyCountFromRemotePeer(t *testing.T) {
ScorerParams: &scorers.Config{},
})

// Set the metadata.
if tc.metadata != nil {
peers.SetMetadata(pid, tc.metadata)
}

// Add a new peer with the record.
peers.Add(tc.record, pid, nil, network.DirOutbound)

// Create a new service.
service := &Service{
peers: peers,
peers: peers,
metaData: tc.metadata,
}

// Retrieve the custody count from the remote peer.
Expand Down
133 changes: 104 additions & 29 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,62 +55,137 @@ func (quicProtocol) ENRKey() string { return quickProtocolEnrKey }
// This routine checks for our attestation, sync committee and data column subnets and updates them if they have
// been rotated.
func (s *Service) RefreshPersistentSubnets() {
// return early if discv5 isnt running
// Return early if discv5 service isn't running.
if s.dv5Listener == nil || !s.isInitialized() {
return
}
currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix())))
if err := initializePersistentSubnets(s.dv5Listener.LocalNode().ID(), currEpoch); err != nil {

// Get the current epoch.
currentSlot := slots.CurrentSlot(uint64(s.genesisTime.Unix()))
currentEpoch := slots.ToEpoch(currentSlot)

// Get our node ID.
nodeID := s.dv5Listener.LocalNode().ID()

// Get our node record.
record := s.dv5Listener.Self().Record()

// Get the version of our metadata.
metadataVersion := s.Metadata().Version()

// Initialize persistent subnets.
if err := initializePersistentSubnets(nodeID, currentEpoch); err != nil {
log.WithError(err).Error("Could not initialize persistent subnets")
return
}
if err := initializePersistentColumnSubnets(s.dv5Listener.LocalNode().ID()); err != nil {

// Initialize persistent column subnets.
if err := initializePersistentColumnSubnets(nodeID); err != nil {
log.WithError(err).Error("Could not initialize persistent column subnets")
return
}

// Get the current attestation subnet bitfield.
bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range committees {
attestationCommittees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range attestationCommittees {
bitV.SetBitAt(idx, true)
}
currentBitV, err := attBitvector(s.dv5Listener.Self().Record())

// Get the attestation subnet bitfield we store in our record.
inRecordBitV, err := attBitvector(record)
if err != nil {
log.WithError(err).Error("Could not retrieve att bitfield")
return
}

// Compare current epoch with our fork epochs
// Get the attestation subnet bitfield in our metadata.
inMetadataBitV := s.Metadata().AttnetsBitfield()

// Is our attestation bitvector record up to date?
isBitVUpToDate := bytes.Equal(bitV, inRecordBitV) && bytes.Equal(bitV, inMetadataBitV)

// Compare current epoch with Altair fork epoch
altairForkEpoch := params.BeaconConfig().AltairForkEpoch
switch {
case currEpoch < altairForkEpoch:

if currentEpoch < altairForkEpoch {
// Phase 0 behaviour.
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
if isBitVUpToDate {
// Return early if bitfield hasn't changed.
return
}

// Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadata(bitV)
default:
// Retrieve sync subnets from application level
// cache.
bitS := bitfield.Bitvector4{byte(0x00)}
committees = cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
for _, idx := range committees {
bitS.SetBitAt(idx, true)
}
currentBitS, err := syncBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.WithError(err).Error("Could not retrieve sync bitfield")
return
}
if bytes.Equal(bitV, currentBitV) && bytes.Equal(bitS, currentBitS) &&
s.Metadata().Version() == version.Altair {
// return early if bitfields haven't changed

// Ping all peers.
s.pingPeers()

return
}

// Get the current sync subnet bitfield.
bitS := bitfield.Bitvector4{byte(0x00)}
syncCommittees := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
for _, idx := range syncCommittees {
bitS.SetBitAt(idx, true)
}

// Get the sync subnet bitfield we store in our record.
inRecordBitS, err := syncBitvector(record)
if err != nil {
log.WithError(err).Error("Could not retrieve sync bitfield")
return
}

// Get the sync subnet bitfield in our metadata.
currentBitSInMetadata := s.Metadata().SyncnetsBitfield()

isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)

// Compare current epoch with EIP-7594 fork epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch

if currentEpoch < eip7594ForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.
return
}

// Some data have changed, update our record and metadata.
s.updateSubnetRecordWithMetadataV2(bitV, bitS)

// Ping all peers to inform them of new metadata
s.pingPeers()

return
}
// ping all peers to inform them of new metadata

// Get the current custody subnet count.
custodySubnetCount := peerdas.CustodySubnetCount()

// Get the custody subnet count we store in our record.
inRecordCustodySubnetCount, err := peerdas.CustodyCountFromRecord(record)
if err != nil {
log.WithError(err).Error("Could not retrieve custody subnet count")
return
}

// Get the custody subnet count in our metadata.
inMetadataCustodySubnetCount := s.Metadata().CustodySubnetCount()

isCustodySubnetCountUpToDate := (custodySubnetCount == inRecordCustodySubnetCount && custodySubnetCount == inMetadataCustodySubnetCount)

if metadataVersion == version.Deneb && isBitVUpToDate && isBitSUpToDate && isCustodySubnetCountUpToDate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove metadataVersion == version.Deneb we already asserted above that we are beyond the peerDAS fork epoch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 58c40ef.

// Nothing to do, return early.
return
}

// Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount)

// Ping all peers.
s.pingPeers()
}

Expand Down
Loading
Loading