Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Initial Sync with 128 data columns subnets #14403

Merged
merged 20 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
50b4f4b
`pingPeers`: Add log with new ENR when modified.
nalepae Aug 30, 2024
7514b7b
`p2p Start`: Use idiomatic go error syntax.
nalepae Aug 30, 2024
e2220b5
P2P `start`: Fix error message.
nalepae Aug 30, 2024
d95407f
Use not bootnodes at all if the `--chain-config-file` flag is used an…
nalepae Aug 30, 2024
2857344
`validPeersExist`: Centralize logs.
nalepae Aug 30, 2024
be836a9
`AddConnectionHandler`: Improve logging.
nalepae Sep 1, 2024
0abfab4
Logging: Add 2 decimals for timestamt in text and JSON logs.
nalepae Sep 1, 2024
959ff24
Improve "no valid peers" logging.
nalepae Sep 2, 2024
2d446a2
Improve "Some columns have no peers responsible for custody" logging.
nalepae Sep 2, 2024
edb0fc1
`pubsubSubscriptionRequestLimit`: Increase to be consistent with data…
nalepae Sep 3, 2024
c12b239
`sendPingRequest`: Improve logging.
nalepae Sep 3, 2024
7d02a3c
`FindPeersWithSubnet`: Regularly recheck in our current set of peers …
nalepae Sep 3, 2024
eee87bb
`subscribeDynamicWithSyncSubnets`: Use exactly the same subscription …
nalepae Sep 3, 2024
0af4c2e
Make deepsource happier.
nalepae Sep 3, 2024
bd34800
Nishant's commend: Change peer disconnected log.
nalepae Sep 3, 2024
ab0b0e8
NIshant's comment: Change `Too many incoming subscription` log from e…
nalepae Sep 3, 2024
5e2af9f
`FindPeersWithSubnet`: Address Nishant's comment.
nalepae Sep 3, 2024
0b39d26
`batchSize`: Address Nishant's comment.
nalepae Sep 3, 2024
bd82d71
`pingPeers` ==> `pingPeersAndLogEnr`.
nalepae Sep 3, 2024
5c0cc4c
Update beacon-chain/sync/subscriber.go
nalepae Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadata(bitV)

// Ping all peers.
s.pingPeers()
s.pingPeersAndLogEnr()

return
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadataV2(bitV, bitS)

// Ping all peers to inform them of new metadata
s.pingPeers()
s.pingPeersAndLogEnr()

return
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (s *Service) RefreshPersistentSubnets() {
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount)

// Ping all peers.
s.pingPeers()
s.pingPeersAndLogEnr()
}

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
Expand Down
11 changes: 8 additions & 3 deletions beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con
disconnectFromPeer()
return
}

validPeerConnection := func() {
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected)
// Go through the handshake process.
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction,
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer connected")
}).Debug("New peer connection")
}

// Do not perform handshake on inbound dials.
Expand Down Expand Up @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected)
// Only log disconnections if we were fully connected.
if priorState == peers.PeerConnected {
log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected")
log.WithFields(logrus.Fields{
"direction": conn.Stat().Direction.String(),
"multiAddr": peerMultiaddrString(conn),
"activePeers": len(s.peers.Active()),
}).Debug("Peer disconnected")
}
}()
},
Expand Down
28 changes: 23 additions & 5 deletions beacon-chain/p2p/pubsub_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,27 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/sirupsen/logrus"
)

var _ pubsub.SubscriptionFilter = (*Service)(nil)

// It is set at this limit to handle the possibility
// of double topic subscriptions at fork boundaries.
// -> 64 Attestation Subnets * 2.
// -> 4 Sync Committee Subnets * 2.
// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2.
const pubsubSubscriptionRequestLimit = 200
// -> BeaconBlock * 2 = 2
// -> BeaconAggregateAndProof * 2 = 2
// -> VoluntaryExit * 2 = 2
// -> ProposerSlashing * 2 = 2
// -> AttesterSlashing * 2 = 2
// -> 64 Beacon Attestation * 2 = 128
// -> SyncContributionAndProof * 2 = 2
// -> 4 SyncCommitteeSubnets * 2 = 8
// -> BlsToExecutionChange * 2 = 2
// -> 128 DataColumnSidecar * 2 = 256
// -------------------------------------
// TOTAL = 406
// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar)
const pubsubSubscriptionRequestLimit = 500

