Skip to content

Commit

Permalink
PeerDAS: Add MetadataV3 with custody_subnet_count (#14274)
Browse files Browse the repository at this point in the history
* `sendPingRequest`: Add some comments.

* `sendPingRequest`: Replace `stream.Conn().RemotePeer()` by `peerID`.

* `pingHandler`: Add comments.

* `sendMetaDataRequest`: Add comments and implement an unique test.

* Gather `SchemaVersion`s in the same `const` definition.

* Define `SchemaVersionV3`.

* `MetaDataV1`: Fix comment.

* Proto: Define `MetaDataV2`.

* `MetaDataV2`: Generate SSZ.

* `newColumnSubnetIDs`: Use smaller lines.

* `metaDataHandler` and `sendMetaDataRequest`: Manage `MetaDataV2`.

* `RefreshPersistentSubnets`: Refactor tests (no functional change).

* `RefreshPersistentSubnets`: Refactor and add comments (no functional change).

* `RefreshPersistentSubnets`: Compare cache with both ENR & metadata.

* `RefreshPersistentSubnets`: Manage peerDAS.

* `registerRPCHandlersPeerDAS`: Register `RPCMetaDataTopicV3`.

* `CustodyCountFromRemotePeer`: Retrieve the count from metadata.

Then default to ENR, then default to the default value.

* Update beacon-chain/sync/rpc_metadata.go

Co-authored-by: Nishant Das <[email protected]>

* Fix duplicate case.

* Remove version testing.

* `debug.proto`: Stop breaking ordering.

---------

Co-authored-by: Nishant Das <[email protected]>
  • Loading branch information
nalepae and nisdas committed Aug 28, 2024
1 parent 98a3652 commit 8839988
Show file tree
Hide file tree
Showing 34 changed files with 1,528 additions and 590 deletions.
9 changes: 7 additions & 2 deletions beacon-chain/cache/column_subnet_ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ var ColumnSubnetIDs = newColumnSubnetIDs()
const columnKey = "columns"

func newColumnSubnetIDs() *columnSubnetIDs {
epochDuration := time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
epochDuration := time.Duration(slotsPerEpoch.Mul(secondsPerSlot))

// Set the default duration of a column subnet subscription as the column expiry period.
subLength := epochDuration * time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
minEpochsForDataColumnSidecarsRequest := time.Duration(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest)
subLength := epochDuration * minEpochsForDataColumnSidecarsRequest

persistentCache := cache.New(subLength*time.Second, epochDuration*time.Second)
return &columnSubnetIDs{colSubCache: persistentCache}
}
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/core/helpers/sync_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestIsCurrentEpochSyncCommittee_UsingCommittee(t *testing.T) {

func TestIsCurrentEpochSyncCommittee_DoesNotExist(t *testing.T) {
helpers.ClearCache()
params.SetupTestConfigCleanup(t)

validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
syncCommittee := &ethpb.SyncCommittee{
Expand Down Expand Up @@ -264,6 +265,7 @@ func TestCurrentEpochSyncSubcommitteeIndices_UsingCommittee(t *testing.T) {
}

func TestCurrentEpochSyncSubcommitteeIndices_DoesNotExist(t *testing.T) {
params.SetupTestConfigCleanup(t)
helpers.ClearCache()

validators := make([]*ethpb.Validator, params.BeaconConfig().SyncCommitteeSize)
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ go_test(
"//network/forks:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//proto/testing:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
Expand Down
17 changes: 16 additions & 1 deletion beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,28 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
custodyRequirement := params.BeaconConfig().CustodyRequirement

// First, try to get the custody count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
}

if metadata != nil {
custodyCount := metadata.CustodySubnetCount()
if custodyCount > 0 {
return custodyCount
}
}

log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value")

// Retrieve the ENR of the peer.
record, err := s.peers.ENR(pid)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"peerID": pid,
"defaultValue": custodyRequirement,
}).Error("Failed to retrieve ENR for peer, defaulting to the default value")
}).Debug("Failed to retrieve ENR for peer, defaulting to the default value")

return custodyRequirement
}
Expand Down
55 changes: 44 additions & 11 deletions beacon-chain/p2p/custody_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
prysmNetwork "github.com/prysmaticlabs/prysm/v5/network"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)

