Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network/records: change subnets type to [128]byte #1813

Open
wants to merge 3 commits into
base: stage
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/handlers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func (h *Node) peers(peers []peer.ID) []peerJSON {
resp[i] = peerJSON{
ID: id,
Connectedness: h.Network.Connectedness(id).String(),
Subnets: h.PeersIndex.GetPeerSubnets(id).String(),
}
subnets, ok := h.PeersIndex.GetPeerSubnets(id)
if ok {
resp[i].Subnets = subnets.String()
}

for _, addr := range h.Network.Peerstore().Addrs(id) {
Expand Down
8 changes: 3 additions & 5 deletions network/discovery/dv5_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
libp2pnetwork "github.com/libp2p/go-libp2p/core/network"
"github.com/ssvlabs/ssv/network/records"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/records"
)

// limitNodeFilter returns true if the limit is exceeded
Expand Down Expand Up @@ -71,14 +72,11 @@ func (dvs *DiscV5Service) sharedSubnetsFilter(n int) func(node *enode.Node) bool
if n == 0 {
return true
}
if len(dvs.subnets) == 0 {
return true
}
nodeSubnets, err := records.GetSubnetsEntry(node.Record())
if err != nil {
return false
}
shared := records.SharedSubnets(dvs.subnets, nodeSubnets, n)
shared := dvs.subnets.SharedSubnetsN(nodeSubnets, n)
// logger.Debug("shared subnets", zap.Ints("shared", shared),
// zap.String("node", node.String()))

Expand Down
13 changes: 6 additions & 7 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package discovery

import (
"bytes"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -62,7 +61,7 @@ type DiscV5Service struct {
sharedConn *SharedUDPConn

networkConfig networkconfig.NetworkConfig
subnets []byte
subnets records.Subnets

publishLock chan struct{}
}
Expand Down Expand Up @@ -177,7 +176,7 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
if err != nil {
return fmt.Errorf("could not read subnets: %w", err)
}
if bytes.Equal(zeroSubnets, nodeSubnets) {
if zeroSubnets == nodeSubnets {
return errors.New("zero subnets")
}

Expand Down Expand Up @@ -321,11 +320,11 @@ func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...uint64)
if len(subnets) == 0 {
return false, nil
}
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil)
updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), subnets, nil)
if err != nil {
return false, errors.Wrap(err, "could not update ENR")
}
if updatedSubnets != nil {
if isUpdated {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
return true, nil
Expand All @@ -340,11 +339,11 @@ func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...uint6
if len(subnets) == 0 {
return false, nil
}
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets)
updatedSubnets, isUpdated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), nil, subnets)
if err != nil {
return false, errors.Wrap(err, "could not update ENR")
}
if updatedSubnets != nil {
if isUpdated {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
return true, nil
Expand Down
18 changes: 10 additions & 8 deletions network/discovery/dv5_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ func TestCheckPeer(t *testing.T) {
expectedError: nil,
},
{
name: "missing subnets",
domainType: &myDomainType,
subnets: nil,
expectedError: errors.New("could not read subnets"),
name: "missing subnets",
domainType: &myDomainType,
subnets: records.Subnets{},
missingSubnets: true,
expectedError: errors.New("could not read subnets"),
},
{
name: "inactive subnets",
Expand Down Expand Up @@ -138,7 +139,7 @@ func TestCheckPeer(t *testing.T) {
err := records.SetDomainTypeEntry(localNode, records.KeyNextDomainType, *test.nextDomainType)
require.NoError(t, err)
}
if test.subnets != nil {
if !test.missingSubnets {
err := records.SetSubnetsEntry(localNode, test.subnets)
require.NoError(t, err)
}
Expand Down Expand Up @@ -176,13 +177,14 @@ type checkPeerTest struct {
name string
domainType *spectypes.DomainType
nextDomainType *spectypes.DomainType
subnets []byte
subnets records.Subnets
missingSubnets bool
localNode *enode.LocalNode
expectedError error
}

func mockSubnets(active ...int) []byte {
subnets := make([]byte, commons.Subnets())
func mockSubnets(active ...int) records.Subnets {
subnets := records.Subnets{}
for _, subnet := range active {
subnets[subnet] = 1
}
Expand Down
3 changes: 2 additions & 1 deletion network/discovery/node_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ethereum/go-ethereum/p2p/enode"
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/network/records"
)

Expand All @@ -16,7 +17,7 @@ func DecorateWithDomainType(key records.ENRKey, domainType spectypes.DomainType)
}
}

func DecorateWithSubnets(subnets []byte) NodeRecordDecoration {
func DecorateWithSubnets(subnets records.Subnets) NodeRecordDecoration {
return func(node *enode.LocalNode) error {
return records.SetSubnetsEntry(node, subnets)
}
Expand Down
5 changes: 3 additions & 2 deletions network/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ssvlabs/ssv/logging"
compatible_logger "github.com/ssvlabs/ssv/network/discovery/logger"
"github.com/ssvlabs/ssv/network/records"

"github.com/ssvlabs/ssv/network/commons"

Expand Down Expand Up @@ -34,8 +35,8 @@ type DiscV5Options struct {
NetworkKey *ecdsa.PrivateKey
// Bootnodes is a list of bootstrapper nodes
Bootnodes []string
// Subnets is a bool slice represents all the subnets the node is intreseted in
Subnets []byte
// Subnets is a bool slice represents all the subnets the node is interested in
Subnets records.Subnets
// EnableLogging when true enables logs to be emitted
EnableLogging bool
}
Expand Down
3 changes: 2 additions & 1 deletion network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
)
Expand Down Expand Up @@ -281,7 +282,7 @@ func TestDiscV5Service_checkPeer(t *testing.T) {
dvs.conns.(*MockConnection).SetAtLimit(false)

// Valid peer but no common subnet
subnets := make([]byte, len(records.ZeroSubnets))
subnets := records.Subnets{}
subnets[10] = 1
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomSubnets(t, subnets)))
require.ErrorContains(t, err, "no shared subnets")
Expand Down
15 changes: 8 additions & 7 deletions network/discovery/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/network/commons"
"github.com/ssvlabs/ssv/network/peers"
"github.com/ssvlabs/ssv/network/records"
"github.com/ssvlabs/ssv/networkconfig"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -58,8 +60,7 @@ func testingDiscoveryOptions(t *testing.T, networkConfig networkconfig.NetworkCo
}

// Discovery options
allSubs, _ := records.Subnets{}.FromString(records.AllSubnets)
subnetsIndex := peers.NewSubnetsIndex(len(allSubs))
subnetsIndex := peers.NewSubnetsIndex(commons.Subnets())
connectionIndex := NewMockConnection()

return &Options{
Expand Down Expand Up @@ -160,7 +161,7 @@ func NodeWithoutNextDomain(t *testing.T) *enode.Node {
}

func NodeWithoutSubnets(t *testing.T) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, nil)
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), false, records.Subnets{})
}

