Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Subnet Functionality to its Own File #5179

Merged
merged 9 commits into from
Mar 24, 2020
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"rpc_topic_mappings.go",
"sender.go",
"service.go",
"subnets.go",
"utils.go",
"watch_peers.go",
],
Expand Down Expand Up @@ -89,6 +90,7 @@ go_test(
"parameter_test.go",
"sender_test.go",
"service_test.go",
"subnets_test.go",
],
embed = [":go_default_library"],
flaky = True,
Expand Down
27 changes: 0 additions & 27 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
)

const attestationSubnetCount = 64
const attSubnetEnrKey = "attnets"

// Listener defines the discovery V5 network interface that is used
// to communicate with other peers.
type Listener interface {
Expand Down Expand Up @@ -113,29 +109,6 @@ func startDHTDiscovery(host core.Host, bootstrapAddr string) error {
return err
}

func intializeAttSubnets(node *enode.LocalNode) *enode.LocalNode {
bitV := bitfield.NewBitvector64()
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
node.Set(entry)
return node
}

func retrieveAttSubnets(record *enr.Record) ([]uint64, error) {
bitV := bitfield.NewBitvector64()
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
err := record.Load(entry)
if err != nil {
return nil, err
}
committeeIdxs := []uint64{}
for i := uint64(0); i < 64; i++ {
if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i)
}
}
return committeeIdxs, nil
}

func parseBootStrapAddrs(addrs []string) (discv5Nodes []string, kadDHTNodes []string) {
discv5Nodes, kadDHTNodes = parseGenericAddrs(addrs)
if len(discv5Nodes) == 0 && len(kadDHTNodes) == 0 {
Expand Down
90 changes: 2 additions & 88 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ import (

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/host"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
logTest "github.com/sirupsen/logrus/hooks/test"

rauljordan marked this conversation as resolved.
Show resolved Hide resolved
"github.com/prysmaticlabs/prysm/shared/iputils"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

var discoveryWaitTime = 1 * time.Second
Expand Down Expand Up @@ -110,90 +108,6 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
}
}

func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
bootListener := createListener(ipAddr, pkey, &Config{UDPPort: uint(port)})
defer bootListener.Close()

bootNode := bootListener.Self()
cfg := &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
MaxPeers: 30,
}
// Use shorter period for testing.
currentPeriod := pollingPeriod
pollingPeriod = 1 * time.Second
defer func() {
pollingPeriod = currentPeriod
}()

var listeners []*discover.UDPv5
for i := 1; i <= 3; i++ {
port = 3000 + i
cfg.UDPPort = uint(port)
ipAddr, pkey := createAddrAndPrivKey(t)
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
if err != nil {
t.Errorf("Could not start discovery for node: %v", err)
}
bitV := bitfield.NewBitvector64()
bitV.SetBitAt(uint64(i), true)

entry := enr.WithEntry(attSubnetEnrKey, &bitV)
listener.LocalNode().Set(entry)
listeners = append(listeners, listener)
}

// Make one service on port 3001.
port = 4000
cfg.UDPPort = uint(port)
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
}
s.Start()
defer s.Stop()

// Wait for the nodes to have their local routing tables to be populated with the other nodes
time.Sleep(discoveryWaitTime)

// look up 3 different subnets
exists, err := s.FindPeersWithSubnet(1)
if err != nil {
t.Fatal(err)
}
exists2, err := s.FindPeersWithSubnet(2)
if err != nil {
t.Fatal(err)
}
exists3, err := s.FindPeersWithSubnet(3)
if err != nil {
t.Fatal(err)
}
if !exists || !exists2 || !exists3 {
t.Fatal("Peer with subnet doesn't exist")
}

// update ENR of a peer
testService := &Service{dv5Listener: listeners[0]}
cache.CommitteeIDs.AddIDs([]uint64{10}, 0)
testService.RefreshENR(0)
time.Sleep(2 * time.Second)

