diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 09b241f9c9a9..fd31cabb87d1 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -18,7 +18,6 @@ go_library( "handshake.go", "info.go", "interfaces.go", - "iterator.go", "log.go", "message_id.go", "monitoring.go", diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 9ab312559046..3051b51ab80f 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -229,11 +229,11 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { require.NoError(t, err) defer bootListener.Close() - // Use shorter period for testing. - currentPeriod := pollingPeriod - pollingPeriod = 1 * time.Second + // Use smaller batch size for testing. + currentBatchSize := batchSize + batchSize = 2 defer func() { - pollingPeriod = currentPeriod + batchSize = currentBatchSize }() bootNode := bootListener.Self() diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go index 6fbeb28e20ba..c4462e7f1257 100644 --- a/beacon-chain/p2p/custody.go +++ b/beacon-chain/p2p/custody.go @@ -72,26 +72,10 @@ loop: return validPeers, nil } -// CustodyCountFromRemotePeer retrieves the custody count from a remote peer. -func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 { +func (s *Service) custodyCountFromRemotePeerEnr(pid peer.ID) uint64 { // By default, we assume the peer custodies the minimum number of subnets. custodyRequirement := params.BeaconConfig().CustodyRequirement - // First, try to get the custody count from the peer's metadata. - metadata, err := s.peers.Metadata(pid) - if err != nil { - log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value") - } - - if metadata != nil { - custodyCount := metadata.CustodySubnetCount() - if custodyCount > 0 { - return custodyCount - } - } - - log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value") - // Retrieve the ENR of the peer. record, err := s.peers.ENR(pid) if err != nil { @@ -116,3 +100,30 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 { return custodyCount } + +// CustodyCountFromRemotePeer retrieves the custody count from a remote peer. +func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 { + // Try to get the custody count from the peer's metadata. + metadata, err := s.peers.Metadata(pid) + if err != nil { + log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value") + return s.custodyCountFromRemotePeerEnr(pid) + } + + // If the metadata is nil, default to the ENR value. + if metadata == nil { + log.WithField("peerID", pid).Debug("Metadata is nil, defaulting to the ENR value") + return s.custodyCountFromRemotePeerEnr(pid) + } + + // Get the custody subnets count from the metadata. + custodyCount := metadata.CustodySubnetCount() + + // If the custody count is null, default to the ENR value. + if custodyCount == 0 { + log.WithField("peerID", pid).Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value") + return s.custodyCountFromRemotePeerEnr(pid) + } + + return custodyCount +} diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 5d0823d9e385..3e82f8b7958a 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -15,6 +15,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" + "github.com/sirupsen/logrus" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" @@ -277,7 +278,29 @@ func (s *Service) RefreshPersistentSubnets() { // listen for new nodes watches for new nodes in the network and adds them to the peerstore. func (s *Service) listenForNewNodes() { - iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer) + const ( + minLogInterval = 1 * time.Minute + thresholdLimit = 5 + ) + + peersSummary := func(threshold uint) (uint, uint) { + // Retrieve how many active peers we have. + activePeers := s.Peers().Active() + activePeerCount := uint(len(activePeers)) + + // Compute how many peers we are missing to reach the threshold. + if activePeerCount >= threshold { + return activePeerCount, 0 + } + + missingPeerCount := threshold - activePeerCount + + return activePeerCount, missingPeerCount + } + + var lastLogTime time.Time + + iterator := s.dv5Listener.RandomNodes() defer iterator.Close() connectivityTicker := time.NewTicker(1 * time.Minute) thresholdCount := 0 @@ -286,25 +309,31 @@ func (s *Service) listenForNewNodes() { select { case <-s.ctx.Done(): return + case <-connectivityTicker.C: // Skip the connectivity check if not enabled. if !features.Get().EnableDiscoveryReboot { continue } + if !s.isBelowOutboundPeerThreshold() { // Reset counter if we are beyond the threshold thresholdCount = 0 continue } + thresholdCount++ + // Reboot listener if connectivity drops - if thresholdCount > 5 { - log.WithField("outboundConnectionCount", len(s.peers.OutboundConnected())).Warn("Rebooting discovery listener, reached threshold.") + if thresholdCount > thresholdLimit { + outBoundConnectedCount := len(s.peers.OutboundConnected()) + log.WithField("outboundConnectionCount", outBoundConnectedCount).Warn("Rebooting discovery listener, reached threshold.") if err := s.dv5Listener.RebootListener(); err != nil { log.WithError(err).Error("Could not reboot listener") continue } - iterator = filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer) + + iterator = s.dv5Listener.RandomNodes() thresholdCount = 0 } default: @@ -315,17 +344,35 @@ func (s *Service) listenForNewNodes() { time.Sleep(pollingPeriod) continue } - wantedCount := s.wantedPeerDials() - if wantedCount == 0 { + + // Compute the number of new peers we want to dial. + activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers) + + fields := logrus.Fields{ + "currentPeerCount": activePeerCount, + "targetPeerCount": s.cfg.MaxPeers, + } + + if missingPeerCount == 0 { log.Trace("Not looking for peers, at peer limit") time.Sleep(pollingPeriod) continue } + + if time.Since(lastLogTime) > minLogInterval { + lastLogTime = time.Now() + log.WithFields(fields).Debug("Searching for new active peers") + } + // Restrict dials if limit is applied. if flags.MaxDialIsActive() { - wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials) + maxConcurrentDials := uint(flags.Get().MaxConcurrentDials) + missingPeerCount = min(missingPeerCount, maxConcurrentDials) } - wantedNodes := enode.ReadNodes(iterator, wantedCount) + + // Search for new peers. + wantedNodes := searchForPeers(iterator, batchSize, missingPeerCount, s.filterPeer) + wg := new(sync.WaitGroup) for i := 0; i < len(wantedNodes); i++ { node := wantedNodes[i] @@ -615,17 +662,6 @@ func (s *Service) isBelowOutboundPeerThreshold() bool { return outBoundCount < outBoundThreshold } -func (s *Service) wantedPeerDials() int { - maxPeers := int(s.cfg.MaxPeers) - - activePeers := len(s.Peers().Active()) - wantedCount := 0 - if maxPeers > activePeers { - wantedCount = maxPeers - activePeers - } - return wantedCount -} - // PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p. func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) { var allAddrs []ma.Multiaddr diff --git a/beacon-chain/p2p/iterator.go b/beacon-chain/p2p/iterator.go deleted file mode 100644 index cd5451ba3048..000000000000 --- a/beacon-chain/p2p/iterator.go +++ /dev/null @@ -1,36 +0,0 @@ -package p2p - -import ( - "context" - - "github.com/ethereum/go-ethereum/p2p/enode" -) - -// filterNodes wraps an iterator such that Next only returns nodes for which -// the 'check' function returns true. This custom implementation also -// checks for context deadlines so that in the event the parent context has -// expired, we do exit from the search rather than perform more network -// lookups for additional peers. -func filterNodes(ctx context.Context, it enode.Iterator, check func(*enode.Node) bool) enode.Iterator { - return &filterIter{ctx, it, check} -} - -type filterIter struct { - context.Context - enode.Iterator - check func(*enode.Node) bool -} - -// Next looks up for the next valid node according to our -// filter criteria. -func (f *filterIter) Next() bool { - for f.Iterator.Next() { - if f.Context.Err() != nil { - return false - } - if f.check(f.Node()) { - return true - } - } - return false -} diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 122e58a2b894..c7d7147a6d64 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -43,6 +43,10 @@ var _ runtime.Service = (*Service)(nil) // defined below. var pollingPeriod = 6 * time.Second +// When looking for new nodes, if not enough nodes are found, +// we stop after this amount of iterations. +var batchSize = 2_000 + // Refresh rate of ENR set at twice per slot. var refreshRate = slots.DivideSlotBy(2) diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index da7dd426023a..d9bba2c4879d 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -202,11 +202,11 @@ func TestListenForNewNodes(t *testing.T) { require.NoError(t, err) defer bootListener.Close() - // Use shorter period for testing. - currentPeriod := pollingPeriod - pollingPeriod = 1 * time.Second + // Use shorter batch size for testing. + currentBatchSize := batchSize + batchSize = 5 defer func() { - pollingPeriod = currentPeriod + batchSize = currentBatchSize }() bootNode := bootListener.Self() diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 2adead9cff19..803e370b8f0b 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -76,11 +76,11 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) func searchForPeers( iterator enode.Iterator, batchSize int, - peersToFindCount int, + peersToFindCount uint, filter func(node *enode.Node) bool, ) []*enode.Node { nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize) - for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ { + for i := 0; i < batchSize && uint(len(nodeFromNodeID)) <= peersToFindCount && iterator.Next(); i++ { node := iterator.Node() // Filter out nodes that do not meet the criteria. @@ -141,7 +141,7 @@ func (s *Service) FindPeersWithSubnet( index uint64, threshold int, ) (bool, error) { - const batchSize = 2000 + const minLogInterval = 1 * time.Minute ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() @@ -180,19 +180,17 @@ func (s *Service) FindPeersWithSubnet( return true, nil } - log.WithFields(logrus.Fields{ - "topic": topic, - "currentPeerCount": peerCountForTopic, - "targetPeerCount": threshold, - }).Debug("Searching for new peers in the network - Start") + log := log.WithFields(logrus.Fields{ + "topic": topic, + "targetPeerCount": threshold, + }) + + log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start") + + lastLogTime := time.Now() wg := new(sync.WaitGroup) for { - // If we have enough peers, we can exit the loop. This is the happy path. - if missingPeerCountForTopic == 0 { - break - } - // If the context is done, we can exit the loop. This is the unhappy path. if err := ctx.Err(); err != nil { return false, errors.Errorf( @@ -202,7 +200,7 @@ func (s *Service) FindPeersWithSubnet( } // Search for new peers in the network. - nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter) + nodes := searchForPeers(iterator, batchSize, uint(missingPeerCountForTopic), filter) // Restrict dials if limit is applied. maxConcurrentDials := math.MaxInt @@ -221,10 +219,20 @@ func (s *Service) FindPeersWithSubnet( wg.Wait() } - _, missingPeerCountForTopic = peersSummary(topic, threshold) + peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) + + // If we have enough peers, we can exit the loop. This is the happy path. + if missingPeerCountForTopic == 0 { + break + } + + if time.Since(lastLogTime) > minLogInterval { + lastLogTime = time.Now() + log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue") + } } - log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") + log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success") return true, nil }