From c63d73a2735e78459abdbbe8f151ec81808bf177 Mon Sep 17 00:00:00 2001
From: Manu NALEPA <enalepa@offchainlabs.com>
Date: Fri, 13 Sep 2024 12:27:08 +0200
Subject: [PATCH] Fix CPU usage in small devnets (#14446)

* `CustodyCountFromRemotePeer`: Set happy path in the outer scope.

* `FindPeersWithSubnet`: Improve logging.

* `listenForNewNodes`: Avoid infinite loop in a small subnet.

* Address Nishant's comment.

* FIx Nishant's comment.
---
 beacon-chain/p2p/BUILD.bazel         |  1 -
 beacon-chain/p2p/broadcaster_test.go |  8 ++--
 beacon-chain/p2p/custody.go          | 45 +++++++++++++--------
 beacon-chain/p2p/discovery.go        | 59 ++++++++++++++++++++--------
 beacon-chain/p2p/iterator.go         | 36 -----------------
 beacon-chain/p2p/service.go          |  4 ++
 beacon-chain/p2p/service_test.go     |  8 ++--
 beacon-chain/p2p/subnets.go          | 40 +++++++++++--------
 8 files changed, 107 insertions(+), 94 deletions(-)
 delete mode 100644 beacon-chain/p2p/iterator.go

diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel
index 09b241f9c9a9..fd31cabb87d1 100644
--- a/beacon-chain/p2p/BUILD.bazel
+++ b/beacon-chain/p2p/BUILD.bazel
@@ -18,7 +18,6 @@ go_library(
         "handshake.go",
         "info.go",
         "interfaces.go",
-        "iterator.go",
         "log.go",
         "message_id.go",
         "monitoring.go",
diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go
index f514d9a16e8b..80e94f044f32 100644
--- a/beacon-chain/p2p/broadcaster_test.go
+++ b/beacon-chain/p2p/broadcaster_test.go
@@ -230,11 +230,11 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
 	require.NoError(t, err)
 	defer bootListener.Close()
 
-	// Use shorter period for testing.
-	currentPeriod := pollingPeriod
-	pollingPeriod = 1 * time.Second
+	// Use smaller batch size for testing.
+	currentBatchSize := batchSize
+	batchSize = 2
 	defer func() {
-		pollingPeriod = currentPeriod
+		batchSize = currentBatchSize
 	}()
 
 	bootNode := bootListener.Self()
diff --git a/beacon-chain/p2p/custody.go b/beacon-chain/p2p/custody.go
index 6fbeb28e20ba..c4462e7f1257 100644
--- a/beacon-chain/p2p/custody.go
+++ b/beacon-chain/p2p/custody.go
@@ -72,26 +72,10 @@ loop:
 	return validPeers, nil
 }
 
-// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
-func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
+func (s *Service) custodyCountFromRemotePeerEnr(pid peer.ID) uint64 {
 	// By default, we assume the peer custodies the minimum number of subnets.
 	custodyRequirement := params.BeaconConfig().CustodyRequirement
 
-	// First, try to get the custody count from the peer's metadata.
-	metadata, err := s.peers.Metadata(pid)
-	if err != nil {
-		log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
-	}
-
-	if metadata != nil {
-		custodyCount := metadata.CustodySubnetCount()
-		if custodyCount > 0 {
-			return custodyCount
-		}
-	}
-
-	log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value")
-
 	// Retrieve the ENR of the peer.
 	record, err := s.peers.ENR(pid)
 	if err != nil {
@@ -116,3 +100,30 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
 
 	return custodyCount
 }
+
+// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
+func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
+	// Try to get the custody count from the peer's metadata.
+	metadata, err := s.peers.Metadata(pid)
+	if err != nil {
+		log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
+		return s.custodyCountFromRemotePeerEnr(pid)
+	}
+
+	// If the metadata is nil, default to the ENR value.
+	if metadata == nil {
+		log.WithField("peerID", pid).Debug("Metadata is nil, defaulting to the ENR value")
+		return s.custodyCountFromRemotePeerEnr(pid)
+	}
+
+	// Get the custody subnets count from the metadata.
+	custodyCount := metadata.CustodySubnetCount()
+
+	// If the custody count is null, default to the ENR value.
+	if custodyCount == 0 {
+		log.WithField("peerID", pid).Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value")
+		return s.custodyCountFromRemotePeerEnr(pid)
+	}
+
+	return custodyCount
+}
diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go
index ac28081e54f3..b9b533b1ce0b 100644
--- a/beacon-chain/p2p/discovery.go
+++ b/beacon-chain/p2p/discovery.go
@@ -15,6 +15,7 @@ import (
 	ma "github.com/multiformats/go-multiaddr"
 	"github.com/pkg/errors"
 	"github.com/prysmaticlabs/go-bitfield"
+	"github.com/sirupsen/logrus"
 
 	"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
 	"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
@@ -191,7 +192,26 @@ func (s *Service) RefreshPersistentSubnets() {
 
 // listen for new nodes watches for new nodes in the network and adds them to the peerstore.
 func (s *Service) listenForNewNodes() {
-	iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
+	const minLogInterval = 1 * time.Minute
+
+	peersSummary := func(threshold uint) (uint, uint) {
+		// Retrieve how many active peers we have.
+		activePeers := s.Peers().Active()
+		activePeerCount := uint(len(activePeers))
+
+		// Compute how many peers we are missing to reach the threshold.
+		if activePeerCount >= threshold {
+			return activePeerCount, 0
+		}
+
+		missingPeerCount := threshold - activePeerCount
+
+		return activePeerCount, missingPeerCount
+	}
+
+	var lastLogTime time.Time
+
+	iterator := s.dv5Listener.RandomNodes()
 	defer iterator.Close()
 
 	for {
@@ -207,17 +227,35 @@ func (s *Service) listenForNewNodes() {
 			time.Sleep(pollingPeriod)
 			continue
 		}
-		wantedCount := s.wantedPeerDials()
-		if wantedCount == 0 {
+
+		// Compute the number of new peers we want to dial.
+		activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers)
+
+		fields := logrus.Fields{
+			"currentPeerCount": activePeerCount,
+			"targetPeerCount":  s.cfg.MaxPeers,
+		}
+
+		if missingPeerCount == 0 {
 			log.Trace("Not looking for peers, at peer limit")
 			time.Sleep(pollingPeriod)
 			continue
 		}
+
+		if time.Since(lastLogTime) > minLogInterval {
+			lastLogTime = time.Now()
+			log.WithFields(fields).Debug("Searching for new active peers")
+		}
+
 		// Restrict dials if limit is applied.
 		if flags.MaxDialIsActive() {
-			wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
+			maxConcurrentDials := uint(flags.Get().MaxConcurrentDials)
+			missingPeerCount = min(missingPeerCount, maxConcurrentDials)
 		}
-		wantedNodes := enode.ReadNodes(iterator, wantedCount)
+
+		// Search for new peers.
+		wantedNodes := searchForPeers(iterator, batchSize, missingPeerCount, s.filterPeer)
+
 		wg := new(sync.WaitGroup)
 		for i := 0; i < len(wantedNodes); i++ {
 			node := wantedNodes[i]
@@ -487,17 +525,6 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
 	return activePeers >= maxPeers || numOfConns >= maxPeers
 }
 
-func (s *Service) wantedPeerDials() int {
-	maxPeers := int(s.cfg.MaxPeers)
-
-	activePeers := len(s.Peers().Active())
-	wantedCount := 0
-	if maxPeers > activePeers {
-		wantedCount = maxPeers - activePeers
-	}
-	return wantedCount
-}
-
 // PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
 func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
 	var allAddrs []ma.Multiaddr
diff --git a/beacon-chain/p2p/iterator.go b/beacon-chain/p2p/iterator.go
deleted file mode 100644
index cd5451ba3048..000000000000
--- a/beacon-chain/p2p/iterator.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package p2p
-
-import (
-	"context"
-
-	"github.com/ethereum/go-ethereum/p2p/enode"
-)
-
-// filterNodes wraps an iterator such that Next only returns nodes for which
-// the 'check' function returns true. This custom implementation also
-// checks for context deadlines so that in the event the parent context has
-// expired, we do exit from the search rather than  perform more network
-// lookups for additional peers.
-func filterNodes(ctx context.Context, it enode.Iterator, check func(*enode.Node) bool) enode.Iterator {
-	return &filterIter{ctx, it, check}
-}
-
-type filterIter struct {
-	context.Context
-	enode.Iterator
-	check func(*enode.Node) bool
-}
-
-// Next looks up for the next valid node according to our
-// filter criteria.
-func (f *filterIter) Next() bool {
-	for f.Iterator.Next() {
-		if f.Context.Err() != nil {
-			return false
-		}
-		if f.check(f.Node()) {
-			return true
-		}
-	}
-	return false
-}
diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go
index bea75488251f..428482218fb8 100644
--- a/beacon-chain/p2p/service.go
+++ b/beacon-chain/p2p/service.go
@@ -43,6 +43,10 @@ var _ runtime.Service = (*Service)(nil)
 // defined below.
 var pollingPeriod = 6 * time.Second
 
+// When looking for new nodes, if not enough nodes are found,
+// we stop after this amount of iterations.
+var batchSize = 2_000
+
 // Refresh rate of ENR set at twice per slot.
 var refreshRate = slots.DivideSlotBy(2)
 
diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go
index c09ad1db6407..ccc340b310c6 100644
--- a/beacon-chain/p2p/service_test.go
+++ b/beacon-chain/p2p/service_test.go
@@ -201,11 +201,11 @@ func TestListenForNewNodes(t *testing.T) {
 	require.NoError(t, err)
 	defer bootListener.Close()
 
-	// Use shorter period for testing.
-	currentPeriod := pollingPeriod
-	pollingPeriod = 1 * time.Second
+	// Use shorter batch size for testing.
+	currentBatchSize := batchSize
+	batchSize = 5
 	defer func() {
-		pollingPeriod = currentPeriod
+		batchSize = currentBatchSize
 	}()
 
 	bootNode := bootListener.Self()
diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go
index 2adead9cff19..803e370b8f0b 100644
--- a/beacon-chain/p2p/subnets.go
+++ b/beacon-chain/p2p/subnets.go
@@ -76,11 +76,11 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node)
 func searchForPeers(
 	iterator enode.Iterator,
 	batchSize int,
-	peersToFindCount int,
+	peersToFindCount uint,
 	filter func(node *enode.Node) bool,
 ) []*enode.Node {
 	nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
-	for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
+	for i := 0; i < batchSize && uint(len(nodeFromNodeID)) <= peersToFindCount && iterator.Next(); i++ {
 		node := iterator.Node()
 
 		// Filter out nodes that do not meet the criteria.
@@ -141,7 +141,7 @@ func (s *Service) FindPeersWithSubnet(
 	index uint64,
 	threshold int,
 ) (bool, error) {
-	const batchSize = 2000
+	const minLogInterval = 1 * time.Minute
 
 	ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
 	defer span.End()
@@ -180,19 +180,17 @@ func (s *Service) FindPeersWithSubnet(
 		return true, nil
 	}
 
-	log.WithFields(logrus.Fields{
-		"topic":            topic,
-		"currentPeerCount": peerCountForTopic,
-		"targetPeerCount":  threshold,
-	}).Debug("Searching for new peers in the network - Start")
+	log := log.WithFields(logrus.Fields{
+		"topic":           topic,
+		"targetPeerCount": threshold,
+	})
+
+	log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start")
+
+	lastLogTime := time.Now()
 
 	wg := new(sync.WaitGroup)
 	for {
-		// If we have enough peers, we can exit the loop. This is the happy path.
-		if missingPeerCountForTopic == 0 {
-			break
-		}
-
 		// If the context is done, we can exit the loop. This is the unhappy path.
 		if err := ctx.Err(); err != nil {
 			return false, errors.Errorf(
@@ -202,7 +200,7 @@ func (s *Service) FindPeersWithSubnet(
 		}
 
 		// Search for new peers in the network.
-		nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter)
+		nodes := searchForPeers(iterator, batchSize, uint(missingPeerCountForTopic), filter)
 
 		// Restrict dials if limit is applied.
 		maxConcurrentDials := math.MaxInt
@@ -221,10 +219,20 @@ func (s *Service) FindPeersWithSubnet(
 			wg.Wait()
 		}
 
-		_, missingPeerCountForTopic = peersSummary(topic, threshold)
+		peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)
+
+		// If we have enough peers, we can exit the loop. This is the happy path.
+		if missingPeerCountForTopic == 0 {
+			break
+		}
+
+		if time.Since(lastLogTime) > minLogInterval {
+			lastLogTime = time.Now()
+			log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue")
+		}
 	}
 
-	log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
+	log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success")
 	return true, nil
 }