Skip to content

Commit

Permalink
Fix CPU usage in small devnets (#14446)
Browse files Browse the repository at this point in the history
* `CustodyCountFromRemotePeer`: Set happy path in the outer scope.

* `FindPeersWithSubnet`: Improve logging.

* `listenForNewNodes`: Avoid infinite loop in a small subnet.

* Address Nishant's comment.

* FIx Nishant's comment.
  • Loading branch information
nalepae committed Nov 22, 2024
1 parent 89ab9be commit b531397
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 97 deletions.
1 change: 0 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"handshake.go",
"info.go",
"interfaces.go",
"iterator.go",
"log.go",
"message_id.go",
"monitoring.go",
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,11 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
require.NoError(t, err)
defer bootListener.Close()

// Use shorter period for testing.
currentPeriod := pollingPeriod
pollingPeriod = 1 * time.Second
// Use smaller batch size for testing.
currentBatchSize := batchSize
batchSize = 2
defer func() {
pollingPeriod = currentPeriod
batchSize = currentBatchSize
}()

bootNode := bootListener.Self()
Expand Down
45 changes: 28 additions & 17 deletions beacon-chain/p2p/custody.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,10 @@ loop:
return validPeers, nil
}

// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
func (s *Service) custodyCountFromRemotePeerEnr(pid peer.ID) uint64 {
// By default, we assume the peer custodies the minimum number of subnets.
custodyRequirement := params.BeaconConfig().CustodyRequirement

// First, try to get the custody count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
}

if metadata != nil {
custodyCount := metadata.CustodySubnetCount()
if custodyCount > 0 {
return custodyCount
}
}

log.WithField("peerID", pid).Debug("Failed to retrieve custody count from metadata for peer, defaulting to the ENR value")

