From 6b098041bec6fad5ec60b5665de8578509a8df40 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 22 Oct 2024 12:40:38 -0300 Subject: [PATCH 1/3] network/records: change subnets type to [commons.SubnetsCount]byte --- api/handlers/node.go | 5 +- network/discovery/dv5_filters.go | 8 +-- network/discovery/dv5_service.go | 13 ++-- network/discovery/dv5_service_test.go | 18 +++--- network/discovery/node_record.go | 3 +- network/discovery/options.go | 5 +- network/discovery/service_test.go | 3 +- network/discovery/util_test.go | 15 ++--- network/p2p/p2p.go | 20 +++---- network/p2p/p2p_pubsub.go | 6 +- network/p2p/p2p_setup.go | 12 ++-- network/peers/conn_manager.go | 13 ++-- network/peers/conn_manager_test.go | 6 +- network/peers/connections/conn_handler.go | 15 ++--- network/peers/index.go | 2 +- network/peers/subnets.go | 17 +++--- network/peers/subnets_test.go | 31 +++++----- network/records/entries.go | 12 ++-- network/records/subnets.go | 73 +++++++++++------------ network/records/subnets_test.go | 6 +- 20 files changed, 140 insertions(+), 143 deletions(-) diff --git a/api/handlers/node.go b/api/handlers/node.go index fc8dc0062a..8d2bcdeb55 100644 --- a/api/handlers/node.go +++ b/api/handlers/node.go @@ -172,7 +172,10 @@ func (h *Node) peers(peers []peer.ID) []peerJSON { resp[i] = peerJSON{ ID: id, Connectedness: h.Network.Connectedness(id).String(), - Subnets: h.PeersIndex.GetPeerSubnets(id).String(), + } + subnets, ok := h.PeersIndex.GetPeerSubnets(id) + if ok { + resp[i].Subnets = subnets.String() } for _, addr := range h.Network.Peerstore().Addrs(id) { diff --git a/network/discovery/dv5_filters.go b/network/discovery/dv5_filters.go index e32bddbbf7..2651294c0e 100644 --- a/network/discovery/dv5_filters.go +++ b/network/discovery/dv5_filters.go @@ -4,8 +4,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" - "github.com/ssvlabs/ssv/network/records" "go.uber.org/zap" + + "github.com/ssvlabs/ssv/network/records" ) // limitNodeFilter returns true if the limit is exceeded @@ -71,14 +72,11 @@ func (dvs *DiscV5Service) sharedSubnetsFilter(n int) func(node *enode.Node) bool if n == 0 { return true } - if len(dvs.subnets) == 0 { - return true - } nodeSubnets, err := records.GetSubnetsEntry(node.Record()) if err != nil { return false } - shared := records.SharedSubnets(dvs.subnets, nodeSubnets, n) + shared := dvs.subnets.SharedSubnets(nodeSubnets, n) // logger.Debug("shared subnets", zap.Ints("shared", shared), // zap.String("node", node.String())) diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index 8733662c82..95e3714e53 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -1,7 +1,6 @@ package discovery import ( - "bytes" "context" "fmt" "net" @@ -62,7 +61,7 @@ type DiscV5Service struct { sharedConn *SharedUDPConn networkConfig networkconfig.NetworkConfig - subnets []byte + subnets records.Subnets publishLock chan struct{} } @@ -177,7 +176,7 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error { if err != nil { return fmt.Errorf("could not read subnets: %w", err) } - if bytes.Equal(zeroSubnets, nodeSubnets) { + if zeroSubnets == nodeSubnets { return errors.New("zero subnets") } @@ -321,11 +320,11 @@ func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...uint64) if len(subnets) == 0 { return false, nil } - updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil) + updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), subnets, nil) if err != nil { return false, errors.Wrap(err, "could not update ENR") } - if updatedSubnets != nil { + if isUpdated { dvs.subnets = updatedSubnets logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) return true, nil @@ -340,11 +339,11 @@ func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...uint6 if len(subnets) == 0 { return false, nil } - updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets) + updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), nil, subnets) if err != nil { return false, errors.Wrap(err, "could not update ENR") } - if updatedSubnets != nil { + if isUpdated { dvs.subnets = updatedSubnets logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) return true, nil diff --git a/network/discovery/dv5_service_test.go b/network/discovery/dv5_service_test.go index e02a8991d6..ac295ba92d 100644 --- a/network/discovery/dv5_service_test.go +++ b/network/discovery/dv5_service_test.go @@ -83,10 +83,11 @@ func TestCheckPeer(t *testing.T) { expectedError: nil, }, { - name: "missing subnets", - domainType: &myDomainType, - subnets: nil, - expectedError: errors.New("could not read subnets"), + name: "missing subnets", + domainType: &myDomainType, + subnets: records.Subnets{}, + missingSubnets: true, + expectedError: errors.New("could not read subnets"), }, { name: "inactive subnets", @@ -138,7 +139,7 @@ func TestCheckPeer(t *testing.T) { err := records.SetDomainTypeEntry(localNode, records.KeyNextDomainType, *test.nextDomainType) require.NoError(t, err) } - if test.subnets != nil { + if !test.missingSubnets { err := records.SetSubnetsEntry(localNode, test.subnets) require.NoError(t, err) } @@ -176,13 +177,14 @@ type checkPeerTest struct { name string domainType *spectypes.DomainType nextDomainType *spectypes.DomainType - subnets []byte + subnets records.Subnets + missingSubnets bool localNode *enode.LocalNode expectedError error } -func mockSubnets(active ...int) []byte { - subnets := make([]byte, commons.Subnets()) +func mockSubnets(active ...int) records.Subnets { + subnets := records.Subnets{} for _, subnet := range active { subnets[subnet] = 1 } diff --git a/network/discovery/node_record.go b/network/discovery/node_record.go index d20d46c4eb..371db27924 100644 --- a/network/discovery/node_record.go +++ b/network/discovery/node_record.go @@ -5,6 +5,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/network/records" ) @@ -16,7 +17,7 @@ func DecorateWithDomainType(key records.ENRKey, domainType spectypes.DomainType) } } -func DecorateWithSubnets(subnets []byte) NodeRecordDecoration { +func DecorateWithSubnets(subnets records.Subnets) NodeRecordDecoration { return func(node *enode.LocalNode) error { return records.SetSubnetsEntry(node, subnets) } diff --git a/network/discovery/options.go b/network/discovery/options.go index 15e1bcb7e8..87b9eb969b 100644 --- a/network/discovery/options.go +++ b/network/discovery/options.go @@ -6,6 +6,7 @@ import ( "github.com/ssvlabs/ssv/logging" compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger" + "github.com/ssvlabs/ssv/network/records" "github.com/ssvlabs/ssv/network/commons" @@ -34,8 +35,8 @@ type DiscV5Options struct { NetworkKey *ecdsa.PrivateKey // Bootnodes is a list of bootstrapper nodes Bootnodes []string - // Subnets is a bool slice represents all the subnets the node is intreseted in - Subnets []byte + // Subnets is a bool slice encoded in hex, it represents all the subnets the node is interested in + Subnets records.Subnets // EnableLogging when true enables logs to be emitted EnableLogging bool } diff --git a/network/discovery/service_test.go b/network/discovery/service_test.go index 54c503ac32..c4f322de3f 100644 --- a/network/discovery/service_test.go +++ b/network/discovery/service_test.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/ssvlabs/ssv/network/records" "github.com/ssvlabs/ssv/networkconfig" ) @@ -281,7 +282,7 @@ func TestDiscV5Service_checkPeer(t *testing.T) { dvs.conns.(*MockConnection).SetAtLimit(false) // Valid peer but no common subnet - subnets := make([]byte, len(records.ZeroSubnets)) + subnets := records.Subnets{} subnets[10] = 1 err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomSubnets(t, subnets))) require.ErrorContains(t, err, "no shared subnets") diff --git a/network/discovery/util_test.go b/network/discovery/util_test.go index 183d222f5a..45adbcb0bf 100644 --- a/network/discovery/util_test.go +++ b/network/discovery/util_test.go @@ -19,11 +19,13 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" spectypes "github.com/ssvlabs/ssv-spec/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/peers" "github.com/ssvlabs/ssv/network/records" "github.com/ssvlabs/ssv/networkconfig" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) var ( @@ -58,8 +60,7 @@ func testingDiscoveryOptions(t *testing.T, networkConfig networkconfig.NetworkCo } // Discovery options - allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - subnetsIndex := peers.NewSubnetsIndex(len(allSubs)) + subnetsIndex := peers.NewSubnetsIndex(commons.Subnets()) connectionIndex := NewMockConnection() return &Options{ @@ -160,7 +161,7 @@ func NodeWithoutNextDomain(t *testing.T) *enode.Node { } func NodeWithoutSubnets(t *testing.T) *enode.Node { - return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, nil) + return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, records.Subnets{}) } func NodeWithCustomDomains(t *testing.T, domainType spectypes.DomainType, nextDomainType spectypes.DomainType) *enode.Node { @@ -171,14 +172,14 @@ func NodeWithZeroSubnets(t *testing.T) *enode.Node { return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, zeroSubnets) } -func NodeWithCustomSubnets(t *testing.T, subnets []byte) *enode.Node { +func NodeWithCustomSubnets(t *testing.T, subnets records.Subnets) *enode.Node { return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, subnets) } func CustomNode(t *testing.T, setDomainType bool, domainType spectypes.DomainType, setNextDomainType bool, nextDomainType spectypes.DomainType, - setSubnets bool, subnets []byte) *enode.Node { + setSubnets bool, subnets records.Subnets) *enode.Node { // Generate key nodeKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 674ab65c94..20e1032b29 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -95,8 +95,8 @@ type p2pNetwork struct { backoffConnector *libp2pdiscbackoff.BackoffConnector - fixedSubnets []byte - activeSubnets []byte + fixedSubnets records.Subnets + activeSubnets records.Subnets libConnManager connmgrcore.ConnManager @@ -298,16 +298,14 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { ctx, cancel := context.WithTimeout(n.ctx, connManagerBalancingTimeout) defer cancel() - mySubnets := records.Subnets(n.activeSubnets).Clone() - // Disconnect from irrelevant peers - disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, mySubnets) + disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, n.activeSubnets) if disconnectedPeers > 0 { return } // Trim peers according to subnet participation (considering the subnet size) - connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers) + connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, n.activeSubnets, allPeers, n.cfg.TopicMaxPeers) connMgr.TrimPeers(ctx, logger, n.host.Network()) } } @@ -350,16 +348,14 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { // there is a pending PR to replace this: https://github.com/ssvlabs/ssv/pull/990 logger = logger.Named(logging.NameP2PNetwork) ticker := time.NewTicker(time.Second) - registeredSubnets := make([]byte, commons.Subnets()) + registeredSubnets := records.Subnets{} defer ticker.Stop() // Run immediately and then every second. for ; true; <-ticker.C { start := time.Now() - // Compute the new subnets according to the active committees/validators. - updatedSubnets := make([]byte, commons.Subnets()) - copy(updatedSubnets, n.fixedSubnets) + updatedSubnets := n.fixedSubnets n.activeCommittees.Range(func(cid string, status validatorStatus) bool { subnet := commons.CommitteeSubnet(spectypes.CommitteeID([]byte(cid))) @@ -399,7 +395,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } n.idx.UpdateSelfRecord(func(self *records.NodeInfo) *records.NodeInfo { - self.Metadata.Subnets = records.Subnets(n.activeSubnets).String() + self.Metadata.Subnets = n.activeSubnets.String() return self }) @@ -437,7 +433,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0) + subnetsList := allSubs.SharedSubnets(n.activeSubnets, 0) logger.Debug("updated subnets", zap.Any("added", addedSubnets), zap.Any("removed", removedSubnets), diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index 68baf5126c..b0a703a9f2 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -107,13 +107,9 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error } } - // Update the subnets slice. - subnets := make([]byte, commons.Subnets()) - copy(subnets, n.fixedSubnets) for _, subnet := range randomSubnets { - subnets[subnet] = byte(1) + n.fixedSubnets[subnet] = byte(1) } - n.fixedSubnets = subnets return nil } diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index 6225b0f0e3..22407dd2b0 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -88,14 +88,12 @@ func (n *p2pNetwork) initCfg() error { n.cfg.UserAgent = userAgent(n.cfg.UserAgent) } if len(n.cfg.Subnets) > 0 { - s := make(records.Subnets, 0) + s := records.Subnets{} subnets, err := s.FromString(strings.Replace(n.cfg.Subnets, "0x", "", 1)) if err != nil { return fmt.Errorf("parse subnet: %w", err) } n.fixedSubnets = subnets - } else { - n.fixedSubnets = make(records.Subnets, p2pcommons.Subnets()) } if n.cfg.MaxPeers <= 0 { n.cfg.MaxPeers = minPeersBuffer @@ -181,7 +179,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { self := records.NewNodeInfo(domain) self.Metadata = &records.NodeMetadata{ NodeVersion: commons.GetNodeVersion(), - Subnets: records.Subnets(n.fixedSubnets).String(), + Subnets: n.fixedSubnets.String(), } getPrivKey := func() crypto.PrivKey { return libPrivKey @@ -253,10 +251,8 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { Bootnodes: n.cfg.TransformBootnodes(), EnableLogging: n.cfg.DiscoveryTrace, } - if len(n.fixedSubnets) > 0 { - discV5Opts.Subnets = n.fixedSubnets - logger = logger.With(zap.String("subnets", records.Subnets(n.fixedSubnets).String())) - } + discV5Opts.Subnets = n.fixedSubnets + logger = logger.With(zap.String("subnets", n.fixedSubnets.String())) logger.Info("discovery: using discv5", zap.Strings("bootnodes", discV5Opts.Bootnodes), zap.String("ip", discV5Opts.IP)) diff --git a/network/peers/conn_manager.go b/network/peers/conn_manager.go index 38b1886aa3..b8594b8116 100644 --- a/network/peers/conn_manager.go +++ b/network/peers/conn_manager.go @@ -116,9 +116,9 @@ func (c connManager) getBestPeers(n int, mySubnets records.Subnets, allPeers []p // Compute the score for each peer according to peer's subnets and subnets' score var peerLogs []peerLog for _, pid := range allPeers { - peerSubnets := c.subnetsIdx.GetPeerSubnets(pid) + peerSubnets, ok := c.subnetsIdx.GetPeerSubnets(pid) var score PeerScore - if len(peerSubnets) == 0 { + if !ok { // TODO: shouldn't we not connect to peers with no subnets? c.logger.Debug("peer has no subnets", zap.String("peer", pid.String())) score = -1000 @@ -129,7 +129,7 @@ func (c connManager) getBestPeers(n int, mySubnets records.Subnets, allPeers []p peerLogs = append(peerLogs, peerLog{ Peer: pid, Score: score, - SharedSubnets: len(records.SharedSubnets(peerSubnets, mySubnets, len(mySubnets))), + SharedSubnets: len(peerSubnets.SharedSubnets(mySubnets, len(mySubnets))), }) } @@ -231,8 +231,11 @@ func (c connManager) DisconnectFromBadPeers(logger *zap.Logger, net libp2pnetwor func (c connManager) DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnectQuota int, net libp2pnetwork.Network, allPeers []peer.ID, mySubnets records.Subnets) int { disconnectedPeers := 0 for _, peerID := range allPeers { - peerSubnets := c.subnetsIdx.GetPeerSubnets(peerID) - sharedSubnets := records.SharedSubnets(mySubnets, peerSubnets, len(mySubnets)) + var sharedSubnets []int + peerSubnets, ok := c.subnetsIdx.GetPeerSubnets(peerID) + if ok { + sharedSubnets = mySubnets.SharedSubnets(peerSubnets, len(mySubnets)) + } // If there's no common subnet, disconnect from peer. if len(sharedSubnets) == 0 { diff --git a/network/peers/conn_manager_test.go b/network/peers/conn_manager_test.go index 17cb8ac53a..6148aae75e 100644 --- a/network/peers/conn_manager_test.go +++ b/network/peers/conn_manager_test.go @@ -12,6 +12,7 @@ import ( "go.uber.org/zap" "github.com/ssvlabs/ssv/logging" + "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/records" ) @@ -19,8 +20,7 @@ func TestTagBestPeers(t *testing.T) { logger := logging.TestLogger(t) connMgrMock := newConnMgr() - allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - si := NewSubnetsIndex(len(allSubs)) + si := NewSubnetsIndex(commons.Subnets()) cm := NewConnManager(zap.NewNop(), connMgrMock, si, nil).(*connManager) @@ -28,7 +28,7 @@ func TestTagBestPeers(t *testing.T) { require.NoError(t, err) for _, pid := range pids { - r := rand.Intn(len(allSubs) / 3) + r := rand.Intn(commons.Subnets() / 3) si.UpdatePeerSubnets(pid, createRandomSubnets(r)) } mySubnets := createRandomSubnets(40) diff --git a/network/peers/connections/conn_handler.go b/network/peers/connections/conn_handler.go index af45f62ce4..6f4b51b289 100644 --- a/network/peers/connections/conn_handler.go +++ b/network/peers/connections/conn_handler.go @@ -216,19 +216,20 @@ func (ch *connHandler) Handle(logger *zap.Logger) *libp2pnetwork.NotifyBundle { func (ch *connHandler) sharesEnoughSubnets(logger *zap.Logger, conn libp2pnetwork.Conn) bool { pid := conn.RemotePeer() - subnets := ch.subnetsIndex.GetPeerSubnets(pid) - if len(subnets) == 0 { - // no subnets for this peer - return false + subnets, ok := ch.subnetsIndex.GetPeerSubnets(pid) + if ok { + logger = logger.With(fields.Subnets(subnets)) + } else { + logger = logger.With(zap.String(fields.FieldSubnets, "-")) } - mySubnets := ch.subnetsProvider() - logger = logger.With(fields.Subnets(subnets), zap.String("my_subnets", mySubnets.String())) + mySubnets := ch.subnetsProvider() + logger = logger.With(zap.String("my_subnets", mySubnets.String())) if mySubnets.String() == records.ZeroSubnets { // this node has no subnets return true } - shared := records.SharedSubnets(mySubnets, subnets, 1) + shared := mySubnets.SharedSubnets(subnets, 1) logger.Debug("checking subnets", zap.Ints("shared", shared)) return len(shared) == 1 diff --git a/network/peers/index.go b/network/peers/index.go index a102aca9fb..1b326cdba2 100644 --- a/network/peers/index.go +++ b/network/peers/index.go @@ -106,7 +106,7 @@ type SubnetsIndex interface { GetSubnetPeers(s int) []peer.ID // GetPeerSubnets returns subnets of the given peer - GetPeerSubnets(id peer.ID) records.Subnets + GetPeerSubnets(id peer.ID) (records.Subnets, bool) // GetSubnetsStats collects and returns subnets stats GetSubnetsStats() *SubnetsStats diff --git a/network/peers/subnets.go b/network/peers/subnets.go index 5b347d80aa..1850aa3f92 100644 --- a/network/peers/subnets.go +++ b/network/peers/subnets.go @@ -29,10 +29,12 @@ func (si *subnetsIndex) UpdatePeerSubnets(id peer.ID, s records.Subnets) bool { defer si.lock.Unlock() existing, ok := si.peerSubnets[id] + var diff map[int]byte if !ok { - existing = make([]byte, 0) + diff = s.ToMap() + } else { + diff = existing.DiffSubnets(s) } - diff := records.DiffSubnets(existing, s) if len(diff) == 0 { return false } @@ -104,17 +106,16 @@ func (si *subnetsIndex) GetSubnetsStats() *SubnetsStats { return stats } -func (si *subnetsIndex) GetPeerSubnets(id peer.ID) records.Subnets { +func (si *subnetsIndex) GetPeerSubnets(id peer.ID) (records.Subnets, bool) { si.lock.RLock() defer si.lock.RUnlock() subnets, ok := si.peerSubnets[id] if !ok { - return nil + return records.Subnets{}, false } - cp := make(records.Subnets, len(subnets)) - copy(cp, subnets) - return cp + + return subnets, false } // GetSubnetsDistributionScores returns current subnets scores based on peers distribution. @@ -124,7 +125,7 @@ func GetSubnetsDistributionScores(stats *SubnetsStats, minPerSubnet int, mySubne const activeSubnetBoost = 0.2 allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - activeSubnets := records.SharedSubnets(allSubs, mySubnets, 0) + activeSubnets := allSubs.SharedSubnets(mySubnets, 0) scores := make([]float64, len(allSubs)) for _, s := range activeSubnets { diff --git a/network/peers/subnets_test.go b/network/peers/subnets_test.go index b187d92522..caac7cbf7b 100644 --- a/network/peers/subnets_test.go +++ b/network/peers/subnets_test.go @@ -7,10 +7,11 @@ import ( "testing" "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "github.com/ssvlabs/ssv/network/commons" "github.com/ssvlabs/ssv/network/records" nettesting "github.com/ssvlabs/ssv/network/testing" - "github.com/stretchr/testify/require" ) func TestSubnetsIndex(t *testing.T) { @@ -35,15 +36,15 @@ func TestSubnetsIndex(t *testing.T) { subnetsIdx := NewSubnetsIndex(128) - subnetsIdx.UpdatePeerSubnets(pids[0], sAll.Clone()) - subnetsIdx.UpdatePeerSubnets(pids[1], sNone.Clone()) - subnetsIdx.UpdatePeerSubnets(pids[2], sPartial.Clone()) - subnetsIdx.UpdatePeerSubnets(pids[3], sPartial.Clone()) + subnetsIdx.UpdatePeerSubnets(pids[0], sAll) + subnetsIdx.UpdatePeerSubnets(pids[1], sNone) + subnetsIdx.UpdatePeerSubnets(pids[2], sPartial) + subnetsIdx.UpdatePeerSubnets(pids[3], sPartial) require.Len(t, subnetsIdx.GetSubnetPeers(0), 3) require.Len(t, subnetsIdx.GetSubnetPeers(10), 1) - subnetsIdx.UpdatePeerSubnets(pids[0], sPartial.Clone()) + subnetsIdx.UpdatePeerSubnets(pids[0], sPartial) require.Len(t, subnetsIdx.GetSubnetPeers(0), 3) require.Len(t, subnetsIdx.GetSubnetPeers(10), 0) @@ -51,23 +52,21 @@ func TestSubnetsIndex(t *testing.T) { stats := subnetsIdx.GetSubnetsStats() require.Equal(t, 3, stats.PeersCount[0]) - subnetsIdx.UpdatePeerSubnets(pids[0], sNone.Clone()) - subnetsIdx.UpdatePeerSubnets(pids[2], sNone.Clone()) - subnetsIdx.UpdatePeerSubnets(pids[3], sNone.Clone()) + subnetsIdx.UpdatePeerSubnets(pids[0], sNone) + subnetsIdx.UpdatePeerSubnets(pids[2], sNone) + subnetsIdx.UpdatePeerSubnets(pids[3], sNone) require.Len(t, subnetsIdx.GetSubnetPeers(0), 0) require.Len(t, subnetsIdx.GetSubnetPeers(10), 0) } func TestSubnetsDistributionScores(t *testing.T) { - nsubnets := 128 - mysubnets := make(records.Subnets, nsubnets) - allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - for sub := range allSubs { - if sub%2 == 0 { - mysubnets[sub] = byte(0) + mysubnets := records.Subnets{} + for i := 0; i < commons.Subnets(); i++ { + if i%2 == 0 { + mysubnets[i] = byte(0) } else { - mysubnets[sub] = byte(1) + mysubnets[i] = byte(1) } } stats := &SubnetsStats{ diff --git a/network/records/entries.go b/network/records/entries.go index 846e7d938d..882abcd9eb 100644 --- a/network/records/entries.go +++ b/network/records/entries.go @@ -69,7 +69,7 @@ func GetDomainTypeEntry(record *enr.Record, key ENRKey) (spectypes.DomainType, e } // SetSubnetsEntry adds subnets entry to our enode.LocalNode -func SetSubnetsEntry(node *enode.LocalNode, subnets []byte) error { +func SetSubnetsEntry(node *enode.LocalNode, subnets Subnets) error { subnetsVec := bitfield.NewBitvector128() for i, subnet := range subnets { // #nosec G115 -- subnets has a constant len of 128 subnetsVec.SetBitAt(uint64(i), subnet > 0) @@ -79,21 +79,21 @@ func SetSubnetsEntry(node *enode.LocalNode, subnets []byte) error { } // GetSubnetsEntry extracts the value of subnets entry from some record -func GetSubnetsEntry(record *enr.Record) ([]byte, error) { +func GetSubnetsEntry(record *enr.Record) (Subnets, error) { subnetsVec := bitfield.NewBitvector128() if err := record.Load(enr.WithEntry("subnets", &subnetsVec)); err != nil { if enr.IsNotFound(err) { - return nil, ErrEntryNotFound + return Subnets{}, ErrEntryNotFound } - return nil, err + return Subnets{}, err } - res := make([]byte, 0, subnetsVec.Len()) + res := Subnets{} for i := uint64(0); i < subnetsVec.Len(); i++ { val := byte(0) if subnetsVec.BitAt(i) { val = 1 } - res = append(res, val) + res[i] = val } return res, nil } diff --git a/network/records/subnets.go b/network/records/subnets.go index de3f2e3faa..9ed85b05ca 100644 --- a/network/records/subnets.go +++ b/network/records/subnets.go @@ -1,7 +1,6 @@ package records import ( - "bytes" "encoding/hex" "fmt" "strconv" @@ -10,6 +9,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" + + "github.com/ssvlabs/ssv/network/commons" ) const ( @@ -21,41 +22,30 @@ const ( // UpdateSubnets updates subnets entry according to the given changes. // count is the amount of subnets, in case that the entry doesn't exist as we want to initialize it -func UpdateSubnets(node *enode.LocalNode, count int, added []uint64, removed []uint64) ([]byte, error) { +func UpdateSubnets(node *enode.LocalNode, added []uint64, removed []uint64) (Subnets, bool, error) { subnets, err := GetSubnetsEntry(node.Node().Record()) if err != nil && !errors.Is(err, ErrEntryNotFound) { - return nil, errors.Wrap(err, "could not read subnets entry") - } - orig := make([]byte, len(subnets)) - if len(subnets) == 0 { // not exist, creating slice - subnets = make([]byte, count) - } else { - copy(orig, subnets) + return Subnets{}, false, errors.Wrap(err, "could not read subnets entry") } + + orig := subnets for _, i := range added { subnets[i] = 1 } for _, i := range removed { subnets[i] = 0 } - if bytes.Equal(orig, subnets) { - return nil, nil + if orig == subnets { + return Subnets{}, false, nil } if err := SetSubnetsEntry(node, subnets); err != nil { - return nil, errors.Wrap(err, "could not update subnets entry") + return Subnets{}, false, errors.Wrap(err, "could not update subnets entry") } - return subnets, nil + return subnets, true, nil } // Subnets holds all the subscribed subnets of a specific node -type Subnets []byte - -// Clone clones the independent byte slice -func (s Subnets) Clone() Subnets { - cp := make([]byte, len(s)) - copy(cp, s) - return cp -} +type Subnets [commons.SubnetsCount]byte func (s Subnets) String() string { subnetsVec := bitfield.NewBitvector128() @@ -74,16 +64,21 @@ func (s Subnets) FromString(subnetsStr string) (Subnets, error) { for i := 0; i+1 < len(subnetsStr); i += 2 { maskData1, err := getCharMask(string(subnetsStr[i])) if err != nil { - return nil, err + return Subnets{}, err } maskData2, err := getCharMask(string(subnetsStr[i+1])) if err != nil { - return nil, err + return Subnets{}, err } data = append(data, maskData2...) data = append(data, maskData1...) } - return data, nil + + if len(data) != commons.Subnets() { + return Subnets{}, fmt.Errorf("invalid subnets length: %d", len(data)) + } + + return Subnets(data), nil } func (s Subnets) Active() int { @@ -96,20 +91,26 @@ func (s Subnets) Active() int { return active } +// ToMap returns a map with all subnets and their values +func (s Subnets) ToMap() map[int]byte { + m := make(map[int]byte) + for subnet, v := range s { + m[subnet] = v + } + return m +} + // SharedSubnets returns the shared subnets -func SharedSubnets(a, b []byte, maxLen int) []int { +func (s Subnets) SharedSubnets(other Subnets, maxLen int) []int { var shared []int if maxLen == 0 { - maxLen = len(a) - } - if len(a) == 0 || len(b) == 0 { - return shared + maxLen = len(s) } - for subnet, aval := range a { - if aval == 0 { + for subnet, v := range s { + if v == 0 { continue } - if b[subnet] == 0 { + if other[subnet] == 0 { continue } shared = append(shared, subnet) @@ -122,12 +123,10 @@ func SharedSubnets(a, b []byte, maxLen int) []int { // DiffSubnets returns a diff of the two given subnets. // returns a map with all the different entries and their post change value -func DiffSubnets(a, b []byte) map[int]byte { +func (s Subnets) DiffSubnets(other Subnets) map[int]byte { diff := make(map[int]byte) - for subnet, bval := range b { - if subnet >= len(a) { - diff[subnet] = bval - } else if aval := a[subnet]; aval != bval { + for subnet, bval := range other { + if aval := s[subnet]; aval != bval { diff[subnet] = bval } } diff --git a/network/records/subnets_test.go b/network/records/subnets_test.go index 25a8de143d..cab9347d87 100644 --- a/network/records/subnets_test.go +++ b/network/records/subnets_test.go @@ -22,7 +22,7 @@ func Test_SubnetsEntry(t *testing.T) { node, err := CreateLocalNode(sk, "", ip, commons.DefaultUDP, commons.DefaultTCP) require.NoError(t, err) - subnets := make([]byte, SubnetsCount) + subnets := Subnets{} for i := 0; i < SubnetsCount; i++ { if i%4 == 0 { subnets[i] = 1 @@ -99,7 +99,7 @@ func TestSharedSubnets(t *testing.T) { expectedShared = append(expectedShared, subnet) } } - shared := SharedSubnets(s1, s2, 0) + shared := s1.SharedSubnets(s2, 0) require.Equal(t, expectedShared, shared) } @@ -109,6 +109,6 @@ func TestDiffSubnets(t *testing.T) { s2, err := Subnets{}.FromString("0x57b080fffd743d9878dc41a184ab160a") require.NoError(t, err) - diff := DiffSubnets(s1, s2) + diff := s1.DiffSubnets(s2) require.Len(t, diff, 128-62) } From abb2424d33ba410c723e0c0c4d6b56534df72eff Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 22 Oct 2024 12:47:55 -0300 Subject: [PATCH 2/3] fix comment --- network/discovery/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/discovery/options.go b/network/discovery/options.go index 87b9eb969b..3b14c75ce4 100644 --- a/network/discovery/options.go +++ b/network/discovery/options.go @@ -35,7 +35,7 @@ type DiscV5Options struct { NetworkKey *ecdsa.PrivateKey // Bootnodes is a list of bootstrapper nodes Bootnodes []string - // Subnets is a bool slice encoded in hex, it represents all the subnets the node is interested in + // Subnets is a bool slice represents all the subnets the node is interested in Subnets records.Subnets // EnableLogging when true enables logs to be emitted EnableLogging bool From a89c4b82bae5b713a89345fd38dd4b2fdbeb3823 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 22 Oct 2024 16:11:51 -0300 Subject: [PATCH 3/3] code review requests --- network/discovery/dv5_filters.go | 2 +- network/p2p/p2p.go | 18 +++++----- network/p2p/p2p_pubsub.go | 10 +++--- network/p2p/p2p_setup.go | 10 +++--- network/peers/conn_manager.go | 4 +-- network/peers/connections/conn_handler.go | 2 +- network/peers/index.go | 4 +-- network/peers/subnets.go | 4 +-- network/peers/subnets_test.go | 41 +++++++++++++++-------- network/records/subnets.go | 20 +++++++---- network/records/subnets_test.go | 2 +- 11 files changed, 69 insertions(+), 48 deletions(-) diff --git a/network/discovery/dv5_filters.go b/network/discovery/dv5_filters.go index 2651294c0e..c2c9b406f0 100644 --- a/network/discovery/dv5_filters.go +++ b/network/discovery/dv5_filters.go @@ -76,7 +76,7 @@ func (dvs *DiscV5Service) sharedSubnetsFilter(n int) func(node *enode.Node) bool if err != nil { return false } - shared := dvs.subnets.SharedSubnets(nodeSubnets, n) + shared := dvs.subnets.SharedSubnetsN(nodeSubnets, n) // logger.Debug("shared subnets", zap.Ints("shared", shared), // zap.String("node", node.String())) diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 20e1032b29..fdaf0cbeee 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -95,8 +95,10 @@ type p2pNetwork struct { backoffConnector *libp2pdiscbackoff.BackoffConnector - fixedSubnets records.Subnets - activeSubnets records.Subnets + // initialSubnets holds subnets on node startup + initialSubnets records.Subnets + // currentSubnets holds current subnets which depend on current active validators and committees + currentSubnets records.Subnets libConnManager connmgrcore.ConnManager @@ -299,13 +301,13 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() { defer cancel() // Disconnect from irrelevant peers - disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, n.activeSubnets) + disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, n.currentSubnets) if disconnectedPeers > 0 { return } // Trim peers according to subnet participation (considering the subnet size) - connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, n.activeSubnets, allPeers, n.cfg.TopicMaxPeers) + connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, n.currentSubnets, allPeers, n.cfg.TopicMaxPeers) connMgr.TrimPeers(ctx, logger, n.host.Network()) } } @@ -355,7 +357,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { for ; true; <-ticker.C { start := time.Now() - updatedSubnets := n.fixedSubnets + updatedSubnets := n.initialSubnets n.activeCommittees.Range(func(cid string, status validatorStatus) bool { subnet := commons.CommitteeSubnet(spectypes.CommitteeID([]byte(cid))) @@ -370,7 +372,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { return true }) } - n.activeSubnets = updatedSubnets + n.currentSubnets = updatedSubnets // Compute the not yet registered subnets. addedSubnets := make([]uint64, 0) @@ -395,7 +397,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } n.idx.UpdateSelfRecord(func(self *records.NodeInfo) *records.NodeInfo { - self.Metadata.Subnets = n.activeSubnets.String() + self.Metadata.Subnets = n.currentSubnets.String() return self }) @@ -433,7 +435,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - subnetsList := allSubs.SharedSubnets(n.activeSubnets, 0) + subnetsList := allSubs.SharedSubnets(n.currentSubnets) logger.Debug("updated subnets", zap.Any("added", addedSubnets), zap.Any("removed", removedSubnets), diff --git a/network/p2p/p2p_pubsub.go b/network/p2p/p2p_pubsub.go index b0a703a9f2..f3b59e4817 100644 --- a/network/p2p/p2p_pubsub.go +++ b/network/p2p/p2p_pubsub.go @@ -76,7 +76,7 @@ func (n *p2pNetwork) SubscribeAll(logger *zap.Logger) error { if !n.isReady() { return p2pprotocol.ErrNetworkIsNotReady } - n.fixedSubnets, _ = records.Subnets{}.FromString(records.AllSubnets) + n.initialSubnets, _ = records.Subnets{}.FromString(records.AllSubnets) for subnet := uint64(0); subnet < commons.SubnetsCount; subnet++ { err := n.topicsCtrl.Subscribe(logger, commons.SubnetTopicID(subnet)) if err != nil { @@ -108,7 +108,7 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error } for _, subnet := range randomSubnets { - n.fixedSubnets[subnet] = byte(1) + n.initialSubnets[subnet] = byte(1) } return nil @@ -245,11 +245,11 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C // subscribeToSubnets subscribes to all the node's subnets func (n *p2pNetwork) subscribeToSubnets(logger *zap.Logger) error { - if len(n.fixedSubnets) == 0 { + if len(n.initialSubnets) == 0 { return nil } - logger.Debug("subscribing to fixed subnets", fields.Subnets(n.fixedSubnets)) - for i, val := range n.fixedSubnets { + logger.Debug("subscribing to fixed subnets", fields.Subnets(n.initialSubnets)) + for i, val := range n.initialSubnets { if val > 0 { subnet := fmt.Sprintf("%d", i) if err := n.topicsCtrl.Subscribe(logger, subnet); err != nil { diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index 22407dd2b0..9637f04f25 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -93,7 +93,7 @@ func (n *p2pNetwork) initCfg() error { if err != nil { return fmt.Errorf("parse subnet: %w", err) } - n.fixedSubnets = subnets + n.initialSubnets = subnets } if n.cfg.MaxPeers <= 0 { n.cfg.MaxPeers = minPeersBuffer @@ -179,7 +179,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { self := records.NewNodeInfo(domain) self.Metadata = &records.NodeMetadata{ NodeVersion: commons.GetNodeVersion(), - Subnets: n.fixedSubnets.String(), + Subnets: n.initialSubnets.String(), } getPrivKey := func() crypto.PrivKey { return libPrivKey @@ -200,7 +200,7 @@ func (n *p2pNetwork) setupPeerServices(logger *zap.Logger) error { } subnetsProvider := func() records.Subnets { - return n.activeSubnets + return n.currentSubnets } // Handshake filters @@ -251,8 +251,8 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { Bootnodes: n.cfg.TransformBootnodes(), EnableLogging: n.cfg.DiscoveryTrace, } - discV5Opts.Subnets = n.fixedSubnets - logger = logger.With(zap.String("subnets", n.fixedSubnets.String())) + discV5Opts.Subnets = n.initialSubnets + logger = logger.With(zap.String("subnets", n.initialSubnets.String())) logger.Info("discovery: using discv5", zap.Strings("bootnodes", discV5Opts.Bootnodes), zap.String("ip", discV5Opts.IP)) diff --git a/network/peers/conn_manager.go b/network/peers/conn_manager.go index b8594b8116..f7c832989d 100644 --- a/network/peers/conn_manager.go +++ b/network/peers/conn_manager.go @@ -129,7 +129,7 @@ func (c connManager) getBestPeers(n int, mySubnets records.Subnets, allPeers []p peerLogs = append(peerLogs, peerLog{ Peer: pid, Score: score, - SharedSubnets: len(peerSubnets.SharedSubnets(mySubnets, len(mySubnets))), + SharedSubnets: len(peerSubnets.SharedSubnets(mySubnets)), }) } @@ -234,7 +234,7 @@ func (c connManager) DisconnectFromIrrelevantPeers(logger *zap.Logger, disconnec var sharedSubnets []int peerSubnets, ok := c.subnetsIdx.GetPeerSubnets(peerID) if ok { - sharedSubnets = mySubnets.SharedSubnets(peerSubnets, len(mySubnets)) + sharedSubnets = mySubnets.SharedSubnets(peerSubnets) } // If there's no common subnet, disconnect from peer. diff --git a/network/peers/connections/conn_handler.go b/network/peers/connections/conn_handler.go index 6f4b51b289..9c686a4e38 100644 --- a/network/peers/connections/conn_handler.go +++ b/network/peers/connections/conn_handler.go @@ -229,7 +229,7 @@ func (ch *connHandler) sharesEnoughSubnets(logger *zap.Logger, conn libp2pnetwor if mySubnets.String() == records.ZeroSubnets { // this node has no subnets return true } - shared := mySubnets.SharedSubnets(subnets, 1) + shared := mySubnets.SharedSubnetsN(subnets, 1) logger.Debug("checking subnets", zap.Ints("shared", shared)) return len(shared) == 1 diff --git a/network/peers/index.go b/network/peers/index.go index 1b326cdba2..08c9e6d66d 100644 --- a/network/peers/index.go +++ b/network/peers/index.go @@ -105,8 +105,8 @@ type SubnetsIndex interface { // GetSubnetPeers returns peers that are interested in the given subnet GetSubnetPeers(s int) []peer.ID - // GetPeerSubnets returns subnets of the given peer - GetPeerSubnets(id peer.ID) (records.Subnets, bool) + // GetPeerSubnets returns subnets of the given peer and whether it was found + GetPeerSubnets(id peer.ID) (subnets records.Subnets, ok bool) // GetSubnetsStats collects and returns subnets stats GetSubnetsStats() *SubnetsStats diff --git a/network/peers/subnets.go b/network/peers/subnets.go index 1850aa3f92..5d77b24112 100644 --- a/network/peers/subnets.go +++ b/network/peers/subnets.go @@ -115,7 +115,7 @@ func (si *subnetsIndex) GetPeerSubnets(id peer.ID) (records.Subnets, bool) { return records.Subnets{}, false } - return subnets, false + return subnets, true } // GetSubnetsDistributionScores returns current subnets scores based on peers distribution. @@ -125,7 +125,7 @@ func GetSubnetsDistributionScores(stats *SubnetsStats, minPerSubnet int, mySubne const activeSubnetBoost = 0.2 allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) - activeSubnets := allSubs.SharedSubnets(mySubnets, 0) + activeSubnets := allSubs.SharedSubnets(mySubnets) scores := make([]float64, len(allSubs)) for _, s := range activeSubnets { diff --git a/network/peers/subnets_test.go b/network/peers/subnets_test.go index caac7cbf7b..1b5bd52610 100644 --- a/network/peers/subnets_test.go +++ b/network/peers/subnets_test.go @@ -36,14 +36,26 @@ func TestSubnetsIndex(t *testing.T) { subnetsIdx := NewSubnetsIndex(128) - subnetsIdx.UpdatePeerSubnets(pids[0], sAll) - subnetsIdx.UpdatePeerSubnets(pids[1], sNone) - subnetsIdx.UpdatePeerSubnets(pids[2], sPartial) - subnetsIdx.UpdatePeerSubnets(pids[3], sPartial) + initialMapping := map[peer.ID]records.Subnets{ + pids[0]: sAll, + pids[1]: sNone, + pids[2]: sPartial, + pids[3]: sPartial, + } + + for pid, subnets := range initialMapping { + subnetsIdx.UpdatePeerSubnets(pid, subnets) + } require.Len(t, subnetsIdx.GetSubnetPeers(0), 3) require.Len(t, subnetsIdx.GetSubnetPeers(10), 1) + for _, pid := range pids { + subnets, ok := subnetsIdx.GetPeerSubnets(pid) + require.True(t, ok) + require.Equal(t, initialMapping[pid], subnets) + } + subnetsIdx.UpdatePeerSubnets(pids[0], sPartial) require.Len(t, subnetsIdx.GetSubnetPeers(0), 3) @@ -61,19 +73,20 @@ func TestSubnetsIndex(t *testing.T) { } func TestSubnetsDistributionScores(t *testing.T) { - mysubnets := records.Subnets{} + mySubnets := records.Subnets{} for i := 0; i < commons.Subnets(); i++ { - if i%2 == 0 { - mysubnets[i] = byte(0) - } else { - mysubnets[i] = byte(1) + if i%2 != 0 { + mySubnets[i] = byte(1) } } + + t.Logf("my subnets: %v", mySubnets.String()) + stats := &SubnetsStats{ - PeersCount: make([]int, len(mysubnets)), - Connected: make([]int, len(mysubnets)), + PeersCount: make([]int, len(mySubnets)), + Connected: make([]int, len(mySubnets)), } - for sub := range mysubnets { + for sub := range mySubnets { stats.Connected[sub] = 1 + rand.Intn(20) stats.PeersCount[sub] = stats.Connected[sub] + rand.Intn(10) } @@ -82,9 +95,9 @@ func TestSubnetsDistributionScores(t *testing.T) { stats.Connected[5] = 30 stats.PeersCount[5] = 30 - distScores := GetSubnetsDistributionScores(stats, 3, mysubnets, 5) + distScores := GetSubnetsDistributionScores(stats, 3, mySubnets, 5) - require.Len(t, distScores, len(mysubnets)) + require.Len(t, distScores, len(mySubnets)) require.Equal(t, float64(0), distScores[0]) require.Equal(t, float64(4.2), distScores[1]) require.Equal(t, float64(2.533333333333333), distScores[3]) diff --git a/network/records/subnets.go b/network/records/subnets.go index 9ed85b05ca..31421bc34c 100644 --- a/network/records/subnets.go +++ b/network/records/subnets.go @@ -44,7 +44,9 @@ func UpdateSubnets(node *enode.LocalNode, added []uint64, removed []uint64) (Sub return subnets, true, nil } -// Subnets holds all the subscribed subnets of a specific node +// Subnets holds all the subscribed subnets of a specific node. +// The array index represents a subnet number, +// the value holds either 0 or 1 representing if the node is subscribed to the subnet number. type Subnets [commons.SubnetsCount]byte func (s Subnets) String() string { @@ -75,7 +77,7 @@ func (s Subnets) FromString(subnetsStr string) (Subnets, error) { } if len(data) != commons.Subnets() { - return Subnets{}, fmt.Errorf("invalid subnets length: %d", len(data)) + return Subnets{}, fmt.Errorf("invalid subnets length %d", len(data)) } return Subnets(data), nil @@ -93,7 +95,7 @@ func (s Subnets) Active() int { // ToMap returns a map with all subnets and their values func (s Subnets) ToMap() map[int]byte { - m := make(map[int]byte) + m := make(map[int]byte, len(s)) for subnet, v := range s { m[subnet] = v } @@ -101,10 +103,14 @@ func (s Subnets) ToMap() map[int]byte { } // SharedSubnets returns the shared subnets -func (s Subnets) SharedSubnets(other Subnets, maxLen int) []int { +func (s Subnets) SharedSubnets(other Subnets) []int { + return s.SharedSubnetsN(other, 0) +} + +func (s Subnets) SharedSubnetsN(other Subnets, n int) []int { var shared []int - if maxLen == 0 { - maxLen = len(s) + if n == 0 { + n = len(s) } for subnet, v := range s { if v == 0 { @@ -114,7 +120,7 @@ func (s Subnets) SharedSubnets(other Subnets, maxLen int) []int { continue } shared = append(shared, subnet) - if len(shared) == maxLen { + if len(shared) == n { break } } diff --git a/network/records/subnets_test.go b/network/records/subnets_test.go index cab9347d87..cf11dc50bb 100644 --- a/network/records/subnets_test.go +++ b/network/records/subnets_test.go @@ -99,7 +99,7 @@ func TestSharedSubnets(t *testing.T) { expectedShared = append(expectedShared, subnet) } } - shared := s1.SharedSubnets(s2, 0) + shared := s1.SharedSubnets(s2) require.Equal(t, expectedShared, shared) }