// CanSubscribe returns true if the topic is of interest and we could subscribe to it.
func (s *Service) CanSubscribe(topic string) bool {
Expand Down Expand Up @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool {
// FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications.
// This method returns only the topics of interest and may return an error if the subscription
// request contains too many topics.
func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) {
if len(subs) > pubsubSubscriptionRequestLimit {
subsCount := len(subs)
log.WithFields(logrus.Fields{
"peerID": peerID,
"subscriptionCounts": subsCount,
"subscriptionLimit": pubsubSubscriptionRequestLimit,
}).Debug("Too many incoming subscriptions, filtering them")

return nil, pubsub.ErrTooManySubscriptions
}

Expand Down
14 changes: 10 additions & 4 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,13 @@ func (s *Service) Start() {
s.startupErr = err
return
}
err = s.connectToBootnodes()
if err != nil {
log.WithError(err).Error("Could not add bootnode to the exclusion list")

if err := s.connectToBootnodes(); err != nil {
log.WithError(err).Error("Could not connect to boot nodes")
s.startupErr = err
return
}

s.dv5Listener = listener
go s.listenForNewNodes()
}
Expand Down Expand Up @@ -393,12 +394,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er
s.pingMethodLock.Unlock()
}

func (s *Service) pingPeers() {
func (s *Service) pingPeersAndLogEnr() {
s.pingMethodLock.RLock()
defer s.pingMethodLock.RUnlock()

localENR := s.dv5Listener.Self()
log.WithField("ENR", localENR).Info("New node record")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we logging this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

We log the very first ENR here:

INFO p2p: Started discovery v5
ENR=enr:-Me4QOjJD9yIdENgUqCEWvF8-i4ptgYCcE2_-585RTOkZmTbH09IuXNW5Y3hX_WG_-DEncEytbRry_tLe31BECrKPRGGAZG3Q7i0h2F0dG5ldHOIAAAAAAAAAACDY3NjBIRldGgykDdW2IZgAAA4AOH1BQAAAACCaWSCdjSCaXCErBAAEolzZWNwMjU2azGhA4Vo-9vK3iXroeg9TmKANgrXuntjWj6atypZL7bbpjBviHN5bmNuZXRzAIN0Y3CCMsiDdW
RwgjLI

But it turns out this ENR is the real one for a very small amount of time.
This ENR is modified shortly after.

Logging the initial ENR without logging modified ones is quite misleading since the user will think that it is it actual ENR, while it is not any more the case.

My opinion is:

  • We should log all ENR changes, or
  • We should NOT log any ENR at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do want to log enr changes, pingPeers is the wrong place to do it. It is more accurate to do it in RefreshPersistentSubnets as that is where the enr is actually modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.
However, currently pingPeers in only called by RefreshPersistentSubnets.
And the way RefreshPersistentSubnets is coded, we should add this log at three different locations, instead of only one.

I can of course do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also rename pingPeers into pingPeersAndLogEnr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming would be better then

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in bd82d71.


if s.pingMethod == nil {
return
}

for _, pid := range s.peers.Connected() {
go func(id peer.ID) {
if err := s.pingMethod(s.ctx, id); err != nil {
Expand Down
185 changes: 146 additions & 39 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"context"
"math"
"strings"
"sync"
"time"
Expand All @@ -20,9 +21,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)

var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount
Expand Down Expand Up @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110
// chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150

// nodeFilter return a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) {
switch {
case strings.Contains(topic, GossipAttestationMessage):
return s.filterPeerForAttSubnet(index), nil
case strings.Contains(topic, GossipSyncCommitteeMessage):
return s.filterPeerForSyncSubnet(index), nil
case strings.Contains(topic, GossipDataColumnSidecarMessage):
return s.filterPeerForDataColumnsSubnet(index), nil
default:
return nil, errors.Errorf("no subnet exists for provided topic: %s", topic)
}
}

// searchForPeers performs a network search for peers subscribed to a particular subnet.
// It exits as soon as one of these conditions is met:
// - It looped through `batchSize` nodes.
// - It found `peersToFindCount“ peers corresponding to the `filter` criteria.
// - Iterator is exhausted.
func searchForPeers(
iterator enode.Iterator,
batchSize int,
peersToFindCount int,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
if !filter(node) {
continue
}

// Remove duplicates, keeping the node with higher seq.
prevNode, ok := nodeFromNodeID[node.ID()]
if ok && prevNode.Seq() > node.Seq() {
continue
}

nodeFromNodeID[node.ID()] = node
}

// Convert the map to a slice.
nodes := make([]*enode.Node, 0, len(nodeFromNodeID))
for _, node := range nodeFromNodeID {
nodes = append(nodes, node)
}

return nodes
}

// dialPeer dials a peer in a separate goroutine.
func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) {
info, _, err := convertToAddrInfo(node)
if err != nil {
return
}

if info == nil {
return
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}

wg.Done()
}()
}

// FindPeersWithSubnet performs a network search for peers
// subscribed to a particular subnet. Then it tries to connect
// with those peers. This method will block until either:
Expand All @@ -61,69 +135,96 @@ const dataColumnSubnetVal = 150
// On some edge cases, this method may hang indefinitely while peers
// are actually found. In such a case, the user should cancel the context
// and re-run the method again.
func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string,
index uint64, threshold int) (bool, error) {
func (s *Service) FindPeersWithSubnet(
ctx context.Context,
topic string,
index uint64,
threshold int,
) (bool, error) {
const batchSize = 2000

ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()

span.AddAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing.

if s.dv5Listener == nil {
// return if discovery isn't set
// Return if discovery isn't set
return false, nil
}

topic += s.Encoding().ProtocolSuffix()
iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
switch {
case strings.Contains(topic, GossipAttestationMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index))
case strings.Contains(topic, GossipSyncCommitteeMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index))
case strings.Contains(topic, GossipDataColumnSidecarMessage):
iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index))
default:
return false, errors.Errorf("no subnet exists for provided topic: %s", topic)