// Retrieve the ENR of the peer.
record, err := s.peers.ENR(pid)
if err != nil {
Expand All @@ -116,3 +100,30 @@ func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {

return custodyCount
}

// CustodyCountFromRemotePeer retrieves the custody count from a remote peer.
func (s *Service) CustodyCountFromRemotePeer(pid peer.ID) uint64 {
// Try to get the custody count from the peer's metadata.
metadata, err := s.peers.Metadata(pid)
if err != nil {
log.WithError(err).WithField("peerID", pid).Debug("Failed to retrieve metadata for peer, defaulting to the ENR value")
return s.custodyCountFromRemotePeerEnr(pid)
}

// If the metadata is nil, default to the ENR value.
if metadata == nil {
log.WithField("peerID", pid).Debug("Metadata is nil, defaulting to the ENR value")
return s.custodyCountFromRemotePeerEnr(pid)
}

// Get the custody subnets count from the metadata.
custodyCount := metadata.CustodySubnetCount()

// If the custody count is null, default to the ENR value.
if custodyCount == 0 {
log.WithField("peerID", pid).Debug("The custody count extracted from the metadata equals to 0, defaulting to the ENR value")
return s.custodyCountFromRemotePeerEnr(pid)
}

return custodyCount
}
74 changes: 55 additions & 19 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/sirupsen/logrus"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
Expand Down Expand Up @@ -277,7 +278,29 @@ func (s *Service) RefreshPersistentSubnets() {

// listen for new nodes watches for new nodes in the network and adds them to the peerstore.
func (s *Service) listenForNewNodes() {
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
const (
minLogInterval = 1 * time.Minute
thresholdLimit = 5
)

peersSummary := func(threshold uint) (uint, uint) {
// Retrieve how many active peers we have.
activePeers := s.Peers().Active()
activePeerCount := uint(len(activePeers))

// Compute how many peers we are missing to reach the threshold.
if activePeerCount >= threshold {
return activePeerCount, 0
}

missingPeerCount := threshold - activePeerCount

return activePeerCount, missingPeerCount
}

var lastLogTime time.Time

iterator := s.dv5Listener.RandomNodes()
defer iterator.Close()
connectivityTicker := time.NewTicker(1 * time.Minute)
thresholdCount := 0
Expand All @@ -286,25 +309,31 @@ func (s *Service) listenForNewNodes() {
select {
case <-s.ctx.Done():
return

case <-connectivityTicker.C:
// Skip the connectivity check if not enabled.
if !features.Get().EnableDiscoveryReboot {
continue
}

if !s.isBelowOutboundPeerThreshold() {
// Reset counter if we are beyond the threshold
thresholdCount = 0
continue
}

thresholdCount++

// Reboot listener if connectivity drops
if thresholdCount > 5 {
log.WithField("outboundConnectionCount", len(s.peers.OutboundConnected())).Warn("Rebooting discovery listener, reached threshold.")
if thresholdCount > thresholdLimit {
outBoundConnectedCount := len(s.peers.OutboundConnected())
log.WithField("outboundConnectionCount", outBoundConnectedCount).Warn("Rebooting discovery listener, reached threshold.")
if err := s.dv5Listener.RebootListener(); err != nil {
log.WithError(err).Error("Could not reboot listener")
continue
}
iterator = filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)

iterator = s.dv5Listener.RandomNodes()
thresholdCount = 0
}
default:
Expand All @@ -315,17 +344,35 @@ func (s *Service) listenForNewNodes() {
time.Sleep(pollingPeriod)
continue
}
wantedCount := s.wantedPeerDials()
if wantedCount == 0 {

// Compute the number of new peers we want to dial.
activePeerCount, missingPeerCount := peersSummary(s.cfg.MaxPeers)

fields := logrus.Fields{
"currentPeerCount": activePeerCount,
"targetPeerCount": s.cfg.MaxPeers,
}

if missingPeerCount == 0 {
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}

if time.Since(lastLogTime) > minLogInterval {
lastLogTime = time.Now()
log.WithFields(fields).Debug("Searching for new active peers")
}

// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
maxConcurrentDials := uint(flags.Get().MaxConcurrentDials)
missingPeerCount = min(missingPeerCount, maxConcurrentDials)
}
wantedNodes := enode.ReadNodes(iterator, wantedCount)

// Search for new peers.
wantedNodes := searchForPeers(iterator, batchSize, missingPeerCount, s.filterPeer)

wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
Expand Down Expand Up @@ -615,17 +662,6 @@ func (s *Service) isBelowOutboundPeerThreshold() bool {
return outBoundCount < outBoundThreshold
}

func (s *Service) wantedPeerDials() int {
maxPeers := int(s.cfg.MaxPeers)

activePeers := len(s.Peers().Active())
wantedCount := 0
if maxPeers > activePeers {
wantedCount = maxPeers - activePeers
}
return wantedCount
}

// PeersFromStringAddrs converts peer raw ENRs into multiaddrs for p2p.
func PeersFromStringAddrs(addrs []string) ([]ma.Multiaddr, error) {
var allAddrs []ma.Multiaddr
Expand Down
36 changes: 0 additions & 36 deletions beacon-chain/p2p/iterator.go

This file was deleted.

4 changes: 4 additions & 0 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ var _ runtime.Service = (*Service)(nil)
// defined below.
var pollingPeriod = 6 * time.Second

// When looking for new nodes, if not enough nodes are found,
// we stop after this amount of iterations.
var batchSize = 2_000

// Refresh rate of ENR set at twice per slot.
var refreshRate = slots.DivideSlotBy(2)

Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func TestListenForNewNodes(t *testing.T) {
require.NoError(t, err)
defer bootListener.Close()

// Use shorter period for testing.
currentPeriod := pollingPeriod
pollingPeriod = 1 * time.Second
// Use shorter batch size for testing.
currentBatchSize := batchSize
batchSize = 5
defer func() {
pollingPeriod = currentPeriod
batchSize = currentBatchSize
}()

bootNode := bootListener.Self()
Expand Down
40 changes: 24 additions & 16 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node)
func searchForPeers(
iterator enode.Iterator,
batchSize int,
peersToFindCount int,
peersToFindCount uint,
filter func(node *enode.Node) bool,
) []*enode.Node {
nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize)
for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ {
for i := 0; i < batchSize && uint(len(nodeFromNodeID)) <= peersToFindCount && iterator.Next(); i++ {
node := iterator.Node()

// Filter out nodes that do not meet the criteria.
Expand Down Expand Up @@ -141,7 +141,7 @@ func (s *Service) FindPeersWithSubnet(
index uint64,
threshold int,
) (bool, error) {
const batchSize = 2000
const minLogInterval = 1 * time.Minute

ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet")
defer span.End()
Expand Down Expand Up @@ -180,19 +180,17 @@ func (s *Service) FindPeersWithSubnet(
return true, nil
}

log.WithFields(logrus.Fields{
"topic": topic,
"currentPeerCount": peerCountForTopic,
"targetPeerCount": threshold,
}).Debug("Searching for new peers in the network - Start")
log := log.WithFields(logrus.Fields{
"topic": topic,
"targetPeerCount": threshold,
})

log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - start")

lastLogTime := time.Now()

wg := new(sync.WaitGroup)
for {
// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeerCountForTopic == 0 {
break
}

// If the context is done, we can exit the loop. This is the unhappy path.
if err := ctx.Err(); err != nil {
return false, errors.Errorf(
Expand All @@ -202,7 +200,7 @@ func (s *Service) FindPeersWithSubnet(
}

// Search for new peers in the network.
nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter)
nodes := searchForPeers(iterator, batchSize, uint(missingPeerCountForTopic), filter)

// Restrict dials if limit is applied.
maxConcurrentDials := math.MaxInt
Expand All @@ -221,10 +219,20 @@ func (s *Service) FindPeersWithSubnet(
wg.Wait()
}

_, missingPeerCountForTopic = peersSummary(topic, threshold)
peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold)

// If we have enough peers, we can exit the loop. This is the happy path.
if missingPeerCountForTopic == 0 {
break
}

if time.Since(lastLogTime) > minLogInterval {
lastLogTime = time.Now()
log.WithField("currentPeerCount", peerCountForTopic).Debug("Searching for new peers for a subnet - continue")
}
}

log.WithField("topic", topic).Debug("Searching for new peers in the network - Success")
log.WithField("currentPeerCount", threshold).Debug("Searching for new peers for a subnet - success")
return true, nil
}

Expand Down

0 comments on commit b531397

Please sign in to comment.