From a68a2c6f406a1e5b62e2b9aa76c84143c33b61ee Mon Sep 17 00:00:00 2001 From: corverroos Date: Thu, 18 Aug 2022 07:22:53 +0200 Subject: [PATCH] p2p: invert peer discovery integration (#989) Inverts the integration between discv5 and libp2p from pull to push. category: refactor ticket: #985 feature_flag: invert_discv5 --- app/app.go | 1 + app/app_test.go | 30 +++++++++--- app/featureset/featureset.go | 4 ++ app/lifecycle/order.go | 1 + app/lifecycle/orderstart_string.go | 13 +++--- p2p/discovery.go | 75 +++++++++++++++++++++++++++--- p2p/gater_test.go | 2 +- p2p/p2p.go | 27 ++++++----- 8 files changed, 122 insertions(+), 31 deletions(-) diff --git a/app/app.go b/app/app.go index 5ca3fa02b..2f831b1ce 100644 --- a/app/app.go +++ b/app/app.go @@ -276,6 +276,7 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartRelay, p2p.NewRelayReserver(tcpNode, relay)) } life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(tcpNode)) + life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PDiscAdapter, p2p.NewDiscoveryAdapter(tcpNode, udpNode, peers)) return tcpNode, localEnode, nil } diff --git a/app/app_test.go b/app/app_test.go index 314cd45ce..8e0905cd7 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -40,6 +40,7 @@ import ( "github.com/obolnetwork/charon/cmd" "github.com/obolnetwork/charon/p2p" "github.com/obolnetwork/charon/testutil" + "github.com/obolnetwork/charon/testutil/beaconmock" ) //go:generate go test . -v -run=TestPingCluster -slow @@ -51,7 +52,7 @@ func TestPingCluster(t *testing.T) { // Nodes bind to lock ENR addresses. // Discv5 can just use those as bootnodes. t.Run("bind_enrs", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ Slow: false, BootLock: true, BindENRAddrs: true, @@ -62,7 +63,7 @@ func TestPingCluster(t *testing.T) { // Nodes bind to random localhost ports (not the lock ENRs), with only single bootnode. // Discv5 will resolve peers via bootnode. t.Run("bootnode_only", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ BindLocalhost: true, BootLock: false, Bootnode: true, @@ -72,7 +73,7 @@ func TestPingCluster(t *testing.T) { // Nodes bind to random 0.0.0.0 ports (but use 127.0.0.1 as external IP), with only single bootnode. // Discv5 will resolve peers via bootnode and external IP. t.Run("external_ip", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ ExternalIP: "127.0.0.1", BindZeroIP: true, BootLock: false, @@ -83,7 +84,7 @@ func TestPingCluster(t *testing.T) { // Nodes bind to 0.0.0.0 (but use localhost as external host), with only single bootnode. // Discv5 will resolve peers via bootnode and external host. t.Run("external_host", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ ExternalHost: "localhost", BindZeroIP: true, BootLock: false, @@ -96,7 +97,7 @@ func TestPingCluster(t *testing.T) { // Node discv5 will not resolve direct address, nodes will connect to bootnode, // and libp2p will relay via bootnode. t.Run("bootnode_relay", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ BootnodeRelay: true, BindZeroPort: true, Bootnode: true, @@ -108,7 +109,7 @@ func TestPingCluster(t *testing.T) { // Discv5 times out resolving stale ENRs, then resolves peers via external node. // This is slow due to discv5 internal timeouts, run with -slow. t.Run("bootnode_and_stale_enrs", func(t *testing.T) { - pingCluster(t, pingTest{ + pingClusterAB(t, pingTest{ Slow: true, BindLocalhost: true, BootLock: true, @@ -134,6 +135,19 @@ type pingTest struct { ExternalHost string } +// TODO(corver): Remove once featureset.InvertDiscv5 launched. +func pingClusterAB(t *testing.T, test pingTest) { + t.Helper() + t.Run("pushdisc", func(t *testing.T) { + featureset.EnableForT(t, featureset.InvertDiscv5) + pingCluster(t, test) + }) + t.Run("pulldisc", func(t *testing.T) { + featureset.DisableForT(t, featureset.InvertDiscv5) + pingCluster(t, test) + }) +} + func pingCluster(t *testing.T, test pingTest) { t.Helper() @@ -178,6 +192,10 @@ func pingCluster(t *testing.T, test pingTest) { P2PKey: p2pKeys[i], PingCallback: asserter.Callback(t, i), DisablePromWrap: true, + SimnetBMockOpts: []beaconmock.Option{ + beaconmock.WithNoAttesterDuties(), + beaconmock.WithNoProposerDuties(), + }, }, P2P: p2p.Config{ UDPBootnodes: bootnodes, diff --git a/app/featureset/featureset.go b/app/featureset/featureset.go index d9b995203..38b4ef687 100644 --- a/app/featureset/featureset.go +++ b/app/featureset/featureset.go @@ -38,12 +38,16 @@ type Feature string const ( // QBFTConsensus introduces qbft consensus, see https://github.com/ObolNetwork/charon/issues/445. QBFTConsensus Feature = "qbft_consensus" + + // InvertDiscv5 enables the new push based discv5 integration and disables the old pull based. + InvertDiscv5 Feature = "invert_discv5" ) var ( // state defines the current rollout status of each feature. state = map[Feature]status{ QBFTConsensus: statusStable, + InvertDiscv5: statusAlpha, // Add all features and there status here. } diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index dfbbaa750..83df50330 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -32,6 +32,7 @@ const ( StartMonitoringAPI StartValidatorAPI StartP2PPing + StartP2PDiscAdapter StartP2PConsensus StartSimulator StartScheduler diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index 9d44e717b..0a15e8ca1 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -29,15 +29,16 @@ func _() { _ = x[StartMonitoringAPI-3] _ = x[StartValidatorAPI-4] _ = x[StartP2PPing-5] - _ = x[StartP2PConsensus-6] - _ = x[StartSimulator-7] - _ = x[StartScheduler-8] - _ = x[StartP2PEventCollector-9] + _ = x[StartP2PDiscAdapter-6] + _ = x[StartP2PConsensus-7] + _ = x[StartSimulator-8] + _ = x[StartScheduler-9] + _ = x[StartP2PEventCollector-10] } -const _OrderStart_name = "TrackerAggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PConsensusSimulatorSchedulerP2PEventCollector" +const _OrderStart_name = "TrackerAggSigDBRelayMonitoringAPIValidatorAPIP2PPingP2PDiscAdapterP2PConsensusSimulatorSchedulerP2PEventCollector" -var _OrderStart_index = [...]uint8{0, 7, 15, 20, 33, 45, 52, 64, 73, 82, 99} +var _OrderStart_index = [...]uint8{0, 7, 15, 20, 33, 45, 52, 66, 78, 87, 96, 113} func (i OrderStart) String() string { if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { diff --git a/p2p/discovery.go b/p2p/discovery.go index 49449ecc0..29da5664a 100644 --- a/p2p/discovery.go +++ b/p2p/discovery.go @@ -16,24 +16,28 @@ package p2p import ( + "context" "crypto/ecdsa" "net" + "time" "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/p2p/netutil" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + ma "github.com/multiformats/go-multiaddr" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/expbackoff" + "github.com/obolnetwork/charon/app/featureset" + "github.com/obolnetwork/charon/app/lifecycle" + "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/z" ) -// UDPNode wraps a discv5 udp node and adds the bootnodes relays. -type UDPNode struct { - *discover.UDPv5 - Relays []Peer -} - // NewUDPNode starts and returns a discv5 UDP implementation. func NewUDPNode(config Config, ln *enode.LocalNode, key *ecdsa.PrivateKey, bootnodes []*enode.Node, @@ -133,3 +137,62 @@ func NewLocalEnode(config Config, key *ecdsa.PrivateKey) (*enode.LocalNode, *eno return node, db, nil } + +// NewDiscoveryAdapter returns a life cycle hook that links discv5 to libp2p by +// continuously polling discv5 for latest peer ERNs and adding then to libp2p peer store. +func NewDiscoveryAdapter(tcpNode host.Host, udpNode *discover.UDPv5, peers []Peer) lifecycle.HookFuncCtx { + return func(ctx context.Context) { + if !featureset.Enabled(featureset.InvertDiscv5) { + return + } + + ctx = log.WithTopic(ctx, "p2p") + ttl := peerstore.TempAddrTTL + baseDelay := expbackoff.WithBaseDelay(time.Millisecond * 100) // Poll quickly on startup + maxDelay := expbackoff.WithMaxDelay(ttl * 9 / 10) // Slow down to 90% of ttl + backoff := expbackoff.New(ctx, baseDelay, maxDelay) + addrs := make(map[peer.ID]string) + + for ctx.Err() == nil { + for _, p := range peers { + if p.ID == tcpNode.ID() { + // Skip self + continue + } + + addr, ok, err := getDiscoveredAddress(udpNode, p) + if err != nil { + log.Error(ctx, "Failed discovering peer address", err) + } else if ok { + addrStr := addr.String() + if addrs[p.ID] != addrStr { + log.Info(ctx, "Discovered new peer address", + z.Str("peer", PeerName(p.ID)), + z.Str("address", addrStr)) + addrs[p.ID] = addrStr + } + + tcpNode.Peerstore().AddAddr(p.ID, addr, ttl) + } + } + + backoff() + } + } +} + +// getDiscoveredAddress returns the peer's address as discovered by discv5, +// it returns false if the peer isn't discovered. +func getDiscoveredAddress(udpNode *discover.UDPv5, p Peer) (ma.Multiaddr, bool, error) { + resolved := udpNode.Resolve(&p.Enode) + if resolved.Seq() == 0 || resolved.TCP() == 0 { + return nil, false, nil // Not discovered + } + + addr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP()) + if err != nil { + return nil, false, err + } + + return addr, true, nil +} diff --git a/p2p/gater_test.go b/p2p/gater_test.go index b3b412156..a1883eb46 100644 --- a/p2p/gater_test.go +++ b/p2p/gater_test.go @@ -71,7 +71,7 @@ func TestP2PConnGating(t *testing.T) { err = nodeA.Connect(context.Background(), addr) require.Error(t, err) - require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s and addr %s", addr.ID, addr.Addrs[0])) + require.Contains(t, err.Error(), fmt.Sprintf("gater rejected connection with peer %s", addr.ID)) } func TestOpenGater(t *testing.T) { diff --git a/p2p/p2p.go b/p2p/p2p.go index 528d9646c..9376375c7 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -34,6 +34,7 @@ import ( ma "github.com/multiformats/go-multiaddr" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/featureset" "github.com/obolnetwork/charon/app/lifecycle" "github.com/obolnetwork/charon/app/log" "github.com/obolnetwork/charon/app/version" @@ -128,21 +129,23 @@ func adaptDiscRouting(udpNode *discover.UDPv5, peers, relays []Peer) peerRouting return peer.AddrInfo{}, errors.New("unknown peer") } - resolved := udpNode.Resolve(&node) - if resolved == nil { - return peer.AddrInfo{}, errors.New("peer not resolved") - } - var mAddrs []ma.Multiaddr - // If sequence is 0, we haven't discovered it yet. - // If tcp port is 0, this node isn't bound to a port. - if resolved.Seq() != 0 && resolved.TCP() != 0 { - mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP()) - if err != nil { - return peer.AddrInfo{}, err + if !featureset.Enabled(featureset.InvertDiscv5) { + resolved := udpNode.Resolve(&node) + if resolved == nil { + return peer.AddrInfo{}, errors.New("peer not resolved") + } + + // If sequence is 0, we haven't discovered it yet. + // If tcp port is 0, this node isn't bound to a port. + if resolved.Seq() != 0 && resolved.TCP() != 0 { + mAddr, err := multiAddrFromIPPort(resolved.IP(), resolved.TCP()) + if err != nil { + return peer.AddrInfo{}, err + } + mAddrs = append(mAddrs, mAddr) } - mAddrs = append(mAddrs, mAddr) } // Add any circuit relays