From c57cf3c3b0a508da8f0b140db8e6adeaf5c86c91 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 3 Sep 2024 15:58:40 +0200 Subject: [PATCH] Fix Initial Sync with 128 data columns subnets (#14403) * `pingPeers`: Add log with new ENR when modified. * `p2p Start`: Use idiomatic go error syntax. * P2P `start`: Fix error message. * Use not bootnodes at all if the `--chain-config-file` flag is used and no `--bootstrap-node` flag is used. Before this commit, if the `--chain-config-file` flag is used and no `--bootstrap-node` flag is used, then bootnodes are (incorrectly) defaulted on `mainnet` ones. * `validPeersExist`: Centralize logs. * `AddConnectionHandler`: Improve logging. "Peer connected" does not really reflect the fact that a new peer is actually connected. --> "New peer connection" is more clear. Also, instead of writing `0`, `1`or `2` for direction, now it's writted "Unknown", "Inbound", "Outbound". * Logging: Add 2 decimals for timestamt in text and JSON logs. * Improve "no valid peers" logging. * Improve "Some columns have no peers responsible for custody" logging. * `pubsubSubscriptionRequestLimit`: Increase to be consistent with data columns. * `sendPingRequest`: Improve logging. * `FindPeersWithSubnet`: Regularly recheck in our current set of peers if we have enough peers for this topic. Before this commit, new peers HAD to be found, even if current peers are eventually acceptable. For very small network, it used to lead to infinite search. * `subscribeDynamicWithSyncSubnets`: Use exactly the same subscription function initially and every slot. * Make deepsource happier. * Nishant's commend: Change peer disconnected log. * NIshant's comment: Change `Too many incoming subscription` log from error to debug. * `FindPeersWithSubnet`: Address Nishant's comment. * `batchSize`: Address Nishant's comment. * `pingPeers` ==> `pingPeersAndLogEnr`. * Update beacon-chain/sync/subscriber.go Co-authored-by: Nishant Das --------- Co-authored-by: Nishant Das --- beacon-chain/p2p/discovery.go | 6 +- beacon-chain/p2p/handshake.go | 11 +- beacon-chain/p2p/pubsub_filter.go | 28 ++- beacon-chain/p2p/service.go | 14 +- beacon-chain/p2p/subnets.go | 185 +++++++++++---- beacon-chain/p2p/testing/p2p.go | 4 +- beacon-chain/sync/data_columns_sampling.go | 12 +- beacon-chain/sync/rpc_ping.go | 2 +- beacon-chain/sync/subscriber.go | 210 +++++++++++------- cmd/beacon-chain/main.go | 6 +- config/features/config.go | 5 +- config/params/BUILD.bazel | 1 + .../params/testnet_custom_network_config.go | 9 + 13 files changed, 349 insertions(+), 144 deletions(-) create mode 100644 config/params/testnet_custom_network_config.go diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index a0166eb70b89..f275328592b1 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -119,7 +119,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadata(bitV) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -157,7 +157,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV2(bitV, bitS) // Ping all peers to inform them of new metadata - s.pingPeers() + s.pingPeersAndLogEnr() return } @@ -186,7 +186,7 @@ func (s *Service) RefreshPersistentSubnets() { s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodySubnetCount) // Ping all peers. - s.pingPeers() + s.pingPeersAndLogEnr() } // listen for new nodes watches for new nodes in the network and adds them to the peerstore. diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index 97d2af8eed72..3bf9bd7e7ddb 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -90,14 +90,15 @@ func (s *Service) AddConnectionHandler(reqFunc, goodByeFunc func(ctx context.Con disconnectFromPeer() return } + validPeerConnection := func() { s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerConnected) // Go through the handshake process. log.WithFields(logrus.Fields{ - "direction": conn.Stat().Direction, + "direction": conn.Stat().Direction.String(), "multiAddr": peerMultiaddrString(conn), "activePeers": len(s.peers.Active()), - }).Debug("Peer connected") + }).Debug("New peer connection") } // Do not perform handshake on inbound dials. @@ -173,7 +174,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p s.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnected) // Only log disconnections if we were fully connected. if priorState == peers.PeerConnected { - log.WithField("activePeers", len(s.peers.Active())).Debug("Peer disconnected") + log.WithFields(logrus.Fields{ + "direction": conn.Stat().Direction.String(), + "multiAddr": peerMultiaddrString(conn), + "activePeers": len(s.peers.Active()), + }).Debug("Peer disconnected") } }() }, diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index e02371c587f9..2239f972bac6 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -10,16 +10,27 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/network/forks" + "github.com/sirupsen/logrus" ) var _ pubsub.SubscriptionFilter = (*Service)(nil) // It is set at this limit to handle the possibility // of double topic subscriptions at fork boundaries. -// -> 64 Attestation Subnets * 2. -// -> 4 Sync Committee Subnets * 2. -// -> Block,Aggregate,ProposerSlashing,AttesterSlashing,Exits,SyncContribution * 2. -const pubsubSubscriptionRequestLimit = 200 +// -> BeaconBlock * 2 = 2 +// -> BeaconAggregateAndProof * 2 = 2 +// -> VoluntaryExit * 2 = 2 +// -> ProposerSlashing * 2 = 2 +// -> AttesterSlashing * 2 = 2 +// -> 64 Beacon Attestation * 2 = 128 +// -> SyncContributionAndProof * 2 = 2 +// -> 4 SyncCommitteeSubnets * 2 = 8 +// -> BlsToExecutionChange * 2 = 2 +// -> 128 DataColumnSidecar * 2 = 256 +// ------------------------------------- +// TOTAL = 406 +// (Note: BlobSidecar is not included in this list since it is superseded by DataColumnSidecar) +const pubsubSubscriptionRequestLimit = 500 // CanSubscribe returns true if the topic is of interest and we could subscribe to it. func (s *Service) CanSubscribe(topic string) bool { @@ -95,8 +106,15 @@ func (s *Service) CanSubscribe(topic string) bool { // FilterIncomingSubscriptions is invoked for all RPCs containing subscription notifications. // This method returns only the topics of interest and may return an error if the subscription // request contains too many topics. -func (s *Service) FilterIncomingSubscriptions(_ peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { +func (s *Service) FilterIncomingSubscriptions(peerID peer.ID, subs []*pubsubpb.RPC_SubOpts) ([]*pubsubpb.RPC_SubOpts, error) { if len(subs) > pubsubSubscriptionRequestLimit { + subsCount := len(subs) + log.WithFields(logrus.Fields{ + "peerID": peerID, + "subscriptionCounts": subsCount, + "subscriptionLimit": pubsubSubscriptionRequestLimit, + }).Debug("Too many incoming subscriptions, filtering them") + return nil, pubsub.ErrTooManySubscriptions } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index d3eaeaa2e4b0..bea75488251f 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -202,12 +202,13 @@ func (s *Service) Start() { s.startupErr = err return } - err = s.connectToBootnodes() - if err != nil { - log.WithError(err).Error("Could not add bootnode to the exclusion list") + + if err := s.connectToBootnodes(); err != nil { + log.WithError(err).Error("Could not connect to boot nodes") s.startupErr = err return } + s.dv5Listener = listener go s.listenForNewNodes() } @@ -393,12 +394,17 @@ func (s *Service) AddPingMethod(reqFunc func(ctx context.Context, id peer.ID) er s.pingMethodLock.Unlock() } -func (s *Service) pingPeers() { +func (s *Service) pingPeersAndLogEnr() { s.pingMethodLock.RLock() defer s.pingMethodLock.RUnlock() + + localENR := s.dv5Listener.Self() + log.WithField("ENR", localENR).Info("New node record") + if s.pingMethod == nil { return } + for _, pid := range s.peers.Connected() { go func(id peer.ID) { if err := s.pingMethod(s.ctx, id); err != nil { diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index cd3370cf3b89..5c270f7c0535 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -2,6 +2,7 @@ package p2p import ( "context" + "math" "strings" "sync" "time" @@ -20,9 +21,9 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/v5/crypto/hash" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" - mathutil "github.com/prysmaticlabs/prysm/v5/math" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/sirupsen/logrus" ) var attestationSubnetCount = params.BeaconConfig().AttestationSubnetCount @@ -53,6 +54,79 @@ const blobSubnetLockerVal = 110 // chosen more than sync, attestation and blob subnet (6) combined. const dataColumnSubnetVal = 150 +// nodeFilter return a function that filters nodes based on the subnet topic and subnet index. +func (s *Service) nodeFilter(topic string, index uint64) (func(node *enode.Node) bool, error) { + switch { + case strings.Contains(topic, GossipAttestationMessage): + return s.filterPeerForAttSubnet(index), nil + case strings.Contains(topic, GossipSyncCommitteeMessage): + return s.filterPeerForSyncSubnet(index), nil + case strings.Contains(topic, GossipDataColumnSidecarMessage): + return s.filterPeerForDataColumnsSubnet(index), nil + default: + return nil, errors.Errorf("no subnet exists for provided topic: %s", topic) + } +} + +// searchForPeers performs a network search for peers subscribed to a particular subnet. +// It exits as soon as one of these conditions is met: +// - It looped through `batchSize` nodes. +// - It found `peersToFindCount“ peers corresponding to the `filter` criteria. +// - Iterator is exhausted. +func searchForPeers( + iterator enode.Iterator, + batchSize int, + peersToFindCount int, + filter func(node *enode.Node) bool, +) []*enode.Node { + nodeFromNodeID := make(map[enode.ID]*enode.Node, batchSize) + for i := 0; i < batchSize && len(nodeFromNodeID) <= peersToFindCount && iterator.Next(); i++ { + node := iterator.Node() + + // Filter out nodes that do not meet the criteria. + if !filter(node) { + continue + } + + // Remove duplicates, keeping the node with higher seq. + prevNode, ok := nodeFromNodeID[node.ID()] + if ok && prevNode.Seq() > node.Seq() { + continue + } + + nodeFromNodeID[node.ID()] = node + } + + // Convert the map to a slice. + nodes := make([]*enode.Node, 0, len(nodeFromNodeID)) + for _, node := range nodeFromNodeID { + nodes = append(nodes, node) + } + + return nodes +} + +// dialPeer dials a peer in a separate goroutine. +func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.Node) { + info, _, err := convertToAddrInfo(node) + if err != nil { + return + } + + if info == nil { + return + } + + wg.Add(1) + go func() { + if err := s.connectWithPeer(ctx, *info); err != nil { + log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + } + + wg.Done() + }() +} + // FindPeersWithSubnet performs a network search for peers // subscribed to a particular subnet. Then it tries to connect // with those peers. This method will block until either: @@ -61,69 +135,96 @@ const dataColumnSubnetVal = 150 // On some edge cases, this method may hang indefinitely while peers // are actually found. In such a case, the user should cancel the context // and re-run the method again. -func (s *Service) FindPeersWithSubnet(ctx context.Context, topic string, - index uint64, threshold int) (bool, error) { +func (s *Service) FindPeersWithSubnet( + ctx context.Context, + topic string, + index uint64, + threshold int, +) (bool, error) { + const batchSize = 2000 + ctx, span := trace.StartSpan(ctx, "p2p.FindPeersWithSubnet") defer span.End() span.SetAttributes(trace.Int64Attribute("index", int64(index))) // lint:ignore uintcast -- It's safe to do this for tracing. if s.dv5Listener == nil { - // return if discovery isn't set + // Return if discovery isn't set return false, nil } topic += s.Encoding().ProtocolSuffix() iterator := s.dv5Listener.RandomNodes() defer iterator.Close() - switch { - case strings.Contains(topic, GossipAttestationMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForAttSubnet(index)) - case strings.Contains(topic, GossipSyncCommitteeMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForSyncSubnet(index)) - case strings.Contains(topic, GossipDataColumnSidecarMessage): - iterator = filterNodes(ctx, iterator, s.filterPeerForDataColumnsSubnet(index)) - default: - return false, errors.Errorf("no subnet exists for provided topic: %s", topic) + + filter, err := s.nodeFilter(topic, index) + if err != nil { + return false, errors.Wrap(err, "node filter") + } + + peersSummary := func(topic string, threshold int) (int, int) { + // Retrieve how many peers we have for this topic. + peerCountForTopic := len(s.pubsub.ListPeers(topic)) + + // Compute how many peers we are missing to reach the threshold. + missingPeerCountForTopic := max(0, threshold-peerCountForTopic) + + return peerCountForTopic, missingPeerCountForTopic } + // Compute how many peers we are missing to reach the threshold. + peerCountForTopic, missingPeerCountForTopic := peersSummary(topic, threshold) + + // Exit early if we have enough peers. + if missingPeerCountForTopic == 0 { + return true, nil + } + + log.WithFields(logrus.Fields{ + "topic": topic, + "currentPeerCount": peerCountForTopic, + "targetPeerCount": threshold, + }).Debug("Searching for new peers in the network - Start") + wg := new(sync.WaitGroup) for { - currNum := len(s.pubsub.ListPeers(topic)) - if currNum >= threshold { + // If we have enough peers, we can exit the loop. This is the happy path. + if missingPeerCountForTopic == 0 { break } + + // If the context is done, we can exit the loop. This is the unhappy path. if err := ctx.Err(); err != nil { - return false, errors.Errorf("unable to find requisite number of peers for topic %s - "+ - "only %d out of %d peers were able to be found", topic, currNum, threshold) + return false, errors.Errorf( + "unable to find requisite number of peers for topic %s - only %d out of %d peers available after searching", + topic, peerCountForTopic, threshold, + ) } - nodeCount := int(params.BeaconNetworkConfig().MinimumPeersInSubnetSearch) + + // Search for new peers in the network. + nodes := searchForPeers(iterator, batchSize, missingPeerCountForTopic, filter) + // Restrict dials if limit is applied. + maxConcurrentDials := math.MaxInt if flags.MaxDialIsActive() { - nodeCount = min(nodeCount, flags.Get().MaxConcurrentDials) + maxConcurrentDials = flags.Get().MaxConcurrentDials } - nodes := enode.ReadNodes(iterator, nodeCount) - for _, node := range nodes { - info, _, err := convertToAddrInfo(node) - if err != nil { - continue - } - if info == nil { - continue + // Dial the peers in batches. + for start := 0; start < len(nodes); start += maxConcurrentDials { + stop := min(start+maxConcurrentDials, len(nodes)) + for _, node := range nodes[start:stop] { + s.dialPeer(ctx, wg, node) } - wg.Add(1) - go func() { - if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) - } - wg.Done() - }() + // Wait for all dials to be completed. + wg.Wait() } - // Wait for all dials to be completed. - wg.Wait() + + _, missingPeerCountForTopic = peersSummary(topic, threshold) } + + log.WithField("topic", topic).Debug("Searching for new peers in the network - Success") return true, nil } @@ -183,11 +284,17 @@ func (s *Service) filterPeerForDataColumnsSubnet(index uint64) func(node *enode. // lower threshold to broadcast object compared to searching // for a subnet. So that even in the event of poor peer // connectivity, we can still broadcast an attestation. -func (s *Service) hasPeerWithSubnet(topic string) bool { +func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { // In the event peer threshold is lower, we will choose the lower // threshold. - minPeers := mathutil.Min(1, uint64(flags.Get().MinimumPeersPerSubnet)) - return len(s.pubsub.ListPeers(topic+s.Encoding().ProtocolSuffix())) >= int(minPeers) // lint:ignore uintcast -- Min peers can be safely cast to int. + minPeers := min(1, flags.Get().MinimumPeersPerSubnet) + topic := subnetTopic + s.Encoding().ProtocolSuffix() + peersWithSubnet := s.pubsub.ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + enoughPeers := peersWithSubnetCount >= minPeers + + return enoughPeers } // Updates the service's discv5 listener record's attestation subnet diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 842b490cef8e..9c4ec5521c1f 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -305,7 +305,7 @@ func (*TestP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { // AddConnectionHandler handles the connection with a newly connected peer. func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - ConnectedF: func(net network.Network, conn network.Conn) { + ConnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.Add(new(enr.Record), conn.RemotePeer(), conn.RemoteMultiaddr(), conn.Stat().Direction) @@ -329,7 +329,7 @@ func (p *TestP2P) AddConnectionHandler(f, _ func(ctx context.Context, id peer.ID // AddDisconnectionHandler -- func (p *TestP2P) AddDisconnectionHandler(f func(ctx context.Context, id peer.ID) error) { p.BHost.Network().Notify(&network.NotifyBundle{ - DisconnectedF: func(net network.Network, conn network.Conn) { + DisconnectedF: func(_ network.Network, conn network.Conn) { // Must be handled in a goroutine as this callback cannot be blocking. go func() { p.peers.SetConnectionState(conn.RemotePeer(), peers.PeerDisconnecting) diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index 27c16fc0f2bf..d83b119bbe8f 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -3,6 +3,7 @@ package sync import ( "context" "fmt" + "slices" "sort" "sync" "time" @@ -182,14 +183,17 @@ func (d *dataColumnSampler1D) refreshPeerInfo() { } } - columnWithNoPeers := make([]uint64, 0) + columnsWithoutPeers := make([]uint64, 0) for column, peers := range d.peerFromColumn { if len(peers) == 0 { - columnWithNoPeers = append(columnWithNoPeers, column) + columnsWithoutPeers = append(columnsWithoutPeers, column) } } - if len(columnWithNoPeers) > 0 { - log.WithField("columnWithNoPeers", columnWithNoPeers).Warn("Some columns have no peers responsible for custody") + + slices.Sort[[]uint64](columnsWithoutPeers) + + if len(columnsWithoutPeers) > 0 { + log.WithField("columns", columnsWithoutPeers).Warn("Some columns have no peers responsible for custody") } } diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index 872f4a25e391..a1bfa06a876c 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -142,7 +142,7 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error { // If the peer responded with an error, increment the bad responses scorer. if code != 0 { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) - return errors.New(errMsg) + return errors.Errorf("code: %d - %s", code, errMsg) } // Decode the sequence number from the peer. diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 2097b776ca5c..942d8422ceb2 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -21,6 +21,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v5/config/features" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/container/slice" @@ -191,7 +192,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s // Do not resubscribe already seen subscriptions. ok := s.subHandler.topicExists(topic) if ok { - log.Debugf("Provided topic already has an active subscription running: %s", topic) + log.WithField("topic", topic).Debug("Provided topic already has an active subscription running") return nil } @@ -208,6 +209,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s log.WithError(err).Error("Could not subscribe topic") return nil } + s.subHandler.addTopic(sub.Topic(), sub) // Pipeline decodes the incoming subscription data, runs the validation, and handles the @@ -215,6 +217,7 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s pipeline := func(msg *pubsub.Message) { ctx, cancel := context.WithTimeout(s.ctx, pubsubMessageTimeout) defer cancel() + ctx, span := trace.StartSpan(ctx, "sync.pubsub") defer span.End() @@ -389,8 +392,6 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, // Check every slot that there are enough peers for i := uint64(0); i < subnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -454,10 +455,8 @@ func (s *Service) subscribeDynamicWithSubnets( return } wantedSubs := s.retrievePersistentSubs(currentSlot) - // Resize as appropriate. s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - // subscribe desired aggregator subnets. for _, idx := range wantedSubs { s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) } @@ -471,9 +470,15 @@ func (s *Service) subscribeDynamicWithSubnets( }() } -// revalidate that our currently connected subnets are valid. -func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription, - wantedSubs []uint64, topicFormat string, digest [4]byte) { +// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are +// not in the list of wanted subnets. +// TODO: Rename this functions as it does not only revalidate subscriptions. +func (s *Service) reValidateSubscriptions( + subscriptions map[uint64]*pubsub.Subscription, + wantedSubs []uint64, + topicFormat string, + digest [4]byte, +) { for k, v := range subscriptions { var wanted bool for _, idx := range wantedSubs { @@ -482,6 +487,7 @@ func (s *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc break } } + if !wanted && v != nil { v.Cancel() fullTopic := fmt.Sprintf(topicFormat, digest, k) + s.cfg.p2p.Encoding().ProtocolSuffix() @@ -508,34 +514,6 @@ func (s *Service) subscribeAggregatorSubnet( subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) } if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) - if err != nil { - log.WithError(err).Debug("Could not search for peers") - } - } -} - -// subscribe missing subnets for our sync committee members. -func (s *Service) subscribeSyncSubnet( - subscriptions map[uint64]*pubsub.Subscription, - idx uint64, - digest [4]byte, - validate wrappedVal, - handle subHandler, -) { - // do not subscribe if we have no peers in the same - // subnet - topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] - subnetTopic := fmt.Sprintf(topic, digest, idx) - // check if subscription exists and if not subscribe the relevant subnet. - if _, exists := subscriptions[idx]; !exists { - subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle) - } - if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -589,8 +567,6 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped // Check every slot that there are enough peers for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { - log.Debugf("No peers found subscribed to sync gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", i) _, err := s.cfg.p2p.FindPeersWithSubnet( s.ctx, s.addDigestAndIndexToTopic(topic, digest, i), @@ -608,59 +584,138 @@ func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrapped }() } -// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible -// string for the topic name and the list of subnets for subscribed topics that should be -// maintained. +// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed. +// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise. +func (s *Service) subscribeToSyncSubnets( + topicFormat string, + digest [4]byte, + genesisValidatorsRoot [fieldparams.RootLength]byte, + genesisTime time.Time, + subscriptions map[uint64]*pubsub.Subscription, + currentSlot primitives.Slot, + validate wrappedVal, + handle subHandler, +) bool { + // Get sync subnets topic. + topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] + + // Do not subscribe if not synced. + if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { + return true + } + + // Do not subscribe is the digest is not valid. + valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot) + if err != nil { + log.Error(err) + return true + } + + // Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork. + if !valid { + log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.") + s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + return false + } + + // Get the current epoch. + currentEpoch := slots.ToEpoch(currentSlot) + + // Retrieve the subnets we want to subscribe to. + wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch) + + // Remove subscriptions that are no longer wanted. + s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest) + + // Subscribe to wanted subnets. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if subscription exists. + if _, exists := subscriptions[subnetIndex]; exists { + continue + } + + // We need to subscribe to the subnet. + subscription := s.subscribeWithBase(subnetTopic, validate, handle) + subscriptions[subnetIndex] = subscription + } + + // Find new peers for wanted subnets if needed. + for _, subnetIndex := range wantedSubnetsIndex { + subnetTopic := fmt.Sprintf(topic, digest, subnetIndex) + + // Check if we have enough peers in the subnet. Skip if we do. + if s.validPeersExist(subnetTopic) { + continue + } + + // Not enough peers in the subnet, we need to search for more. + _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet) + if err != nil { + log.WithError(err).Debug("Could not search for peers") + } + } + + return true +} + +// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets. func (s *Service) subscribeDynamicWithSyncSubnets( topicFormat string, validate wrappedVal, handle subHandler, digest [4]byte, ) { - genRoot := s.cfg.clock.GenesisValidatorsRoot() - _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) + // Retrieve the number of committee subnets we need to subscribe to. + syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount + + // Initialize the subscriptions map. + subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount) + + // Retrieve the genesis validators root. + genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot() + + // Retrieve the epoch of the fork corresponding to the digest. + _, e, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:]) if err != nil { panic(err) } + + // Retrieve the base protobuf message. base := p2p.GossipTopicMappings(topicFormat, e) if base == nil { panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) } - subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().SyncCommitteeSubnetCount) - genesis := s.cfg.clock.GenesisTime() - ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) + + // Retrieve the genesis time. + genesisTime := s.cfg.clock.GenesisTime() + + // Define a ticker ticking every slot. + secondsPerSlot := params.BeaconConfig().SecondsPerSlot + ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot) + + // Retrieve the current slot. + currentSlot := s.cfg.clock.CurrentSlot() + + // Subscribe to the sync subnets. + s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) go func() { for { select { - case <-s.ctx.Done(): - ticker.Done() - return case currentSlot := <-ticker.C(): - if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { - continue - } - valid, err := isDigestValid(digest, genesis, genRoot) - if err != nil { - log.Error(err) - continue - } - if !valid { - log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) - // Unsubscribes from all our current subnets. - s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest) + isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle) + + // Stop the ticker if the digest is not valid. Likely to happen after a hard fork. + if !isDigestValid { ticker.Done() return } - wantedSubs := s.retrieveActiveSyncSubnets(slots.ToEpoch(currentSlot)) - // Resize as appropriate. - s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest) - - // subscribe desired aggregator subnets. - for _, idx := range wantedSubs { - s.subscribeSyncSubnet(subscriptions, idx, digest, validate, handle) - } + case <-s.ctx.Done(): + ticker.Done() + return } } }() @@ -686,12 +741,6 @@ func (s *Service) subscribeColumnSubnet( minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet if !s.validPeersExist(subnetTopic) { - log.WithFields(logrus.Fields{ - "columnSubnet": idx, - "minimumPeersPerSubnet": minimumPeersPerSubnet, - "topic": subnetTopic, - }).Debug("No peers found subscribed to column gossip subnet. Searching network for peers subscribed to it") - _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, minimumPeersPerSubnet) if err != nil { log.WithError(err).Debug("Could not search for peers") @@ -763,8 +812,6 @@ func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})] subnetTopic := fmt.Sprintf(topic, digest, idx) if !s.validPeersExist(subnetTopic) { - log.Debugf("No peers found subscribed to attestation gossip subnet with "+ - "committee index %d. Searching network for peers subscribed to the subnet.", idx) // perform a search for peers with the desired committee index. _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet) if err != nil { @@ -790,8 +837,13 @@ func (s *Service) unSubscribeFromTopic(topic string) { // find if we have peers who are subscribed to the same subnet func (s *Service) validPeersExist(subnetTopic string) bool { - numOfPeers := s.cfg.p2p.PubSub().ListPeers(subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix()) - return len(numOfPeers) >= flags.Get().MinimumPeersPerSubnet + topic := subnetTopic + s.cfg.p2p.Encoding().ProtocolSuffix() + threshold := flags.Get().MinimumPeersPerSubnet + + peersWithSubnet := s.cfg.p2p.PubSub().ListPeers(topic) + peersWithSubnetCount := len(peersWithSubnet) + + return peersWithSubnetCount >= threshold } func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 { diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index fb2654c27f56..69c7283d8f82 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -168,7 +168,7 @@ func before(ctx *cli.Context) error { switch format { case "text": formatter := new(prefixed.TextFormatter) - formatter.TimestampFormat = "2006-01-02 15:04:05" + formatter.TimestampFormat = "2006-01-02 15:04:05.00" formatter.FullTimestamp = true // If persistent log files are written - we disable the log messages coloring because @@ -184,7 +184,9 @@ func before(ctx *cli.Context) error { logrus.SetFormatter(f) case "json": - logrus.SetFormatter(&logrus.JSONFormatter{}) + logrus.SetFormatter(&logrus.JSONFormatter{ + TimestampFormat: "2006-01-02 15:04:05.00", + }) case "journald": if err := journald.Enable(); err != nil { return err diff --git a/config/features/config.go b/config/features/config.go index de37dd5e258d..8c3033972475 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -145,6 +145,7 @@ func configureTestnet(ctx *cli.Context) error { } else { if ctx.IsSet(cmd.ChainConfigFileFlag.Name) { log.Warn("Running on custom Ethereum network specified in a chain configuration yaml file") + params.UseCustomNetworkConfig() } else { log.Info("Running on Ethereum Mainnet") } @@ -156,11 +157,11 @@ func configureTestnet(ctx *cli.Context) error { } // Insert feature flags within the function to be enabled for Sepolia testnet. -func applySepoliaFeatureFlags(ctx *cli.Context) { +func applySepoliaFeatureFlags(_ *cli.Context) { } // Insert feature flags within the function to be enabled for Holesky testnet. -func applyHoleskyFeatureFlags(ctx *cli.Context) { +func applyHoleskyFeatureFlags(_ *cli.Context) { } // ConfigureBeaconChain sets the global config based diff --git a/config/params/BUILD.bazel b/config/params/BUILD.bazel index eed7a596fe11..574b491d61a6 100644 --- a/config/params/BUILD.bazel +++ b/config/params/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "mainnet_config.go", "minimal_config.go", "network_config.go", + "testnet_custom_network_config.go", "testnet_e2e_config.go", "testnet_holesky_config.go", "testnet_sepolia_config.go", diff --git a/config/params/testnet_custom_network_config.go b/config/params/testnet_custom_network_config.go new file mode 100644 index 000000000000..7ce6780fd59a --- /dev/null +++ b/config/params/testnet_custom_network_config.go @@ -0,0 +1,9 @@ +package params + +func UseCustomNetworkConfig() { + cfg := BeaconNetworkConfig().Copy() + cfg.ContractDeploymentBlock = 0 + cfg.BootstrapNodes = []string{} + + OverrideBeaconNetworkConfig(cfg) +}