From 50b4f4ba2221d2c519015447cf18afad4124f666 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 30 Aug 2024 13:37:36 +0200 Subject: [PATCH 01/20] `pingPeers`: Add log with new ENR when modified. --- beacon-chain/p2p/service.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index d3eaeaa2e4b0..965632be256d 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -396,9 +396,14 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er func (s *Service) pingPeers() { s.pingMethodLock.RLock() defer s.pingMethodLock.RUnlock() + + localENR := s.dv5Listener.Self() + log.WithField("ENR", localENR).Info("New node record") + if s.pingMethod == nil { return } + for _, pid := range s.peers.Connected() { go func(id peer.ID) { if err := s.pingMethod(s.ctx, id); err != nil { From 7514b7b6c476b562149608ccbf606e554a5eb276 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 30 Aug 2024 13:39:28 +0200 Subject: [PATCH 02/20] `p2p Start`: Use idiomatic go error syntax. --- beacon-chain/p2p/service.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 965632be256d..aad340f22bf0 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -202,12 +202,13 @@ func (s *Service) Start() { s.startupErr = err return } - err = s.connectToBootnodes() - if err != nil { + + if err := s.connectToBootnodes(); err != nil { log.WithError(err).Error("Could not add bootnode to the exclusion list") s.startupErr = err return } + s.dv5Listener = listener go s.listenForNewNodes() } From e2220b526d234560829f8c07c3655b49b8f94da2 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 30 Aug 2024 13:40:16 +0200 Subject: [PATCH 03/20] P2P `start`: Fix error message. --- beacon-chain/p2p/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index aad340f22bf0..a0566d628cfc 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -204,7 +204,7 @@ func (s *Service) Start() { } if err := s.connectToBootnodes(); err != nil { - log.WithError(err).Error("Could not add bootnode to the exclusion list") + log.WithError(err).Error("Could not connect to boot nodes") s.startupErr = err return } From d95407f98f070d6031874d1fb3d6cda0798504c0 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 30 Aug 2024 14:27:58 +0200 Subject: [PATCH 04/20] Use not bootnodes at all if the `--chain-config-file` flag is used and no `--bootstrap-node` flag is used. Before this commit, if the `--chain-config-file` flag is used and no `--bootstrap-node` flag is used, then bootnodes are (incorrectly) defaulted on `mainnet` ones. --- config/features/config.go | 1 + config/params/BUILD.bazel | 1 + config/params/testnet_custom_network_config.go | 9 +++++++++ 3 files changed, 11 insertions(+) create mode 100644 config/params/testnet_custom_network_config.go diff --git a/config/features/config.go b/config/features/config.go index 0ddf43f0f715..76e12ede6ce1 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -145,6 +145,7 @@ func configureTestnet(ctx *cli.Context) error { } else { if ctx.IsSet(cmd.ChainConfigFileFlag.Name) { log.Warn("Running on custom Ethereum network specified in a chain configuration yaml file") + params.UseCustomNetworkConfig() } else { log.Info("Running on Ethereum Mainnet") } diff --git a/config/params/BUILD.bazel b/config/params/BUILD.bazel index 35660c89b96f..f06dd6fa33a8 100644 --- a/config/params/BUILD.bazel +++ b/config/params/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "mainnet_config.go", "minimal_config.go", "network_config.go", + "testnet_custom_network_config.go", "testnet_e2e_config.go", "testnet_holesky_config.go", "testnet_sepolia_config.go", diff --git a/config/params/testnet_custom_network_config.go b/config/params/testnet_custom_network_config.go new file mode 100644 index 000000000000..7ce6780fd59a --- /dev/null +++ b/config/params/testnet_custom_network_config.go @@ -0,0 +1,9 @@ +package params + +func UseCustomNetworkConfig() { + cfg := BeaconNetworkConfig().Copy() + cfg.ContractDeploymentBlock = 0 + cfg.BootstrapNodes = []string{} + + OverrideBeaconNetworkConfig(cfg) +} From 2857344d1eecb5e0a36117d7d16c400c3b73d0d6 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 30 Aug 2024 23:58:01 +0200 Subject: [PATCH 05/20] `validPeersExist`: Centralize logs. --- beacon-chain/sync/subscriber.go | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 76168790f1e9..c6ab1d8fbb74 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -389,8 +389,6 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, // Check every slot that there are enough peers for i := uint64(0); i < subnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -508,8 +506,6 @@ func (s *Service) subscribeAggregatorSubnet( subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -534,8 +530,6 @@ func (s *Service) subscribeSyncSubnet( subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -589,8 +583,6 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped // Check every slot that there are enough peers for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -686,12 +678,6 @@ func (s *Service) subscribeColumnSubnet( minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet if !s.validPeersExist(subnetTopic) { - log.WithFields(logrus.Fields{ - "columnSubnet": idx, - "minimumPeersPerSubnet": minimumPeersPerSubnet, - "topic": subnetTopic, - }).Debug("No peers found subscribed to column gossip subnet. Searching network for peers subscribed to it") - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, minimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -763,8 +749,6 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) // perform a search for peers with the desired committee index. _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { @@ -790,8 +774,21 @@ func (s *Service) unSubscribeFromTopic(topic string) { // find if we have peers who are subscribed to the same subnet func (s *Service) validPeersExist(subnetTopic string) bool { - numOfPeers := s.cfg.p2p.PubSub().ListPeers(subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()) - return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet + topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() + threshold := flags.Get().MinimumPeersPerSubnet + peersCount := len(s.cfg.p2p.PubSub().ListPeers(topic)) + + enoughPeers := peersCount >= threshold + + if !enoughPeers { + log.WithFields(logrus.Fields{ + "topic": topic, + "peersCount": peersCount, + "threshold": threshold, + }).Debug("Not enough peers for this subnet, running network search") + } + + return enoughPeers } func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 { From be836a95c6c2b300c64b815d42537d02af5341e1 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Sun, 1 Sep 2024 12:54:02 +0200 Subject: [PATCH 06/20] `AddConnectionHandler`: Improve logging. "Peer connected" does not really reflect the fact that a new peer is actually connected. --> "New peer connection" is more clear. Also, instead of writing `0`, `1`or `2` for direction, now it's writted "Unknown", "Inbound", "Outbound". --- beacon-chain/p2p/handshake.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 97d2af8eed72..a83892f2ddf2 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con disconnectFromPeer() return } + validPeerConnection := func() { s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) // Go through the handshake process. log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction, + "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), - }).Debug("Peer connected") + }).Debug("New peer connection") } // Do not perform handshake on inbound dials. @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) // Only log disconnections if we were fully connected. if priorState == peers.PeerConnected { - log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected") + log.WithFields(logrus.Fields{ + "direction": conn.Stat().Direction.String(), + "multiAddr": peerMultiaddrString(conn), + "activePeers": len(s.peers.Active()), + }).Debug("New peer disconnection") } }() }, From 0abfab40ce2d99119c4d5ab1934caf1e358a7025 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Sun, 1 Sep 2024 15:39:19 +0200 Subject: [PATCH 07/20] Logging: Add 2 decimals for timestamt in text and JSON logs. --- cmd/beacon-chain/main.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index c1e4cdac67be..0ed6cbb03801 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -169,7 +169,7 @@ func before(ctx *cli.Context) error { switch format { case "text": formatter := new(prefixed.TextFormatter) - formatter.TimestampFormat = "2006-01-02 15:04:05" + formatter.TimestampFormat = "2006-01-02 15:04:05.00" formatter.FullTimestamp = true // If persistent log files are written - we disable the log messages coloring because @@ -185,7 +185,9 @@ func before(ctx *cli.Context) error { logrus.SetFormatter(f) case "json": - logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: "2006-01-02 15:04:05.00", + }) case "journald": if err := journald.Enable(); err != nil { return err From 959ff2421511646059bfa04f7dd9aaee1c265e37 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 2 Sep 2024 11:23:47 +0200 Subject: [PATCH 08/20] Improve "no valid peers" logging. --- beacon-chain/p2p/subnets.go | 30 ++++++++++++++++++++++++------ beacon-chain/sync/subscriber.go | 11 +++++++---- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index af5b0dda216c..c689779709c6 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -20,9 +20,9 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v5/crypto/hash" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" - mathutil "github.com/prysmaticlabs/prysm/v5/math" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/sirupsen/logrus" ) var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount @@ -61,8 +61,12 @@ const dataColumnSubnetVal = 150 // On some edge cases, this method may hang indefinitely while peers // are actually found. In such a case, the user should cancel the context // and re-run the method again. -func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, - index uint64, threshold int) (bool, error) { +func (s *Service) FindPeersWithSubnet( + ctx context.Context, + topic string, + index uint64, + threshold int, +) (bool, error) { ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() @@ -183,11 +187,25 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode. // lower threshold to broadcast object compared to searching // for a subnet. So that even in the event of poor peer // connectivity, we can still broadcast an attestation. -func (s *Service) hasPeerWithSubnet(topic string) bool { +func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { // In the event peer threshold is lower, we will choose the lower // threshold. - minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet)) - return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int. + minPeers := min(1, flags.Get().MinimumPeersPerSubnet) + topic := subnetTopic + s.Encoding().ProtocolSuffix() + peersWithSubnet := s.pubsub.ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + enoughPeers := peersWithSubnetCount >= minPeers + + if !enoughPeers { + log.WithFields(logrus.Fields{ + "topic": topic, + "peersCount": peersWithSubnetCount, + "ctxError": s.ctx.Err(), + }).Debug("No valid peers, starting network search") + } + + return enoughPeers } // Updates the service's discv5 listener record's attestation subnet diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index c6ab1d8fbb74..483097ec496f 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -776,16 +776,19 @@ func (s *Service) unSubscribeFromTopic(topic string) { func (s *Service) validPeersExist(subnetTopic string) bool { topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() threshold := flags.Get().MinimumPeersPerSubnet - peersCount := len(s.cfg.p2p.PubSub().ListPeers(topic)) - enoughPeers := peersCount >= threshold + peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + enoughPeers := peersWithSubnetCount >= threshold if !enoughPeers { log.WithFields(logrus.Fields{ "topic": topic, - "peersCount": peersCount, + "peersCount": peersWithSubnetCount, "threshold": threshold, - }).Debug("Not enough peers for this subnet, running network search") + "ctxError": s.ctx.Err(), + }).Debug("Not enough valid peers, starting network search") } return enoughPeers From 2d446a2cbadc3cdc3e712c6837c57da937c08bbd Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 2 Sep 2024 11:24:04 +0200 Subject: [PATCH 09/20] Improve "Some columns have no peers responsible for custody" logging. --- beacon-chain/sync/data_columns_sampling.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 27c16fc0f2bf..d83b119bbe8f 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "slices" "sort" "sync" "time" @@ -182,14 +183,17 @@ func (d *dataColumnSampler1D) refreshPeerInfo() { } } - columnWithNoPeers := make([]uint64, 0) + columnsWithoutPeers := make([]uint64, 0) for column, peers := range d.peerFromColumn { if len(peers) == 0 { - columnWithNoPeers = append(columnWithNoPeers, column) + columnsWithoutPeers = append(columnsWithoutPeers, column) } } - if len(columnWithNoPeers) > 0 { - log.WithField("columnWithNoPeers", columnWithNoPeers).Warn("Some columns have no peers responsible for custody") + + slices.Sort[[]uint64](columnsWithoutPeers) + + if len(columnsWithoutPeers) > 0 { + log.WithField("columns", columnsWithoutPeers).Warn("Some columns have no peers responsible for custody") } } From edb0fc14d8a65e5e5cd1518d501523c475fa703c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 10:43:42 +0200 Subject: [PATCH 10/20] `pubsubSubscriptionRequestLimit`: Increase to be consistent with data columns. --- beacon-chain/p2p/pubsub_filter.go | 28 +++++++++++++++++++++++----- beacon-chain/sync/subscriber.go | 4 +++- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index e02371c587f9..762991b650b5 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -10,16 +10,27 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/network/forks" + "github.com/sirupsen/logrus" ) var _ pubsub.SubscriptionFilter = (*Service)(nil) // It is set at this limit to handle the possibility // of double topic subscriptions at fork boundaries. -// -> 64 Attestation Subnets * 2. -// -> 4 Sync Committee Subnets * 2. -// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2. -const pubsubSubscriptionRequestLimit = 200 +// -> BeaconBlock * 2 = 2 +// -> BeaconAggregateAndProof * 2 = 2 +// -> VoluntaryExit * 2 = 2 +// -> ProposerSlashing * 2 = 2 +// -> AttesterSlashing * 2 = 2 +// -> 64 Beacon Attestation * 2 = 128 +// -> SyncContributionAndProof * 2 = 2 +// -> 4 SyncCommitteeSubnets * 2 = 8 +// -> BlsToExecutionChange * 2 = 2 +// -> 128 DataColumnSidecar * 2 = 256 +// ------------------------------------- +// TOTAL = 406 +// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar) +const pubsubSubscriptionRequestLimit = 500 // CanSubscribe returns true if the topic is of interest and we could subscribe to it. func (s *Service) CanSubscribe(topic string) bool { @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool { // FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications. // This method returns only the topics of interest and may return an error if the subscription // request contains too many topics. -func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { +func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { if len(subs) > pubsubSubscriptionRequestLimit { + subsCount := len(subs) + log.WithFields(logrus.Fields{ + "peerID": peerID, + "subscriptionCounts": subsCount, + "subscriptionLimit": pubsubSubscriptionRequestLimit, + }).Error("Too many incoming subscriptions, filtering them") + return nil, pubsub.ErrTooManySubscriptions } diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 483097ec496f..c16239732df1 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -191,7 +191,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s // Do not resubscribe already seen subscriptions. ok := s.subHandler.topicExists(topic) if ok { - log.Debugf("Provided topic already has an active subscription running: %s", topic) + log.WithField("topic", topic).Debug("Provided topic already has an active subscription running") return nil } @@ -208,6 +208,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s log.WithError(err).Error("Could not subscribe topic") return nil } + s.subHandler.addTopic(sub.Topic(), sub) // Pipeline decodes the incoming subscription data, runs the validation, and handles the @@ -215,6 +216,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s pipeline := func(msg *pubsub.Message) { ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout) defer cancel() + ctx, span := trace.StartSpan(ctx, "sync.pubsub") defer span.End() From c12b2391c45a33284a45894fd7c32cf70df9725c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 10:45:41 +0200 Subject: [PATCH 11/20] `sendPingRequest`: Improve logging. --- beacon-chain/sync/rpc_ping.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 872f4a25e391..a1bfa06a876c 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -142,7 +142,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error { // If the peer responded with an error, increment the bad responses scorer. if code != 0 { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) - return errors.New(errMsg) + return errors.Errorf("code: %d - %s", code, errMsg) } // Decode the sequence number from the peer. From 7d02a3c8c997e2cd551c4c110b2044248636dfb9 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 10:48:45 +0200 Subject: [PATCH 12/20] `FindPeersWithSubnet`: Regularly recheck in our current set of peers if we have enough peers for this topic. Before this commit, new peers HAD to be found, even if current peers are eventually acceptable. For very small network, it used to lead to infinite search. --- beacon-chain/p2p/subnets.go | 163 +++++++++++++++++++++++++++--------- 1 file changed, 122 insertions(+), 41 deletions(-) diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index c689779709c6..9870c9204924 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "math" "strings" "sync" "time" @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110 // chosen more than sync, attestation and blob subnet (6) combined. const dataColumnSubnetVal = 150 +// nodeFilter return a function that filters nodes based on the subnet topic and subnet index. +func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { + switch { + case strings.Contains(topic, GossipAttestationMessage): + return s.filterPeerForAttSubnet(index), nil + case strings.Contains(topic, GossipSyncCommitteeMessage): + return s.filterPeerForSyncSubnet(index), nil + case strings.Contains(topic, GossipDataColumnSidecarMessage): + return s.filterPeerForDataColumnsSubnet(index), nil + default: + return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) + } +} + +// searchForPeers performs a network search for peers subscribed to a particular subnet. +// It exits as soon as one of these conditions is met: +// - It looped through `batchSize` nodes. +// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. +// - Iterator is exhausted. +func searchForPeers( + iterator enode.Iterator, + batchSize int, + peersToFindCount int, + filter func(node *enode.Node) bool, +) []*enode.Node { + nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize) + for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ { + node := iterator.Node() + + // Filter out nodes that do not meet the criteria. + if !filter(node) { + continue + } + + // Remove duplicates, keeping the node with higher seq. + prevNode, ok := nodeFromNodeID[node.ID()] + if ok && prevNode.Seq() > node.Seq() { + continue + } + + nodeFromNodeID[node.ID()] = node + } + + // Convert the map to a slice. + nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) + for _, node := range nodeFromNodeID { + nodes = append(nodes, node) + } + + return nodes +} + +// dialPeer dials a peer in a separate goroutine. +func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { + info, _, err := convertToAddrInfo(node) + if err != nil { + return + } + + if info == nil { + return + } + + wg.Add(1) + go func() { + if err := s.connectWithPeer(ctx, *info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + + wg.Done() + }() +} + // FindPeersWithSubnet performs a network search for peers // subscribed to a particular subnet. Then it tries to connect // with those peers. This method will block until either: @@ -67,67 +141,82 @@ func (s *Service) FindPeersWithSubnet( index uint64, threshold int, ) (bool, error) { + const batchSize = 200 + ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() span.AddAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. if s.dv5Listener == nil { - // return if discovery isn't set + // Return if discovery isn't set return false, nil } topic += s.Encoding().ProtocolSuffix() iterator := s.dv5Listener.RandomNodes() defer iterator.Close() - switch { - case strings.Contains(topic, GossipAttestationMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index)) - case strings.Contains(topic, GossipSyncCommitteeMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index)) - case strings.Contains(topic, GossipDataColumnSidecarMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index)) - default: - return false, errors.Errorf("no subnet exists for provided topic: %s", topic) + + filter, err := s.nodeFilter(topic, index) + if err != nil { + return false, errors.Wrap(err, "node filter") } + firstLoop := true + wg := new(sync.WaitGroup) for { - currNum := len(s.pubsub.ListPeers(topic)) - if currNum >= threshold { + // Retrieve how many peers we have for this topic. + peerCountForTopic := len(s.pubsub.ListPeers(topic)) + + // Compute how many peers we are missing to reach the threshold. + missingPeersCount := max(0, threshold-peerCountForTopic) + + // If we have enough peers, we can exit the loop. This is the happy path. + if missingPeersCount == 0 { break } + + if firstLoop { + firstLoop = false + + log.WithFields(logrus.Fields{ + "topic": topic, + "currentPeerCount": peerCountForTopic, + "targetPeerCount": threshold, + }).Debug("Searching for new peers in the network - Start") + } + + // If the context is done, we can exit the loop. This is the unhappy path. if err := ctx.Err(); err != nil { - return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ - "only %d out of %d peers were able to be found", topic, currNum, threshold) + return false, errors.Errorf( + "unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", + topic, peerCountForTopic, threshold, + ) } - nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch) + + // Search for new peers in the network. + nodes := searchForPeers(iterator, batchSize, missingPeersCount, filter) + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt if flags.MaxDialIsActive() { - nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials) + maxConcurrentDials = flags.Get().MaxConcurrentDials } - nodes := enode.ReadNodes(iterator, nodeCount) - for _, node := range nodes { - info, _, err := convertToAddrInfo(node) - if err != nil { - continue - } - if info == nil { - continue + // Dial the peers in batches. + for start := 0; start < len(nodes); start += maxConcurrentDials { + stop := min(start+maxConcurrentDials, len(nodes)) + for _, node := range nodes[start:stop] { + s.dialPeer(ctx, wg, node) } - wg.Add(1) - go func() { - if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) - } - wg.Done() - }() + // Wait for all dials to be completed. + wg.Wait() } - // Wait for all dials to be completed. - wg.Wait() } + + log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") return true, nil } @@ -197,14 +286,6 @@ func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { enoughPeers := peersWithSubnetCount >= minPeers - if !enoughPeers { - log.WithFields(logrus.Fields{ - "topic": topic, - "peersCount": peersWithSubnetCount, - "ctxError": s.ctx.Err(), - }).Debug("No valid peers, starting network search") - } - return enoughPeers } From eee87bba1a493a5dd4b9b1bae1ef834e8d8fbbf3 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 10:49:52 +0200 Subject: [PATCH 13/20] `subscribeDynamicWithSyncSubnets`: Use exactly the same subscription function initially and every slot. --- beacon-chain/sync/subscriber.go | 194 ++++++++++++++++++++------------ 1 file changed, 122 insertions(+), 72 deletions(-) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index c16239732df1..73b76b4988f4 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -21,6 +21,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/features" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/container/slice" @@ -454,10 +455,8 @@ func (s *Service) subscribeDynamicWithSubnets( return } wantedSubs := s.retrievePersistentSubs(currentSlot) - // Resize as appropriate. s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - // subscribe desired aggregator subnets. for _, idx := range wantedSubs { s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) } @@ -471,9 +470,15 @@ func (s *Service) subscribeDynamicWithSubnets( }() } -// revalidate that our currently connected subnets are valid. -func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, topicFormat string, digest [4]byte) { +// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are +// not in the list of wanted subnets. +// TODO: Rename this functions as it does not only revalidate subscriptions. +func (s *Service) reValidateSubscriptions( + subscriptions map[uint64]*pubsub.Subscription, + wantedSubs []uint64, + topicFormat string, + digest [4]byte, +) { for k, v := range subscriptions { var wanted bool for _, idx := range wantedSubs { @@ -482,6 +487,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc break } } + if !wanted && v != nil { v.Cancel() fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() @@ -515,30 +521,6 @@ func (s *Service) subscribeAggregatorSubnet( } } -// subscribe missing subnets for our sync committee members. -func (s *Service) subscribeSyncSubnet( - subscriptions map[uint64]*pubsub.Subscription, - idx uint64, - digest [4]byte, - validate wrappedVal, - handle subHandler, -) { - // do not subscribe if we have no peers in the same - // subnet - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - // check if subscription exists and if not subscribe the relevant subnet. - if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) - } - if !s.validPeersExist(subnetTopic) { - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } - } -} - // subscribe to a static subnet with the given topic and index. A given validator and subscription handler is // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) { @@ -602,59 +584,138 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped }() } -// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible -// string for the topic name and the list of subnets for subscribed topics that should be -// maintained. +// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. +// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. +func (s *Service) subscribeToSyncSubnets( + topicFormat string, + digest [4]byte, + genesisValidatorsRoot [fieldparams.RootLength]byte, + genesisTime time.Time, + subscriptions map[uint64]*pubsub.Subscription, + currentSlot primitives.Slot, + validate wrappedVal, + handle subHandler, +) bool { + // Get sync subnets topic. + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] + + // Do not subscribe if not synced. + if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { + return true + } + + // Do not subscribe is the digest is not valid. + valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + if err != nil { + log.Error(err) + return true + } + + // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + if !valid { + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.") + s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + return false + } + + // Get the current epoch. + currentEpoch := slots.ToEpoch(currentSlot) + + // Retrieve the subnets we want to subscribe to. + wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch) + + // Remove subscriptions that are no longer wanted. + s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest) + + // Subscribe to wanted subnets. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if subscription exists. + if _, exists := subscriptions[subnetIndex]; exists { + continue + } + + // We need to subscribe to the subnet. + subscription := s.subscribeWithBase(subnetTopic, validate, handle) + subscriptions[subnetIndex] = subscription + } + + // Find new peers for wanted subnets if needed. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if we have enough peers in the subnet. Skip if we do. + if s.validPeersExist(subnetTopic) { + continue + } + + // Not enough peers in the subnet, we need to search for more. + _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet) + if err != nil { + log.WithError(err).Debug("Could not search for peers") + } + } + + return true +} + +// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets. func (s *Service) subscribeDynamicWithSyncSubnets( topicFormat string, validate wrappedVal, handle subHandler, digest [4]byte, ) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) + // Retrieve the number of committee subnets we need to subscribe to. + syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount + + // Initialize the subscriptions map. + subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) + + // Retrieve the fenesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the epoch of the fork corresponding to the digest. + _, e, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { panic(err) } + + // Retrieve the base protobuf message. base := p2p.GossipTopicMappings(topicFormat, e) if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().SyncCommitteeSubnetCount) - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + // Retrieve the genesis time. + genesisTime := s.cfg.clock.GenesisTime() + + // Define a ticker ticking every slot. + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + + // Retrieve the current slot. + currentSlot := s.cfg.clock.CurrentSlot() + + // Subscribe to the sync subnets. + s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) go func() { for { select { - case <-s.ctx.Done(): - ticker.Done() - return case currentSlot := <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + + // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. + if !isDigestValid { ticker.Done() return } - wantedSubs := s.retrieveActiveSyncSubnets(slots.ToEpoch(currentSlot)) - // Resize as appropriate. - s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - - // subscribe desired aggregator subnets. - for _, idx := range wantedSubs { - s.subscribeSyncSubnet(subscriptions, idx, digest, validate, handle) - } + case <-s.ctx.Done(): + ticker.Done() + return } } }() @@ -782,18 +843,7 @@ func (s *Service) validPeersExist(subnetTopic string) bool { peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) peersWithSubnetCount := len(peersWithSubnet) - enoughPeers := peersWithSubnetCount >= threshold - - if !enoughPeers { - log.WithFields(logrus.Fields{ - "topic": topic, - "peersCount": peersWithSubnetCount, - "threshold": threshold, - "ctxError": s.ctx.Err(), - }).Debug("Not enough valid peers, starting network search") - } - - return enoughPeers + return peersWithSubnetCount >= threshold } func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 { From 0af4c2eaf535f607015cfa604a16d7bd4ff68eba Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 11:55:54 +0200 Subject: [PATCH 14/20] Make deepsource happier. --- beacon-chain/p2p/testing/p2p.go | 4 ++-- config/features/config.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index da0e88092f31..524e4e820d2e 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -290,7 +290,7 @@ func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { // AddConnectionHandler handles the connection with a newly connected peer. func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(net network.Network, conn network.Conn) { + ConnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.Add(new(enr.Record), conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction) @@ -314,7 +314,7 @@ func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID // AddDisconnectionHandler -- func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - DisconnectedF: func(net network.Network, conn network.Conn) { + DisconnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) diff --git a/config/features/config.go b/config/features/config.go index 76e12ede6ce1..4d6e716a4ec2 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -157,11 +157,11 @@ func configureTestnet(ctx *cli.Context) error { } // Insert feature flags within the function to be enabled for Sepolia testnet. -func applySepoliaFeatureFlags(ctx *cli.Context) { +func applySepoliaFeatureFlags(_ *cli.Context) { } // Insert feature flags within the function to be enabled for Holesky testnet. -func applyHoleskyFeatureFlags(ctx *cli.Context) { +func applyHoleskyFeatureFlags(_ *cli.Context) { } // ConfigureBeaconChain sets the global config based From bd34800dd8f2dc7a75ee576fbb3f35667d484a27 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 12:21:57 +0200 Subject: [PATCH 15/20] Nishant's commend: Change peer disconnected log. --- beacon-chain/p2p/handshake.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index a83892f2ddf2..3bf9bd7e7ddb 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -178,7 +178,7 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), - }).Debug("New peer disconnection") + }).Debug("Peer disconnected") } }() }, From ab0b0e883a17ba02418c0d3066c99b584bf1dbe8 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 12:25:14 +0200 Subject: [PATCH 16/20] NIshant's comment: Change `Too many incoming subscription` log from error to debug. --- beacon-chain/p2p/pubsub_filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index 762991b650b5..2239f972bac6 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -113,7 +113,7 @@ func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.R "peerID": peerID, "subscriptionCounts": subsCount, "subscriptionLimit": pubsubSubscriptionRequestLimit, - }).Error("Too many incoming subscriptions, filtering them") + }).Debug("Too many incoming subscriptions, filtering them") return nil, pubsub.ErrTooManySubscriptions } From 5e2af9feb680b0d96d6321fd4f3bc83432d8083c Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 12:59:17 +0200 Subject: [PATCH 17/20] `FindPeersWithSubnet`: Address Nishant's comment. --- beacon-chain/p2p/subnets.go | 42 ++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 9870c9204924..31f5b7c31f16 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -162,29 +162,35 @@ func (s *Service) FindPeersWithSubnet( return false, errors.Wrap(err, "node filter") } - firstLoop := true - - wg := new(sync.WaitGroup) - for { + peersSummary := func(topic string, threshold int) (int, int) { // Retrieve how many peers we have for this topic. peerCountForTopic := len(s.pubsub.ListPeers(topic)) // Compute how many peers we are missing to reach the threshold. - missingPeersCount := max(0, threshold-peerCountForTopic) + missingPeerCountForTopic := max(0, threshold-peerCountForTopic) - // If we have enough peers, we can exit the loop. This is the happy path. - if missingPeersCount == 0 { - break - } + return peerCountForTopic, missingPeerCountForTopic + } - if firstLoop { - firstLoop = false + // Compute how many peers we are missing to reach the threshold. + peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) - log.WithFields(logrus.Fields{ - "topic": topic, - "currentPeerCount": peerCountForTopic, - "targetPeerCount": threshold, - }).Debug("Searching for new peers in the network - Start") + // Exit early if we have enough peers. + if missingPeerCountForTopic == 0 { + return true, nil + } + + log.WithFields(logrus.Fields{ + "topic": topic, + "currentPeerCount": peerCountForTopic, + "targetPeerCount": threshold, + }).Debug("Searching for new peers in the network - Start") + + wg := new(sync.WaitGroup) + for { + // If we have enough peers, we can exit the loop. This is the happy path. + if missingPeerCountForTopic == 0 { + break } // If the context is done, we can exit the loop. This is the unhappy path. @@ -196,7 +202,7 @@ func (s *Service) FindPeersWithSubnet( } // Search for new peers in the network. - nodes := searchForPeers(iterator, batchSize, missingPeersCount, filter) + nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter) // Restrict dials if limit is applied. maxConcurrentDials := math.MaxInt @@ -214,6 +220,8 @@ func (s *Service) FindPeersWithSubnet( // Wait for all dials to be completed. wg.Wait() } + + _, missingPeerCountForTopic = peersSummary(topic, threshold) } log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") From 0b39d26be61abd7fc9dd00d780f03f2db4ba4e0e Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 12:59:50 +0200 Subject: [PATCH 18/20] `batchSize`: Address Nishant's comment. --- beacon-chain/p2p/subnets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 31f5b7c31f16..13b15f49cd7c 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -141,7 +141,7 @@ func (s *Service) FindPeersWithSubnet( index uint64, threshold int, ) (bool, error) { - const batchSize = 200 + const batchSize = 2000 ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() From bd82d71d6bec2cd8b86da56086f5aaa656814325 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 14:50:43 +0200 Subject: [PATCH 19/20] `pingPeers` ==> `pingPeersAndLogEnr`. --- beacon-chain/p2p/discovery.go | 6 +++--- beacon-chain/p2p/service.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index a0166eb70b89..f275328592b1 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -119,7 +119,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadata(bitV) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -157,7 +157,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV2(bitV, bitS) // Ping all peers to inform them of new metadata - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -186,7 +186,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() } // listen for new nodes watches for new nodes in the network and adds them to the peerstore. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index a0566d628cfc..bea75488251f 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -394,7 +394,7 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er s.pingMethodLock.Unlock() } -func (s *Service) pingPeers() { +func (s *Service) pingPeersAndLogEnr() { s.pingMethodLock.RLock() defer s.pingMethodLock.RUnlock() From 5c0cc4c2a787320d33dd340bf7206520fd5840d7 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 14:56:19 +0200 Subject: [PATCH 20/20] Update beacon-chain/sync/subscriber.go Co-authored-by: Nishant Das --- beacon-chain/sync/subscriber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 73b76b4988f4..53a14d1c1b64 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -673,7 +673,7 @@ func (s *Service) subscribeDynamicWithSyncSubnets( // Initialize the subscriptions map. subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) - // Retrieve the fenesis validators root. + // Retrieve the genesis validators root. genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() // Retrieve the epoch of the fork corresponding to the digest.