From 978091a626a0e1f00a797fc4e2de99f4bfee943b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 09:26:21 -0700 Subject: [PATCH 1/9] feat: implement peering service MVP for #6097 This feature will repeatedly reconnect (with a randomized exponential backoff) to peers in a set of "peered" peers. In the future, this should be extended to: 1. Include a CLI for modifying this list at runtime. 2. Include additional options for peers we want to _protect_ but not connect to. 3. Allow configuring timeouts, backoff, etc. 4. Allow groups? Possibly through textile threads. 5. Allow for runtime-only peering rules. 6. Different reconnect policies. But this MVP should be a significant step forward. --- core/core.go | 2 + core/node/groups.go | 2 + core/node/peering.go | 34 +++++ docs/config.md | 23 +++ go.mod | 3 +- go.sum | 4 +- peering/peering.go | 259 +++++++++++++++++++++++++++++++++ peering/peering_test.go | 6 + test/sharness/t0171-peering.sh | 127 ++++++++++++++++ 9 files changed, 457 insertions(+), 3 deletions(-) create mode 100644 core/node/peering.go create mode 100644 peering/peering.go create mode 100644 peering/peering_test.go create mode 100755 test/sharness/t0171-peering.sh diff --git a/core/core.go b/core/core.go index 2cc67eb6f17..d422a1aa80c 100644 --- a/core/core.go +++ b/core/core.go @@ -48,6 +48,7 @@ import ( "github.com/ipfs/go-ipfs/namesys" ipnsrp "github.com/ipfs/go-ipfs/namesys/republisher" "github.com/ipfs/go-ipfs/p2p" + "github.com/ipfs/go-ipfs/peering" "github.com/ipfs/go-ipfs/repo" ) @@ -83,6 +84,7 @@ type IpfsNode struct { // Online PeerHost p2phost.Host `optional:"true"` // the network host (server+client) + Peering peering.PeeringService `optional:"true"` Filters *ma.Filters `optional:"true"` Bootstrapper io.Closer `optional:"true"` // the periodic bootstrapper Routing routing.Routing `optional:"true"` // the routing system. recommend ipfs-dht diff --git a/core/node/groups.go b/core/node/groups.go index 9078f52b69f..ad51473452e 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -250,6 +250,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(OnlineExchange(shouldBitswapProvide)), maybeProvide(Graphsync, cfg.Experimental.GraphsyncEnabled), fx.Provide(Namesys(ipnsCacheSize)), + fx.Provide(Peering), + PeerWith(cfg.Peering.Peers...), fx.Invoke(IpnsRepublisher(repubPeriod, recordLifetime)), diff --git a/core/node/peering.go b/core/node/peering.go new file mode 100644 index 00000000000..b5e7caadc33 --- /dev/null +++ b/core/node/peering.go @@ -0,0 +1,34 @@ +package node + +import ( + "context" + + "github.com/ipfs/go-ipfs/peering" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "go.uber.org/fx" +) + +// Peering constructs the peering service and hooks it into fx's lifetime +// management system. +func Peering(lc fx.Lifecycle, host host.Host) *peering.PeeringService { + ps := peering.NewPeeringService(host) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + return ps.Start() + }, + OnStop: func(context.Context) error { + return ps.Stop() + }, + }) + return ps +} + +// PeerWith configures the peering service to peer with the specified peers. +func PeerWith(peers ...peer.AddrInfo) fx.Option { + return fx.Invoke(func(ps *peering.PeeringService) { + for _, ai := range peers { + ps.AddPeer(ai) + } + }) +} diff --git a/docs/config.md b/docs/config.md index f2211954cb8..dc6db166c49 100644 --- a/docs/config.md +++ b/docs/config.md @@ -139,6 +139,8 @@ documented in `ipfs config profile --help`. - [`Pubsub`](#pubsub) - [`Pubsub.Router`](#pubsubrouter) - [`Pubsub.DisableSigning`](#pubsubdisablesigning) + - [`Peering`](#peering) + - [`Peering.Peers`](#peeringpeers) - [`Reprovider`](#reprovider) - [`Reprovider.Interval`](#reproviderinterval) - [`Reprovider.Strategy`](#reproviderstrategy) @@ -157,6 +159,7 @@ documented in `ipfs config profile --help`. - [`Swarm.ConnMgr.HighWater`](#swarmconnmgrhighwater) - [`Swarm.ConnMgr.GracePeriod`](#swarmconnmgrgraceperiod) + ## `Addresses` Contains information about various listener addresses to be used by this node. @@ -703,6 +706,26 @@ intentionally re-using the real message's message ID. Default: `false` +### `Peering` + +Configures the peering subsystem. The peering subsystem configures go-ipfs to +connect to, remain connected to, and reconnect to a set of peers. Peers should +use this subsystem to create "sticky" links between frequently used peers for +improved reliability. + +#### `Peering.Peers` + +The set of peers with which to peer. Each entry is of the form: + +```js +{ + "ID": "QmSomePeerID", # The peers ID. + "Addrs": ["/ip4/1.2.3.4/tcp/1234"] # Known addresses for the peer. If none are specified, the DHT will be queried. +} +``` + +Additional fields may be added in the future. + ## `Reprovider` ### `Reprovider.Interval` diff --git a/go.mod b/go.mod index 419484e78ec..b26d3bf4e44 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-cmds v0.2.9 - github.com/ipfs/go-ipfs-config v0.6.1 + github.com/ipfs/go-ipfs-config v0.7.0 github.com/ipfs/go-ipfs-ds-help v0.1.1 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 @@ -94,6 +94,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.6.0 + github.com/stretchr/testify v1.5.1 github.com/syndtr/goleveldb v1.0.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/go-sysinfo v0.0.0-20190219211824-4a357d4b90b1 diff --git a/go.sum b/go.sum index e93e390f8dc..074074e9b44 100644 --- a/go.sum +++ b/go.sum @@ -301,8 +301,8 @@ github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7Na github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalYeobXmUyTreP8= github.com/ipfs/go-ipfs-cmds v0.2.9 h1:zQTENe9UJrtCb2bOtRoDGjtuo3rQjmuPdPnVlqoBV/M= github.com/ipfs/go-ipfs-cmds v0.2.9/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= -github.com/ipfs/go-ipfs-config v0.6.1 h1:d1f0fEEpUQ9R+6c0VZMNy2P+wCl4K4DO4VHJBvgWwFw= -github.com/ipfs/go-ipfs-config v0.6.1/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= +github.com/ipfs/go-ipfs-config v0.7.0 h1:cClINg8v28//KaYMwt1aSjbS8eGJjNKIEnahpT/2hYk= +github.com/ipfs/go-ipfs-config v0.7.0/go.mod h1:GQUxqb0NfkZmEU92PxqqqLVVFTLpoGGUlBaTyDaAqrE= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= diff --git a/peering/peering.go b/peering/peering.go new file mode 100644 index 00000000000..c543712c512 --- /dev/null +++ b/peering/peering.go @@ -0,0 +1,259 @@ +package peering + +import ( + "context" + "errors" + "math/rand" + "sync" + "time" + + "github.com/ipfs/go-log" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/multiformats/go-multiaddr" +) + +// maxBackoff is the maximum time between reconnect attempts. +const ( + maxBackoff = 10 * time.Minute + connmgrTag = "ipfs-peering" + // This needs to be sufficient to prevent two sides from simultaneously + // dialing. + initialDelay = 5 * time.Second +) + +var logger = log.Logger("peering") + +type state int + +const ( + stateInit state = iota + stateRunning + stateStopped +) + +// peerHandler keeps track of all state related to a specific "peering" peer. +type peerHandler struct { + peer peer.ID + host host.Host + ctx context.Context + cancel context.CancelFunc + + mu sync.Mutex + addrs []multiaddr.Multiaddr + timer *time.Timer + + nextDelay time.Duration +} + +func (ph *peerHandler) stop() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil { + ph.timer.Stop() + ph.timer = nil + } +} + +func (ph *peerHandler) nextBackoff() time.Duration { + // calculate the timeout + if ph.nextDelay < maxBackoff { + ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) + } + return ph.nextDelay +} + +func (ph *peerHandler) reconnect() { + // Try connecting + + ph.mu.Lock() + addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...) + ph.mu.Unlock() + + logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) + + err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) + if err != nil { + logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) + // Ok, we failed. Extend the timeout. + ph.mu.Lock() + if ph.timer != nil { + // Only counts if the timer still exists. If not, a + // connection _was_ somehow established. + ph.timer.Reset(ph.nextBackoff()) + } + // Otherwise, someone else has stopped us so we can assume that + // we're either connected or someone else will start us. + ph.mu.Unlock() + } + + // Always call this. We could have connected since we processed the + // error. + ph.stopIfConnected() +} + +func (ph *peerHandler) stopIfConnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + logger.Debugw("successfully reconnected", "peer", ph.peer) + ph.timer.Stop() + ph.timer = nil + ph.nextDelay = initialDelay + } +} + +// startIfDisconnected is the inverse of stopIfConnected. +func (ph *peerHandler) startIfDisconnected() { + ph.mu.Lock() + defer ph.mu.Unlock() + + if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + logger.Debugw("disconnected from peer", "peer", ph.peer) + // Always start with a short timeout so we can stagger things a bit. + ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + } +} + +// PeeringService maintains connections to specified peers, reconnecting on +// disconnect with a back-off. +type PeeringService struct { + host host.Host + + mu sync.RWMutex + peers map[peer.ID]*peerHandler + + ctx context.Context + cancel context.CancelFunc + state state +} + +// NewPeeringService constructs a new peering service. Peers can be added and +// removed immediately, but connections won't be formed until `Start` is called. +func NewPeeringService(host host.Host) *PeeringService { + ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} + ps.ctx, ps.cancel = context.WithCancel(context.Background()) + return ps +} + +// Start starts the peering service, connecting and maintaining connections to +// all registered peers. It returns an error if the service has already been +// stopped. +func (ps *PeeringService) Start() error { + ps.mu.Lock() + defer ps.mu.Unlock() + + switch ps.state { + case stateInit: + logger.Infow("starting") + case stateRunning: + return nil + case stateStopped: + return errors.New("already stopped") + } + ps.host.Network().Notify((*netNotifee)(ps)) + ps.state = stateRunning + for _, handler := range ps.peers { + go handler.startIfDisconnected() + } + return nil +} + +// Stop stops the peering service. +func (ps *PeeringService) Stop() error { + ps.cancel() + ps.host.Network().StopNotify((*netNotifee)(ps)) + + ps.mu.Lock() + defer ps.mu.Unlock() + + if ps.state == stateRunning { + logger.Infow("stopping") + for _, handler := range ps.peers { + handler.stop() + } + } + return nil +} + +// AddPeer adds a peer to the peering service. This function may be safely +// called at any time: before the service is started, while running, or after it +// stops. +// +// Add peer may also be called multiple times for the same peer. The new +// addresses will replace the old. +func (ps *PeeringService) AddPeer(info peer.AddrInfo) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[info.ID]; ok { + logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) + handler.addrs = info.Addrs + } else { + logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) + ps.host.ConnManager().Protect(info.ID, connmgrTag) + + handler = &peerHandler{ + host: ps.host, + peer: info.ID, + addrs: info.Addrs, + nextDelay: initialDelay, + } + handler.ctx, handler.cancel = context.WithCancel(ps.ctx) + ps.peers[info.ID] = handler + if ps.state == stateRunning { + go handler.startIfDisconnected() + } + } +} + +// RemovePeer removes a peer from the peering service. This function may be +// safely called at any time: before the service is started, while running, or +// after it stops. +func (ps *PeeringService) RemovePeer(id peer.ID) { + ps.mu.Lock() + defer ps.mu.Unlock() + + if handler, ok := ps.peers[id]; ok { + logger.Infow("peer removed", "peer", id) + ps.host.ConnManager().Unprotect(id, connmgrTag) + + handler.stop() + handler.cancel() + delete(ps.peers, id) + } +} + +type netNotifee PeeringService + +func (nn *netNotifee) Connected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.stopIfConnected() + } +} +func (nn *netNotifee) Disconnected(_ network.Network, c network.Conn) { + ps := (*PeeringService)(nn) + + p := c.RemotePeer() + ps.mu.RLock() + defer ps.mu.RUnlock() + + if handler, ok := ps.peers[p]; ok { + // use a goroutine to avoid blocking events. + go handler.startIfDisconnected() + } +} +func (nn *netNotifee) OpenedStream(network.Network, network.Stream) {} +func (nn *netNotifee) ClosedStream(network.Network, network.Stream) {} +func (nn *netNotifee) Listen(network.Network, multiaddr.Multiaddr) {} +func (nn *netNotifee) ListenClose(network.Network, multiaddr.Multiaddr) {} diff --git a/peering/peering_test.go b/peering/peering_test.go new file mode 100644 index 00000000000..0be08dcdc80 --- /dev/null +++ b/peering/peering_test.go @@ -0,0 +1,6 @@ +package peering + +import "testing" + +func TestPeeringService(t *testing.T) { +} diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh new file mode 100755 index 00000000000..ed1cbb436c6 --- /dev/null +++ b/test/sharness/t0171-peering.sh @@ -0,0 +1,127 @@ +#!/usr/bin/env bash + +test_description="Test peering service" + +. lib/test-lib.sh + +NUM_NODES=3 + +test_expect_success 'init iptb' ' + rm -rf .iptb/ && + iptb testbed create -type localipfs -count $NUM_NODES -init +' + +test_expect_success 'disabling routing' ' + iptb run -- ipfs config Routing.Type none +' + +for i in $(seq 0 2); do + ADDR="$(printf '["/ip4/127.0.0.1/tcp/%s"]' "$(( 3000 + ( RANDOM % 1000 ) ))")" + test_expect_success "configuring node $i to listen on $ADDR" ' + ipfsi "$i" config --json Addresses.Swarm "$ADDR" + ' +done + +peer_id() { + ipfsi "$1" config Identity.PeerID +} + +peer_addrs() { + ipfsi "$1" config Addresses.Swarm +} + +peer() { + PEER1="$1" && + PEER2="$2" && + PEER_LIST="$(ipfsi "$PEER1" config Peering.Peers)" && + { [[ "$PEER_LIST" == "null" ]] || PEER_LIST_INNER="${PEER_LIST:1:-1}"; } && + ADDR_INFO="$(printf '[%s{"ID": "%s", "Addrs": %s}]' \ + "${PEER_LIST_INNER:+${PEER_LIST_INNER},}" \ + "$(peer_id "$PEER2")" \ + "$(peer_addrs "$PEER2")")" && + ipfsi "$PEER1" config --json Peering.Peers "${ADDR_INFO}" +} + +# Peer: +# - 0 <-> 1 +# - 1 -> 2 +test_expect_success 'configure peering' ' + peer 0 1 && + peer 1 0 && + peer 1 2 +' + +list_peers() { + ipfsi "$1" swarm peers | sed 's|.*/p2p/\([^/]*\)$|\1|' | sort -u +} + +check_peers() { + sleep 20 # give it some time to settle. + test_expect_success 'verifying peering for peer 0' ' + list_peers 0 > peers_0_actual && + peer_id 1 > peers_0_expected && + test_cmp peers_0_expected peers_0_actual + ' + + test_expect_success 'verifying peering for peer 1' ' + list_peers 1 > peers_1_actual && + { peer_id 0 && peer_id 2 ; } | sort -u > peers_1_expected && + test_cmp peers_1_expected peers_1_actual + ' + + test_expect_success 'verifying peering for peer 2' ' + list_peers 2 > peers_2_actual && + peer_id 1 > peers_2_expected && + test_cmp peers_2_expected peers_2_actual + ' +} + +test_expect_success 'startup cluster' ' + iptb start -wait && + iptb run -- ipfs log level peering debug +' + +check_peers + +disconnect() { + ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" +} + +# Bidiractional peering shouldn't cause problems (e.g., simultaneous connect +# issues). +test_expect_success 'disconnecting 0->1' ' + disconnect 0 1 +' + +check_peers + +# 1 should reconnect to 2 when 2 disconnects from 1. +test_expect_success 'disconnecting 2->1' ' + disconnect 2 1 +' + +check_peers + +# 2 isn't peering. This test ensures that 1 will re-peer with 2 when it comes +# back online. +test_expect_success 'stopping 2' ' + iptb stop 2 +' + +# Wait to disconnect +sleep 30 + +test_expect_success 'starting 2' ' + iptb start 2 +' + +# Wait for backoff +sleep 30 + +check_peers + +test_expect_success "stop testbed" ' + iptb stop +' + +test_done From 8e52c7fb2d04482b108489e7cafd95ec6ebb0375 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 18:53:55 -0700 Subject: [PATCH 2/9] fix: doc comment location Co-authored-by: Will --- peering/peering.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/peering/peering.go b/peering/peering.go index c543712c512..ef0aa4d2e8b 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -14,8 +14,8 @@ import ( "github.com/multiformats/go-multiaddr" ) -// maxBackoff is the maximum time between reconnect attempts. const ( + // maxBackoff is the maximum time between reconnect attempts. maxBackoff = 10 * time.Minute connmgrTag = "ipfs-peering" // This needs to be sufficient to prevent two sides from simultaneously From 6ba49cfef24cd7be43d20eaceec9df66b054a0d2 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:05:45 -0700 Subject: [PATCH 3/9] fix: spelling Co-authored-by: Adin Schmahmann --- test/sharness/t0171-peering.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/sharness/t0171-peering.sh b/test/sharness/t0171-peering.sh index ed1cbb436c6..9b775cb3cbe 100755 --- a/test/sharness/t0171-peering.sh +++ b/test/sharness/t0171-peering.sh @@ -87,7 +87,7 @@ disconnect() { ipfsi "$1" swarm disconnect "/p2p/$(peer_id "$2")" } -# Bidiractional peering shouldn't cause problems (e.g., simultaneous connect +# Bidirectional peering shouldn't cause problems (e.g., simultaneous connect # issues). test_expect_success 'disconnecting 0->1' ' disconnect 0 1 From 0551c4dca843fe3a9665fdc49cd2dcecaeb85046 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 19:11:13 -0700 Subject: [PATCH 4/9] fix: address peering service code feedback * better name for timer * cancel context from within stop --- peering/peering.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index ef0aa4d2e8b..9785b65556e 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -40,20 +40,22 @@ type peerHandler struct { ctx context.Context cancel context.CancelFunc - mu sync.Mutex - addrs []multiaddr.Multiaddr - timer *time.Timer + mu sync.Mutex + addrs []multiaddr.Multiaddr + reconnectTimer *time.Timer nextDelay time.Duration } +// stop permanently stops the peer handler. func (ph *peerHandler) stop() { + ph.cancel() + ph.mu.Lock() defer ph.mu.Unlock() - - if ph.timer != nil { - ph.timer.Stop() - ph.timer = nil + if ph.reconnectTimer != nil { + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil } } @@ -79,10 +81,10 @@ func (ph *peerHandler) reconnect() { logger.Debugw("failed to reconnect", "peer", ph.peer, "error", err) // Ok, we failed. Extend the timeout. ph.mu.Lock() - if ph.timer != nil { - // Only counts if the timer still exists. If not, a + if ph.reconnectTimer != nil { + // Only counts if the reconnectTimer still exists. If not, a // connection _was_ somehow established. - ph.timer.Reset(ph.nextBackoff()) + ph.reconnectTimer.Reset(ph.nextBackoff()) } // Otherwise, someone else has stopped us so we can assume that // we're either connected or someone else will start us. @@ -98,10 +100,10 @@ func (ph *peerHandler) stopIfConnected() { ph.mu.Lock() defer ph.mu.Unlock() - if ph.timer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { + if ph.reconnectTimer != nil && ph.host.Network().Connectedness(ph.peer) == network.Connected { logger.Debugw("successfully reconnected", "peer", ph.peer) - ph.timer.Stop() - ph.timer = nil + ph.reconnectTimer.Stop() + ph.reconnectTimer = nil ph.nextDelay = initialDelay } } @@ -111,10 +113,10 @@ func (ph *peerHandler) startIfDisconnected() { ph.mu.Lock() defer ph.mu.Unlock() - if ph.timer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { + if ph.reconnectTimer == nil && ph.host.Network().Connectedness(ph.peer) != network.Connected { logger.Debugw("disconnected from peer", "peer", ph.peer) // Always start with a short timeout so we can stagger things a bit. - ph.timer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) + ph.reconnectTimer = time.AfterFunc(ph.nextBackoff(), ph.reconnect) } } @@ -222,7 +224,6 @@ func (ps *PeeringService) RemovePeer(id peer.ID) { ps.host.ConnManager().Unprotect(id, connmgrTag) handler.stop() - handler.cancel() delete(ps.peers, id) } } From 7990d2ccb51c0c9ea205e07b37e21330e3189ad3 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 19:11:42 -0700 Subject: [PATCH 5/9] doc: expand on peering service documentation * Explain _why_ it exists. * Explain how it can be used. --- docs/config.md | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/docs/config.md b/docs/config.md index dc6db166c49..ac5e01682c1 100644 --- a/docs/config.md +++ b/docs/config.md @@ -709,9 +709,39 @@ Default: `false` ### `Peering` Configures the peering subsystem. The peering subsystem configures go-ipfs to -connect to, remain connected to, and reconnect to a set of peers. Peers should -use this subsystem to create "sticky" links between frequently used peers for -improved reliability. +connect to, remain connected to, and reconnect to a set of nodes. Nodes should +use this subsystem to create "sticky" links between frequently useful peers to +improve reliability. + +Use-cases: + +* An IPFS gateway connected to an IPFS cluster should peer to ensure that the + gateway can always fetch content from the cluster. +* A dapp may peer embedded go-ipfs nodes with a set of pinning services or + textile cafes/hubs. +* A set of friends may peer to ensure that they can always fetch each other's + content. + +When a node is added to the set of peered nodes, go-ipfs will: + +1. Protect connections to this node from the connection manager. That is, + go-ipfs will never automatically close the connection to this node and + connections to this node will not count towards the connection limit. +2. Connect to this node on startup. +3. Repeatedly try to reconnect to this node if the last connection dies or the + node goes offline. This repeated re-connect logic is governed by a randomized + exponential backoff delay ranging from ~5 seconds to ~10 minutes to avoid + repeatedly reconnect to a node that's offline. + +Peering can be asymmetric or symmetric: + +* When symmetric, the connection will be protected by both nodes and will likely + be vary stable. +* When asymmetric, only one node (the node that configured peering) will protect + the connection and attempt to re-connect to the peered node on disconnect. If + the peered node is under heavy load and/or has a low connection limit, the + connection may flap repeatedly. Be careful when asymmetrically peering to not + overload peers. #### `Peering.Peers` From fe2b289d3002ed11a1dbcc0521bc42d33f0d46ed Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:02:23 -0700 Subject: [PATCH 6/9] test: add unit test for peering service --- peering/peering_test.go | 135 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 134 insertions(+), 1 deletion(-) diff --git a/peering/peering_test.go b/peering/peering_test.go index 0be08dcdc80..0d03aaf8ec0 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -1,6 +1,139 @@ package peering -import "testing" +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + connmgr "github.com/libp2p/go-libp2p-connmgr" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + + "github.com/stretchr/testify/require" +) + +func newNode(ctx context.Context, t *testing.T) host.Host { + h, err := libp2p.New( + ctx, + libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), + // We'd like to set the connection manager low water to 0, but + // that would disable the connection manager. + libp2p.ConnectionManager(connmgr.NewConnManager(1, 100, 0)), + ) + require.NoError(t, err) + return h +} func TestPeeringService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h1 := newNode(ctx, t) + ps1 := NewPeeringService(h1) + + h2 := newNode(ctx, t) + h3 := newNode(ctx, t) + h4 := newNode(ctx, t) + + // peer 1 -> 2 + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + + // We haven't started so we shouldn't have any peers. + require.Never(t, func() bool { + return len(h1.Network().Peers()) > 0 + }, 100*time.Millisecond, 1*time.Second, "expected host 1 to have no peers") + + // Use p4 to take up the one slot we have in the connection manager. + for _, h := range []host.Host{h1, h2} { + require.NoError(t, h.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()})) + h.ConnManager().TagPeer(h4.ID(), "sticky-peer", 1000) + } + + // Now start. + require.NoError(t, ps1.Start()) + // starting twice is fine. + require.NoError(t, ps1.Start()) + + // We should eventually connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 10*time.Millisecond) + + // Now explicitly connect to p3. + require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()})) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 100*time.Millisecond) + + require.Len(t, h1.Network().Peers(), 3) + + // force a disconnect + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect from p3. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should remain connected to p2 + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 1*time.Second) + + // Now force h2 to disconnect (we have an asymmetric peering). + conns := h2.Network().ConnsToPeer(h1.ID()) + require.NotEmpty(t, conns) + h2.ConnManager().TrimOpenConns(ctx) + + // All conns to peer should eventually close. + for _, c := range conns { + require.Eventually(t, func() bool { + s, err := c.NewStream() + if s != nil { + _ = s.Reset() + } + return err != nil + }, 5*time.Second, 10*time.Millisecond) + } + + // Should eventually re-connect. + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Unprotect 2 from 1. + ps1.RemovePeer(h2.ID()) + + // Trim connections. + h1.ConnManager().TrimOpenConns(ctx) + + // Should disconnect + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) != network.Connected + }, 5*time.Second, 10*time.Millisecond) + + // Should never reconnect. + require.Never(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 20*time.Second, 1*time.Second) + + // Until added back + ps1.AddPeer(peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + ps1.AddPeer(peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h2.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + require.Eventually(t, func() bool { + return h1.Network().Connectedness(h3.ID()) == network.Connected + }, 30*time.Second, 1*time.Second) + + // Should be able to repeatedly stop. + require.NoError(t, ps1.Stop()) + require.NoError(t, ps1.Stop()) + + // Adding and removing should work after stopping. + ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) + ps1.RemovePeer(h2.ID()) } From 87a293f6801b176d9a947c3bf49c511e3dd98dce Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:09:50 -0700 Subject: [PATCH 7/9] fix(peering): fix a race condition --- peering/peering.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 9785b65556e..5f78a44f6c1 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -47,6 +47,24 @@ type peerHandler struct { nextDelay time.Duration } +// setAddrs sets the addresses for this peer. +func (ph *peerHandler) setAddrs(addrs []multiaddr.Multiaddr) { + // Not strictly necessary, but it helps to not trust the calling code. + addrCopy := make([]multiaddr.Multiaddr, len(addrs)) + copy(addrCopy, addrs) + + ph.mu.Lock() + defer ph.mu.Unlock() + ph.addrs = addrCopy +} + +// getAddrs returns a shared slice of addresses for this peer. Do not modify. +func (ph *peerHandler) getAddrs() []multiaddr.Multiaddr { + ph.mu.Lock() + defer ph.mu.Unlock() + return ph.addrs +} + // stop permanently stops the peer handler. func (ph *peerHandler) stop() { ph.cancel() @@ -69,11 +87,7 @@ func (ph *peerHandler) nextBackoff() time.Duration { func (ph *peerHandler) reconnect() { // Try connecting - - ph.mu.Lock() - addrs := append(([]multiaddr.Multiaddr)(nil), ph.addrs...) - ph.mu.Unlock() - + addrs := ph.getAddrs() logger.Debugw("reconnecting", "peer", ph.peer, "addrs", addrs) err := ph.host.Connect(ph.ctx, peer.AddrInfo{ID: ph.peer, Addrs: addrs}) @@ -193,7 +207,7 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { if handler, ok := ps.peers[info.ID]; ok { logger.Infow("updating addresses", "peer", info.ID, "addrs", info.Addrs) - handler.addrs = info.Addrs + handler.setAddrs(info.Addrs) } else { logger.Infow("peer added", "peer", info.ID, "addrs", info.Addrs) ps.host.ConnManager().Protect(info.ID, connmgrTag) From 17b3b02549ef96534b4d170674d4ccf174efb1d1 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 20:20:13 -0700 Subject: [PATCH 8/9] fix: remove unecessary context --- peering/peering.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 5f78a44f6c1..663f4101760 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -141,18 +141,13 @@ type PeeringService struct { mu sync.RWMutex peers map[peer.ID]*peerHandler - - ctx context.Context - cancel context.CancelFunc - state state + state state } // NewPeeringService constructs a new peering service. Peers can be added and // removed immediately, but connections won't be formed until `Start` is called. func NewPeeringService(host host.Host) *PeeringService { - ps := &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} - ps.ctx, ps.cancel = context.WithCancel(context.Background()) - return ps + return &PeeringService{host: host, peers: make(map[peer.ID]*peerHandler)} } // Start starts the peering service, connecting and maintaining connections to @@ -180,17 +175,18 @@ func (ps *PeeringService) Start() error { // Stop stops the peering service. func (ps *PeeringService) Stop() error { - ps.cancel() ps.host.Network().StopNotify((*netNotifee)(ps)) ps.mu.Lock() defer ps.mu.Unlock() - if ps.state == stateRunning { + switch ps.state { + case stateInit, stateRunning: logger.Infow("stopping") for _, handler := range ps.peers { handler.stop() } + ps.state = stateStopped } return nil } @@ -218,10 +214,16 @@ func (ps *PeeringService) AddPeer(info peer.AddrInfo) { addrs: info.Addrs, nextDelay: initialDelay, } - handler.ctx, handler.cancel = context.WithCancel(ps.ctx) + handler.ctx, handler.cancel = context.WithCancel(context.Background()) ps.peers[info.ID] = handler - if ps.state == stateRunning { + switch ps.state { + case stateRunning: go handler.startIfDisconnected() + case stateStopped: + // We still construct everything in this state because + // it's easier to reason about. But we should still free + // resources. + handler.cancel() } } } From e10289a93d50e3cc80a5a3692b98391cc1aab62b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 25 May 2020 21:18:45 -0700 Subject: [PATCH 9/9] fix: really cap the max backoff at 10 minutes While preserving some randomness. And add a test. --- peering/peering.go | 18 ++++++++++++++++-- peering/peering_test.go | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/peering/peering.go b/peering/peering.go index 663f4101760..ed0b43226c0 100644 --- a/peering/peering.go +++ b/peering/peering.go @@ -14,10 +14,17 @@ import ( "github.com/multiformats/go-multiaddr" ) +// Seed the random number generator. +// +// We don't need good randomness, but we do need randomness. const ( // maxBackoff is the maximum time between reconnect attempts. maxBackoff = 10 * time.Minute - connmgrTag = "ipfs-peering" + // The backoff will be cut off when we get within 10% of the actual max. + // If we go over the max, we'll adjust the delay down to a random value + // between 90-100% of the max backoff. + maxBackoffJitter = 10 // % + connmgrTag = "ipfs-peering" // This needs to be sufficient to prevent two sides from simultaneously // dialing. initialDelay = 5 * time.Second @@ -78,10 +85,17 @@ func (ph *peerHandler) stop() { } func (ph *peerHandler) nextBackoff() time.Duration { - // calculate the timeout if ph.nextDelay < maxBackoff { ph.nextDelay += ph.nextDelay/2 + time.Duration(rand.Int63n(int64(ph.nextDelay))) } + + // If we've gone over the max backoff, reduce it under the max. + if ph.nextDelay > maxBackoff { + ph.nextDelay = maxBackoff + // randomize the backoff a bit (10%). + ph.nextDelay -= time.Duration(rand.Int63n(int64(maxBackoff) * maxBackoffJitter / 100)) + } + return ph.nextDelay } diff --git a/peering/peering_test.go b/peering/peering_test.go index 0d03aaf8ec0..1f21b7816a2 100644 --- a/peering/peering_test.go +++ b/peering/peering_test.go @@ -137,3 +137,22 @@ func TestPeeringService(t *testing.T) { ps1.AddPeer(peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}) ps1.RemovePeer(h2.ID()) } + +func TestNextBackoff(t *testing.T) { + minMaxBackoff := (100 - maxBackoffJitter) / 100 * maxBackoff + for x := 0; x < 1000; x++ { + ph := peerHandler{nextDelay: time.Second} + for min, max := time.Second*3/2, time.Second*5/2; min < minMaxBackoff; min, max = min*3/2, max*5/2 { + b := ph.nextBackoff() + if b > max || b < min { + t.Errorf("expected backoff %s to be between %s and %s", b, min, max) + } + } + for i := 0; i < 100; i++ { + b := ph.nextBackoff() + if b < minMaxBackoff || b > maxBackoff { + t.Fatal("failed to stay within max bounds") + } + } + } +}