Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: invert peer discovery integration #989

Merged
merged 2 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay))
}
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(tcpNode))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiscAdapter, p2p.NewDiscoveryAdapter(tcpNode, udpNode, peers))

return tcpNode, localEnode, nil
}
Expand Down
32 changes: 25 additions & 7 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ import (
"github.com/obolnetwork/charon/cmd"
"github.com/obolnetwork/charon/p2p"
"github.com/obolnetwork/charon/testutil"
"github.com/obolnetwork/charon/testutil/beaconmock"
)

//go:generate go test . -v -run=TestPingCluster -slow
var slow = flag.Bool("slow", false, "enable slow tests")
var slow = flag.Bool("slow", true, "enable slow tests")

// TestPingCluster starts a cluster of charon nodes and waits for each node to ping all the others.
// It relies on discv5 for peer discovery.
func TestPingCluster(t *testing.T) {
// Nodes bind to lock ENR addresses.
// Discv5 can just use those as bootnodes.
t.Run("bind_enrs", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
Slow: false,
BootLock: true,
BindENRAddrs: true,
Expand All @@ -62,7 +63,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random localhost ports (not the lock ENRs), with only single bootnode.
// Discv5 will resolve peers via bootnode.
t.Run("bootnode_only", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
BindLocalhost: true,
BootLock: false,
Bootnode: true,
Expand All @@ -72,7 +73,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to random 0.0.0.0 ports (but use 127.0.0.1 as external IP), with only single bootnode.
// Discv5 will resolve peers via bootnode and external IP.
t.Run("external_ip", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
ExternalIP: "127.0.0.1",
BindZeroIP: true,
BootLock: false,
Expand All @@ -83,7 +84,7 @@ func TestPingCluster(t *testing.T) {
// Nodes bind to 0.0.0.0 (but use localhost as external host), with only single bootnode.
// Discv5 will resolve peers via bootnode and external host.
t.Run("external_host", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
ExternalHost: "localhost",
BindZeroIP: true,
BootLock: false,
Expand All @@ -96,7 +97,7 @@ func TestPingCluster(t *testing.T) {
// Node discv5 will not resolve direct address, nodes will connect to bootnode,
// and libp2p will relay via bootnode.
t.Run("bootnode_relay", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
BootnodeRelay: true,
BindZeroPort: true,
Bootnode: true,
Expand All @@ -108,7 +109,7 @@ func TestPingCluster(t *testing.T) {
// Discv5 times out resolving stale ENRs, then resolves peers via external node.
// This is slow due to discv5 internal timeouts, run with -slow.
t.Run("bootnode_and_stale_enrs", func(t *testing.T) {
pingCluster(t, pingTest{
pingClusterAB(t, pingTest{
Slow: true,
BindLocalhost: true,
BootLock: true,
Expand All @@ -134,6 +135,19 @@ type pingTest struct {
ExternalHost string
}

// TODO(corver): Remove once featureset.InvertDiscv5 launched.
func pingClusterAB(t *testing.T, test pingTest) {
t.Helper()
t.Run("pushdisc", func(t *testing.T) {
featureset.EnableForT(t, featureset.InvertDiscv5)
pingCluster(t, test)
})
t.Run("pulldisc", func(t *testing.T) {
featureset.DisableForT(t, featureset.InvertDiscv5)
pingCluster(t, test)
})
}

func pingCluster(t *testing.T, test pingTest) {
t.Helper()

Expand Down Expand Up @@ -178,6 +192,10 @@ func pingCluster(t *testing.T, test pingTest) {
P2PKey: p2pKeys[i],
PingCallback: asserter.Callback(t, i),
DisablePromWrap: true,
SimnetBMockOpts: []beaconmock.Option{
beaconmock.WithNoAttesterDuties(),
beaconmock.WithNoProposerDuties(),
},
},
P2P: p2p.Config{
UDPBootnodes: bootnodes,
Expand Down
4 changes: 4 additions & 0 deletions app/featureset/featureset.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ type Feature string
const (
// QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445.
QBFTConsensus Feature = "qbft_consensus"

// InvertDiscv5 enables the new push based discv5 integration and disables the old pull based.
InvertDiscv5 Feature = "invert_discv5"
)

var (
// state defines the current rollout status of each feature.
state = map[Feature]status{
QBFTConsensus: statusStable,
InvertDiscv5: statusAlpha,
// Add all features and there status here.
}

Expand Down
1 change: 1 addition & 0 deletions app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
StartMonitoringAPI
StartValidatorAPI
StartP2PPing
StartP2PDiscAdapter
StartP2PConsensus
StartSimulator
StartScheduler
Expand Down
13 changes: 7 additions & 6 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 69 additions & 6 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,28 @@
package p2p

import (
"context"
"crypto/ecdsa"
"net"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/expbackoff"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/z"
)

// UDPNode wraps a discv5 udp node and adds the bootnodes relays.
type UDPNode struct {
*discover.UDPv5
Relays []Peer
}
Comment on lines -31 to -35
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used


// NewUDPNode starts and returns a discv5 UDP implementation.
func NewUDPNode(config Config, ln *enode.LocalNode,
key *ecdsa.PrivateKey, bootnodes []*enode.Node,
Expand Down Expand Up @@ -133,3 +137,62 @@ func NewLocalEnode(config Config, key *ecdsa.PrivateKey) (*enode.LocalNode, *eno

return node, db, nil
}

// NewDiscoveryAdapter returns a life cycle hook that links discv5 to libp2p by
// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store.
// continuously polling discv5 for latest peer ENRs and adding then to libp2p peer store.

func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx {
return func(ctx context.Context) {
if !featureset.Enabled(featureset.InvertDiscv5) {
return
}

ctx = log.WithTopic(ctx, "p2p")
ttl := peerstore.TempAddrTTL
baseDelay := expbackoff.WithBaseDelay(time.Millisecond * 100) // Poll quickly on startup
maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl
maxDelay := expbackoff.WithMaxDelay(ttl * 0.9) // Slow down to 90% of ttl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't compile

backoff := expbackoff.New(ctx, baseDelay, maxDelay)
addrs := make(map[peer.ID]string)

for ctx.Err() == nil {
for _, p := range peers {
if p.ID == tcpNode.ID() {
// Skip self
continue
}

addr, ok, err := getDiscoveredAddress(udpNode, p)
if err != nil {
log.Error(ctx, "Failed discovering peer address", err)
} else if ok {
addrStr := addr.String()
if addrs[p.ID] != addrStr {
log.Info(ctx, "Discovered new peer address",
z.Str("peer", PeerName(p.ID)),
z.Str("address", addrStr))
addrs[p.ID] = addrStr
}

tcpNode.Peerstore().AddAddr(p.ID, addr, ttl)
}
}

backoff()
}
}
}

// getDiscoveredAddress returns the peer's address as discovered by discv5,
// it returns false if the peer isn't discovered.
func getDiscoveredAddress(udpNode *discover.UDPv5, p Peer) (ma.Multiaddr, bool, error) {
resolved := udpNode.Resolve(&p.Enode)
if resolved.Seq() == 0 || resolved.TCP() == 0 {
return nil, false, nil // Not discovered
}

addr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return nil, false, err
}

return addr, true, nil
}
2 changes: 1 addition & 1 deletion p2p/gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestP2PConnGating(t *testing.T) {

err = nodeA.Connect(context.Background(), addr)
require.Error(t, err)
require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s and addr %s", addr.ID, addr.Addrs[0]))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flapping test, somehow used ipv6 on my local machine

require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s", addr.ID))
}

func TestOpenGater(t *testing.T) {
Expand Down
27 changes: 15 additions & 12 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ma "github.com/multiformats/go-multiaddr"

"github.com/obolnetwork/charon/app/errors"
"github.com/obolnetwork/charon/app/featureset"
"github.com/obolnetwork/charon/app/lifecycle"
"github.com/obolnetwork/charon/app/log"
"github.com/obolnetwork/charon/app/version"
Expand Down Expand Up @@ -128,21 +129,23 @@ func adaptDiscRouting(udpNode *discover.UDPv5, peers, relays []Peer) peerRouting
return peer.AddrInfo{}, errors.New("unknown peer")
}

resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}

var mAddrs []ma.Multiaddr

// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
if !featureset.Enabled(featureset.InvertDiscv5) {
resolved := udpNode.Resolve(&node)
if resolved == nil {
return peer.AddrInfo{}, errors.New("peer not resolved")
}

// If sequence is 0, we haven't discovered it yet.
// If tcp port is 0, this node isn't bound to a port.
if resolved.Seq() != 0 && resolved.TCP() != 0 {
mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP())
if err != nil {
return peer.AddrInfo{}, err
}
mAddrs = append(mAddrs, mAddr)
}
mAddrs = append(mAddrs, mAddr)
}

// Add any circuit relays
Expand Down