func NodeWithCustomDomains(t *testing.T, domainType spectypes.DomainType, nextDomainType spectypes.DomainType) *enode.Node {
Expand All @@ -171,14 +172,14 @@ func NodeWithZeroSubnets(t *testing.T) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, zeroSubnets)
}

func NodeWithCustomSubnets(t *testing.T, subnets []byte) *enode.Node {
func NodeWithCustomSubnets(t *testing.T, subnets records.Subnets) *enode.Node {
return CustomNode(t, true, testNetConfig.DomainType(), true, testNetConfig.NextDomainType(), true, subnets)
}

func CustomNode(t *testing.T,
setDomainType bool, domainType spectypes.DomainType,
setNextDomainType bool, nextDomainType spectypes.DomainType,
setSubnets bool, subnets []byte) *enode.Node {
setSubnets bool, subnets records.Subnets) *enode.Node {

// Generate key
nodeKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
Expand Down
24 changes: 11 additions & 13 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ type p2pNetwork struct {

backoffConnector *libp2pdiscbackoff.BackoffConnector

fixedSubnets []byte
activeSubnets []byte
// initialSubnets holds subnets on node startup
initialSubnets records.Subnets
// currentSubnets holds current subnets which depend on current active validators and committees
currentSubnets records.Subnets

libConnManager connmgrcore.ConnManager

Expand Down Expand Up @@ -298,16 +300,14 @@ func (n *p2pNetwork) peersBalancing(logger *zap.Logger) func() {
ctx, cancel := context.WithTimeout(n.ctx, connManagerBalancingTimeout)
defer cancel()

mySubnets := records.Subnets(n.activeSubnets).Clone()

// Disconnect from irrelevant peers
disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, mySubnets)
disconnectedPeers := connMgr.DisconnectFromIrrelevantPeers(logger, maximumIrrelevantPeersToDisconnect, n.host.Network(), allPeers, n.currentSubnets)
if disconnectedPeers > 0 {
return
}

// Trim peers according to subnet participation (considering the subnet size)
connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, mySubnets, allPeers, n.cfg.TopicMaxPeers)
connMgr.TagBestPeers(logger, n.cfg.MaxPeers-1, n.currentSubnets, allPeers, n.cfg.TopicMaxPeers)
connMgr.TrimPeers(ctx, logger, n.host.Network())
}
}
Expand Down Expand Up @@ -350,16 +350,14 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
// there is a pending PR to replace this: https://github.com/ssvlabs/ssv/pull/990
logger = logger.Named(logging.NameP2PNetwork)
ticker := time.NewTicker(time.Second)
registeredSubnets := make([]byte, commons.Subnets())
registeredSubnets := records.Subnets{}
defer ticker.Stop()