Expand Down Expand Up @@ -99,11 +102,12 @@ func TestGetValidCustodyPeers(t *testing.T) {

func TestCustodyCountFromRemotePeer(t *testing.T) {
const (
expected uint64 = 7
pid = "test-id"
expectedENR uint64 = 7
expectedMetadata uint64 = 8
pid = "test-id"
)

csc := peerdas.Csc(expected)
csc := peerdas.Csc(expectedENR)

// Define a nil record
var nilRecord *enr.Record = nil
Expand All @@ -115,26 +119,49 @@ func TestCustodyCountFromRemotePeer(t *testing.T) {
nominalRecord := &enr.Record{}
nominalRecord.Set(csc)

// Define a metadata with zero custody.
zeroMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodySubnetCount: 0,
})

// Define a nominal metadata.
nominalMetadata := wrapper.WrappedMetadataV2(&pb.MetaDataV2{
CustodySubnetCount: expectedMetadata,
})

testCases := []struct {
name string
record *enr.Record
metadata metadata.Metadata
expected uint64
}{
{
name: "nominal",
record: nominalRecord,
expected: expected,
},
{
name: "nil",
name: "No metadata - No ENR",
record: nilRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "empty",
name: "No metadata - Empty ENR",
record: emptyRecord,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "No Metadata - ENR",
record: nominalRecord,
expected: expectedENR,
},
{
name: "Metadata with 0 value - ENR",
record: nominalRecord,
metadata: zeroMetadata,
expected: expectedENR,
},
{
name: "Metadata - ENR",
record: nominalRecord,
metadata: nominalMetadata,
expected: expectedMetadata,
},
}

for _, tc := range testCases {
Expand All @@ -144,12 +171,18 @@ func TestCustodyCountFromRemotePeer(t *testing.T) {
ScorerParams: &scorers.Config{},
})

// Set the metadata.
if tc.metadata != nil {
peers.SetMetadata(pid, tc.metadata)
}

// Add a new peer with the record.
peers.Add(tc.record, pid, nil, network.DirOutbound)

// Create a new service.
service := &Service{
peers: peers,
peers: peers,
metaData: tc.metadata,
}

// Retrieve the custody count from the remote peer.
Expand Down
133 changes: 104 additions & 29 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,62 +55,137 @@ func (quicProtocol) ENRKey() string { return quickProtocolEnrKey }
// This routine checks for our attestation, sync committee and data column subnets and updates them if they have
// been rotated.
func (s *Service) RefreshPersistentSubnets() {
// return early if discv5 isnt running
// Return early if discv5 service isn't running.
if s.dv5Listener == nil || !s.isInitialized() {
return
}
currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix())))
if err := initializePersistentSubnets(s.dv5Listener.LocalNode().ID(), currEpoch); err != nil {

// Get the current epoch.
currentSlot := slots.CurrentSlot(uint64(s.genesisTime.Unix()))
currentEpoch := slots.ToEpoch(currentSlot)

// Get our node ID.
nodeID := s.dv5Listener.LocalNode().ID()

// Get our node record.
record := s.dv5Listener.Self().Record()

// Get the version of our metadata.
metadataVersion := s.Metadata().Version()

// Initialize persistent subnets.
if err := initializePersistentSubnets(nodeID, currentEpoch); err != nil {
log.WithError(err).Error("Could not initialize persistent subnets")
return
}
if err := initializePersistentColumnSubnets(s.dv5Listener.LocalNode().ID()); err != nil {

// Initialize persistent column subnets.
if err := initializePersistentColumnSubnets(nodeID); err != nil {
log.WithError(err).Error("Could not initialize persistent column subnets")
return
}

// Get the current attestation subnet bitfield.
bitV := bitfield.NewBitvector64()
committees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range committees {
attestationCommittees := cache.SubnetIDs.GetAllSubnets()
for _, idx := range attestationCommittees {
bitV.SetBitAt(idx, true)
}
currentBitV, err := attBitvector(s.dv5Listener.Self().Record())

// Get the attestation subnet bitfield we store in our record.
inRecordBitV, err := attBitvector(record)
if err != nil {
log.WithError(err).Error("Could not retrieve att bitfield")
return
}

// Compare current epoch with our fork epochs
// Get the attestation subnet bitfield in our metadata.
inMetadataBitV := s.Metadata().AttnetsBitfield()

// Is our attestation bitvector record up to date?
isBitVUpToDate := bytes.Equal(bitV, inRecordBitV) && bytes.Equal(bitV, inMetadataBitV)

// Compare current epoch with Altair fork epoch
altairForkEpoch := params.BeaconConfig().AltairForkEpoch
switch {
case currEpoch < altairForkEpoch:

if currentEpoch < altairForkEpoch {
// Phase 0 behaviour.
if bytes.Equal(bitV, currentBitV) {
// return early if bitfield hasn't changed
if isBitVUpToDate {
// Return early if bitfield hasn't changed.
return
}

// Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadata(bitV)
default:
// Retrieve sync subnets from application level
// cache.
bitS := bitfield.Bitvector4{byte(0x00)}
committees = cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
for _, idx := range committees {
bitS.SetBitAt(idx, true)
}
currentBitS, err := syncBitvector(s.dv5Listener.Self().Record())
if err != nil {
log.WithError(err).Error("Could not retrieve sync bitfield")
return
}
if bytes.Equal(bitV, currentBitV) && bytes.Equal(bitS, currentBitS) &&
s.Metadata().Version() == version.Altair {
// return early if bitfields haven't changed

// Ping all peers.
s.pingPeers()

return
}

// Get the current sync subnet bitfield.
bitS := bitfield.Bitvector4{byte(0x00)}
syncCommittees := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
for _, idx := range syncCommittees {
bitS.SetBitAt(idx, true)
}

// Get the sync subnet bitfield we store in our record.
inRecordBitS, err := syncBitvector(record)
if err != nil {
log.WithError(err).Error("Could not retrieve sync bitfield")
return
}

// Get the sync subnet bitfield in our metadata.
currentBitSInMetadata := s.Metadata().SyncnetsBitfield()

isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)

// Compare current epoch with EIP-7594 fork epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch

if currentEpoch < eip7594ForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.
return
}

// Some data have changed, update our record and metadata.
s.updateSubnetRecordWithMetadataV2(bitV, bitS)

// Ping all peers to inform them of new metadata
s.pingPeers()

return
}
// ping all peers to inform them of new metadata

// Get the current custody subnet count.
custodySubnetCount := peerdas.CustodySubnetCount()

// Get the custody subnet count we store in our record.
inRecordCustodySubnetCount, err := peerdas.CustodyCountFromRecord(record)
if err != nil {
log.WithError(err).Error("Could not retrieve custody subnet count")
return
}

// Get the custody subnet count in our metadata.
inMetadataCustodySubnetCount := s.Metadata().CustodySubnetCount()

isCustodySubnetCountUpToDate := (custodySubnetCount == inRecordCustodySubnetCount && custodySubnetCount == inMetadataCustodySubnetCount)

if isBitVUpToDate && isBitSUpToDate && isCustodySubnetCountUpToDate {
// Nothing to do, return early.
return
}

// Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount)

// Ping all peers.
s.pingPeers()
}

Expand Down
Loading

0 comments on commit 8839988

Please sign in to comment.