exists, err = s.FindPeersWithSubnet(2)
if err != nil {
t.Fatal(err)
}

if !exists {
t.Fatal("Peer with subnet doesn't exist")
}

}

func TestMultiAddrsConversion_InvalidIPAddr(t *testing.T) {
addr := net.ParseIP("invalidIP")
_, pkey := createAddrAndPrivKey(t)
Expand Down
33 changes: 33 additions & 0 deletions beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package p2p

import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/prysmaticlabs/go-bitfield"
)

const attestationSubnetCount = 64
const attSubnetEnrKey = "attnets"

func intializeAttSubnets(node *enode.LocalNode) *enode.LocalNode {
bitV := bitfield.NewBitvector64()
entry := enr.WithEntry(attSubnetEnrKey, bitV.Bytes())
node.Set(entry)
return node
}

func retrieveAttSubnets(record *enr.Record) ([]uint64, error) {
bitV := bitfield.NewBitvector64()
entry := enr.WithEntry(attSubnetEnrKey, &bitV)
err := record.Load(entry)
if err != nil {
return nil, err
}
committeeIdxs := []uint64{}
for i := uint64(0); i < 64; i++ {
if bitV.BitAt(i) {
committeeIdxs = append(committeeIdxs, i)
}
}
return committeeIdxs, nil
}
95 changes: 95 additions & 0 deletions beacon-chain/p2p/subnets_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package p2p

import (
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
)

func TestStartDiscV5_DiscoverPeersWithSubnets(t *testing.T) {
port := 2000
ipAddr, pkey := createAddrAndPrivKey(t)
bootListener := createListener(ipAddr, pkey, &Config{UDPPort: uint(port)})
defer bootListener.Close()

bootNode := bootListener.Self()
cfg := &Config{
BootstrapNodeAddr: []string{bootNode.String()},
Discv5BootStrapAddr: []string{bootNode.String()},
Encoding: "ssz",
MaxPeers: 30,
}
// Use shorter period for testing.
currentPeriod := pollingPeriod
pollingPeriod = 1 * time.Second
defer func() {
pollingPeriod = currentPeriod
}()

var listeners []*discover.UDPv5
for i := 1; i <= 3; i++ {
port = 3000 + i
cfg.UDPPort = uint(port)
ipAddr, pkey := createAddrAndPrivKey(t)
listener, err := startDiscoveryV5(ipAddr, pkey, cfg)
if err != nil {
t.Errorf("Could not start discovery for node: %v", err)
}
bitV := bitfield.NewBitvector64()
bitV.SetBitAt(uint64(i), true)

entry := enr.WithEntry(attSubnetEnrKey, &bitV)
listener.LocalNode().Set(entry)
listeners = append(listeners, listener)
}

// Make one service on port 3001.
port = 4000
cfg.UDPPort = uint(port)
s, err := NewService(cfg)
if err != nil {
t.Fatal(err)
}
s.Start()
defer s.Stop()

// Wait for the nodes to have their local routing tables to be populated with the other nodes
time.Sleep(discoveryWaitTime)

// look up 3 different subnets
exists, err := s.FindPeersWithSubnet(1)
if err != nil {
t.Fatal(err)
}
exists2, err := s.FindPeersWithSubnet(2)
if err != nil {
t.Fatal(err)
}
exists3, err := s.FindPeersWithSubnet(3)
if err != nil {
t.Fatal(err)
}
if !exists || !exists2 || !exists3 {
t.Fatal("Peer with subnet doesn't exist")
}

// update ENR of a peer.
testService := &Service{dv5Listener: listeners[0]}
cache.CommitteeIDs.AddIDs([]uint64{10}, 0)
testService.RefreshENR(0)
time.Sleep(2 * time.Second)

exists, err = s.FindPeersWithSubnet(2)
if err != nil {
t.Fatal(err)
}

if !exists {
t.Fatal("Peer with subnet doesn't exist")
}

}