// Run immediately and then every second.
for ; true; <-ticker.C {
start := time.Now()

// Compute the new subnets according to the active committees/validators.
updatedSubnets := make([]byte, commons.Subnets())
copy(updatedSubnets, n.fixedSubnets)
updatedSubnets := n.initialSubnets

n.activeCommittees.Range(func(cid string, status validatorStatus) bool {
subnet := commons.CommitteeSubnet(spectypes.CommitteeID([]byte(cid)))
Expand All @@ -374,7 +372,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
return true
})
}
n.activeSubnets = updatedSubnets
n.currentSubnets = updatedSubnets

// Compute the not yet registered subnets.
addedSubnets := make([]uint64, 0)
Expand All @@ -399,7 +397,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
}

n.idx.UpdateSelfRecord(func(self *records.NodeInfo) *records.NodeInfo {
self.Metadata.Subnets = records.Subnets(n.activeSubnets).String()
self.Metadata.Subnets = n.currentSubnets.String()
return self
})

Expand Down Expand Up @@ -437,7 +435,7 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
}

allSubs, _ := records.Subnets{}.FromString(records.AllSubnets)
subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0)
subnetsList := allSubs.SharedSubnets(n.currentSubnets)
logger.Debug("updated subnets",
zap.Any("added", addedSubnets),
zap.Any("removed", removedSubnets),
Expand Down
14 changes: 5 additions & 9 deletions network/p2p/p2p_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (n *p2pNetwork) SubscribeAll(logger *zap.Logger) error {
if !n.isReady() {
return p2pprotocol.ErrNetworkIsNotReady
}
n.fixedSubnets, _ = records.Subnets{}.FromString(records.AllSubnets)
n.initialSubnets, _ = records.Subnets{}.FromString(records.AllSubnets)
for subnet := uint64(0); subnet < commons.SubnetsCount; subnet++ {
err := n.topicsCtrl.Subscribe(logger, commons.SubnetTopicID(subnet))
if err != nil {
Expand Down Expand Up @@ -107,13 +107,9 @@ func (n *p2pNetwork) SubscribeRandoms(logger *zap.Logger, numSubnets int) error
}
}

// Update the subnets slice.
subnets := make([]byte, commons.Subnets())
copy(subnets, n.fixedSubnets)
for _, subnet := range randomSubnets {
subnets[subnet] = byte(1)
n.initialSubnets[subnet] = byte(1)
}
n.fixedSubnets = subnets

return nil
}
Expand Down Expand Up @@ -249,11 +245,11 @@ func (n *p2pNetwork) handlePubsubMessages(logger *zap.Logger) func(ctx context.C

// subscribeToSubnets subscribes to all the node's subnets
func (n *p2pNetwork) subscribeToSubnets(logger *zap.Logger) error {
if len(n.fixedSubnets) == 0 {
if len(n.initialSubnets) == 0 {
return nil
}
logger.Debug("subscribing to fixed subnets", fields.Subnets(n.fixedSubnets))
for i, val := range n.fixedSubnets {
logger.Debug("subscribing to fixed subnets", fields.Subnets(n.initialSubnets))
for i, val := range n.initialSubnets {
if val > 0 {
subnet := fmt.Sprintf("%d", i)
if err := n.topicsCtrl.Subscribe(logger, subnet); err != nil {
Expand Down
Loading
Loading