filter, err := s.nodeFilter(topic, index)
if err != nil {
return false, errors.Wrap(err, "node filter")
}

peersSummary := func(topic string, threshold int) (int, int) {
// Retrieve how many peers we have for this topic.
peerCountForTopic := len(s.pubsub.ListPeers(topic))

// Compute how many peers we are missing to reach the threshold.
missingPeerCountForTopic := max(0, threshold-peerCountForTopic)

return peerCountForTopic, missingPeerCountForTopic
}

// Compute how many peers we are missing to reach the threshold.
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)

// Exit early if we have enough peers.
if missingPeerCountForTopic == 0 {
return true, nil
}

log.WithFields(logrus.Fields{
"topic": topic,
"currentPeerCount": peerCountForTopic,
"targetPeerCount": threshold,
}).Debug("Searching for new peers in the network - Start")

wg := new(sync.WaitGroup)
for {
currNum := len(s.pubsub.ListPeers(topic))
if currNum >= threshold {
// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeerCountForTopic == 0 {
break
}

// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+
"only %d out of %d peers were able to be found", topic, currNum, threshold)
return false, errors.Errorf(
"unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching",
topic, peerCountForTopic, threshold,
)
}
nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch)

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
if flags.MaxDialIsActive() {
nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials)
maxConcurrentDials = flags.Get().MaxConcurrentDials
}
nodes := enode.ReadNodes(iterator, nodeCount)
for _, node := range nodes {
info, _, err := convertToAddrInfo(node)
if err != nil {
continue
}

if info == nil {
continue
// Dial the peers in batches.
for start := 0; start < len(nodes); start += maxConcurrentDials {
stop := min(start+maxConcurrentDials, len(nodes))
for _, node := range nodes[start:stop] {
s.dialPeer(ctx, wg, node)
}

wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}()
// Wait for all dials to be completed.
wg.Wait()
}
// Wait for all dials to be completed.
wg.Wait()

_, missingPeerCountForTopic = peersSummary(topic, threshold)
}

log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
return true, nil
}

Expand Down Expand Up @@ -183,11 +284,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode.
// lower threshold to broadcast object compared to searching
// for a subnet. So that even in the event of poor peer
// connectivity, we can still broadcast an attestation.
func (s *Service) hasPeerWithSubnet(topic string) bool {
func (s *Service) hasPeerWithSubnet(subnetTopic string) bool {
// In the event peer threshold is lower, we will choose the lower
// threshold.
minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet))
return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int.
minPeers := min(1, flags.Get().MinimumPeersPerSubnet)
topic := subnetTopic + s.Encoding().ProtocolSuffix()
peersWithSubnet := s.pubsub.ListPeers(topic)
peersWithSubnetCount := len(peersWithSubnet)

enoughPeers := peersWithSubnetCount >= minPeers

return enoughPeers
}

// Updates the service's discv5 listener record's attestation subnet
Expand Down
Loading
Loading